diff --git a/src/worker/acp_bridge.rs b/src/worker/acp_bridge.rs index c981dc558a..8cd1fe03fc 100644 --- a/src/worker/acp_bridge.rs +++ b/src/worker/acp_bridge.rs @@ -103,28 +103,31 @@ impl AcpBridgeRuntime { }) .await?; - // Run the ACP session - match self.run_acp_session(&job.description, &extra_env).await { - Ok(()) => { - self.client - .report_complete(&CompletionReport { - success: true, - message: Some("ACP agent session completed".to_string()), - iterations: 1, - }) - .await?; - } + // Run the ACP session, then emit exactly one terminal "result" event + // (so job monitors transition state) followed by a completion report. + let (success, message) = match self.run_acp_session(&job.description, &extra_env).await { + Ok(()) => (true, "ACP agent session completed".to_string()), Err(e) => { tracing::error!(job_id = %self.config.job_id, "ACP session failed: {}", e); - self.client - .report_complete(&CompletionReport { - success: false, - message: Some(format!("ACP agent failed: {}", e)), - iterations: 1, - }) - .await?; + (false, format!("ACP agent failed: {e}")) } - } + }; + self.client + .post_event(&JobEventPayload { + event_type: "result".to_string(), + data: json!({ + "status": if success { "completed" } else { "error" }, + "message": &message, + }), + }) + .await; + self.client + .report_complete(&CompletionReport { + success, + message: Some(message), + iterations: 1, + }) + .await?; Ok(()) } @@ -197,14 +200,7 @@ impl AcpBridgeRuntime { // Monitor the child process so the follow-up loop can exit if the agent dies. // The oneshot is Send, so it crosses the LocalSet boundary cleanly. - let (child_exit_tx, child_exit_rx) = tokio::sync::oneshot::channel::>(); - tokio::spawn(async move { - let exit_code = match child.wait().await { - Ok(status) => status.code(), - Err(_) => None, - }; - let _ = child_exit_tx.send(exit_code); - }); + let (child_exit_rx, kill_tx) = spawn_child_monitor(child); let local_set = tokio::task::LocalSet::new(); let acp_result = local_set @@ -266,91 +262,34 @@ impl AcpBridgeRuntime { }; // Report prompt result - let result_payload = - stop_reason_to_result(&prompt_response.stop_reason, &session_id.to_string()); + let result_payload = stop_reason_to_turn_event( + &prompt_response.stop_reason, + &session_id.to_string(), + ); client_for_acp.post_event(&result_payload).await; // Follow-up loop: poll for prompts, send additional prompt() calls. // Exits when: orchestrator sends done, or agent process exits. - let mut child_exit_rx = child_exit_rx; - loop { - match client_for_followup.poll_prompt().await { - Ok(Some(follow_up)) => { - if follow_up.done { - tracing::info!(job_id = %job_id, "Orchestrator signaled done"); - break; - } - tracing::info!(job_id = %job_id, "Got follow-up prompt"); - - let follow_result = conn - .prompt(acp::PromptRequest::new( - session_id.clone(), - vec![follow_up.content.into()], - )) - .await; - - match follow_result { - Ok(resp) => { - let payload = stop_reason_to_result( - &resp.stop_reason, - &session_id.to_string(), - ); - client_for_followup.post_event(&payload).await; - } - Err(e) => { - tracing::error!( - job_id = %job_id, - "Follow-up prompt failed: {}", e - ); - client_for_followup - .post_event(&JobEventPayload { - event_type: "status".to_string(), - data: json!({ - "message": format!("Follow-up failed: {}", e), - }), - }) - .await; - } - } - } - Ok(None) => { - // No prompt available — wait, but also watch for agent exit. - tokio::select! { - _ = tokio::time::sleep(Duration::from_secs(2)) => {} - exit_code = &mut child_exit_rx => { - let code = exit_code.ok().flatten(); - tracing::info!( - job_id = %job_id, - exit_code = ?code, - "ACP agent process exited, ending follow-up loop" - ); - break; - } - } - } - Err(e) => { - tracing::warn!(job_id = %job_id, "Prompt polling error: {}", e); - tokio::select! { - _ = tokio::time::sleep(Duration::from_secs(5)) => {} - exit_code = &mut child_exit_rx => { - let code = exit_code.ok().flatten(); - tracing::info!( - job_id = %job_id, - exit_code = ?code, - "ACP agent process exited, ending follow-up loop" - ); - break; - } - } - } - } - } + let prompt_sender = ConnPromptSender { conn: &conn }; + run_follow_up_loop( + &client_for_followup, + &prompt_sender, + &client_for_acp, + &session_id, + child_exit_rx, + job_id, + ) + .await?; Ok::<(), WorkerError>(()) }) .await; - // Wait for stderr reader to finish + // Kill the child on protocol failure so stderr closes (see kill channel above). + if acp_result.is_err() { + let _ = kill_tx.send(()); + } + let _ = stderr_handle.await; acp_result @@ -373,6 +312,28 @@ impl AcpEventSink for Arc { } } +/// Source of follow-up prompts from the orchestrator. +pub(crate) trait FollowUpPromptSource { + fn poll_prompt( + &self, + ) -> impl std::future::Future, WorkerError>>; +} + +impl FollowUpPromptSource for Arc { + async fn poll_prompt(&self) -> Result, WorkerError> { + WorkerHttpClient::poll_prompt(self).await + } +} + +/// Sends a follow-up prompt to the ACP agent. +trait AcpPromptSender { + fn send_prompt( + &self, + session_id: acp::SessionId, + content: String, + ) -> impl std::future::Future>; +} + /// IronClaw's implementation of the ACP Client trait. /// /// Handles callbacks from the agent: session notifications (streaming output) @@ -420,6 +381,156 @@ pub(crate) fn ironclaw_init_request() -> acp::InitializeRequest { ) } +// ==================== Child process monitor ==================== + +/// Spawn a background task that monitors a child process for natural exit +/// or a kill signal. Returns `(child_exit_rx, kill_tx)`. +/// +/// - `child_exit_rx`: fires with the exit code when the child terminates +/// - `kill_tx`: send `()` to terminate the child (e.g. on protocol failure +/// so stderr closes and the stderr reader task can finish) +fn spawn_child_monitor( + mut child: tokio::process::Child, +) -> ( + tokio::sync::oneshot::Receiver>, + tokio::sync::oneshot::Sender<()>, +) { + let (child_exit_tx, child_exit_rx) = tokio::sync::oneshot::channel(); + let (kill_tx, kill_rx) = tokio::sync::oneshot::channel(); + tokio::spawn(async move { + let exit_code_of = + |r: std::io::Result| r.ok().and_then(|s| s.code()); + let exit_code = tokio::select! { + status = child.wait() => exit_code_of(status), + // Only fire on explicit send, not on sender drop. + Ok(()) = kill_rx => { + let _ = child.kill().await; + exit_code_of(child.wait().await) + } + }; + let _ = child_exit_tx.send(exit_code); + }); + (child_exit_rx, kill_tx) +} + +// ==================== Follow-up loop ==================== + +/// Wrapper that implements `AcpPromptSender` for an ACP `ClientSideConnection`. +struct ConnPromptSender<'a, C: acp::Agent> { + conn: &'a C, +} + +impl AcpPromptSender for ConnPromptSender<'_, C> { + async fn send_prompt( + &self, + session_id: acp::SessionId, + content: String, + ) -> acp::Result { + self.conn + .prompt(acp::PromptRequest::new(session_id, vec![content.into()])) + .await + } +} + +/// Maximum consecutive transient poll errors before giving up. +const MAX_CONSECUTIVE_POLL_ERRORS: u32 = 5; + +/// Run the follow-up prompt loop. +/// +/// Polls for follow-up prompts from the orchestrator, sends them to the ACP +/// agent, and translates results into events. Returns `Err` if any follow-up +/// prompt fails, so the caller can report `success: false`. +async fn run_follow_up_loop( + prompt_source: &impl FollowUpPromptSource, + agent: &impl AcpPromptSender, + sink: &impl AcpEventSink, + session_id: &acp::SessionId, + mut child_exit_rx: tokio::sync::oneshot::Receiver>, + job_id: Uuid, +) -> Result<(), WorkerError> { + let session_id_str = session_id.to_string(); + let mut consecutive_poll_errors: u32 = 0; + loop { + // Race poll_prompt against child exit so we detect process death + // even during a long-poll HTTP request to the orchestrator. + let poll_result = tokio::select! { + result = prompt_source.poll_prompt() => result, + exit_code = &mut child_exit_rx => { + let code = exit_code.ok().flatten(); + tracing::debug!(job_id = %job_id, exit_code = ?code, "ACP agent exited, ending follow-up loop"); + break; + } + }; + + let backoff = match poll_result { + Ok(Some(follow_up)) => { + consecutive_poll_errors = 0; + if follow_up.done { + tracing::debug!(job_id = %job_id, "Orchestrator signaled done"); + break; + } + tracing::debug!(job_id = %job_id, "Got follow-up prompt"); + + let follow_result = agent + .send_prompt(session_id.clone(), follow_up.content) + .await; + + match follow_result { + Ok(resp) => { + let payload = stop_reason_to_turn_event(&resp.stop_reason, &session_id_str); + sink.emit_event(&payload).await; + } + Err(e) => { + let msg = format!("Follow-up prompt failed: {e}"); + tracing::error!(job_id = %job_id, "{}", msg); + return Err(WorkerError::ExecutionFailed { reason: msg }); + } + } + continue; + } + Ok(None) => { + consecutive_poll_errors = 0; + Duration::from_secs(2) + } + Err(e) => { + // Permanent errors fail immediately; transient errors retry + // with a cap. Follows the is_retryable pattern from llm/retry.rs. + let is_retryable = matches!(&e, WorkerError::ConnectionFailed { .. }); + if !is_retryable { + let msg = format!("Prompt polling failed (permanent): {e}"); + tracing::error!(job_id = %job_id, "{}", msg); + return Err(WorkerError::ExecutionFailed { reason: msg }); + } + consecutive_poll_errors += 1; + if consecutive_poll_errors >= MAX_CONSECUTIVE_POLL_ERRORS { + let msg = format!( + "Prompt polling exhausted {} retries: {e}", + MAX_CONSECUTIVE_POLL_ERRORS, + ); + tracing::error!(job_id = %job_id, "{}", msg); + return Err(WorkerError::ExecutionFailed { reason: msg }); + } + tracing::warn!( + job_id = %job_id, + attempt = consecutive_poll_errors, + "Transient poll error: {}", e, + ); + Duration::from_secs(5) + } + }; + + tokio::select! { + _ = tokio::time::sleep(backoff) => {} + exit_code = &mut child_exit_rx => { + let code = exit_code.ok().flatten(); + tracing::debug!(job_id = %job_id, exit_code = ?code, "ACP agent exited, ending follow-up loop"); + break; + } + } + } + Ok(()) +} + // ==================== Event translation ==================== /// Convert an ACP `SessionUpdate` into an IronClaw `JobEventPayload`. @@ -469,8 +580,12 @@ fn text_from_content_block(block: &acp::ContentBlock) -> Option<&str> { } } -/// Convert an ACP `StopReason` into a "result" `JobEventPayload`. -fn stop_reason_to_result(reason: &acp::StopReason, session_id: &str) -> JobEventPayload { +/// Convert an ACP `StopReason` into a non-terminal per-turn event. +/// +/// Uses `event_type: "turn_result"` so the orchestrator maps it to +/// `AppEvent::JobStatus` (not `JobResult`). The terminal `"result"` event +/// is emitted exactly once by `run()` after the entire session ends. +fn stop_reason_to_turn_event(reason: &acp::StopReason, session_id: &str) -> JobEventPayload { let (status, message) = match reason { acp::StopReason::EndTurn => ("completed", "Agent completed successfully"), acp::StopReason::MaxTokens => ("error", "Agent reached max tokens"), @@ -480,7 +595,7 @@ fn stop_reason_to_result(reason: &acp::StopReason, session_id: &str) -> JobEvent _ => ("completed", "Agent finished"), }; JobEventPayload { - event_type: "result".to_string(), + event_type: "turn_result".to_string(), data: json!({ "status": status, "session_id": session_id, @@ -537,26 +652,26 @@ mod tests { #[test] fn test_stop_reason_end_turn() { - let payload = stop_reason_to_result(&acp::StopReason::EndTurn, "sid-1"); - assert_eq!(payload.event_type, "result"); + let payload = stop_reason_to_turn_event(&acp::StopReason::EndTurn, "sid-1"); + assert_eq!(payload.event_type, "turn_result"); assert_eq!(payload.data["status"], "completed"); } #[test] fn test_stop_reason_max_tokens() { - let payload = stop_reason_to_result(&acp::StopReason::MaxTokens, "sid-1"); + let payload = stop_reason_to_turn_event(&acp::StopReason::MaxTokens, "sid-1"); assert_eq!(payload.data["status"], "error"); } #[test] fn test_stop_reason_cancelled() { - let payload = stop_reason_to_result(&acp::StopReason::Cancelled, "sid-1"); + let payload = stop_reason_to_turn_event(&acp::StopReason::Cancelled, "sid-1"); assert_eq!(payload.data["status"], "cancelled"); } #[test] fn test_stop_reason_refusal() { - let payload = stop_reason_to_result(&acp::StopReason::Refusal, "sid-1"); + let payload = stop_reason_to_turn_event(&acp::StopReason::Refusal, "sid-1"); assert_eq!(payload.data["status"], "error"); assert_eq!(payload.data["message"], "Agent refused to continue"); } @@ -589,14 +704,14 @@ mod tests { #[test] fn test_stop_reason_max_turn_requests() { - let payload = stop_reason_to_result(&acp::StopReason::MaxTurnRequests, "sid-1"); + let payload = stop_reason_to_turn_event(&acp::StopReason::MaxTurnRequests, "sid-1"); assert_eq!(payload.data["status"], "error"); assert_eq!(payload.data["message"], "Agent reached max turn requests"); } #[test] fn test_stop_reason_includes_session_id() { - let payload = stop_reason_to_result(&acp::StopReason::EndTurn, "my-session-42"); + let payload = stop_reason_to_turn_event(&acp::StopReason::EndTurn, "my-session-42"); assert_eq!(payload.data["session_id"], "my-session-42"); } @@ -626,4 +741,386 @@ mod tests { assert_eq!(truncate(s, 3), "caf"); // doesn't split the é assert_eq!(truncate(s, 5), "café"); // includes full char } + + // ==================== Follow-up loop stubs & tests ==================== + + use crate::worker::api::PromptResponse; + use std::sync::Mutex; + + /// Stub event sink that collects emitted events for assertion. + struct CollectingSink { + events: Mutex>, + } + + impl CollectingSink { + fn new() -> Self { + Self { + events: Mutex::new(Vec::new()), + } + } + + fn events(&self) -> Vec { + self.events.lock().unwrap().clone() + } + } + + impl AcpEventSink for CollectingSink { + async fn emit_event(&self, payload: &JobEventPayload) { + self.events.lock().unwrap().push(payload.clone()); + } + } + + /// Stub prompt source that yields a sequence of responses then returns None. + struct StubPromptSource { + responses: Mutex, WorkerError>>>, + } + + impl StubPromptSource { + fn new(mut responses: Vec, WorkerError>>) -> Self { + responses.reverse(); // so we can pop (FIFO) + Self { + responses: Mutex::new(responses), + } + } + } + + impl FollowUpPromptSource for StubPromptSource { + async fn poll_prompt(&self) -> Result, WorkerError> { + self.responses.lock().unwrap().pop().unwrap_or(Ok(None)) + } + } + + /// Stub ACP prompt sender that returns pre-configured results. + struct StubAcpPromptSender { + results: Mutex>>, + } + + impl StubAcpPromptSender { + fn new(mut results: Vec>) -> Self { + results.reverse(); // so we can pop (FIFO) + Self { + results: Mutex::new(results), + } + } + } + + impl AcpPromptSender for StubAcpPromptSender { + async fn send_prompt( + &self, + _session_id: acp::SessionId, + _content: String, + ) -> acp::Result { + self.results + .lock() + .unwrap() + .pop() + .unwrap_or(Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))) + } + } + + /// Regression test for #1915: follow-up prompt failure must return Err + /// so that run() reports success: false. + #[tokio::test] + async fn follow_up_prompt_failure_returns_error() { + let sink = CollectingSink::new(); + + let prompt_source = StubPromptSource::new(vec![Ok(Some(PromptResponse { + content: "follow up".to_string(), + done: false, + }))]); + + let agent = StubAcpPromptSender::new(vec![Err(acp::Error::internal_error())]); + + let (_tx, rx) = tokio::sync::oneshot::channel(); + let session_id = acp::SessionId::new("test-session"); + + let result = + run_follow_up_loop(&prompt_source, &agent, &sink, &session_id, rx, Uuid::nil()).await; + + assert!( + result.is_err(), + "Follow-up prompt failure must propagate as Err" + ); + let err = result.unwrap_err(); + assert!( + matches!(err, WorkerError::ExecutionFailed { .. }), + "Error must be ExecutionFailed, got: {err}" + ); + + // No per-turn events should be emitted on failure — the terminal + // "result" event is emitted by run(), not the follow-up loop. + let events = sink.events(); + assert!( + events.is_empty(), + "Follow-up loop should not emit events on failure (terminal event comes from run())" + ); + } + + /// Verify that a successful follow-up followed by orchestrator "done" + /// signal returns Ok. + #[tokio::test] + async fn follow_up_prompt_success_then_done_returns_ok() { + let sink = CollectingSink::new(); + + let prompt_source = StubPromptSource::new(vec![ + Ok(Some(PromptResponse { + content: "do more work".to_string(), + done: false, + })), + Ok(Some(PromptResponse { + content: String::new(), + done: true, + })), + ]); + + let agent = + StubAcpPromptSender::new(vec![Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))]); + + let (_tx, rx) = tokio::sync::oneshot::channel(); + let session_id = acp::SessionId::new("test-session"); + + let result = + run_follow_up_loop(&prompt_source, &agent, &sink, &session_id, rx, Uuid::nil()).await; + + assert!( + result.is_ok(), + "Successful follow-up then done should be Ok" + ); + + let events = sink.events(); + assert!( + events.iter().any(|e| e.event_type == "turn_result"), + "Should emit turn_result (non-terminal) event for successful prompt" + ); + assert!( + !events.iter().any(|e| e.event_type == "result"), + "Must NOT emit terminal result event (that comes from run())" + ); + } + + /// Stub that blocks forever on poll_prompt, simulating a long-poll HTTP + /// request that never returns. Only cancellation (via select!) can end this. + struct ForeverPromptSource; + + impl FollowUpPromptSource for ForeverPromptSource { + async fn poll_prompt(&self) -> Result, WorkerError> { + std::future::pending::<()>().await; + unreachable!() + } + } + + /// Regression test: child exit must be detected even when poll_prompt() + /// is blocked on a long-poll HTTP request. Without the select! fix around + /// poll_prompt, this test times out because the loop never checks + /// child_exit_rx during active polling. + #[tokio::test] + async fn follow_up_loop_exits_during_long_poll_when_child_dies() { + let sink = CollectingSink::new(); + let prompt_source = ForeverPromptSource; + let agent = StubAcpPromptSender::new(vec![]); + + let (tx, rx) = tokio::sync::oneshot::channel(); + let session_id = acp::SessionId::new("test-session"); + + // Simulate child exit after a short delay + tokio::spawn(async move { + tokio::time::sleep(Duration::from_millis(50)).await; + let _ = tx.send(Some(0)); + }); + + let result = tokio::time::timeout( + Duration::from_secs(5), + run_follow_up_loop(&prompt_source, &agent, &sink, &session_id, rx, Uuid::nil()), + ) + .await; + + assert!( + result.is_ok(), + "Loop should exit promptly when child dies during long poll, not timeout" + ); + assert!( + result.unwrap().is_ok(), + "Child exit during poll should be a clean exit (Ok), not an error" + ); + } + + /// Regression test for #1981 review feedback: when the ACP protocol fails + /// while the subprocess is still alive, the kill channel must terminate the + /// child so the stderr reader hits EOF and cleanup completes. Without the + /// kill mechanism, stderr_handle.await blocks forever and the job hangs. + #[tokio::test] + async fn child_monitor_kills_process_so_stderr_reader_completes() { + let mut child = tokio::process::Command::new("sleep") + .arg("30") + .stderr(std::process::Stdio::piped()) + .kill_on_drop(true) + .spawn() + .expect("spawn sleep"); + + let child_stderr = child.stderr.take().unwrap(); + + // Use the actual production function + let (child_exit_rx, kill_tx) = spawn_child_monitor(child); + + // Stderr reader — same pattern as production (lines 176-187) + let stderr_handle = tokio::spawn(async move { + let reader = BufReader::new(child_stderr); + let mut lines = reader.lines(); + while let Ok(Some(_)) = lines.next_line().await {} + }); + + // Simulate protocol failure → send kill signal + let _ = kill_tx.send(()); + + // Both must complete; would hang forever without the kill channel + let result = tokio::time::timeout(Duration::from_secs(5), async { + let _ = child_exit_rx.await; + let _ = stderr_handle.await; + }) + .await; + + assert!( + result.is_ok(), + "kill signal should terminate child and unblock stderr reader" + ); + } + + /// Verify the loop recovers from a transient polling error and + /// processes the next prompt successfully. + #[tokio::test(start_paused = true)] + async fn follow_up_loop_recovers_from_transient_poll_error() { + let sink = CollectingSink::new(); + + // First poll returns an error, second returns a real prompt, third signals done. + let prompt_source = StubPromptSource::new(vec![ + Err(WorkerError::ConnectionFailed { + url: "http://test".to_string(), + reason: "transient".to_string(), + }), + Ok(Some(PromptResponse { + content: "do work".to_string(), + done: false, + })), + Ok(Some(PromptResponse { + content: String::new(), + done: true, + })), + ]); + + let agent = + StubAcpPromptSender::new(vec![Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))]); + + let (_tx, rx) = tokio::sync::oneshot::channel(); + let session_id = acp::SessionId::new("test-session"); + + let result = + run_follow_up_loop(&prompt_source, &agent, &sink, &session_id, rx, Uuid::nil()).await; + + assert!( + result.is_ok(), + "Loop should recover from transient poll error" + ); + + let events = sink.events(); + assert!( + events.iter().any(|e| e.event_type == "turn_result"), + "Should emit turn_result event after recovery" + ); + } + + /// Regression test for #1981: successful follow-up must emit "turn_result" + /// (non-terminal), not "result" (terminal). Terminal events are emitted + /// by run() so the job monitor sees exactly one completion signal. + #[tokio::test] + async fn follow_up_success_emits_turn_event_not_terminal() { + let sink = CollectingSink::new(); + + let prompt_source = StubPromptSource::new(vec![ + Ok(Some(PromptResponse { + content: "do work".to_string(), + done: false, + })), + Ok(Some(PromptResponse { + content: String::new(), + done: true, + })), + ]); + + let agent = + StubAcpPromptSender::new(vec![Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))]); + + let (_tx, rx) = tokio::sync::oneshot::channel(); + let session_id = acp::SessionId::new("test-session"); + + let result = + run_follow_up_loop(&prompt_source, &agent, &sink, &session_id, rx, Uuid::nil()).await; + assert!(result.is_ok()); + + let events = sink.events(); + assert_eq!(events.len(), 1, "Exactly one per-turn event expected"); + assert_eq!(events[0].event_type, "turn_result"); + assert_eq!(events[0].data["status"], "completed"); + } + + /// Regression test for #1981: permanent poll errors (OrchestratorRejected, + /// LlmProxyFailed) must fail immediately without retry. + #[tokio::test] + async fn follow_up_loop_fails_on_permanent_poll_error() { + let sink = CollectingSink::new(); + + let prompt_source = StubPromptSource::new(vec![Err(WorkerError::OrchestratorRejected { + job_id: Uuid::nil(), + reason: "prompt endpoint returned 404 Not Found".to_string(), + })]); + + let agent = StubAcpPromptSender::new(vec![]); + + let (_tx, rx) = tokio::sync::oneshot::channel(); + let session_id = acp::SessionId::new("test-session"); + + let result = + run_follow_up_loop(&prompt_source, &agent, &sink, &session_id, rx, Uuid::nil()).await; + + assert!(result.is_err(), "Permanent error must fail immediately"); + let err = result.unwrap_err(); + let msg = format!("{err}"); + assert!( + msg.contains("permanent"), + "Error message should indicate permanent failure, got: {msg}" + ); + } + + /// Regression test for #1981: repeated transient poll errors must eventually + /// give up after MAX_CONSECUTIVE_POLL_ERRORS retries. + #[tokio::test(start_paused = true)] + async fn follow_up_loop_exhausts_transient_retries() { + let sink = CollectingSink::new(); + + // Generate more errors than the retry cap + let errors: Vec, WorkerError>> = (0 + ..MAX_CONSECUTIVE_POLL_ERRORS + 1) + .map(|_| { + Err(WorkerError::ConnectionFailed { + url: "http://test".to_string(), + reason: "connection refused".to_string(), + }) + }) + .collect(); + let prompt_source = StubPromptSource::new(errors); + + let agent = StubAcpPromptSender::new(vec![]); + + let (_tx, rx) = tokio::sync::oneshot::channel(); + let session_id = acp::SessionId::new("test-session"); + + let result = + run_follow_up_loop(&prompt_source, &agent, &sink, &session_id, rx, Uuid::nil()).await; + + assert!(result.is_err(), "Should fail after exhausting retries"); + let msg = format!("{}", result.unwrap_err()); + assert!( + msg.contains("exhausted"), + "Error should mention retry exhaustion, got: {msg}" + ); + } } diff --git a/src/worker/api.rs b/src/worker/api.rs index 43fda2dd75..9bfda52cd9 100644 --- a/src/worker/api.rs +++ b/src/worker/api.rs @@ -91,7 +91,7 @@ pub struct CompletionReport { } /// Payload sent to the orchestrator for each job event (shared by worker and Claude Code bridge). -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct JobEventPayload { pub event_type: String, pub data: serde_json::Value,