-
Notifications
You must be signed in to change notification settings - Fork 194
+str Add startAfterNrOfConsumers to BroadcastHub.
#275
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -424,6 +424,32 @@ object BroadcastHub { | |
| */ | ||
| def sink[T](bufferSize: Int): Sink[T, Source[T, NotUsed]] = Sink.fromGraph(new BroadcastHub[T](bufferSize)) | ||
|
|
||
| /** | ||
| * Creates a [[Sink]] that receives elements from its upstream producer and broadcasts them to a dynamic set | ||
| * of consumers. After the [[Sink]] returned by this method is materialized, it returns a [[Source]] as materialized | ||
| * value. This [[Source]] can be materialized an arbitrary number of times and each materialization will receive the | ||
| * broadcast elements from the original [[Sink]]. | ||
| * | ||
| * Every new materialization of the [[Sink]] results in a new, independent hub, which materializes to its own | ||
| * [[Source]] for consuming the [[Sink]] of that materialization. | ||
| * | ||
| * If the original [[Sink]] is failed, then the failure is immediately propagated to all of its materialized | ||
| * [[Source]]s (possibly jumping over already buffered elements). If the original [[Sink]] is completed, then | ||
| * all corresponding [[Source]]s are completed. Both failure and normal completion is "remembered" and later | ||
| * materializations of the [[Source]] will see the same (failure or completion) state. [[Source]]s that are | ||
| * cancelled are simply removed from the dynamic set of consumers. | ||
| * | ||
| * @param startAfterNrOfConsumers Elements are buffered until this number of consumers have been connected. | ||
| * This is only used initially when the operator is starting up, i.e. it is not honored when consumers have | ||
| * been removed (canceled). | ||
| * @param bufferSize Buffer size used by the producer. Gives an upper bound on how "far" from each other two | ||
| * concurrent consumers can be in terms of element. If this buffer is full, the producer | ||
| * is backpressured. Must be a power of two and less than 4096. | ||
| * @since 1.1.0 | ||
| */ | ||
| def sink[T](startAfterNrOfConsumers: Int, bufferSize: Int): Sink[T, Source[T, NotUsed]] = | ||
| Sink.fromGraph(new BroadcastHub[T](startAfterNrOfConsumers, bufferSize)) | ||
|
|
||
| /** | ||
| * Creates a [[Sink]] with default buffer size 256 that receives elements from its upstream producer and broadcasts them to a dynamic set | ||
| * of consumers. After the [[Sink]] returned by this method is materialized, it returns a [[Source]] as materialized | ||
|
|
@@ -446,11 +472,13 @@ object BroadcastHub { | |
| /** | ||
| * INTERNAL API | ||
| */ | ||
| private[pekko] class BroadcastHub[T](bufferSize: Int) | ||
| private[pekko] class BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: Int) | ||
| extends GraphStageWithMaterializedValue[SinkShape[T], Source[T, NotUsed]] { | ||
| require(startAfterNrOfConsumers >= 0, "startAfterNrOfConsumers must >= 0") | ||
| require(bufferSize > 0, "Buffer size must be positive") | ||
| require(bufferSize < 4096, "Buffer size larger then 4095 is not allowed") | ||
| require((bufferSize & bufferSize - 1) == 0, "Buffer size must be a power of two") | ||
| def this(bufferSize: Int) = this(0, bufferSize) | ||
|
|
||
| private val Mask = bufferSize - 1 | ||
| private val WheelMask = (bufferSize * 2) - 1 | ||
|
|
@@ -482,6 +510,7 @@ private[pekko] class BroadcastHub[T](bufferSize: Int) | |
| private[this] val callbackPromise: Promise[AsyncCallback[HubEvent]] = Promise() | ||
| private[this] val noRegistrationsState = Open(callbackPromise.future, Nil) | ||
| val state = new AtomicReference[HubState](noRegistrationsState) | ||
| private var initialized = false | ||
|
|
||
| // Start from values that will almost immediately overflow. This has no effect on performance, any starting | ||
| // number will do, however, this protects from regressions as these values *almost surely* overflow and fail | ||
|
|
@@ -511,7 +540,9 @@ private[pekko] class BroadcastHub[T](bufferSize: Int) | |
| override def preStart(): Unit = { | ||
| setKeepGoing(true) | ||
| callbackPromise.success(getAsyncCallback[HubEvent](onEvent)) | ||
| pull(in) | ||
| if (startAfterNrOfConsumers == 0) { | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The old behavior |
||
| pull(in) | ||
| } | ||
| } | ||
|
|
||
| // Cannot complete immediately if there is no space in the queue to put the completion marker | ||
|
|
@@ -522,8 +553,29 @@ private[pekko] class BroadcastHub[T](bufferSize: Int) | |
| if (!isFull) pull(in) | ||
| } | ||
|
|
||
| private def tryPull(): Unit = { | ||
| if (initialized && !isClosed(in) && !hasBeenPulled(in) && !isFull) { | ||
| pull(in) | ||
| } | ||
| } | ||
|
|
||
| private def onEvent(ev: HubEvent): Unit = { | ||
| ev match { | ||
| case Advance(id, previousOffset) => | ||
| val newOffset = previousOffset + DemandThreshold | ||
| // Move the consumer from its last known offset to its new one. Check if we are unblocked. | ||
| val consumer = findAndRemoveConsumer(id, previousOffset) | ||
| addConsumer(consumer, newOffset) | ||
| checkUnblock(previousOffset) | ||
| case NeedWakeup(id, previousOffset, currentOffset) => | ||
| // Move the consumer from its last known offset to its new one. Check if we are unblocked. | ||
| val consumer = findAndRemoveConsumer(id, previousOffset) | ||
| addConsumer(consumer, currentOffset) | ||
|
|
||
| // Also check if the consumer is now unblocked since we published an element since it went asleep. | ||
| if (currentOffset != tail) consumer.callback.invoke(Wakeup) | ||
| checkUnblock(previousOffset) | ||
|
|
||
| case RegistrationPending => | ||
| state.getAndSet(noRegistrationsState).asInstanceOf[Open].registrations.foreach { consumer => | ||
| val startFrom = head | ||
|
|
@@ -538,6 +590,10 @@ private[pekko] class BroadcastHub[T](bufferSize: Int) | |
| case _ => () | ||
| } | ||
| } | ||
| if (activeConsumers >= startAfterNrOfConsumers) { | ||
| initialized = true | ||
| } | ||
| tryPull() | ||
|
|
||
| case UnRegister(id, previousOffset, finalOffset) => | ||
| if (findAndRemoveConsumer(id, previousOffset) != null) | ||
|
|
@@ -552,24 +608,10 @@ private[pekko] class BroadcastHub[T](bufferSize: Int) | |
| head += 1 | ||
| } | ||
| head = finalOffset | ||
| if (!hasBeenPulled(in)) pull(in) | ||
| tryPull() | ||
| } | ||
| } else checkUnblock(previousOffset) | ||
|
|
||
| case Advance(id, previousOffset) => | ||
| val newOffset = previousOffset + DemandThreshold | ||
| // Move the consumer from its last known offset to its new one. Check if we are unblocked. | ||
| val consumer = findAndRemoveConsumer(id, previousOffset) | ||
| addConsumer(consumer, newOffset) | ||
| checkUnblock(previousOffset) | ||
| case NeedWakeup(id, previousOffset, currentOffset) => | ||
| // Move the consumer from its last known offset to its new one. Check if we are unblocked. | ||
| val consumer = findAndRemoveConsumer(id, previousOffset) | ||
| addConsumer(consumer, currentOffset) | ||
|
|
||
| // Also check if the consumer is now unblocked since we published an element since it went asleep. | ||
| if (currentOffset != tail) consumer.callback.invoke(Wakeup) | ||
| checkUnblock(previousOffset) | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -624,7 +666,7 @@ private[pekko] class BroadcastHub[T](bufferSize: Int) | |
| private def checkUnblock(offsetOfConsumerRemoved: Int): Unit = { | ||
| if (unblockIfPossible(offsetOfConsumerRemoved)) { | ||
| if (isClosed(in)) complete() | ||
| else if (!hasBeenPulled(in)) pull(in) | ||
| else tryPull() | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -1106,6 +1148,9 @@ object PartitionHub { | |
| startAfterNrOfConsumers: Int, | ||
| bufferSize: Int) | ||
| extends GraphStageWithMaterializedValue[SinkShape[T], Source[T, NotUsed]] { | ||
| require(partitioner != null, "partitioner must not be null") | ||
| require(startAfterNrOfConsumers >= 0, "startAfterNrOfConsumers must >= 0") | ||
| require(bufferSize > 0, "Buffer size must be positive") | ||
| import PartitionHub.ConsumerInfo | ||
| import PartitionHub.Internal._ | ||
|
|
||
|
|
@@ -1231,20 +1276,20 @@ object PartitionHub { | |
| val newConsumers = (consumerInfo.consumers :+ consumer).sortBy(_.id) | ||
| consumerInfo = new ConsumerInfoImpl(newConsumers) | ||
| queue.init(consumer.id) | ||
| if (newConsumers.size >= startAfterNrOfConsumers) { | ||
| initialized = true | ||
| } | ||
|
|
||
| consumer.callback.invoke(Initialize) | ||
| } | ||
|
|
||
| if (initialized && pending.nonEmpty) { | ||
| pending.foreach(publish) | ||
| pending = Vector.empty[T] | ||
| } | ||
| if (consumerInfo.size >= startAfterNrOfConsumers) { | ||
| initialized = true | ||
| } | ||
|
|
||
| tryPull() | ||
| if (initialized && pending.nonEmpty) { | ||
| pending.foreach(publish) | ||
| pending = Vector.empty[T] | ||
| } | ||
|
|
||
| tryPull() | ||
|
|
||
| case UnRegister(id) => | ||
| val newConsumers = consumerInfo.consumers.filterNot(_.id == id) | ||
| consumerInfo = new ConsumerInfoImpl(newConsumers) | ||
|
|
||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.