11import { type Readable , Transform , type TransformCallback } from 'stream' ;
22import { clearTimeout , setTimeout } from 'timers' ;
3- import { promisify } from 'util' ;
43
54import type { BSONSerializeOptions , Document , ObjectId } from '../bson' ;
65import type { AutoEncrypter } from '../client-side-encryption/auto_encrypter' ;
@@ -37,7 +36,7 @@ import {
3736 maxWireVersion ,
3837 type MongoDBNamespace ,
3938 now ,
40- promiseWithResolvers ,
39+ once ,
4140 uuidV4
4241} from '../utils' ;
4342import type { WriteConcern } from '../write_concern' ;
@@ -182,18 +181,18 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
182181 * Once connection is established, command logging can log events (if enabled)
183182 */
184183 public established : boolean ;
184+ /** Indicates that the connection (including underlying TCP socket) has been closed. */
185+ public closed = false ;
185186
186187 private lastUseTime : number ;
187188 private clusterTime : Document | null = null ;
189+ private error : Error | null = null ;
190+ private dataEvents : AsyncGenerator < Buffer , void , void > | null = null ;
188191
189192 private readonly socketTimeoutMS : number ;
190193 private readonly monitorCommands : boolean ;
191194 private readonly socket : Stream ;
192- private readonly controller : AbortController ;
193- private readonly signal : AbortSignal ;
194195 private readonly messageStream : Readable ;
195- private readonly socketWrite : ( buffer : Uint8Array ) => Promise < void > ;
196- private readonly aborted : Promise < never > ;
197196
198197 /** @event */
199198 static readonly COMMAND_STARTED = COMMAND_STARTED ;
@@ -213,6 +212,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
213212 constructor ( stream : Stream , options : ConnectionOptions ) {
214213 super ( ) ;
215214
215+ this . socket = stream ;
216216 this . id = options . id ;
217217 this . address = streamIdentifier ( stream , options ) ;
218218 this . socketTimeoutMS = options . socketTimeoutMS ?? 0 ;
@@ -225,39 +225,12 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
225225 this . generation = options . generation ;
226226 this . lastUseTime = now ( ) ;
227227
228- this . socket = stream ;
229-
230- // TODO: Remove signal from connection layer
231- this . controller = new AbortController ( ) ;
232- const { signal } = this . controller ;
233- this . signal = signal ;
234- const { promise : aborted , reject } = promiseWithResolvers < never > ( ) ;
235- aborted . then ( undefined , ( ) => null ) ; // Prevent unhandled rejection
236- this . signal . addEventListener (
237- 'abort' ,
238- function onAbort ( ) {
239- reject ( signal . reason ) ;
240- } ,
241- { once : true }
242- ) ;
243- this . aborted = aborted ;
244-
245228 this . messageStream = this . socket
246229 . on ( 'error' , this . onError . bind ( this ) )
247230 . pipe ( new SizedMessageTransform ( { connection : this } ) )
248231 . on ( 'error' , this . onError . bind ( this ) ) ;
249232 this . socket . on ( 'close' , this . onClose . bind ( this ) ) ;
250233 this . socket . on ( 'timeout' , this . onTimeout . bind ( this ) ) ;
251-
252- const socketWrite = promisify ( this . socket . write . bind ( this . socket ) ) ;
253- this . socketWrite = async buffer => {
254- return Promise . race ( [ socketWrite ( buffer ) , this . aborted ] ) ;
255- } ;
256- }
257-
258- /** Indicates that the connection (including underlying TCP socket) has been closed. */
259- public get closed ( ) : boolean {
260- return this . signal . aborted ;
261234 }
262235
263236 public get hello ( ) {
@@ -308,7 +281,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
308281 this . lastUseTime = now ( ) ;
309282 }
310283
311- public onError ( error ? : Error ) {
284+ public onError ( error : Error ) {
312285 this . cleanup ( error ) ;
313286 }
314287
@@ -351,13 +324,15 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
351324 *
352325 * This method does nothing if the connection is already closed.
353326 */
354- private cleanup ( error ? : Error ) : void {
327+ private cleanup ( error : Error ) : void {
355328 if ( this . closed ) {
356329 return ;
357330 }
358331
359332 this . socket . destroy ( ) ;
360- this . controller . abort ( error ) ;
333+ this . error = error ;
334+ this . dataEvents ?. throw ( error ) . then ( undefined , ( ) => null ) ; // squash unhandled rejection
335+ this . closed = true ;
361336 this . emit ( Connection . CLOSE ) ;
362337 }
363338
@@ -598,7 +573,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
598573 }
599574
600575 private throwIfAborted ( ) {
601- this . signal . throwIfAborted ( ) ;
576+ if ( this . error ) throw this . error ;
602577 }
603578
604579 /**
@@ -621,7 +596,8 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
621596
622597 const buffer = Buffer . concat ( await finalCommand . toBin ( ) ) ;
623598
624- return this . socketWrite ( buffer ) ;
599+ if ( this . socket . write ( buffer ) ) return ;
600+ return once ( this . socket , 'drain' ) ;
625601 }
626602
627603 /**
@@ -634,13 +610,19 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
634610 * Note that `for-await` loops call `return` automatically when the loop is exited.
635611 */
636612 private async * readMany ( ) : AsyncGenerator < OpMsgResponse | OpQueryResponse > {
637- for await ( const message of onData ( this . messageStream , { signal : this . signal } ) ) {
638- const response = await decompressResponse ( message ) ;
639- yield response ;
613+ try {
614+ this . dataEvents = onData ( this . messageStream ) ;
615+ for await ( const message of this . dataEvents ) {
616+ const response = await decompressResponse ( message ) ;
617+ yield response ;
640618
641- if ( ! response . moreToCome ) {
642- return ;
619+ if ( ! response . moreToCome ) {
620+ return ;
621+ }
643622 }
623+ } finally {
624+ this . dataEvents = null ;
625+ this . throwIfAborted ( ) ;
644626 }
645627 }
646628}
0 commit comments