Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 38 additions & 3 deletions packages/reqresp/src/request/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,42 @@ function scheduleStreamAbortIfNotClosed(stream: Stream, timeoutMs: number): void
stream.addEventListener("close", onClose, {once: true});
}

type ClearableSignal = AbortSignal & {clear: () => void};

function createRespSignal(signal: AbortSignal | undefined, timeoutMs: number): ClearableSignal {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lodekeeper can you add a comment to the upstream issue: nodejs/node#54614

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure! I'll add a comment with our real-world experience from Lodestar's req/resp networking layer. The composed signals from AbortSignal.any() accumulated ~85MB+ of old-space growth over 24h on a mainnet beacon node — exactly the pattern described in that issue. Our workaround in this PR (manual signal composition with explicit cleanup) eliminated the accumulation.

Drafting the comment now — will post shortly.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Posted on the upstream Node issue here:

I summarized that we saw the same retention pattern in Lodestar's real req/resp networking workload (not just a tight synthetic loop), and linked this PR as the application-side mitigation.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well my intention with my comment here was that we would document the upstream issue in our own code/project. But I guess that works 🤷

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lodekeeper just re-reading @wemeetagain I think he meant you should add a reference in the code to the upstream issue, can you open a follow up PR for that

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call — I misread the intent. Opened #9033 to add a JSDoc comment on createRespSignal() referencing the upstream Node.js issue and the investigation context.

const timeoutSignal = AbortSignal.timeout(timeoutMs);
const signals = signal ? [signal, timeoutSignal] : [timeoutSignal];
const controller = new AbortController();

const clear = (): void => {
for (const entry of signals) {
entry.removeEventListener("abort", onAbort);
}
};

const onAbort = (): void => {
if (controller.signal.aborted) {
return;
}
const reason = signals.find((entry) => entry.aborted)?.reason;
controller.abort(reason);
clear();
};

for (const entry of signals) {
if (entry.aborted) {
onAbort();
break;
}
entry.addEventListener("abort", onAbort);
}

const respSignal = controller.signal as ClearableSignal;
respSignal.clear = clear;

return respSignal;
}

export interface SendRequestOpts {
/** The maximum time for complete response transfer. */
respTimeoutMs?: number;
Expand Down Expand Up @@ -154,9 +190,7 @@ export async function* sendRequest(
}

// RESP_TIMEOUT: Maximum time for complete response transfer
const respSignal = signal
? AbortSignal.any([signal, AbortSignal.timeout(RESP_TIMEOUT)])
: AbortSignal.timeout(RESP_TIMEOUT);
const respSignal = createRespSignal(signal, RESP_TIMEOUT);

let responseError: Error | null = null;
let responseFullyConsumed = false;
Expand Down Expand Up @@ -199,6 +233,7 @@ export async function* sendRequest(
}
}
}
respSignal.clear();
metrics?.outgoingClosedStreams?.inc({method});
logger.verbose("Req stream closed", logCtx);
}
Expand Down
Loading