Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,10 @@ function asAbsoluteCwd(cwd: string): string {
}

function basenameToken(value: string): string {
return path.basename(value).toLowerCase();
return path
.basename(value)
.toLowerCase()
.replace(/\.(cmd|exe|bat)$/u, "");
}

function isGeminiAcpCommand(command: string, args: readonly string[]): boolean {
Expand Down
11 changes: 11 additions & 0 deletions src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,17 @@ export class GeminiAcpStartupTimeoutError extends AcpxOperationalError {
}
}

export class SessionModeReplayError extends AcpxOperationalError {
constructor(message: string, options?: AcpxErrorOptions) {
super(message, {
outputCode: "RUNTIME",
detailCode: "SESSION_MODE_REPLAY_FAILED",
origin: "acp",
...options,
});
}
}

export class ClaudeAcpSessionCreateTimeoutError extends AcpxOperationalError {
constructor(message: string, options?: AcpxErrorOptions) {
super(message, {
Expand Down
13 changes: 0 additions & 13 deletions src/session-runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -985,19 +985,6 @@ export async function runSessionQueueOwner(options: QueueOwnerRuntimeOptions): P
}
await releaseQueueOwnerLease(lease);

// Auto-close the session when the queue owner shuts down and the agent
// has exited, preventing zombie session accumulation (#47).
try {
const record = await resolveSessionRecord(options.sessionId);
if (!record.closed && record.lastAgentExitAt) {
record.closed = true;
record.closedAt = isoNow();
await writeSessionRecord(record);
}
} catch {
// best effort — session may already be cleaned up
}

if (options.verbose) {
process.stderr.write(`[acpx] queue owner stopped for session ${options.sessionId}\n`);
}
Expand Down
27 changes: 20 additions & 7 deletions src/session-runtime/connect-load.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
isAcpQueryClosedBeforeResponseError,
isAcpResourceNotFoundError,
} from "../error-normalization.js";
import { SessionModeReplayError } from "../errors.js";
import { incrementPerfCounter } from "../perf-metrics.js";
import { isProcessAlive } from "../queue-ipc.js";
import type { QueueOwnerActiveSessionController } from "../queue-owner-turn-controller.js";
Expand Down Expand Up @@ -66,6 +67,7 @@ export async function connectAndLoadSession(
const record = options.record;
const client = options.client;
const originalSessionId = record.acpSessionId;
const originalAgentSessionId = record.agentSessionId;
const desiredModeId = getDesiredModeId(record.acpx);
const storedProcessAlive = isProcessAlive(record.pid);
const shouldReconnect = Boolean(record.pid) && !storedProcessAlive;
Expand Down Expand Up @@ -98,6 +100,7 @@ export async function connectAndLoadSession(
let loadError: string | undefined;
let sessionId = record.acpSessionId;
let createdFreshSession = false;
let pendingAgentSessionId = record.agentSessionId;

if (reusingLoadedSession) {
resumed = true;
Expand All @@ -119,15 +122,13 @@ export async function connectAndLoadSession(
const createdSession = await withTimeout(client.createSession(record.cwd), options.timeoutMs);
sessionId = createdSession.sessionId;
createdFreshSession = true;
record.acpSessionId = sessionId;
reconcileAgentSessionId(record, createdSession.agentSessionId);
pendingAgentSessionId = createdSession.agentSessionId;
}
} else {
const createdSession = await withTimeout(client.createSession(record.cwd), options.timeoutMs);
sessionId = createdSession.sessionId;
createdFreshSession = true;
record.acpSessionId = sessionId;
reconcileAgentSessionId(record, createdSession.agentSessionId);
pendingAgentSessionId = createdSession.agentSessionId;
}

if (createdFreshSession && desiredModeId) {
Expand All @@ -139,14 +140,26 @@ export async function connectAndLoadSession(
);
}
} catch (error) {
const message =
`Failed to replay saved session mode ${desiredModeId} on fresh ACP session ${sessionId}: ` +
formatErrorMessage(error);
record.acpSessionId = originalSessionId;
record.agentSessionId = originalAgentSessionId;
if (options.verbose) {
process.stderr.write(
`[acpx] failed to replay desired mode ${desiredModeId} on fresh ACP session ${sessionId}: ${formatErrorMessage(error)}\n`,
);
process.stderr.write(`[acpx] ${message}\n`);
}
throw new SessionModeReplayError(message, {
cause: error instanceof Error ? error : undefined,
retryable: true,
});
}
}

if (createdFreshSession) {
record.acpSessionId = sessionId;
reconcileAgentSessionId(record, pendingAgentSessionId);
}

options.onSessionIdResolved?.(sessionId);

return {
Expand Down
62 changes: 62 additions & 0 deletions test/cli.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ const MOCK_AGENT_WITH_DISTINCT_CREATE_AND_LOAD_RUNTIME_SESSION_IDS =
"--supports-load-session --load-runtime-session-id resumed-runtime-session";
const MOCK_AGENT_WITH_LOAD_FALLBACK = `${MOCK_AGENT_COMMAND} --supports-load-session --load-session-fails-on-empty`;
const MOCK_AGENT_WITH_LOAD_SESSION_NOT_FOUND = `${MOCK_AGENT_COMMAND} --supports-load-session --load-session-not-found`;
const MOCK_AGENT_WITH_LOAD_FALLBACK_AND_MODE_FAILURE = `${MOCK_AGENT_COMMAND} --supports-load-session --load-session-fails-on-empty --set-session-mode-fails`;

type CliRunResult = {
code: number | null;
Expand Down Expand Up @@ -761,6 +762,67 @@ test("set-mode persists across load fallback and replays on fresh ACP sessions",
});
});

test("set-mode load fallback failure does not persist the fresh session id to disk", async () => {
await withTempHome(async (homeDir) => {
const cwd = path.join(homeDir, "workspace");
await fs.mkdir(cwd, { recursive: true });
await fs.mkdir(path.join(homeDir, ".acpx"), { recursive: true });
await fs.writeFile(
path.join(homeDir, ".acpx", "config.json"),
`${JSON.stringify(
{
agents: {
codex: {
command: MOCK_AGENT_WITH_LOAD_FALLBACK_AND_MODE_FAILURE,
},
},
},
null,
2,
)}\n`,
"utf8",
);

const sessionId = "mode-replay-session";
await writeSessionRecord(homeDir, {
acpxRecordId: sessionId,
acpSessionId: sessionId,
agentCommand: MOCK_AGENT_WITH_LOAD_FALLBACK_AND_MODE_FAILURE,
cwd,
createdAt: "2026-01-01T00:00:00.000Z",
lastUsedAt: "2026-01-01T00:00:00.000Z",
closed: false,
acpx: {
desired_mode_id: "plan",
},
});

const result = await runCli(
["--cwd", cwd, "--format", "json", "codex", "set-mode", "plan"],
homeDir,
);
assert.equal(result.code, 1, result.stderr);
const error = parseSingleAcpErrorLine(result.stdout);
assert.equal(error.data?.acpxCode, "RUNTIME");
assert.equal(error.data?.detailCode, "SESSION_MODE_REPLAY_FAILED");

const storedRecordPath = path.join(
homeDir,
".acpx",
"sessions",
`${encodeURIComponent(sessionId)}.json`,
);
const storedRecord = JSON.parse(await fs.readFile(storedRecordPath, "utf8")) as {
acp_session_id?: string;
acpx?: {
desired_mode_id?: string;
};
};
assert.equal(storedRecord.acp_session_id, sessionId);
assert.equal(storedRecord.acpx?.desired_mode_id, "plan");
});
});

test("--ttl flag is parsed for sessions commands", async () => {
await withTempHome(async (homeDir) => {
const ok = await runCli(["--ttl", "30", "--format", "json", "sessions"], homeDir);
Expand Down
65 changes: 65 additions & 0 deletions test/connect-load.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type FakeClient = {
options: { suppressReplayUpdates: boolean },
) => Promise<{ agentSessionId?: string }>;
createSession: (cwd: string) => Promise<{ sessionId: string; agentSessionId?: string }>;
setSessionMode: (sessionId: string, modeId: string) => Promise<void>;
};

const ACTIVE_CONTROLLER: QueueOwnerActiveSessionController = {
Expand Down Expand Up @@ -76,6 +77,7 @@ test("connectAndLoadSession resumes an existing load-capable session", async ()
createSession: async () => {
throw new Error("createSession should not be called");
},
setSessionMode: async () => {},
};

const result = await connectAndLoadSession({
Expand Down Expand Up @@ -146,6 +148,7 @@ test("connectAndLoadSession falls back to createSession when load returns resour
agentSessionId: "new-runtime",
};
},
setSessionMode: async () => {},
};

const result = await connectAndLoadSession({
Expand Down Expand Up @@ -196,6 +199,7 @@ test("connectAndLoadSession falls back to createSession for empty sessions on ad
sessionId: "created-for-empty",
agentSessionId: "created-runtime",
}),
setSessionMode: async () => {},
};

const result = await connectAndLoadSession({
Expand Down Expand Up @@ -249,6 +253,7 @@ test("connectAndLoadSession rethrows load failures that should not create a new
createSession: async () => ({
sessionId: "unexpected",
}),
setSessionMode: async () => {},
};

await assert.rejects(
Expand All @@ -271,6 +276,65 @@ test("connectAndLoadSession rethrows load failures that should not create a new
});
});

test("connectAndLoadSession fails when desired mode replay cannot be restored on a fresh session", async () => {
await withTempHome(async (homeDir) => {
const cwd = path.join(homeDir, "workspace");
await fs.mkdir(cwd, { recursive: true });

const record = makeSessionRecord({
acpxRecordId: "mode-replay-record",
acpSessionId: "stale-session",
agentCommand: "agent",
cwd,
acpx: {
desired_mode_id: "plan",
},
});

const client: FakeClient = {
hasReusableSession: () => false,
start: async () => {},
getAgentLifecycleSnapshot: () => ({
running: true,
}),
supportsLoadSession: () => true,
loadSessionWithOptions: async () => {
throw {
error: {
code: -32002,
message: "session not found",
},
};
},
createSession: async () => ({
sessionId: "fresh-session",
agentSessionId: "fresh-runtime",
}),
setSessionMode: async (sessionId, modeId) => {
assert.equal(sessionId, "fresh-session");
assert.equal(modeId, "plan");
throw new Error("mode restore rejected");
},
};

await assert.rejects(
async () =>
await connectAndLoadSession({
client: client as never,
record,
activeController: ACTIVE_CONTROLLER,
}),
(error: unknown) => {
assert(error instanceof Error);
assert.match(error.message, /Failed to replay saved session mode plan/);
return true;
},
);
assert.equal(record.acpSessionId, "stale-session");
assert.equal(record.agentSessionId, undefined);
});
});

test("connectAndLoadSession reuses an already loaded client session", async () => {
await withTempHome(async (homeDir) => {
const cwd = path.join(homeDir, "workspace");
Expand Down Expand Up @@ -303,6 +367,7 @@ test("connectAndLoadSession reuses an already loaded client session", async () =
createSession: async () => {
throw new Error("createSession should not be called");
},
setSessionMode: async () => {},
};

const result = await connectAndLoadSession({
Expand Down
20 changes: 10 additions & 10 deletions test/integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -325,11 +325,11 @@ test("integration: timeout emits structured TIMEOUT json error", async () => {
});
});

test("integration: gemini ACP startup timeout is surfaced as actionable error", async () => {
test("integration: gemini ACP startup timeout is surfaced as actionable error for gemini.cmd too", async () => {
await withTempHome(async (homeDir) => {
const cwd = await fs.mkdtemp(path.join(os.tmpdir(), "acpx-integration-cwd-"));
const fakeBinDir = await fs.mkdtemp(path.join(os.tmpdir(), "acpx-fake-gemini-"));
const fakeGeminiPath = path.join(fakeBinDir, "gemini");
const fakeGeminiPath = path.join(fakeBinDir, "gemini.cmd");
const previousTimeout = process.env.ACPX_GEMINI_ACP_STARTUP_TIMEOUT_MS;

try {
Expand Down Expand Up @@ -1170,7 +1170,7 @@ test("integration: prompt exits after done while detached owner stays warm", asy
});
});

test("integration: session auto-closes when queue owner exits and agent has exited (#47)", async () => {
test("integration: session remains resumable after queue owner exits and agent has exited", async () => {
await withTempHome(async (homeDir) => {
const cwd = await fs.mkdtemp(path.join(os.tmpdir(), "acpx-integration-cwd-"));

Expand Down Expand Up @@ -1228,8 +1228,8 @@ test("integration: session auto-closes when queue owner exits and agent has exit
last_agent_exit_code?: number | null;
};

// 5. After the fix for #47, the queue owner auto-closes the session
// when it shuts down and the agent has exited.
// 5. Routine queue-owner shutdown must not permanently close
// a resumable persistent session.
assert.equal(
storedRecord.last_agent_exit_at != null,
true,
Expand All @@ -1238,14 +1238,14 @@ test("integration: session auto-closes when queue owner exits and agent has exit

assert.equal(
storedRecord.closed,
true,
"session should be auto-closed after agent exit and queue owner shutdown (#47)",
false,
"session should remain resumable after queue owner shutdown",
);

assert.equal(
typeof storedRecord.closed_at,
"string",
"closed_at should be set when session is auto-closed",
storedRecord.closed_at,
undefined,
"closed_at should remain unset for resumable sessions",
);
} finally {
// Clean up: close session if it's still around
Expand Down
Loading