Skip to content

Commit 91a94a1

Browse files
authored
fix(agent): classify upstream prompt failures without replay (#1782)
1 parent 40ffcc1 commit 91a94a1

3 files changed

Lines changed: 209 additions & 4 deletions

File tree

packages/agent/src/adapters/claude/conversion/sdk-to-acp.ts

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -618,6 +618,32 @@ export type ResultMessageHandlerResult = {
618618
};
619619
};
620620

621+
export type AgentErrorClassification =
622+
| "upstream_stream_terminated"
623+
| "upstream_connection_error"
624+
| "agent_error";
625+
626+
/**
627+
* Classify an error string surfaced by the Claude CLI via `is_error: true`
628+
* result messages. Transient upstream-stream terminations (e.g. the fetch body
629+
* from the LLM gateway is torn down mid-stream) are retriable; most other
630+
* errors are not.
631+
*/
632+
export function classifyAgentError(
633+
result: string | undefined,
634+
): AgentErrorClassification {
635+
if (!result) return "agent_error";
636+
const text = result.trim();
637+
// Anthropic SDK surfaces an undici fetch abort as "API Error: terminated".
638+
if (/API Error:\s*terminated\b/i.test(text)) {
639+
return "upstream_stream_terminated";
640+
}
641+
if (/API Error:\s*Connection error\b/i.test(text)) {
642+
return "upstream_connection_error";
643+
}
644+
return "agent_error";
645+
}
646+
621647
export function handleResultMessage(
622648
message: SDKResultMessage,
623649
): ResultMessageHandlerResult {
@@ -636,9 +662,13 @@ export function handleResultMessage(
636662
return { shouldStop: true, stopReason: "max_tokens", usage };
637663
}
638664
if (message.is_error) {
665+
const classification = classifyAgentError(message.result);
639666
return {
640667
shouldStop: true,
641-
error: RequestError.internalError(undefined, message.result),
668+
error: RequestError.internalError(
669+
{ classification, result: message.result },
670+
message.result,
671+
),
642672
usage,
643673
};
644674
}

packages/agent/src/server/agent-server.ts

Lines changed: 54 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,17 @@ import {
1414
import { type ServerType, serve } from "@hono/node-server";
1515
import { getCurrentBranch } from "@posthog/git/queries";
1616
import { Hono } from "hono";
17+
import { z } from "zod";
1718
import packageJson from "../../package.json" with { type: "json" };
1819
import { POSTHOG_METHODS, POSTHOG_NOTIFICATIONS } from "../acp-extensions";
1920
import {
2021
createAcpConnection,
2122
type InProcessAcpConnection,
2223
} from "../adapters/acp-connection";
24+
import {
25+
type AgentErrorClassification,
26+
classifyAgentError,
27+
} from "../adapters/claude/conversion/sdk-to-acp";
2328
import { selectRecentTurns } from "../adapters/claude/session/jsonl-hydration";
2429
import type { PermissionMode } from "../execution-mode";
2530
import { DEFAULT_CODEX_MODEL } from "../gateway-models";
@@ -51,6 +56,16 @@ import { type JwtPayload, JwtValidationError, validateJwt } from "./jwt";
5156
import { jsonRpcRequestSchema, validateCommandParams } from "./schemas";
5257
import type { AgentServerConfig } from "./types";
5358

59+
const agentErrorClassificationSchema = z.enum([
60+
"upstream_stream_terminated",
61+
"upstream_connection_error",
62+
"agent_error",
63+
]) satisfies z.ZodType<AgentErrorClassification>;
64+
65+
const errorWithClassificationSchema = z.object({
66+
data: z.object({ classification: agentErrorClassificationSchema }),
67+
});
68+
5469
type MessageCallback = (message: unknown) => void;
5570

5671
class NdJsonTap {
@@ -973,6 +988,41 @@ export class AgentServer {
973988
await this.sendInitialTaskMessage(payload, preTaskRun);
974989
}
975990

991+
private extractErrorClassification(error: unknown): {
992+
classification: AgentErrorClassification;
993+
message: string;
994+
} {
995+
const message =
996+
error instanceof Error ? error.message : String(error ?? "");
997+
998+
// Prefer the structured `data` carried on RequestError if present.
999+
const parsed = errorWithClassificationSchema.safeParse(error);
1000+
if (parsed.success) {
1001+
return { classification: parsed.data.data.classification, message };
1002+
}
1003+
1004+
return { classification: classifyAgentError(message), message };
1005+
}
1006+
1007+
private classifyAndSignalFailure(
1008+
payload: JwtPayload,
1009+
phase: "initial" | "resume",
1010+
error: unknown,
1011+
): Promise<void> {
1012+
const { classification, message } = this.extractErrorClassification(error);
1013+
const errorMessage =
1014+
classification === "upstream_stream_terminated"
1015+
? "Upstream LLM stream terminated"
1016+
: classification === "upstream_connection_error"
1017+
? "Upstream LLM connection error"
1018+
: message || "Agent error";
1019+
this.logger.error(`send_${phase}_task_message_failed`, {
1020+
classification,
1021+
message,
1022+
});
1023+
return this.signalTaskComplete(payload, "error", errorMessage);
1024+
}
1025+
9761026
private async sendInitialTaskMessage(
9771027
payload: JwtPayload,
9781028
prefetchedRun?: TaskRun | null,
@@ -1087,7 +1137,7 @@ export class AgentServer {
10871137
if (this.session) {
10881138
await this.session.logWriter.flushAll();
10891139
}
1090-
await this.signalTaskComplete(payload, "error");
1140+
await this.classifyAndSignalFailure(payload, "initial", error);
10911141
}
10921142
}
10931143

@@ -1176,7 +1226,7 @@ export class AgentServer {
11761226
if (this.session) {
11771227
await this.session.logWriter.flushAll();
11781228
}
1179-
await this.signalTaskComplete(payload, "error");
1229+
await this.classifyAndSignalFailure(payload, "resume", error);
11801230
}
11811231
}
11821232

@@ -1657,6 +1707,7 @@ ${attributionInstructions}
16571707
private async signalTaskComplete(
16581708
payload: JwtPayload,
16591709
stopReason: string,
1710+
errorMessage?: string,
16601711
): Promise<void> {
16611712
if (this.session?.payload.run_id === payload.run_id) {
16621713
try {
@@ -1684,7 +1735,7 @@ ${attributionInstructions}
16841735
try {
16851736
await this.posthogAPI.updateTaskRun(payload.task_id, payload.run_id, {
16861737
status,
1687-
error_message: stopReason === "error" ? "Agent error" : undefined,
1738+
error_message: errorMessage ?? "Agent error",
16881739
});
16891740
this.logger.info("Task completion signaled", { status, stopReason });
16901741
} catch (error) {

packages/agent/src/server/question-relay.test.ts

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { type SetupServerApi, setupServer } from "msw/node";
22
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
3+
import { classifyAgentError } from "../adapters/claude/conversion/sdk-to-acp";
34
import type { PostHogAPIClient } from "../posthog-api";
45
import { createTestRepo, type TestRepo } from "../test/fixtures/api";
56
import { createPostHogHandlers } from "../test/mocks/msw-handlers";
@@ -49,7 +50,42 @@ const QUESTION_META = {
4950
],
5051
};
5152

53+
function createTransientPromptError(): Error & {
54+
data: { classification: string; result: string };
55+
} {
56+
const error = new Error("API Error: terminated") as Error & {
57+
data: { classification: string; result: string };
58+
};
59+
error.data = {
60+
classification: "upstream_stream_terminated",
61+
result: "API Error: terminated",
62+
};
63+
return error;
64+
}
65+
66+
function createTransientConnectionError(): Error & {
67+
data: { classification: string; result: string };
68+
} {
69+
const error = new Error("fetch failed") as Error & {
70+
data: { classification: string; result: string };
71+
};
72+
error.data = {
73+
classification: "upstream_connection_error",
74+
result: "fetch failed",
75+
};
76+
return error;
77+
}
78+
5279
describe("Question relay", () => {
80+
it.each([
81+
["API Error: terminated", "upstream_stream_terminated"],
82+
["API Error: Connection error", "upstream_connection_error"],
83+
["something else", "agent_error"],
84+
[undefined, "agent_error"],
85+
])("classifies %p as %s", (message, expected) => {
86+
expect(classifyAgentError(message)).toBe(expected);
87+
});
88+
5389
let repo: TestRepo;
5490
let server: TestableAgentServer;
5591
let mswServer: SetupServerApi;
@@ -514,5 +550,93 @@ describe("Question relay", () => {
514550
prompt: [{ type: "text", text: "original task description" }],
515551
});
516552
});
553+
554+
it("does not replay a transient upstream termination before any session activity", async () => {
555+
vi.spyOn(server.posthogAPI, "getTask").mockResolvedValue({
556+
id: "test-task-id",
557+
title: "t",
558+
description: "original task description",
559+
} as unknown as Task);
560+
vi.spyOn(server.posthogAPI, "getTaskRun").mockResolvedValue({
561+
id: "test-run-id",
562+
task: "test-task-id",
563+
state: {},
564+
} as unknown as TaskRun);
565+
566+
const promptSpy = vi
567+
.fn()
568+
.mockRejectedValueOnce(createTransientPromptError());
569+
const updateTaskRunSpy = vi
570+
.spyOn(server.posthogAPI, "updateTaskRun")
571+
.mockResolvedValue({} as TaskRun);
572+
server.session = {
573+
payload: TEST_PAYLOAD,
574+
acpSessionId: "acp-session",
575+
clientConnection: { prompt: promptSpy },
576+
logWriter: {
577+
flushAll: vi.fn().mockResolvedValue(undefined),
578+
getFullAgentResponse: vi.fn().mockReturnValue(null),
579+
resetTurnMessages: vi.fn(),
580+
flush: vi.fn().mockResolvedValue(undefined),
581+
isRegistered: vi.fn().mockReturnValue(true),
582+
},
583+
};
584+
585+
await server.sendInitialTaskMessage(TEST_PAYLOAD);
586+
587+
expect(promptSpy).toHaveBeenCalledTimes(1);
588+
expect(updateTaskRunSpy).toHaveBeenCalledWith(
589+
"test-task-id",
590+
"test-run-id",
591+
{
592+
status: "failed",
593+
error_message: "Upstream LLM stream terminated",
594+
},
595+
);
596+
});
597+
598+
it("surfaces upstream connection errors with the connection-specific message", async () => {
599+
vi.spyOn(server.posthogAPI, "getTask").mockResolvedValue({
600+
id: "test-task-id",
601+
title: "t",
602+
description: "original task description",
603+
} as unknown as Task);
604+
vi.spyOn(server.posthogAPI, "getTaskRun").mockResolvedValue({
605+
id: "test-run-id",
606+
task: "test-task-id",
607+
state: {},
608+
} as unknown as TaskRun);
609+
610+
const promptSpy = vi.fn().mockImplementationOnce(async () => {
611+
throw createTransientConnectionError();
612+
});
613+
const updateTaskRunSpy = vi
614+
.spyOn(server.posthogAPI, "updateTaskRun")
615+
.mockResolvedValue({} as TaskRun);
616+
server.session = {
617+
payload: TEST_PAYLOAD,
618+
acpSessionId: "acp-session",
619+
clientConnection: { prompt: promptSpy },
620+
logWriter: {
621+
flushAll: vi.fn().mockResolvedValue(undefined),
622+
getFullAgentResponse: vi.fn().mockReturnValue(null),
623+
resetTurnMessages: vi.fn(),
624+
flush: vi.fn().mockResolvedValue(undefined),
625+
isRegistered: vi.fn().mockReturnValue(true),
626+
},
627+
};
628+
629+
await server.sendInitialTaskMessage(TEST_PAYLOAD);
630+
631+
expect(promptSpy).toHaveBeenCalledTimes(1);
632+
expect(updateTaskRunSpy).toHaveBeenCalledWith(
633+
"test-task-id",
634+
"test-run-id",
635+
{
636+
status: "failed",
637+
error_message: "Upstream LLM connection error",
638+
},
639+
);
640+
});
517641
});
518642
});

0 commit comments

Comments
 (0)