Skip to content

Add BroadcastHub startAfterNrOfConsumers parameter#8018

Merged
Aaronontheweb merged 4 commits intoakkadotnet:devfrom
Aaronontheweb:feature/broadcast-hub-start-after-consumers-8017
Jan 25, 2026
Merged

Add BroadcastHub startAfterNrOfConsumers parameter#8018
Aaronontheweb merged 4 commits intoakkadotnet:devfrom
Aaronontheweb:feature/broadcast-hub-start-after-consumers-8017

Conversation

@Aaronontheweb
Copy link
Member

Summary

Ports two fixes from Apache Pekko to resolve flaky BroadcastHub tests:

  1. Add startAfterNrOfConsumers parameter (Pekko PR #275 by He-Pin)

    • Allows BroadcastHub to buffer elements until N consumers have connected
    • Eliminates the need for racy Task.Delay workarounds in tests
    • New API: BroadcastHub.Sink<T>(startAfterNrOfConsumers: 2, bufferSize: 8)
  2. Fix race condition in consumer PostStop (Pekko PR #1841)

    • Only sends UnRegister message if offsetInitialized is true
    • Prevents StreamDetachedException when consumer detaches before receiving Initialize event

Both fixes are Apache 2.0 licensed from the Apache Pekko project.

Closes #8017

Changes

  • Hub.cs: Added new startAfterNrOfConsumers parameter and constructor, _initialized flag, TryPull() method, and race condition fix
  • HubSpec.cs: Updated 6 previously-flaky tests to use the new API instead of Task.Delay(500)
  • API approval files updated for new public method

Test plan

  • All 38 HubSpec tests pass (1 pre-existing skip)
  • All 17 API approval tests pass
  • Tests run 10/10 times without flaking

Ports two fixes from Apache Pekko to resolve flaky BroadcastHub tests:

1. Add startAfterNrOfConsumers parameter (Pekko PR akkadotnet#275 by He-Pin)
   - Allows BroadcastHub to buffer elements until N consumers connect
   - Eliminates need for racy Task.Delay in tests

2. Fix race condition in consumer PostStop (Pekko PR akkadotnet#1841)
   - Only send UnRegister if offsetInitialized is true
   - Prevents StreamDetachedException when consumer detaches early

Both fixes are Apache 2.0 licensed from the Pekko project.
Copy link
Member Author

@Aaronontheweb Aaronontheweb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Detailed my changes - this is pretty vanilla for the most part.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

API changes are backwards compatible - it's just a new overload.

var (firstElement, source) = Source.Maybe<int>()
.Concat(other)
.ToMaterialized(BroadcastHub.Sink<int>(8), Keep.Both)
.ToMaterialized(BroadcastHub.Sink<int>(startAfterNrOfConsumers: 2, bufferSize: 8), Keep.Both)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the new overloads to stop race conditions during testing

private readonly ImmutableList<Consumer>[] _consumerWheel;

private int _activeConsumer;
private bool _initialized;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use to gate whether the hub can emit elements downstream or not

// Only start pulling immediately if we don't need to wait for consumers
if (_stage._startAfterNrOfConsumers == 0)
{
_initialized = true;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ungate immediately if _startAfterNrOfConsumers == 0

}
}

private void TryPull()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try to pull if ungated AND the upstream(s) haven't been pulled yet.


// Check if we've reached the consumer threshold to start pulling
if (_activeConsumer >= _stage._startAfterNrOfConsumers)
_initialized = true;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ungate after we've reached a sufficient number registered consumers

@Aaronontheweb Aaronontheweb enabled auto-merge (squash) January 24, 2026 22:58
@Aaronontheweb Aaronontheweb merged commit df7d6c4 into akkadotnet:dev Jan 25, 2026
12 checks passed
Aaronontheweb added a commit to Aaronontheweb/akka.net that referenced this pull request Jan 26, 2026
…kkadotnet#8018)

Ports two fixes from Apache Pekko to resolve flaky BroadcastHub tests:

1. Add startAfterNrOfConsumers parameter (Pekko PR akkadotnet#275 by He-Pin)
   - Allows BroadcastHub to buffer elements until N consumers connect
   - Eliminates need for racy Task.Delay in tests

2. Fix race condition in consumer PostStop (Pekko PR akkadotnet#1841)
   - Only send UnRegister if offsetInitialized is true
   - Prevents StreamDetachedException when consumer detaches early

Both fixes are Apache 2.0 licensed from the Pekko project.
Aaronontheweb added a commit that referenced this pull request Jan 26, 2026
Ports two fixes from Apache Pekko to resolve flaky BroadcastHub tests:

1. Add startAfterNrOfConsumers parameter (Pekko PR #275 by He-Pin)
   - Allows BroadcastHub to buffer elements until N consumers connect
   - Eliminates need for racy Task.Delay in tests

2. Fix race condition in consumer PostStop (Pekko PR #1841)
   - Only send UnRegister if offsetInitialized is true
   - Prevents StreamDetachedException when consumer detaches early

Both fixes are Apache 2.0 licensed from the Pekko project.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Port BroadcastHub startAfterNrOfConsumers parameter from Apache Pekko

1 participant