Skip to content

Commit f29f8bb

Browse files
authored
[agentserver][responses] fix replay stuck (#45930)
* use handler docorator and refining orchestration * refining orchestration * refined orchestration * fixed conflict and handler registration * fix replay
1 parent 4bd38ab commit f29f8bb

6 files changed

Lines changed: 48 additions & 36 deletions

File tree

sdk/agentserver/azure-ai-agentserver-responses/azure/ai/agentserver/responses/hosting/_orchestrator.py

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -655,11 +655,55 @@ async def _finalize() -> None:
655655

656656
# --- Fast path: no keep-alive ---
657657
if not self._runtime_options.sse_keep_alive_enabled:
658+
if not (ctx.background and ctx.store):
659+
# Simple fast path for non-background streaming.
660+
try:
661+
async for event in self._process_handler_events(ctx, state, handler_iterator):
662+
yield encode_sse_payload(event["type"], event["payload"])
663+
finally:
664+
await _finalize()
665+
return
666+
667+
# Background+stream without keep-alive: run the handler as an independent
668+
# asyncio.Task so that finalization (including subject.complete()) is
669+
# guaranteed to run even when the original SSE connection is dropped before
670+
# all events are delivered. Without this, _live_stream can be abandoned
671+
# mid-iteration by Starlette (the async-generator finalizer may not fire
672+
# promptly), leaving GET-replay subscribers blocked on await q.get() forever.
673+
_SENTINEL_BG = object()
674+
bg_queue: asyncio.Queue[object] = asyncio.Queue()
675+
676+
async def _bg_producer() -> None:
677+
try:
678+
async for event in self._process_handler_events(ctx, state, handler_iterator):
679+
await bg_queue.put(encode_sse_payload(event["type"], event["payload"]))
680+
except Exception as exc: # pylint: disable=broad-exception-caught
681+
state.captured_error = exc
682+
finally:
683+
# Always finalize (includes subject.complete()) — this runs even if
684+
# the original POST SSE connection was dropped and _live_stream is
685+
# never properly closed by Starlette.
686+
await _finalize()
687+
await bg_queue.put(_SENTINEL_BG)
688+
689+
bg_task = asyncio.create_task(_bg_producer())
658690
try:
659-
async for event in self._process_handler_events(ctx, state, handler_iterator):
660-
yield encode_sse_payload(event["type"], event["payload"])
691+
while True:
692+
item = await bg_queue.get()
693+
if item is _SENTINEL_BG:
694+
break
695+
yield item # type: ignore[misc]
696+
except Exception: # pylint: disable=broad-exception-caught
697+
pass # SSE connection dropped; bg_task continues independently
661698
finally:
662-
await _finalize()
699+
# Wait for the handler task so _finalize() has run before we exit.
700+
# Do NOT cancel it — background+stream must reach a terminal state
701+
# regardless of client connectivity.
702+
if not bg_task.done():
703+
try:
704+
await bg_task
705+
except Exception: # pylint: disable=broad-exception-caught
706+
pass
663707
return
664708

665709
# --- Keep-alive path: merge handler events with periodic keep-alive comments ---

sdk/agentserver/azure-ai-agentserver-responses/samples/ConversationHistory/test.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,6 @@ def _assert_ok(response: requests.Response) -> None:
3131
raise RuntimeError(f"HTTP request failed: {response.status_code} {response.text}") from exc
3232

3333

34-
def _ready() -> None:
35-
_print_header("Ready")
36-
response = requests.get(f"{BASE_URL}/ready", timeout=10)
37-
_assert_ok(response)
38-
_pretty_print(response.json())
39-
40-
4134
def _create(payload: dict[str, Any]) -> dict[str, Any]:
4235
response = requests.post(f"{BASE_URL}/responses", json=payload, timeout=10)
4336
_assert_ok(response)
@@ -104,7 +97,6 @@ def _turn_4_stream(previous_response_id: str) -> None:
10497

10598

10699
def main() -> None:
107-
_ready()
108100
response_1_id = _turn_1()
109101
response_2_id = _turn_2(response_1_id)
110102
response_3_id = _turn_3(response_2_id)

sdk/agentserver/azure-ai-agentserver-responses/samples/FunctionCalling/test.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,6 @@ def _assert_ok(response: requests.Response) -> None:
3232
raise RuntimeError(f"HTTP request failed: {response.status_code} {response.text}") from exc
3333

3434

35-
def _ready() -> None:
36-
_print_header("Ready")
37-
response = requests.get(f"{BASE_URL}/ready", timeout=10)
38-
_assert_ok(response)
39-
_pretty_print(response.json())
40-
41-
4235
def _turn_1_request_function_call() -> str:
4336
_print_header("Turn 1: Request function call")
4437
payload = {
@@ -103,7 +96,6 @@ def _turn_2_submit_function_output_streaming(call_id: str) -> None:
10396

10497

10598
def main() -> None:
106-
_ready()
10799
call_id = _turn_1_request_function_call()
108100
_turn_2_submit_function_output(call_id)
109101
_turn_2_submit_function_output_streaming(call_id)

sdk/agentserver/azure-ai-agentserver-responses/samples/GetStarted/app.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ async def _events() -> AsyncIterable[dict[str, Any]]:
3737
text_content = message_item.add_text_content()
3838
yield text_content.emit_added()
3939
yield text_content.emit_delta("Hello from the Python GettingStarted sample!")
40-
yield text_content.emit_done()
40+
yield text_content.emit_done("Hello from the Python GettingStarted sample!")
4141
yield message_item.emit_content_done(text_content)
4242

4343
yield message_item.emit_done()

sdk/agentserver/azure-ai-agentserver-responses/samples/GetStarted/test.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,6 @@ def _assert_ok(response: requests.Response) -> None:
3232
raise RuntimeError(f"HTTP request failed: {response.status_code} {response.text}") from exc
3333

3434

35-
def _ready() -> None:
36-
_print_header("Ready")
37-
response = requests.get(f"{BASE_URL}/ready", timeout=10)
38-
_assert_ok(response)
39-
_pretty_print(response.json())
40-
41-
4235
def _default_mode() -> None:
4336
_print_header("Default mode (JSON)")
4437
payload = {"model": "gpt-4o-mini", "input": "hello"}
@@ -167,7 +160,6 @@ def _get_replay_mode() -> None:
167160

168161

169162
def main() -> None:
170-
_ready()
171163
_default_mode()
172164
_stream_mode()
173165
_background_mode()

sdk/agentserver/azure-ai-agentserver-responses/samples/MultiOutput/test.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,6 @@ def _assert_ok(response: requests.Response) -> None:
3131
raise RuntimeError(f"HTTP request failed: {response.status_code} {response.text}") from exc
3232

3333

34-
def _ready() -> None:
35-
_print_header("Ready")
36-
response = requests.get(f"{BASE_URL}/ready", timeout=10)
37-
_assert_ok(response)
38-
_pretty_print(response.json())
39-
40-
4134
def _default_mode() -> None:
4235
_print_header("Default mode (JSON) - reasoning + message")
4336
payload = {"model": "test"}
@@ -62,7 +55,6 @@ def _stream_mode() -> None:
6255

6356

6457
def main() -> None:
65-
_ready()
6658
_default_mode()
6759
_stream_mode()
6860

0 commit comments

Comments
 (0)