Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 75 additions & 67 deletions src/kimi_cli/soul/context.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import asyncio
import json
from collections.abc import Sequence
from pathlib import Path
Expand All @@ -21,6 +22,7 @@ def __init__(self, file_backend: Path):
self._next_checkpoint_id: int = 0
"""The ID of the next checkpoint, starting from 0, incremented after each checkpoint."""
self._system_prompt: str | None = None
self._write_lock = asyncio.Lock()

async def restore(self) -> bool:
logger.debug("Restoring context from file: {file_backend}", file_backend=self._file_backend)
Expand Down Expand Up @@ -83,20 +85,21 @@ async def write_system_prompt(self, prompt: str) -> None:
"""
prompt_line = json.dumps({"role": "_system_prompt", "content": prompt}) + "\n"

if not self._file_backend.exists() or self._file_backend.stat().st_size == 0:
async with aiofiles.open(self._file_backend, "w", encoding="utf-8") as f:
await f.write(prompt_line)
else:
tmp_path = self._file_backend.with_suffix(".tmp")
async with aiofiles.open(tmp_path, "w", encoding="utf-8") as tmp_f:
await tmp_f.write(prompt_line)
async with aiofiles.open(self._file_backend, encoding="utf-8") as src_f:
while True:
chunk = await src_f.read(64 * 1024)
if not chunk:
break
await tmp_f.write(chunk)
await aiofiles.os.replace(tmp_path, self._file_backend)
async with self._write_lock:
if not self._file_backend.exists() or self._file_backend.stat().st_size == 0:
async with aiofiles.open(self._file_backend, "w", encoding="utf-8") as f:
await f.write(prompt_line)
else:
tmp_path = self._file_backend.with_suffix(".tmp")
async with aiofiles.open(tmp_path, "w", encoding="utf-8") as tmp_f:
await tmp_f.write(prompt_line)
async with aiofiles.open(self._file_backend, encoding="utf-8") as src_f:
while True:
chunk = await src_f.read(64 * 1024)
if not chunk:
break
await tmp_f.write(chunk)
await aiofiles.os.replace(tmp_path, self._file_backend)

self._system_prompt = prompt

Expand All @@ -105,8 +108,9 @@ async def checkpoint(self, add_user_message: bool):
self._next_checkpoint_id += 1
logger.debug("Checkpointing, ID: {id}", id=checkpoint_id)

async with aiofiles.open(self._file_backend, "a", encoding="utf-8") as f:
await f.write(json.dumps({"role": "_checkpoint", "id": checkpoint_id}) + "\n")
async with self._write_lock:
Comment on lines 108 to +111
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Acquire _write_lock before advancing in-memory state

Because run_soul() cancels soul_task on user interrupt, any checkpoint()/append_message()/update_token_count() call that is queued on the new _write_lock can now be cancelled after _next_checkpoint_id, _history, or _token_count has already been mutated. That leaves the live Context ahead of context.jsonl: later turns in the same CLI can see phantom checkpoints/messages/token counts, while a resumed or exported session after restart does not. Moving the in-memory updates inside the locked section avoids this divergence under the exact concurrent-write condition this patch introduces.

Useful? React with 👍 / 👎.

async with aiofiles.open(self._file_backend, "a", encoding="utf-8") as f:
await f.write(json.dumps({"role": "_checkpoint", "id": checkpoint_id}) + "\n")
if add_user_message:
await self.append_message(
Message(role="user", content=[system(f"CHECKPOINT {checkpoint_id}")])
Expand All @@ -131,43 +135,44 @@ async def revert_to(self, checkpoint_id: int):
logger.error("Checkpoint {checkpoint_id} does not exist", checkpoint_id=checkpoint_id)
raise ValueError(f"Checkpoint {checkpoint_id} does not exist")

# rotate the context file
rotated_file_path = await next_available_rotation(self._file_backend)
if rotated_file_path is None:
logger.error("No available rotation path found")
raise RuntimeError("No available rotation path found")
await aiofiles.os.replace(self._file_backend, rotated_file_path)
logger.debug(
"Rotated context file: {rotated_file_path}", rotated_file_path=rotated_file_path
)

# restore the context until the specified checkpoint
self._history.clear()
self._token_count = 0
self._next_checkpoint_id = 0
self._system_prompt = None
async with (
aiofiles.open(rotated_file_path, encoding="utf-8") as old_file,
aiofiles.open(self._file_backend, "w", encoding="utf-8") as new_file,
):
async for line in old_file:
if not line.strip():
continue

line_json = json.loads(line)
if line_json["role"] == "_checkpoint" and line_json["id"] == checkpoint_id:
break
async with self._write_lock:
# rotate the context file
rotated_file_path = await next_available_rotation(self._file_backend)
if rotated_file_path is None:
logger.error("No available rotation path found")
raise RuntimeError("No available rotation path found")
await aiofiles.os.replace(self._file_backend, rotated_file_path)
logger.debug(
"Rotated context file: {rotated_file_path}", rotated_file_path=rotated_file_path
)

await new_file.write(line)
if line_json["role"] == "_system_prompt":
self._system_prompt = line_json["content"]
elif line_json["role"] == "_usage":
self._token_count = line_json["token_count"]
elif line_json["role"] == "_checkpoint":
self._next_checkpoint_id = line_json["id"] + 1
else:
message = Message.model_validate(line_json)
self._history.append(message)
# restore the context until the specified checkpoint
self._history.clear()
self._token_count = 0
self._next_checkpoint_id = 0
self._system_prompt = None
async with (
aiofiles.open(rotated_file_path, encoding="utf-8") as old_file,
aiofiles.open(self._file_backend, "w", encoding="utf-8") as new_file,
):
async for line in old_file:
if not line.strip():
continue

line_json = json.loads(line)
if line_json["role"] == "_checkpoint" and line_json["id"] == checkpoint_id:
break

await new_file.write(line)
if line_json["role"] == "_system_prompt":
self._system_prompt = line_json["content"]
elif line_json["role"] == "_usage":
self._token_count = line_json["token_count"]
elif line_json["role"] == "_checkpoint":
self._next_checkpoint_id = line_json["id"] + 1
else:
message = Message.model_validate(line_json)
self._history.append(message)

async def clear(self):
"""
Expand All @@ -182,16 +187,17 @@ async def clear(self):

logger.debug("Clearing context")

# rotate the context file
rotated_file_path = await next_available_rotation(self._file_backend)
if rotated_file_path is None:
logger.error("No available rotation path found")
raise RuntimeError("No available rotation path found")
await aiofiles.os.replace(self._file_backend, rotated_file_path)
self._file_backend.touch()
logger.debug(
"Rotated context file: {rotated_file_path}", rotated_file_path=rotated_file_path
)
async with self._write_lock:
# rotate the context file
rotated_file_path = await next_available_rotation(self._file_backend)
if rotated_file_path is None:
logger.error("No available rotation path found")
raise RuntimeError("No available rotation path found")
await aiofiles.os.replace(self._file_backend, rotated_file_path)
self._file_backend.touch()
logger.debug(
"Rotated context file: {rotated_file_path}", rotated_file_path=rotated_file_path
)

self._history.clear()
self._token_count = 0
Comment on lines 202 to 203
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 clear() performs memory clearing outside _write_lock, inconsistent with revert_to()

In clear(), the in-memory state clearing (_history.clear(), _token_count = 0, _next_checkpoint_id = 0, _system_prompt = None) at lines 202-205 is outside the async with self._write_lock: block. In contrast, revert_to() performs identical memory operations inside its lock (lines 150-153). While in current CPython's asyncio this is technically safe (no yield between lock release and synchronous statements), it's inconsistent with the locking strategy of revert_to() and fragile — if append_message is fixed to move extend inside its lock, clear()'s memory ops also need to be inside the lock to ensure proper serialization.

(Refers to lines 202-205)

Prompt for agents
In src/kimi_cli/soul/context.py, move the four memory-clearing statements (lines 202-205) inside the async with self._write_lock: block in the clear() method, matching the pattern used by revert_to(). Specifically, self._history.clear(), self._token_count = 0, self._next_checkpoint_id = 0, and self._system_prompt = None should be placed after self._file_backend.touch() but before the async with block ends (i.e., before line 200's closing of the logger.debug call, add them indented under the async with).
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Expand All @@ -203,13 +209,15 @@ async def append_message(self, message: Message | Sequence[Message]):
messages = [message] if isinstance(message, Message) else message
self._history.extend(messages)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 In-memory state updated before acquiring _write_lock in append_message, enabling file-memory desync under concurrent access

self._history.extend(messages) at line 210 executes before the _write_lock is acquired at line 212. This means a concurrent lock holder (e.g., revert_to() or clear()) can clear _history after the extend but before the file write, leaving the file with data that's missing from memory.

Concrete scenario via asyncio.shield

asyncio.shield(self._grow_context(...)) at kimisoul.py:728 keeps _grow_context alive as a background task after cancellation. _grow_context calls append_message (kimisoul.py:778,785) and update_token_count (kimisoul.py:780). If the outer task is cancelled and a new turn starts calling clear() or revert_to():

  1. Shielded _grow_context calls append_messageself._history.extend(messages) succeeds (pre-lock)
  2. append_message blocks on _write_lock (held by clear()/revert_to())
  3. The lock holder clears _history (wiping the just-extended message)
  4. Lock is released → append_message acquires it and writes the message to the file
  5. Result: file contains the message, _history does not → desync on next restore()

The same pattern exists in update_token_count (line 219), where self._token_count = token_count is set before the lock.

Suggested change
self._history.extend(messages)
async with self._write_lock:
self._history.extend(messages)
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.


async with aiofiles.open(self._file_backend, "a", encoding="utf-8") as f:
for message in messages:
await f.write(message.model_dump_json(exclude_none=True) + "\n")
async with self._write_lock:
async with aiofiles.open(self._file_backend, "a", encoding="utf-8") as f:
for message in messages:
await f.write(message.model_dump_json(exclude_none=True) + "\n")

async def update_token_count(self, token_count: int):
logger.debug("Updating token count in context: {token_count}", token_count=token_count)
self._token_count = token_count
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 In-memory state updated before acquiring _write_lock in update_token_count, same desync risk as append_message

self._token_count = token_count at line 219 executes before the _write_lock is acquired at line 221, following the same problematic pattern as append_message. Under the asyncio.shield concurrency scenario described for append_message (via _grow_context at kimisoul.py:780), a concurrent clear() or revert_to() can reset _token_count to 0 between the assignment and the file write, causing the stale token count to be written to the file while in-memory state shows 0.

Suggested change
self._token_count = token_count
async with self._write_lock:
self._token_count = token_count
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.


async with aiofiles.open(self._file_backend, "a", encoding="utf-8") as f:
await f.write(json.dumps({"role": "_usage", "token_count": token_count}) + "\n")
async with self._write_lock:
async with aiofiles.open(self._file_backend, "a", encoding="utf-8") as f:
await f.write(json.dumps({"role": "_usage", "token_count": token_count}) + "\n")
Loading