Conversation
tonyxiao
left a comment
There was a problem hiding this comment.
Good direction — the teardown chain through split() → channel() → merge() is the right fix for the unbounded-buffer leak. Two issues need fixing before merge though: one silently breaks the core mechanism, the other regresses iterator cleanup on error paths.
Verified locally: existing tests all pass (the leak fix is a no-op rather than a regression, so CI stays green), but the stated goal of propagating return() through the pipeline is not actually achieved.
| @@ -51,6 +64,9 @@ export function channel<T>(): AsyncIterable<T> & { | |||
| r({ value: undefined as any, done: true }) | |||
| } | |||
| }, | |||
There was a problem hiding this comment.
Bug (blocks the leak fix): Object.assign() copies the value of the onReturn setter, not the setter descriptor. So when split() later does matches.onReturn = abort, it writes a plain property on the object instead of calling this setter — the closed-over onReturn variable that return() actually invokes is never updated.
Verified with a minimal Node repro:
let captured;
const o = Object.assign({}, { set onReturn(fn) { captured = fn } });
o.onReturn = () => {};
console.log(typeof captured); // "undefined" — setter was lostFix: define onReturn directly on iter (before Object.assign), or use Object.defineProperty to preserve the setter descriptor. For example:
Object.defineProperty(result, 'onReturn', {
set(fn) { onReturn = fn },
get() { return onReturn },
configurable: true,
});| if (result.done) break | ||
| controller.enqueue(enc.encode(JSON.stringify(result.value) + '\n')) | ||
| } | ||
| controller.close() |
There was a problem hiding this comment.
Cleanup regression: Switching from for await to a manual iterator.next() loop means that if JSON.stringify(), controller.enqueue(), or any expression inside the loop body throws, iterator.return() is never called — the catch block calls controller.error(err) but not iterator.return(). With the previous for await, the spec guarantees IteratorClose on any abrupt exit.
The new cancel() handler only fires on explicit stream cancellation, not on errors inside start().
Fix: add iterator.return?.() in the catch block (or a finally), or keep for await and add just the cancel() handler.
Same issue applies to packages/ts-cli/src/ndjson.ts.
| rest.close() | ||
| sourceIterator.return?.() | ||
| } | ||
| matches.onReturn = abort |
There was a problem hiding this comment.
Missing test coverage: There is no test asserting that calling return() on either branch of split() invokes sourceIterator.return() and closes the sibling. That's exactly how the Object.assign setter bug above slipped through — the existing split tests only check data routing and error-channel closing, not upstream teardown.
Suggested test:
it('propagates return() to source when consumer breaks early', async () => {
let sourceReturned = false;
async function* source() {
try { yield 1; yield 2; yield 3; }
finally { sourceReturned = true; }
}
const isEven = (n: number): n is number => n % 2 === 0;
const [evens, odds] = split(source(), isEven);
const it = odds[Symbol.asyncIterator]();
await it.next(); // pull one item
await it.return!(); // early exit
expect(sourceReturned).toBe(true);
});This test would fail today, proving the setter bug is real.
Summary
The engine process leaked memory unboundedly (258MB → 4GB+) during sustained sync operations. The root cause was a missing teardown chain across the async iterable pipeline: when takeLimits ended a 60-second sync window, nothing told the upstream split/channel/source generators to stop. They kept running in the background, accumulating records in unbounded pending arrays that were never GC'd.
How to test (optional)
Docker compose with a live Stripe account — engine RSS should stay stable (~300-400MB) across successive 60s sync windows instead of growing unboundedly