diff --git a/benchmark/streams/finished.js b/benchmark/streams/finished.js new file mode 100644 index 00000000000000..5fd2820549cd91 --- /dev/null +++ b/benchmark/streams/finished.js @@ -0,0 +1,33 @@ +'use strict'; + +const common = require('../common'); +const { Readable, Writable } = require('stream'); +const { finished } = require('stream/promises'); + +const bench = common.createBenchmark(main, { + n: [1e7], + streamType: ['readable', 'writable'], +}); + +async function main({ n, streamType }) { + bench.start(); + + for (let i = 0; i < n; i++) { + let stream; + + switch (streamType) { + case 'readable': + stream = new Readable({ read() { this.push(null); } }); + stream.resume(); + break; + case 'writable': + stream = new Writable({ write(chunk, enc, cb) { cb(); } }); + stream.end(); + break; + } + + await finished(stream); + } + + bench.end(n); +} diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js index 850d4314139afd..99587ae254ddd5 100644 --- a/lib/internal/streams/end-of-stream.js +++ b/lib/internal/streams/end-of-stream.js @@ -44,6 +44,9 @@ const { kIsClosedPromise, } = require('internal/streams/utils'); +const { getHookArrays } = require('internal/async_hooks'); +const AsyncContextFrame = require('internal/async_context_frame'); + // Lazy load let AsyncResource; let addAbortListener; @@ -74,9 +77,14 @@ function eos(stream, options, callback) { validateFunction(callback, 'callback'); validateAbortSignal(options.signal, 'options.signal'); - // Avoid AsyncResource.bind() because it calls ObjectDefineProperties which - // is a bottleneck here. - callback = once(bindAsyncResource(callback, 'STREAM_END_OF_STREAM')); + if (AsyncContextFrame.current() || + getHookArrays()[0].length > 0) { + // Avoid AsyncResource.bind() because it calls ObjectDefineProperties which + // is a bottleneck here. + callback = once(bindAsyncResource(callback, 'STREAM_END_OF_STREAM')); + } else { + callback = once(callback); + } if (isReadableStream(stream) || isWritableStream(stream)) { return eosWeb(stream, options, callback); diff --git a/test/parallel/test-stream-finished-async-local-storage.js b/test/parallel/test-stream-finished-async-local-storage.js new file mode 100644 index 00000000000000..1440d2bb0ac991 --- /dev/null +++ b/test/parallel/test-stream-finished-async-local-storage.js @@ -0,0 +1,24 @@ +// Flags: --expose-internals +'use strict'; + +const common = require('../common'); +const { Readable, finished } = require('stream'); +const { AsyncLocalStorage } = require('async_hooks'); +const { strictEqual } = require('assert'); +const AsyncContextFrame = require('internal/async_context_frame'); +const internalAsyncHooks = require('internal/async_hooks'); + +// This test verifies that ALS context is preserved when using stream.finished() + +const als = new AsyncLocalStorage(); +const readable = new Readable(); + +als.run('test-context-1', () => { + finished(readable, common.mustCall(() => { + strictEqual(AsyncContextFrame.enabled || internalAsyncHooks.getHookArrays()[0].length > 0, + true, 'One of AsyncContextFrame or async hooks criteria should be met'); + strictEqual(als.getStore(), 'test-context-1', 'ALS context should be preserved'); + })); +}); + +readable.destroy(); diff --git a/test/parallel/test-stream-finished-bindAsyncResource-path.js b/test/parallel/test-stream-finished-bindAsyncResource-path.js new file mode 100644 index 00000000000000..6bec9587e3ad11 --- /dev/null +++ b/test/parallel/test-stream-finished-bindAsyncResource-path.js @@ -0,0 +1,35 @@ +// Flags: --expose-internals +'use strict'; + +const common = require('../common'); +const { Readable, finished } = require('stream'); +const { createHook, executionAsyncId } = require('async_hooks'); +const { strictEqual } = require('assert'); +const internalAsyncHooks = require('internal/async_hooks'); + +// This test verifies that when there are active async hooks, stream.finished() uses +// the bindAsyncResource path + +createHook({ + init(asyncId, type, triggerAsyncId) { + if (type === 'STREAM_END_OF_STREAM') { + const parentContext = contextMap.get(triggerAsyncId); + contextMap.set(asyncId, parentContext); + } + } +}).enable(); + +const contextMap = new Map(); +const asyncId = executionAsyncId(); +contextMap.set(asyncId, 'abc-123'); +const readable = new Readable(); + +finished(readable, common.mustCall(() => { + const currentAsyncId = executionAsyncId(); + const ctx = contextMap.get(currentAsyncId); + strictEqual(internalAsyncHooks.getHookArrays()[0].length > 0, + true, 'Should have active user async hook'); + strictEqual(ctx, 'abc-123', 'Context should be preserved'); +})); + +readable.destroy(); diff --git a/test/parallel/test-stream-finished-default-path.js b/test/parallel/test-stream-finished-default-path.js new file mode 100644 index 00000000000000..31fcd8d175a3fd --- /dev/null +++ b/test/parallel/test-stream-finished-default-path.js @@ -0,0 +1,21 @@ +// Flags: --expose-internals --no-async-context-frame +'use strict'; + +const common = require('../common'); +const { Readable, finished } = require('stream'); +const { strictEqual } = require('assert'); +const AsyncContextFrame = require('internal/async_context_frame'); +const internalAsyncHooks = require('internal/async_hooks'); + +// This test verifies that when there are no active async hooks, stream.finished() uses the default callback path + +const readable = new Readable(); + +finished(readable, common.mustCall(() => { + strictEqual(internalAsyncHooks.getHookArrays()[0].length === 0, + true, 'Should not have active user async hook'); + strictEqual(AsyncContextFrame.current() || internalAsyncHooks.getHookArrays()[0].length > 0, + false, 'Default callback path should be used'); +})); + +readable.destroy();