Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ export default (common: TestSetup<StreamMuxerFactory>): void => {
new Promise((resolve, reject) => {
setTimeout(() => {
reject(timeoutError)
}, 70)
}, 2000)
})
])
expect.fail('stream pipe with infinite source should never return')
Expand Down
24 changes: 16 additions & 8 deletions packages/transport-webrtc/src/private-to-private/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ export interface ReadCandidatesOptions extends AbortOptions, LoggerOptions, Prog
}

export const readCandidatesUntilConnected = async (pc: RTCPeerConnection, stream: MessageStream<Message, Stream>, options: ReadCandidatesOptions): Promise<void> => {
try {
const connectedPromise = Promise.withResolvers<void>()
resolveOnConnected(pc, connectedPromise)
const connectedPromise = Promise.withResolvers<void>()
resolveOnConnected(pc, connectedPromise)

try {
// read candidates until we are connected or we reach the end of the stream
while (true) {
// if we connect, stop trying to read from the stream
Expand Down Expand Up @@ -66,10 +66,19 @@ export const readCandidatesUntilConnected = async (pc: RTCPeerConnection, stream
}
}
} catch (err) {
options.log.error('%s error parsing ICE candidate - %e', options.direction, err)

if (options.signal?.aborted === true && pc.connectionState !== 'connected') {
throw err
options.log.error('%s error reading ICE candidates - %e', options.direction, err)

// If the peer connection is not connected, the error may still be
// recoverable — the signaling stream can close just before the
// connectionstatechange event fires. Wait for the connected promise to
// settle: if the PC connects we can ignore the stream error; if it fails
// or was never going to connect, re-throw.
if (pc.connectionState !== 'connected') {
try {
await connectedPromise.promise
} catch {
throw err
}
}
}
}
Expand All @@ -86,7 +95,6 @@ function resolveOnConnected (pc: RTCPeerConnection, promise: DeferredPromise<voi
promise.resolve()
break
case 'failed':
case 'disconnected':
case 'closed':
promise.reject(new ConnectionFailedError(`RTCPeerConnection connection state became "${pc.connectionState}"`))
break
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,9 @@ export class WebRTCDirectTransport extends WebRTCDirectBrowserTransport implemen
private getKeychain (): Keychain | undefined {
try {
return this.components.keychain
} catch {}
} catch {
// keychain is an optional service
}
}
}

Expand Down
14 changes: 13 additions & 1 deletion packages/transport-webrtc/src/rtcpeerconnection-to-conn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class RTCPeerConnectionMultiaddrConnection extends AbstractMultiaddrConnection {
this.peerConnection.onconnectionstatechange = () => {
this.log.trace('peer connection state change %s initial state %s', this.peerConnection.connectionState, initialState)

if (this.peerConnection.connectionState === 'disconnected' || this.peerConnection.connectionState === 'failed' || this.peerConnection.connectionState === 'closed') {
if (this.peerConnection.connectionState === 'failed' || this.peerConnection.connectionState === 'closed') {
// nothing else to do but close the connection
this.onTransportClosed()

Expand All @@ -29,6 +29,18 @@ class RTCPeerConnectionMultiaddrConnection extends AbstractMultiaddrConnection {
this.peerConnection.close()
}
}

// Handle the case where the peerConnection already reached a terminal state
// before this handler was registered (e.g. ICE failed during SDP exchange).
// Since onconnectionstatechange is a property assignment it won't fire for
// past state transitions, so we need to check the current state explicitly.
// Note: 'disconnected' is transient and may recover to 'connected', so only
// 'failed' and 'closed' are treated as terminal states here.
if (initialState === 'failed' || initialState === 'closed') {
this.log.trace('peer connection already in terminal state %s at construction time', initialState)
this.onTransportClosed()
this.peerConnection.close()
}
}

sendData (data: Uint8ArrayList): SendResult {
Expand Down
179 changes: 179 additions & 0 deletions packages/transport-webrtc/test/maconn.spec.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
/* eslint-disable @typescript-eslint/no-unused-expressions */

import { ConnectionFailedError } from '@libp2p/interface'
import { defaultLogger } from '@libp2p/logger'
import { pbStream, streamPair } from '@libp2p/utils'
import { multiaddr } from '@multiformats/multiaddr'
import { expect } from 'aegir/chai'
import delay from 'delay'
import { stubObject } from 'sinon-ts'
import { Message } from '../src/private-to-private/pb/message.js'
import { readCandidatesUntilConnected } from '../src/private-to-private/util.js'
import { toMultiaddrConnection } from '../src/rtcpeerconnection-to-conn.ts'
import { RTCPeerConnection } from '../src/webrtc/index.js'
import type { CounterGroup } from '@libp2p/interface'
Expand Down Expand Up @@ -33,4 +38,178 @@ describe('Multiaddr Connection', () => {
expect(maConn.timeline.close).to.not.be.undefined
expect(metrics.increment.calledWith({ close: true })).to.be.true
})

it('closes immediately when peer connection is already in failed state at construction time', async () => {
// Simulate a peerConnection that already reached 'failed' before
// toMultiaddrConnection is called (race between ICE failure and signaling completion)
const peerConnection = {
connectionState: 'failed' as RTCPeerConnectionState,
onconnectionstatechange: null as any,
close: () => {}
}

const remoteAddr = multiaddr('/ip4/1.2.3.4/udp/1234/webrtc/certhash/uEiAUqV7kzvM1wI5DYDc1RbcekYVmXli_Qprlw3IkiEg6tQ')

const maConn = toMultiaddrConnection({
// @ts-expect-error - intentional mock
peerConnection,
remoteAddr,
direction: 'outbound',
log: defaultLogger().forComponent('libp2p:webrtc:connection')
})

// Give any microtasks or synchronous operations a chance to complete
await delay(0)

expect(maConn.timeline.close).to.not.be.undefined
})

it('closes when peer connection transitions to failed state after construction', async () => {
const peerConnection: any = {
connectionState: 'connected' as RTCPeerConnectionState,
onconnectionstatechange: null as any,
close: () => {}
}

const remoteAddr = multiaddr('/ip4/1.2.3.4/udp/1234/webrtc/certhash/uEiAUqV7kzvM1wI5DYDc1RbcekYVmXli_Qprlw3IkiEg6tQ')

const maConn = toMultiaddrConnection({
peerConnection,
remoteAddr,
direction: 'outbound',
log: defaultLogger().forComponent('libp2p:webrtc:connection')
})

expect(maConn.timeline.close).to.be.undefined

// Simulate peerConnection going to 'failed' after construction
peerConnection.connectionState = 'failed'
peerConnection.onconnectionstatechange?.()

await delay(0)

expect(maConn.timeline.close).to.not.be.undefined
})
})

describe('readCandidatesUntilConnected', () => {
it('throws ConnectionFailedError when the peer connection enters failed state', async () => {
const [localStream, remoteStream] = await streamPair()

// A mock peer connection that never connects — simulates ICE failure
const pc: any = {
connectionState: 'checking' as RTCPeerConnectionState,
onconnectionstatechange: null as any
}

// Schedule ICE failure after a short delay
setTimeout(() => {
pc.connectionState = 'failed'
pc.onconnectionstatechange?.(new Event('connectionstatechange'))
}, 50)

const messageStream = pbStream(localStream).pb(Message)

await expect(
readCandidatesUntilConnected(pc, messageStream, {
direction: 'initiator',
signal: AbortSignal.timeout(5000),
log: defaultLogger().forComponent('test:webrtc')
})
).to.eventually.be.rejectedWith(ConnectionFailedError)

await remoteStream.close()
})

it('does not fail when peer connection is temporarily disconnected then recovers to connected', async () => {
// 'disconnected' is a transient state — the ICE agent may recover without
// intervention. readCandidatesUntilConnected must not abort on 'disconnected'.
const [localStream, remoteStream] = await streamPair()

const pc: any = {
connectionState: 'checking' as RTCPeerConnectionState,
onconnectionstatechange: null as any
}

// ICE briefly goes disconnected at t=30ms, then recovers at t=60ms
setTimeout(() => {
pc.connectionState = 'disconnected'
pc.onconnectionstatechange?.(new Event('connectionstatechange'))
}, 30)

setTimeout(() => {
pc.connectionState = 'connected'
pc.onconnectionstatechange?.(new Event('connectionstatechange'))
}, 60)

const messageStream = pbStream(localStream).pb(Message)

// Should resolve cleanly when the PC connects — 'disconnected' is ignored
await expect(
readCandidatesUntilConnected(pc, messageStream, {
direction: 'recipient',
signal: AbortSignal.timeout(5000),
log: defaultLogger().forComponent('test:webrtc')
})
).to.eventually.be.undefined

await remoteStream.close()
})

it('throws ConnectionFailedError when peer connection goes disconnected then failed', async () => {
// If ICE cannot recover from 'disconnected' and transitions to 'failed',
// the error must still be propagated correctly.
const [localStream, remoteStream] = await streamPair()

const pc: any = {
connectionState: 'checking' as RTCPeerConnectionState,
onconnectionstatechange: null as any
}

// ICE goes disconnected at t=30ms, then fails at t=60ms
setTimeout(() => {
pc.connectionState = 'disconnected'
pc.onconnectionstatechange?.(new Event('connectionstatechange'))
}, 30)

setTimeout(() => {
pc.connectionState = 'failed'
pc.onconnectionstatechange?.(new Event('connectionstatechange'))
}, 60)

const messageStream = pbStream(localStream).pb(Message)

await expect(
readCandidatesUntilConnected(pc, messageStream, {
direction: 'recipient',
signal: AbortSignal.timeout(5000),
log: defaultLogger().forComponent('test:webrtc')
})
).to.eventually.be.rejectedWith(ConnectionFailedError)

await remoteStream.close()
})

it('returns without error when peer connection reaches connected state', async () => {
const [localStream, remoteStream] = await streamPair()

const pc: any = {
connectionState: 'connected' as RTCPeerConnectionState,
onconnectionstatechange: null as any
}

const messageStream = pbStream(localStream).pb(Message)

// Close remote stream — stream.read returns null, which resolves as success
// (since connectedPromise already resolved because state is 'connected')
void remoteStream.close()

await expect(
readCandidatesUntilConnected(pc, messageStream, {
direction: 'initiator',
signal: AbortSignal.timeout(5000),
log: defaultLogger().forComponent('test:webrtc')
})
).to.eventually.be.undefined
})
})
Loading