Skip to content

Commit e572ce9

Browse files
author
Shizoqua
committed
Fix #7100: Add cancellation check in run_stream() to stop iteration immediately
- Add cancellation token check before yielding messages in run_stream() - Prevents processing buffered messages after cancellation token is cancelled - Fixes issue where signal handlers (SIGINT/Ctrl+C) couldn't stop iteration immediately - Add test_round_robin_group_chat_run_stream_cancellation to verify fix The issue was that run_stream() would continue processing all buffered messages from the queue even after the cancellation token was cancelled, because the cancellation check only happened when awaiting the next message. This fix adds a check after receiving each message to break immediately if cancellation occurred.
1 parent 13e144e commit e572ce9

File tree

2 files changed

+65
-0
lines changed

2 files changed

+65
-0
lines changed

python/packages/autogen-agentchat/src/autogen_agentchat/teams/_group_chat/_base_group_chat.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -547,6 +547,11 @@ async def stop_runtime() -> None:
547547
cancellation_token.link_future(message_future)
548548
# Wait for the next message, this will raise an exception if the task is cancelled.
549549
message = await message_future
550+
# Check if cancellation token was cancelled before processing/yielding the message.
551+
# This ensures that signal handlers (e.g., SIGINT/Ctrl+C) can stop iteration immediately
552+
# even if there are buffered messages in the queue.
553+
if cancellation_token is not None and cancellation_token.is_cancelled():
554+
break
550555
if isinstance(message, GroupChatTermination):
551556
# If the message contains an error, we need to raise it here.
552557
# This will stop the team and propagate the error.

python/packages/autogen-agentchat/tests/test_group_chat.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -763,6 +763,66 @@ async def test_round_robin_group_chat_cancellation(runtime: AgentRuntime | None)
763763
assert result.stop_reason is not None and result.stop_reason == "Maximum number of turns 1000 reached."
764764

765765

766+
@pytest.mark.asyncio
767+
async def test_round_robin_group_chat_run_stream_cancellation(runtime: AgentRuntime | None) -> None:
768+
"""Test that run_stream() stops immediately when CancellationToken is cancelled.
769+
770+
This test verifies the fix for issue #7100 where run_stream() would continue
771+
processing buffered messages even after the cancellation token was cancelled.
772+
"""
773+
agent_1 = _EchoAgent("agent_1", description="echo agent 1")
774+
agent_2 = _EchoAgent("agent_2", description="echo agent 2")
775+
agent_3 = _EchoAgent("agent_3", description="echo agent 3")
776+
agent_4 = _EchoAgent("agent_4", description="echo agent 4")
777+
# Set max_turns to a large number to avoid stopping due to max_turns before cancellation.
778+
team = RoundRobinGroupChat(participants=[agent_1, agent_2, agent_3, agent_4], max_turns=1000, runtime=runtime)
779+
cancellation_token = CancellationToken()
780+
781+
events_processed = []
782+
cancellation_occurred = False
783+
784+
async def stream_and_cancel() -> None:
785+
nonlocal cancellation_occurred, events_processed
786+
try:
787+
async for event in team.run_stream(
788+
task="Write a program that prints 'Hello, world!'",
789+
cancellation_token=cancellation_token,
790+
):
791+
events_processed.append(type(event).__name__)
792+
# Cancel after processing a few events to test immediate stopping
793+
if len(events_processed) == 3 and not cancellation_occurred:
794+
cancellation_token.cancel()
795+
cancellation_occurred = True
796+
except asyncio.CancelledError:
797+
# CancelledError is expected when cancellation token is cancelled
798+
# This is the correct behavior - the stream should stop
799+
pass
800+
801+
# Run the stream task
802+
stream_task = asyncio.create_task(stream_and_cancel())
803+
804+
# Wait a bit for events to process and cancellation to occur
805+
await asyncio.sleep(0.5)
806+
807+
# Verify that cancellation occurred
808+
assert cancellation_occurred, "Cancellation should have occurred"
809+
810+
# Wait for the task to complete (it should have stopped due to cancellation)
811+
try:
812+
await asyncio.wait_for(stream_task, timeout=2.0)
813+
except asyncio.TimeoutError:
814+
pytest.fail("Stream task did not complete after cancellation - this indicates the bug still exists")
815+
816+
# With the fix, we should stop immediately after cancellation
817+
# Without the fix, all events would be processed (could be 10+ events)
818+
# With the fix, we should have processed only a few events before stopping
819+
assert len(events_processed) <= 5, (
820+
f"Stream should stop soon after cancellation. "
821+
f"Processed {len(events_processed)} events, expected <= 5. "
822+
f"This indicates the cancellation check is not working properly."
823+
)
824+
825+
766826
@pytest.mark.asyncio
767827
async def test_selector_group_chat(runtime: AgentRuntime | None) -> None:
768828
model_client = ReplayChatCompletionClient(

0 commit comments

Comments
 (0)