Add BroadcastHub startAfterNrOfConsumers parameter#8018
Merged
Aaronontheweb merged 4 commits intoakkadotnet:devfrom Jan 25, 2026
Merged
Conversation
Ports two fixes from Apache Pekko to resolve flaky BroadcastHub tests: 1. Add startAfterNrOfConsumers parameter (Pekko PR akkadotnet#275 by He-Pin) - Allows BroadcastHub to buffer elements until N consumers connect - Eliminates need for racy Task.Delay in tests 2. Fix race condition in consumer PostStop (Pekko PR akkadotnet#1841) - Only send UnRegister if offsetInitialized is true - Prevents StreamDetachedException when consumer detaches early Both fixes are Apache 2.0 licensed from the Pekko project.
Aaronontheweb
commented
Jan 24, 2026
Member
Author
Aaronontheweb
left a comment
There was a problem hiding this comment.
Detailed my changes - this is pretty vanilla for the most part.
Member
Author
There was a problem hiding this comment.
API changes are backwards compatible - it's just a new overload.
| var (firstElement, source) = Source.Maybe<int>() | ||
| .Concat(other) | ||
| .ToMaterialized(BroadcastHub.Sink<int>(8), Keep.Both) | ||
| .ToMaterialized(BroadcastHub.Sink<int>(startAfterNrOfConsumers: 2, bufferSize: 8), Keep.Both) |
Member
Author
There was a problem hiding this comment.
Use the new overloads to stop race conditions during testing
| private readonly ImmutableList<Consumer>[] _consumerWheel; | ||
|
|
||
| private int _activeConsumer; | ||
| private bool _initialized; |
Member
Author
There was a problem hiding this comment.
Use to gate whether the hub can emit elements downstream or not
| // Only start pulling immediately if we don't need to wait for consumers | ||
| if (_stage._startAfterNrOfConsumers == 0) | ||
| { | ||
| _initialized = true; |
Member
Author
There was a problem hiding this comment.
Ungate immediately if _startAfterNrOfConsumers == 0
| } | ||
| } | ||
|
|
||
| private void TryPull() |
Member
Author
There was a problem hiding this comment.
Try to pull if ungated AND the upstream(s) haven't been pulled yet.
|
|
||
| // Check if we've reached the consumer threshold to start pulling | ||
| if (_activeConsumer >= _stage._startAfterNrOfConsumers) | ||
| _initialized = true; |
Member
Author
There was a problem hiding this comment.
Ungate after we've reached a sufficient number registered consumers
Aaronontheweb
added a commit
to Aaronontheweb/akka.net
that referenced
this pull request
Jan 26, 2026
…kkadotnet#8018) Ports two fixes from Apache Pekko to resolve flaky BroadcastHub tests: 1. Add startAfterNrOfConsumers parameter (Pekko PR akkadotnet#275 by He-Pin) - Allows BroadcastHub to buffer elements until N consumers connect - Eliminates need for racy Task.Delay in tests 2. Fix race condition in consumer PostStop (Pekko PR akkadotnet#1841) - Only send UnRegister if offsetInitialized is true - Prevents StreamDetachedException when consumer detaches early Both fixes are Apache 2.0 licensed from the Pekko project.
Merged
Aaronontheweb
added a commit
that referenced
this pull request
Jan 26, 2026
Ports two fixes from Apache Pekko to resolve flaky BroadcastHub tests: 1. Add startAfterNrOfConsumers parameter (Pekko PR #275 by He-Pin) - Allows BroadcastHub to buffer elements until N consumers connect - Eliminates need for racy Task.Delay in tests 2. Fix race condition in consumer PostStop (Pekko PR #1841) - Only send UnRegister if offsetInitialized is true - Prevents StreamDetachedException when consumer detaches early Both fixes are Apache 2.0 licensed from the Pekko project.
This was referenced Jan 27, 2026
This was referenced Feb 9, 2026
This was referenced Feb 20, 2026
This was referenced Feb 20, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Ports two fixes from Apache Pekko to resolve flaky BroadcastHub tests:
Add
startAfterNrOfConsumersparameter (Pekko PR #275 by He-Pin)Task.Delayworkarounds in testsBroadcastHub.Sink<T>(startAfterNrOfConsumers: 2, bufferSize: 8)Fix race condition in consumer PostStop (Pekko PR #1841)
UnRegistermessage ifoffsetInitializedis trueStreamDetachedExceptionwhen consumer detaches before receivingInitializeeventBoth fixes are Apache 2.0 licensed from the Apache Pekko project.
Closes #8017
Changes
Hub.cs: Added newstartAfterNrOfConsumersparameter and constructor,_initializedflag,TryPull()method, and race condition fixHubSpec.cs: Updated 6 previously-flaky tests to use the new API instead ofTask.Delay(500)Test plan