perf: replace O(n) partition scan in Ready() with push-based notification#527
perf: replace O(n) partition scan in Ready() with push-based notification#527
Conversation
There was a problem hiding this comment.
Code Review
This PR replaces the O(n_all_partitions) scan in Ready() with a push-based notification queue (ConcurrentQueue<TopicPartition>). The intent is sound and the implementation is generally clean. I have a few concerns to raise before merging.
1. Correctness concern: _readyPartitions.Count snapshot is not atomic with the subsequent dequeues
var count = _readyPartitions.Count;
for (var i = 0; i < count; i++)
{
if (!_readyPartitions.TryDequeue(out var tp))
break;
...
}ConcurrentQueue<T>.Count is O(n) and not atomic with the dequeue operations that follow. Because Ready() is documented as "only called from the sender thread" (single consumer), the count snapshot correctly prevents re-enqueued items from causing infinite loops — that part is fine. However, it is worth noting that the break on a failed TryDequeue is effectively dead code in the single-consumer case: no other thread can dequeue, so if count items were present when .Count was sampled, all count TryDequeue calls will succeed. The break gives a false sense of safety and could mask a logic bug if the single-consumer invariant is ever relaxed. Consider asserting the dequeue succeeds rather than silently breaking.
2. Missing notification: muted partitions are silently discarded from the queue without re-enqueue
if (_mutedPartitions.ContainsKey(tp))
continue; // ← notification is consumed but not re-queuedWhen a muted partition is dequeued and skipped, the notification is discarded. The PR addresses this in UnmutePartition() by re-enqueueing if the deque has a batch at that time. But there is a subtle race:
- Thread A:
Ready()dequeuestp, checks_mutedPartitions→ not muted yet → proceeds normally. - Thread B:
MutePartition(tp)runs between the mute check andDrain(). Drain()then skips the partition anyway via its own mute check.
This is pre-existing and unavoidable without heavier synchronization, so it is not a regression. But the inverse race is new and worth noting:
- Thread B:
UnmutePartition(tp)runs, peeks the deque → has batches → enqueues notification. - Thread A:
Ready()dequeues the notification but then sees the partition is still muted (theTryRemoveinUnmutePartitionhappened before the peek butReady()runs before the remove is visible due to CPU reordering).
ConcurrentDictionary.TryRemove uses Volatile.Write-strength ordering on the internal array, and ContainsKey reads with Volatile.Read-strength, so in practice this race is extremely unlikely on x86. Nevertheless, the comment in UnmutePartition should document the invariant that TryRemove always precedes the Enqueue.
3. Backoff re-enqueue loop will fire every sender cycle until backoff expires
// Re-enqueue so the sender loop picks this partition up on the next cycle
// after the backoff expires.
_readyPartitions.Enqueue(tp);When a batch is in backoff, the partition is re-enqueued every sender cycle and then immediately re-skipped. This is O(1) per cycle but it means the sender loop wakes and runs Ready() continuously for the duration of the backoff window. The old O(n) scan had the same problem structurally (it would re-check every cycle), so this is not a regression. But now that the queue is the canonical source of truth, it would be cleaner to defer the re-enqueue until the backoff expires — for example, via a Timer callback or by storing pending-backoff partitions in a separate min-heap. This is a future enhancement, not a blocker, but worth a TODO comment.
4. HasPendingWork() still uses an O(n) scan and is now inconsistent with Ready()
// Still need O(n) scan for sealed-but-not-yet-sent batches (pd.Count > 0)
foreach (var kvp in _partitionDeques)
{
if (kvp.Value.Count > 0)
return true;
}This is called in the sender loop shutdown path. After this PR, _readyPartitions is the authoritative list of partitions with sealed batches, so HasPendingWork() could check !_readyPartitions.IsEmpty as a fast-path before (or instead of) the full scan. This would make the shutdown path consistent with the new design. Again, not a regression — the comment already acknowledges the O(n) — but a natural follow-up.
5. Duplicate _readyPartitions.Enqueue calls on the same partition are harmless but accumulate
With multiple append workers producing to the same partition, each sealed batch triggers one enqueue. Because Ready() peeks the head of the deque (which represents only the oldest ready batch per partition), a partition with N sealed batches will appear N times in _readyPartitions but Ready() will only add the broker ID to readyNodes once per cycle. The extra dequeues are O(1) and cheap, but the queue can grow proportionally to the number of sealed batches rather than the number of partitions. Under sustained high-throughput load with many small batches, this queue could hold thousands of entries before the sender thread drains them.
This is a bounded-by-design issue (the queue can have at most one entry per sealed batch, and the total number of sealed batches is bounded by buffer memory), so it does not introduce unbounded growth. A de-duplication step (e.g., a ConcurrentDictionary<TopicPartition, byte> set instead of a queue) would keep the queue size bounded by partition count, but at higher constant cost per enqueue. Given the throughput profile, the current approach is probably the right trade-off — just worth making explicit in the comment.
6. Test quality: good coverage, one missing case
The 8 unit tests cover the main scenarios well. One gap: there is no test verifying that a partition in backoff is re-enqueued and eventually becomes ready after the backoff window. The Ready_UnknownLeader_SetsUnknownLeadersExist test covers the unknown-leader re-enqueue path, which is structurally identical, so this is low priority. Still worth adding for completeness given how critical retry correctness is.
Summary
The design is correct and the performance improvement is real: for a producer writing to a handful of active partitions out of thousands, this eliminates the O(n_all) scan entirely. The implementation is clean, well-commented, and the tests cover the important behavioral properties.
The muted-partition handling in UnmutePartition is the subtlest part of the patch and deserves a documentation comment about the ordering invariant (TryRemove before Enqueue). Items 3 and 4 are natural follow-ups. No blocking issues found.
There was a problem hiding this comment.
Follow-up Review (after second commit)
The second commit (911055e49b) addressed some surface-level feedback: comment text was deduplicated across the three _readyPartitions.Enqueue call sites, a stale comment was removed from the test, and the BootstrapServers array initializer was updated to a collection expression. These are clean-ups.
However, the substantive issues raised in the previous review were not addressed. Summarising their current status:
Still open: break on failed TryDequeue is dead code in the single-consumer model
var count = _readyPartitions.Count;
for (var i = 0; i < count; i++)
{
if (!_readyPartitions.TryDequeue(out var tp))
break; // ← can never trigger while Ready() is the sole consumer
...
}TryDequeue can only return false if another consumer has raced ahead and emptied the queue. Because Ready() is documented as exclusively called from the sender thread, no such race exists. The break silently swallows a logic error that would otherwise surface as missing partitions. Replacing it with a Debug.Assert or throw would make the invariant explicit and fail loudly if the single-consumer assumption is ever violated:
var dequeued = _readyPartitions.TryDequeue(out var tp);
Debug.Assert(dequeued, "TryDequeue failed despite being the sole consumer of _readyPartitions");Still open: UnmutePartition ordering invariant not documented
The TryRemove must precede the Enqueue to ensure Ready() does not see the notification but then observe the partition as still muted. This invariant is upheld by the current code but is not called out in a comment, making it fragile under future refactoring.
Still open: Backoff re-enqueue busy-loops for the duration of the backoff window
When a batch is in backoff, the partition is re-enqueued at the end of every Ready() cycle. On the next sender wakeup the loop immediately re-skips it. This is functionally correct but wastes CPU during high-retry scenarios. A // TODO: defer re-enqueue until backoff expires comment would signal the known trade-off and prevent reviewers from thinking this was unintentional.
Still open: No test for partition re-enqueue after backoff expiry
The 8 tests cover unknown-leader re-enqueue (structurally identical to backoff), mute/unmute, drain-once, and linger seal. The backoff-specific path (the _readyPartitions.Enqueue(tp) at line ~924) has no direct test. Given that retry correctness is critical for producer reliability, a test simulating a batch in backoff that becomes ready after the window would be valuable.
Not blocking, but still worth a follow-up: HasPendingWork() O(n) inconsistency
After this PR, _readyPartitions.IsEmpty is a cheap O(1) proxy for "no sealed batches ready to send." The existing O(n) scan in HasPendingWork() could use this as a fast-path. Not a regression, but the asymmetry between the two code paths will likely confuse the next reader.
Overall
The core algorithmic change is correct and the performance improvement is real. The second commit shows the author is responsive to feedback. The remaining items are mostly hardening (the dead-code break, missing ordering comment, missing backoff test) rather than correctness bugs. Addressing at least the break/assert and the ordering comment before merge would make the invariants explicit and the code more maintainable.
There was a problem hiding this comment.
Code Review: perf: replace O(n) partition scan in Ready() with push-based notification
This is a well-motivated and well-scoped performance optimization. The approach is sound and the implementation is generally clean. Below are my observations, ranging from correctness concerns to style suggestions.
Correctness: Duplicate notifications in _readyPartitions queue
File: src/Dekaf/Producer/RecordAccumulator.cs — SealCurrentBatchUnderLock, SealBatchesAsync, Reenqueue
The _readyPartitions queue can accumulate multiple entries for the same TopicPartition across calls. For example, if a partition has 5 batches sealed in sequence, there will be 5 entries for that TP in the queue. Each pass through Ready() dequeues one, peeks the head batch (which may already have been sent and replaced), and adds the node. This isn't incorrect — the sender will just call Ready() multiple times and Drain() will drain one batch per call, which is exactly the existing per-call contract. However, it means the queue's length is not bounded by the number of partitions; it's bounded by the number of sealed batches across all partitions.
Two consequences worth noting:
-
The
countsnapshot at the top ofReady()reads_readyPartitions.Count. In a high-throughput scenario where batches are being sealed continuously by producer threads whileReady()runs (it's single-consumer, so no concurrent drain, but producers are concurrent enqueues), the snapshot is a point-in-time read on aConcurrentQueue. This is fine for the single-consumer/multi-producer pattern becauseCountisVolatile.Readinternally, but it's worth ensuring the intent is documented — the snapshot is preventing re-enqueued items (muted/backoff/unknown-leader) from inflating future counts unboundedly. -
Queue size under sustained load: with N partitions each producing M batches, the queue can hold up to N*M entries instead of N. The practical impact is low because
Ready()is called on the sender thread frequently, but a comment noting this could help future readers understand the amortized behavior.
Correctness: Re-enqueue for muted partitions creates FIFO stacking
File: src/Dekaf/Producer/RecordAccumulator.cs, lines ~905–910
When a partition is muted, every Ready() cycle re-enqueues it at the tail and continues. If a partition is muted for a long time and many batches seal for it, the queue will hold many entries for that partition (the re-enqueued ones plus newly pushed ones). When it is eventually unmuted, UnmutePartition enqueues one more notification, but all the stale ones are already in the queue. This is safe because head is null handles the case where the deque is empty, but it could cause a burst of redundant readyNodes.Add(leader.NodeId) calls for the same node on the unmute cycle. The sender is resilient to this (it's a HashSet<int>), so it's not a bug — just a note that the queue can "pile up" for long-muted partitions.
Potential issue: pd is null drop without re-enqueue
File: src/Dekaf/Producer/RecordAccumulator.cs, lines ~913–915
var pd = _partitionDeques.GetValueOrDefault(tp);
if (pd is null)
continue;If pd is null (which should never happen in practice — GetOrCreateDeque is called at append time, so the deque exists before any notification is pushed), the notification is silently dropped. Since the deque cannot be null at notification time (the notification is pushed inside SealCurrentBatchUnderLock which operates on a pd already in the dictionary), this is dead-defensive code. A Debug.Assert(pd is not null) or a log warning here would make any future regression immediately visible rather than silently losing batches.
Minor: nowMs parameter is unused
File: src/Dekaf/Producer/RecordAccumulator.cs, line 890
internal (int NextCheckDelayMs, bool UnknownLeadersExist) Ready(
MetadataManager metadataManager, long nowMs, HashSet<int> readyNodes)The nowMs parameter is accepted but never read in the new implementation (the old scan used it for linger checks, but the new implementation defers linger decisions to the sealing site). If the callers still pass a meaningful timestamp, it might be worth either removing the parameter or noting in the doc comment that it's intentionally unused as a no-op legacy parameter kept for call-site compatibility. Unused parameters in hot-path signatures can be misleading.
Minor: SealBatchesAsync still enqueues inside a foreach over _partitionDeques
File: src/Dekaf/Producer/RecordAccumulator.cs, lines ~2222–2254
SealBatchesAsync still iterates over _partitionDeques (O(n_all_partitions)) when sealing for linger/flush. This is correct and expected — sealing itself must visit all partitions to decide which ones are due. The push notification added inside that loop is the right place. This is just a note to confirm that SealBatchesAsync is not on the steady-state hot path (it runs on the linger timer or on FlushAsync), so the O(n) enumeration there is acceptable.
Test: Ready_AfterSealingBatch_ReturnsReadyNode — fragile trigger assumption
File: tests/Dekaf.Tests.Unit/Producer/RecordAccumulatorReadyTests.cs, lines ~93–113
The test appends 10 records with batchSize: 50 and expects a batch to seal. This relies on the encoding overhead of 10 empty-key/empty-value records exceeding 50 bytes. This is plausible but the test would be more robust if it used a smaller batch size (e.g., batchSize: 1) or appended records with a non-empty value that provably exceeds the threshold. As written, if record encoding overhead changes, the test may silently start testing the "no sealed batch" path instead.
Similarly for Ready_MutedPartition_SkipsNotification, Ready_UnknownLeader_SetsUnknownLeadersExist, Ready_CalledTwice_SecondCallReturnsEmpty, and Ready_UnmutePartition_ReNotifiesReadyPartition — all use the same 10 records / batchSize: 50 pattern. A shared helper like AppendUntilSealed(accumulator, topic, partition, pool) that returns only after a batch is confirmed sealed (e.g., by checking _readyPartitions indirectly via Ready() returning Contains(nodeId)) would eliminate this fragility and centralize the trigger logic.
Test: Ready_MultiplePartitions_OnlyReportsReadyOnes — asserts correct outcome but not the key invariant
File: tests/Dekaf.Tests.Unit/Producer/RecordAccumulatorReadyTests.cs, lines ~306–345
The test verifies that readyNodes.Contains(1) — i.e., the ready node is reported. The more interesting invariant for this test (given that it's explicitly testing "only ready ones") is that readyNodes.Count == 1, meaning the partitions that did NOT seal (partition 7 and partitions 0–2, 4–6, 8–9) are not included. Adding await Assert.That(readyNodes).HasCount().EqualTo(1) would make the test definitively validate the "only reports ready ones" claim.
Nit: Comment drift in Ready() XML doc
File: src/Dekaf/Producer/RecordAccumulator.cs, line 882
"...whose head batch is sendable (batch is complete, linger expired, or flush in progress)"
The parenthetical linger expired, or flush in progress describes the old sealing criteria that were checked at Ready() time in the O(n) version. In the new model, Ready() never checks linger or flush state — those are checked at seal time. The summary could be simplified to: "...whose head batch is sendable (sealed by append, linger expiry, or flush)."
Overall Assessment
The approach is correct, well-commented, and the test coverage is meaningful. The count-snapshot loop to avoid infinite re-enqueue cycles is a clean solution. The UnmutePartition re-notification is complete. The main items worth addressing before merge are:
- Consider a
Debug.Assertfor thepd is nullpath to surface any future regression. - Clarify or remove the unused
nowMsparameter to avoid reader confusion. - Strengthen the
Ready_MultiplePartitions_OnlyReportsReadyOnestest to assertreadyNodes.Count == 1. - Document that the queue is not partitions-deduplicated — its length is proportional to sealed batches, not distinct partitions.
None of the above are blockers; items 1–3 are low-effort improvements and item 4 is purely documentation.
…ackoff test - Replace dead-code `break` in Ready() with Debug.Assert for TryDequeue invariant - Document UnmutePartition ordering invariant (TryRemove before Enqueue) - Add TODO comment for deferring backoff re-enqueue to avoid per-cycle churn - Add fast-path check (!_readyPartitions.IsEmpty) in HasPendingWork() to skip O(n) scan - Document that duplicate enqueue entries are harmless and bounded by buffer memory - Add unit test verifying batch re-enqueue during backoff and readiness after expiry
There was a problem hiding this comment.
Code Review
This is a solid, well-scoped performance improvement. The core idea — replacing O(n_all_partitions) dictionary enumeration with a push-based ConcurrentQueue<TopicPartition> — is correct and the implementation is careful about the edge cases. The comments are thorough and the test coverage is good. A few observations below.
Correctness concerns
HasPendingWork still has a partial gap (minor)
The comment at line 2192 says "Still need O(n) scan for sealed-but-not-yet-sent batches (pd.Count > 0)". The explanation in the comment is that the _readyPartitions queue can be empty while a partition deque still has sealed batches — specifically the case where Ready() already dequeued the notification but Drain() has not yet consumed the batch. That's a legitimate case, but it's worth noting that this makes HasPendingWork subtly depend on the caller calling Ready() before Drain() in a tight loop. If Ready() and Drain() are always paired (sender thread only), this is fine; just worth confirming no path calls HasPendingWork in a context where Ready() was already called but a partition notification was successfully dequeued without the corresponding batch being taken.
Muted partition re-enqueue creates unbounded queue growth during a long mute window
When a partition is muted, every Ready() call re-enqueues it:
_readyPartitions.Enqueue(tp); // muted path
continue;The comment says "The total duplicate count is bounded by the number of sealed batches", but a muted partition keeps getting re-enqueued on every Ready() call regardless of whether any new batches were sealed. If the sender loop runs at high frequency (e.g. during a retry storm) and a partition is muted for a long time, the queue can grow unboundedly. Consider tracking whether the muted partition is already in the queue (e.g. a ConcurrentDictionary<TopicPartition, byte> of "pending notification" entries) to bound duplicates to one-per-partition rather than one-per-sender-cycle.
The count-snapshot loop is a subtle contract
var count = _readyPartitions.Count;
for (var i = 0; i < count; i++)This works correctly given the single-consumer constraint, but ConcurrentQueue.Count takes a full fence and is O(n) internally (it enumerates segments). For a queue with many entries this may be non-trivial. An alternative that avoids the Count call is to mark a sentinel before entering the loop, or to swap with an empty queue atomically. The sentinel approach is not compatible with ConcurrentQueue, but a simple alternative is to drain into a local List<TopicPartition> (which the caller already owns as readyNodes context) and re-enqueue the deferred ones afterward.
Minor observations
TODO left in production code
// TODO: defer re-enqueue until backoff expires to avoid per-cycle churn
_readyPartitions.Enqueue(tp);The TODO is fine to leave as a known limitation, but it directly implies the muted-partition unbounded growth concern above. If this is a known open issue, a tracking issue reference would be more durable than an inline TODO.
Debug.Assert on single-consumer guarantee is good
The Debug.Assert(dequeued, ...) on the TryDequeue result is a nice safety net. It will surface any violation of the single-consumer invariant in debug builds.
Test uses Task.Delay for timing (fragile on slow CI)
The Ready_BackoffBatch_ReenqueuedThenBecomesReadyAfterExpiry test uses await Task.Delay(150) to wait for a 100ms backoff to expire. Per the project guidelines, timing-dependent tests are fragile on slow CI runners. Consider making the test deterministic by setting RetryNotBefore to a timestamp in the past (i.e., already expired) and testing the "backoff not yet expired" branch with a timestamp far in the future, verifying the re-enqueue semantics without relying on wall-clock expiry.
HasPendingWork O(n) scan can be eliminated entirely
Now that every sealed batch pushes a notification to _readyPartitions, the O(n) fallback scan in HasPendingWork could be replaced by a simple atomic sealed-batch counter (similar to _unsealedBatchCount / _inFlightBatchCount). A _sealedBatchCount incremented in SealCurrentBatchUnderLock / SealBatchesAsync and decremented in Drain / PollFirst would make HasPendingWork fully O(1) and remove the remaining O(n) scan.
Summary
The algorithmic improvement is sound and the implementation is careful. The main issue worth addressing before merge is the unbounded queue growth for muted partitions under sustained re-enqueue cycles. The Task.Delay in the backoff test is a secondary concern per project guidelines on flaky tests.
There was a problem hiding this comment.
Code Review (follow-up after commits 9fce07d and 998a3a5 + 662d20e)
This is a thorough follow-up that closed out nearly all of the prior review's concerns. Below is a status pass on every previously raised item plus fresh observations on the newest code.
Previously raised — now resolved
| Item | Status |
|---|---|
Dead-code break on failed TryDequeue |
Fixed: replaced with Debug.Assert(dequeued, ...) |
UnmutePartition ordering invariant not documented |
Fixed: comment added explaining TryRemove must precede Enqueue |
HasPendingWork O(n) inconsistency |
Fixed: !_readyPartitions.IsEmpty fast-path added |
| Backoff re-enqueue acknowledged | Fixed: // TODO: defer re-enqueue until backoff expires comment added |
| Queue duplicates not documented | Fixed: comment at field declaration explains harmlessness and bounded count |
nowMs parameter unused |
Fixed: removed from signature and all call sites |
| Missing backoff test | Fixed: Ready_BackoffBatch_ReenqueuedThenBecomesReadyAfterExpiry added |
The review feedback has been addressed diligently across multiple commits. The remaining observations below are either minor or future work.
Remaining concern: muted-partition re-enqueue can grow unboundedly under sustained load
File: src/Dekaf/Producer/RecordAccumulator.cs, lines 905–910
if (_mutedPartitions.ContainsKey(tp))
{
_readyPartitions.Enqueue(tp);
continue;
}The field comment states:
"The total duplicate count is bounded by the number of sealed batches, which is itself bounded by buffer memory."
This is true for the sealing sites (SealCurrentBatchUnderLock, SealBatchesAsync, Reenqueue). However, the muted-partition re-enqueue path breaks this bound: every Ready() call re-enqueues a muted partition, independent of whether any new batches have been sealed. If the sender loop runs at high frequency (e.g., during a retry storm or when nextCheckDelayMs returns a small value) and a partition stays muted for many seconds, the queue grows proportionally to (senderCycleFrequency × muteDurationSeconds) for that partition, which has no relationship to BufferMemory / BatchSize.
Concretely: if the sender cycles every 10ms and a partition is muted for 10 seconds, that one partition contributes ~1000 spurious entries to _readyPartitions. Each entry is a TopicPartition struct (~16 bytes), so the absolute size is small, but the queue's length is no longer bounded by a design invariant. The field comment's claim is therefore inaccurate for the muted path.
Suggested fix: Use a ConcurrentDictionary<TopicPartition, byte> (a "pending notification set") instead of a ConcurrentQueue. Before enqueueing in any path, check TryAdd; if the partition is already present, skip the enqueue. This bounds the queue to at most one entry per partition at all times, eliminates all duplicate entries, and makes the muted-path bounded by partition count rather than sender frequency.
Alternatively, a lower-effort fix is to correct the field comment so it explicitly excludes the muted re-enqueue path from the boundedness claim, and note the practical size remains small in any realistic scenario.
Minor: Ready_MultiplePartitions_OnlyReportsReadyOnes doesn't assert the "only" part
File: tests/Dekaf.Tests.Unit/Producer/RecordAccumulatorReadyTests.cs, line 343
The test name claims it verifies only ready partitions are reported, but the assertion only checks Contains(1). With all 10 partitions on node 1, a broken implementation that reported all nodes would still pass. The key invariant — that the notification queue was not populated by the 9 non-sealing partitions — is not verified.
Add one line:
await Assert.That(readyNodes).HasCount().EqualTo(1);This confirms the push-based model didn't accidentally enqueue the unsealed partitions.
Minor: Backoff test still uses Task.Delay for timing (fragile on slow CI)
File: tests/Dekaf.Tests.Unit/Producer/RecordAccumulatorReadyTests.cs, lines 400–405
// Act 2: Wait for backoff to expire, then Ready() should report the node.
await Task.Delay(150);Per the project guidelines, timing-dependent tests are fragile on slow CI runners. The test can be made deterministic by splitting into two independent tests:
- Backoff active: set
RetryNotBefore = Stopwatch.GetTimestamp() + Stopwatch.Frequency * 10(10s in the future), callReady(), assert empty andnextCheckDelayMs ≤ 10000. - Backoff expired: set
RetryNotBefore = Stopwatch.GetTimestamp() - 1(already past), reenqueue, callReady(), assert ready node is returned.
This avoids any real-time wait, is instantaneous, and is not susceptible to CI scheduler delays.
Nit: XML doc comment still describes the old linger-check behavior
File: src/Dekaf/Producer/RecordAccumulator.cs, line 883
"...whose head batch is sendable (batch is complete, linger expired, or flush in progress)"
linger expired and flush in progress are no longer checked inside Ready() — those decisions happen at seal time. This phrasing from the original O(n) implementation persists. The description could be simplified to:
"...whose head batch is sendable (sealed by append overflow, linger expiry, or flush)"
This is purely a documentation fix with no correctness impact.
Summary
The implementation is correct and the performance improvement is real. All blocking concerns from prior reviews have been resolved. The three remaining items are:
- Muted-partition re-enqueue unbounded growth — the field comment's correctness claim is violated by this path; worth either fixing with a deduplication set or correcting the comment.
Ready_MultiplePartitions_OnlyReportsReadyOnesmissing count assertion — a one-line addition to make the test actually verify its stated invariant.- Backoff test
Task.Delay— low-effort split into two deterministic tests per project guidelines.
None of these are blockers for the core optimization, which is sound and well-implemented.
There was a problem hiding this comment.
Code Review (final state — commit 719ac93)
This review covers the full PR including the sixth commit (719ac939) which adds DeferReenqueue and was not covered by the previous review round. Items previously raised and resolved are not repeated here.
New code: DeferReenqueue — no cancellation on disposal
private void DeferReenqueue(TopicPartition tp, int delayMs)
{
_ = Task.Delay(Math.Max(delayMs, 1)).ContinueWith(static (_, state) =>
{
var (self, partition) = ((RecordAccumulator, TopicPartition))state!;
self._readyPartitions.Enqueue(partition);
self.SignalWakeup();
}, (this, tp), CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
}The CancellationToken.None means this timer fires and attempts _readyPartitions.Enqueue + SignalWakeup even after the RecordAccumulator has been disposed. If the producer is disposed during a retry backoff window, the ContinueWith callback runs on the thread pool, calls SignalWakeup on a disposed semaphore, and enqueues into a queue that nobody is draining. SignalWakeup calls TryReleaseSemaphore, which already guards against disposal-related exceptions (its doc comment mentions "ignoring disposal and max-count races"), so this is unlikely to throw — but it is still a reference leak: the captured (this, tp) tuple keeps the RecordAccumulator instance alive on the finalizer queue until the timer fires.
Suggested fix: Pass the CancellationToken from the sender loop's CancellationTokenSource through to DeferReenqueue and use it in Task.Delay. This is consistent with how other background operations in the producer handle cancellation:
private void DeferReenqueue(TopicPartition tp, int delayMs, CancellationToken cancellationToken)
{
_ = Task.Delay(Math.Max(delayMs, 1), cancellationToken).ContinueWith(static (t, state) =>
{
if (t.IsCanceled) return;
var (self, partition) = ((RecordAccumulator, TopicPartition))state!;
self._readyPartitions.Enqueue(partition);
self.SignalWakeup();
}, (this, tp), CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
}If that threading is complex to wire through, at minimum checking IsDisposed (if such a flag exists) at the start of the callback would prevent the post-disposal work.
Still open: muted-partition re-enqueue grows without bound under sustained load
This was raised in the prior review at commit 662d20e1 and is not fixed in the current HEAD. The field comment states:
"The total duplicate count is bounded by the number of sealed batches, which is itself bounded by buffer memory (BufferMemory / BatchSize)."
The muted re-enqueue path:
if (_mutedPartitions.ContainsKey(tp))
{
_readyPartitions.Enqueue(tp); // re-enqueued every Ready() call
continue;
}...breaks this invariant. Every sender cycle re-enqueues the muted partition regardless of whether new batches were sealed. The field comment's correctness claim is inaccurate for this path. This should be either fixed or the comment corrected to acknowledge the exception. A minimally invasive comment fix:
"Entries from sealing sites are bounded by BufferMemory / BatchSize. Muted-partition re-enqueue entries add one additional entry per sender cycle per muted partition; in practice this is small but unbounded by design invariant."
Still open: Ready_MultiplePartitions_OnlyReportsReadyOnes does not assert the key invariant
The test verifies readyNodes.Contains(1) but the test's name claims it verifies "only ready ones". The test does not assert that non-sealing partitions were excluded. Adding await Assert.That(readyNodes).HasCount().EqualTo(1) would make the test actually verify its stated invariant.
Still open: Ready_BackoffBatch_DeferredReenqueueBecomesReadyAfterExpiry uses Task.Delay(200) for timing
Per project guidelines: "Fix timing-dependent tests by using deterministic synchronization instead of arbitrary delays." The test waits 200ms for a 100ms backoff to expire. On a loaded CI runner this can flake. The test can be split into two deterministic cases without any wall-clock wait:
- Backoff not yet expired: set
RetryNotBefore = Stopwatch.GetTimestamp() + Stopwatch.Frequency * 10(10s in the future), callReenqueue, thenReady(), assert empty. - Backoff already expired: set
RetryNotBefore = Stopwatch.GetTimestamp() - 1(already past), callReenqueue, thenReady(), assert the node is reported.
This avoids the DeferReenqueue timer entirely in the expired case (since backoffRemaining <= 0 bypasses the DeferReenqueue call) and tests both branches without sleeping.
Nit: XML doc still describes old linger-check behavior
/// whose head batch is sendable (batch is complete, linger expired, or flush in progress)
linger expired and flush in progress are no longer evaluated inside Ready() — those decisions happen at seal time. The parenthetical should read: (sealed by append overflow, linger expiry, or flush).
Summary
The sixth commit (DeferReenqueue) correctly eliminates the per-cycle backoff churn that was the primary concern of the previous review. The core algorithmic change is sound and the implementation is careful. The remaining items are:
DeferReenqueuewithCancellationToken.None— the timer callback fires and touches the accumulator post-disposal; should respect the sender cancellation token. This is the only new concern introduced in the sixth commit and is worth addressing before merge.- Muted-partition field comment — inaccurate boundedness claim; fix or qualify the comment.
Ready_MultiplePartitions_OnlyReportsReadyOnes— add one assertion to validate the "only" part of the test name.Ready_BackoffBatchtest — split into two deterministic cases to eliminateTask.Delay(200)per project guidelines.
There was a problem hiding this comment.
Code Review (follow-up — commit 6873307)
This review covers only what changed in the latest commit. The core algorithmic change is sound and the implementation is clean.
Previous review items: all resolved except one
| Item | Status |
|---|---|
DeferReenqueue with CancellationToken.None — timer fires post-disposal |
Still open (see below) |
| Muted-partition queue growth — re-enqueued every cycle | ✅ Fixed: muted path now drops notification; UnmutePartition re-notifies |
_readyPartitions field comment — inaccurate boundedness claim |
✅ Fixed |
Ready() XML doc — stale "linger expired" language |
✅ Fixed |
Ready_MultiplePartitions_OnlyReportsReadyOnes — missing count assertion |
✅ Fixed: Count().IsEqualTo(1) added |
Backoff test using Task.Delay(200) — timing-dependent |
✅ Fixed: split into two deterministic cases |
Remaining open: DeferReenqueue uses CancellationToken.None
private void DeferReenqueue(TopicPartition tp, int delayMs)
{
_ = Task.Delay(Math.Max(delayMs, 1)).ContinueWith(static (_, state) =>
{
var (self, partition) = ((RecordAccumulator, TopicPartition))state!;
self._readyPartitions.Enqueue(partition);
self.SignalWakeup();
}, (this, tp), CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
}Task.Delay ignores cancellation, so if the accumulator is disposed during a backoff window the timer fires, enqueues into a now-orphaned _readyPartitions, and calls SignalWakeup() on a disposed semaphore. SignalWakeup already tolerates disposal exceptions (its comment says "ignoring disposal and max-count races"), so this will not throw — but the RecordAccumulator instance is kept alive by the captured state tuple until every in-flight timer fires. Under a long retry backoff (e.g. 30 s) this is a meaningful reference leak.
Suggested fix:
private void DeferReenqueue(TopicPartition tp, int delayMs, CancellationToken cancellationToken)
{
_ = Task.Delay(Math.Max(delayMs, 1), cancellationToken).ContinueWith(static (t, state) =>
{
if (t.IsCanceled) return;
var (self, partition) = ((RecordAccumulator, TopicPartition))state!;
self._readyPartitions.Enqueue(partition);
self.SignalWakeup();
}, (this, tp), CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
}The sender loop's CancellationToken is already available at the call site in Ready(), so wiring it through is straightforward. When the producer is disposed, the token is cancelled, Task.Delay terminates early, and the continuation returns immediately — releasing the captured reference promptly.
Summary
Everything from the previous round is addressed. One item remains: passing the cancellation token to DeferReenqueue to prevent reference leaks on disposal. Once that is addressed this PR is ready to merge.
…tion queue Ready() previously iterated all _partitionDeques entries on every sender loop cycle, acquiring a SpinLock per partition. With hundreds of partitions, this is O(n_partitions) per cycle on the sender hot path. Now, when a batch is sealed (SealCurrentBatchUnderLock, SealBatchesAsync) or reenqueued for retry, the partition's TopicPartition is pushed onto a ConcurrentQueue. Ready() drains only this queue, converting the cost to O(n_ready_partitions). UnmutePartition re-enqueues if sealed batches exist so they are not lost. Closes #520
When Ready() dequeued a muted partition, the notification was dropped with `continue`. If UnmutePartition raced with Ready(), the notification could be permanently lost, causing one partition's batches to never be sent. Re-enqueue muted partitions so they're retried on the next cycle.
…ackoff test - Replace dead-code `break` in Ready() with Debug.Assert for TryDequeue invariant - Document UnmutePartition ordering invariant (TryRemove before Enqueue) - Add TODO comment for deferring backoff re-enqueue to avoid per-cycle churn - Add fast-path check (!_readyPartitions.IsEmpty) in HasPendingWork() to skip O(n) scan - Document that duplicate enqueue entries are harmless and bounded by buffer memory - Add unit test verifying batch re-enqueue during backoff and readiness after expiry
The push-based notification queue eliminated all uses of nowMs inside Ready(). Remove the dead parameter from the signature and all call sites.
Replace immediate re-enqueue of backoff partitions with Task.Delay + ContinueWith that fires once after backoff expires. This eliminates per-cycle dequeue/re-enqueue churn during retry backoff windows. One Task.Delay allocation per retry batch (not per message) is acceptable per allocation guidelines.
…, docs - Remove re-enqueue of muted partitions in Ready() to prevent unbounded queue growth; rely solely on UnmutePartition() for re-notification. - Update _readyPartitions field comment to reflect boundedness invariant. - Add count assertion to Ready_MultiplePartitions_OnlyReportsReadyOnes. - Replace Task.Delay-based backoff test with two deterministic tests: one for active backoff (far future) and one for expired backoff (past). - Fix XML doc on Ready() to say "sealed by append overflow, linger expiry, or flush" instead of old linger-check wording.
…gnature After rebasing onto main (which merged #526's headerCount change), test Append calls had null instead of 0 for the int headerCount parameter.
6873307 to
ad63565
Compare
…erence leaks Pass _disposalCts.Token to Task.Delay so the timer cancels promptly on disposal. Without this, the captured (this, tp) tuple keeps the RecordAccumulator alive until the backoff timer fires, which could be up to 30 seconds for long retry backoffs.
There was a problem hiding this comment.
Code Review (follow-up — commit ad63565)
This is a rebase-conflict fix commit only (updating Append call sites in tests after #526 merged). No production code changed since the previous review round.
Status of all previously raised items
| Item | Status |
|---|---|
Dead-code break → Debug.Assert |
✅ Resolved |
UnmutePartition ordering invariant documented |
✅ Resolved |
| Muted-partition queue grows unboundedly | ✅ Resolved — muted path now drops notification; UnmutePartition re-notifies |
_readyPartitions field comment — inaccurate boundedness claim |
✅ Resolved |
nowMs parameter unused |
✅ Resolved — removed from signature and call sites |
| Backoff per-cycle churn | ✅ Resolved — DeferReenqueue defers re-enqueue via timer |
HasPendingWork O(n) inconsistency |
✅ Resolved — !_readyPartitions.IsEmpty fast-path added |
Ready() XML doc — stale linger-check wording |
✅ Resolved |
Ready_MultiplePartitions_OnlyReportsReadyOnes missing count assertion |
✅ Resolved |
Backoff test using Task.Delay(200) — timing-dependent |
✅ Resolved — split into two deterministic tests |
DeferReenqueue uses CancellationToken.None — reference leak on disposal |
Still open |
Remaining issue: DeferReenqueue captures this with no cancellation on disposal
private void DeferReenqueue(TopicPartition tp, int delayMs)
{
_ = Task.Delay(Math.Max(delayMs, 1)).ContinueWith(static (_, state) =>
{
var (self, partition) = ((RecordAccumulator, TopicPartition))state!;
self._readyPartitions.Enqueue(partition);
self.SignalWakeup();
}, (this, tp), CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
}Task.Delay ignores cancellation here. If the producer is disposed during a retry backoff window (which can be up to 30 seconds in default config), the timer fires on the thread pool after DisposeAsync completes and:
- Calls
_readyPartitions.Enqueueon an orphaned queue that no sender thread is draining. - Calls
SignalWakeupon a disposedSemaphoreSlim.SignalWakeupalready tolerates this gracefully (as documented), so no exception is thrown. - Holds the
(RecordAccumulator, TopicPartition)state tuple — and therefore a strong reference tothis— alive for the full duration of the backoff window after disposal.
The fix is a one-line change at the Task.Delay call site. The sender loop's CancellationToken is already available in SenderLoopAsync at the DeferReenqueue call site — thread it through to the private method:
private void DeferReenqueue(TopicPartition tp, int delayMs, CancellationToken cancellationToken)
{
_ = Task.Delay(Math.Max(delayMs, 1), cancellationToken).ContinueWith(static (t, state) =>
{
if (t.IsCanceled) return;
var (self, partition) = ((RecordAccumulator, TopicPartition))state!;
self._readyPartitions.Enqueue(partition);
self.SignalWakeup();
}, (this, tp), CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
}When the producer is disposed, the sender's CancellationToken is cancelled, Task.Delay terminates early, the continuation returns immediately, and the captured reference is released promptly. This is consistent with how other background operations in the producer handle disposal-time cancellation.
Overall
The PR is in excellent shape. The algorithmic change is correct, the muted-partition handling is now bounded and sound, the tests are deterministic, and all prior review concerns have been addressed. The one remaining item — passing the cancellation token to DeferReenqueue — is a low-effort fix that eliminates a reference leak on disposal. Once addressed, this PR is ready to merge.
There was a problem hiding this comment.
Code Review (follow-up — commit f6c138b)
This commit addresses the one remaining open issue from the previous review round.
Previously open: DeferReenqueue reference leak on disposal — ✅ Resolved
The fix correctly passes _disposalCts.Token to Task.Delay and adds an early-return guard in the ContinueWith callback:
_ = Task.Delay(Math.Max(delayMs, 1), _disposalCts.Token).ContinueWith(static (t, state) =>
{
if (t.IsCanceled) return;
var (self, partition) = ((RecordAccumulator, TopicPartition))state!;
self._readyPartitions.Enqueue(partition);
self.SignalWakeup();
}, (this, tp), CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);Using _disposalCts.Token (the accumulator's own disposal token) rather than the sender loop's CancellationToken is actually slightly preferable: the accumulator can be cancelled independently of the sender loop's token, so this ensures the timer fires at the earliest possible cancellation signal. The early-return on t.IsCanceled prevents the post-disposal enqueue and wakeup, releasing the captured (this, tp) reference promptly.
The XML doc comment update ("Respects _disposalCts so the timer cancels promptly on disposal, preventing reference leaks from captured state keeping the accumulator alive") correctly explains the intent.
Full status of all previously raised items
| Item | Status |
|---|---|
Dead-code break → Debug.Assert |
✅ Resolved |
UnmutePartition ordering invariant documented |
✅ Resolved |
| Muted-partition queue grows unboundedly | ✅ Resolved |
_readyPartitions field comment — inaccurate boundedness claim |
✅ Resolved |
nowMs parameter unused |
✅ Resolved |
Backoff per-cycle churn → DeferReenqueue timer |
✅ Resolved |
HasPendingWork O(n) inconsistency — fast-path added |
✅ Resolved |
Ready() XML doc — stale linger-check wording |
✅ Resolved |
Ready_MultiplePartitions missing count assertion |
✅ Resolved |
Backoff test using Task.Delay(200) — timing-dependent |
✅ Resolved |
DeferReenqueue uses CancellationToken.None — reference leak |
✅ Resolved in this commit |
One minor observation (non-blocking)
DeferReenqueue captures this via the state tuple. If _disposalCts itself is disposed before Task.Delay is awaited internally (which is possible in adversarial timing during DisposeAsync), Task.Delay can throw ObjectDisposedException when the cancellation token source is disposed. In practice, CancellationTokenSource.Dispose after Cancel is safe because Task.Delay has already observed the cancellation at that point, but if Dispose races with the Task.Delay internals registering a cancellation callback, the exception would propagate as an unobserved task exception (the _ = discard swallows it). This is the same pattern used elsewhere in the codebase for fire-and-forget tasks, and it is an accepted trade-off — not a regression introduced here.
Summary
All review items are now resolved. The implementation is correct, well-tested (9 unit tests covering the main behavioral invariants), and consistent with the project's allocation and threading guidelines. The DeferReenqueue disposal-safety fix is the right approach. This PR is ready to merge.
With push-based notifications, the sender wakes via SignalWakeup() from batch sealing or DeferReenqueue timer expiry. The old dynamic nextCheckDelayMs was never modified after initialization, always falling through to the 100ms safety-net poll. Remove the dead variable and document the 100ms fallback rationale.
There was a problem hiding this comment.
Follow-up Review — Round 3
All items from the previous two review rounds have been resolved. Here's what I verified against the current diff:
| Previous Item | Status |
|---|---|
break on failed dequeue → Debug.Assert |
✅ Resolved |
UnmutePartition ordering invariant comment |
✅ Resolved |
Backoff per-cycle churn → DeferReenqueue timer |
✅ Resolved |
HasPendingWork fast-path with !_readyPartitions.IsEmpty |
✅ Resolved |
DeferReenqueue CancellationToken.None reference leak |
✅ Resolved — _disposalCts.Token cancels the delay, t.IsCanceled guard in continuation |
Stale XML doc / nowMs parameter |
✅ Resolved |
Observations on the current diff
DeferReenqueue looks correct. The value-tuple state object (this, tp) avoids a closure allocation. Math.Max(delayMs, 1) prevents a zero-delay degenerate case. TaskContinuationOptions.ExecuteSynchronously is appropriate since the continuation only enqueues and signals. _disposalCts.Token cancels the timer on disposal and the t.IsCanceled guard prevents a post-disposal enqueue.
Unknown-leader re-enqueue is immediate, not deferred. When the leader is unknown, the partition is re-enqueued immediately (_readyPartitions.Enqueue(tp)) rather than via DeferReenqueue. This means it re-appears on the next sender cycle. This is intentional — the sender triggers a metadata refresh on unknownLeadersExist = true, so the loop naturally backs off via the refresh round-trip latency. The count-snapshot loop prevents this from infinite-cycling within a single Ready() call. Correct as-is.
HasPendingWork fast-path may give a false positive if stale duplicate notifications remain in _readyPartitions after the associated batches have been drained. In that case, HasPendingWork returns true, Ready() runs, drains the stale entries (each finds pd.PeekFirst() == null and skips), then HasPendingWork falls through to the O(n) scan which correctly returns false. Net effect is at most one extra idle sender cycle. Not a bug — worth knowing the false-positive is possible.
One minor test gap (non-blocking): Neither backoff test exercises the actual DeferReenqueue timer path end-to-end (i.e., backoff > 0 → timer fires → partition becomes ready). The Ready_BackoffBatch_ActiveBackoff_DoesNotReport test verifies no report during backoff, and Ready_BackoffBatch_ExpiredBackoff_ReportsReady tests the already-expired case via Reenqueue directly. Testing the timer itself would require a small await Task.Delay and would be timing-sensitive — arguably not worth the fragility. The DeferReenqueue logic is simple enough to trust without it.
Summary
The implementation is correct, complete, and consistent with the project's allocation and threading guidelines. All previous feedback has been addressed. No blocking issues. Ready to merge.
…questSize Ready() consumes all notifications from _readyPartitions, but DrainBatchesForOneNode breaks early when maxRequestSize is exceeded. The skipped partitions' notifications were permanently lost — they aren't muted (so UnmutePartition never fires), and no new seals create notifications for their existing batches. This caused orphaned batch delivery timeouts in backpressure integration tests. Add ReenqueueSkippedPartitions to blindly re-enqueue remaining partitions when the size break triggers. No locking or filtering needed: Ready()/Drain re-validate on the next cycle, and duplicate entries are harmless per the ConcurrentQueue invariant.
There was a problem hiding this comment.
Round 4 Review — Final Commit
The last commit (c6fd8e48) was added after the Round 3 sign-off. It fixes a correctness bug in the push-based notification design where DrainBatchesForOneNode breaks early on maxRequestSize but Ready() had already consumed all the notifications. Without this fix, the skipped partitions' sealed batches would be permanently orphaned.
ReenqueueSkippedPartitions — analysis
private void ReenqueueSkippedPartitions(
IReadOnlyList<TopicPartition> partitions, int startIndex, int fromOffset, int count)
{
for (var j = fromOffset; j < count; j++)
_readyPartitions.Enqueue(partitions[(startIndex + j) % count]);
}Correctness: The circular index (startIndex + j) % count matches the rotation logic in DrainBatchesForOneNode, so the right partitions are re-enqueued. fromOffset is the index of the first partition that didn't fit, so partitions [0, fromOffset) were drained and notifications are correctly NOT re-enqueued for them.
Edge cases:
- If
fromOffset == count(all partitions fit), the loop is a no-op. ✓ - If
ready.Count == 0(first batch, conditionready.Count > 0is false), the break is never reached regardless of size — so the first batch always makes progress andReenqueueSkippedPartitionsis never called withfromOffset == 0. ✓ - The
ready.Count > 0guard in the break condition means this path is only hit on second+ partition, so the fix is both necessary and correctly scoped. ✓
Thread safety: DrainBatchesForOneNode is called from the sender thread, the same thread that is the sole consumer of _readyPartitions. Enqueueing from the sender thread is safe (multi-producer MPSC semantics are maintained). ✓
Test
The regression test Drain_MaxRequestSizeExceeded_ReenqueuesSkippedPartitions is deterministic and correctly asserts the observable behavior: after a size-limited drain, a subsequent Ready() call must still find the skipped partitions. The test uses maxRequestSize: 1 which is an extreme case but effective for forcing the break path. ✓
Summary
This commit closes a real correctness hole that the push-based design introduced. The fix is minimal, correct, and well-tested. No issues found.
All feedback across all four rounds is now resolved. Ready to merge.
…ning batches After DrainBatchesForOneNode removes one batch via PollFirst(), any remaining sealed batches in the same partition's deque had no pending notification in _readyPartitions. The code relied on the mute/unmute cycle (UnmutePartition) to re-enqueue, but there is a timing gap between drain and mute where the sender could sleep with no notifications — orphaning the remaining batches. Now PeekFirst() is checked immediately after PollFirst() (still under the spinlock), and if more batches exist the partition is re-enqueued. Duplicates are harmless per the ConcurrentQueue invariant.
There was a problem hiding this comment.
Code Review — Final Round (latest commits)
This review covers the two most recent commits: 08885779 (cleanup: remove dead nextCheckDelayMs variable) and 1ad24c06 (fix: re-enqueue notification after PollFirst when partition has remaining batches), plus verifying the overall state of the PR.
Status of all previously raised items
| Item | Status |
|---|---|
Dead-code break → Debug.Assert |
✅ Resolved |
UnmutePartition ordering invariant documented |
✅ Resolved |
| Muted-partition queue grows unboundedly per sender cycle | ✅ Resolved — muted path drops notification; UnmutePartition re-notifies |
_readyPartitions field comment — inaccurate boundedness claim |
✅ Resolved |
nowMs parameter unused |
✅ Resolved — removed from signature |
| Backoff per-cycle churn | ✅ Resolved — DeferReenqueue defers re-enqueue via timer |
DeferReenqueue CancellationToken.None reference leak on disposal |
✅ Resolved — _disposalCts.Token used |
HasPendingWork O(n) inconsistency — fast-path added |
✅ Resolved |
Ready() XML doc — stale linger-check wording |
✅ Resolved |
Ready_MultiplePartitions missing count assertion |
✅ Resolved |
Backoff test using Task.Delay(200) — timing-dependent |
✅ Resolved — split into two deterministic tests |
Orphaned partitions when Drain hits maxRequestSize |
✅ Resolved — ReenqueueSkippedPartitions added |
Remaining batches orphaned after PollFirst |
✅ Resolved — this commit |
Review of 1ad24c06: re-enqueue after PollFirst
batch = pd.PollFirst();
if (pd.PeekFirst() is not null)
_readyPartitions.Enqueue(tp);Correctness: PollFirst removes the head batch for delivery; PeekFirst then checks if more sealed batches remain in the same partition's deque. If so, a notification is enqueued so the sender does not miss the remaining batches. The comment correctly identifies the timing gap — the mute/unmute cycle would eventually re-enqueue, but there is a window between drain and mute where the sender could sleep with no pending notification.
Thread safety: Both PollFirst and PeekFirst are called under the existing spinlock inside DrainBatchesForOneNode. The _readyPartitions.Enqueue call is from the sender thread (sole consumer but concurrent enqueuer), which is safe. ✓
Regression test: Drain_MultipleBatchesSamePartition_ReenqueuesRemaining validates the fix end-to-end: 30 records seal 3 batches on one partition, Drain takes one, and a subsequent Ready() must still find the remaining batches. ✓
Review of 08885779: dead nextCheckDelayMs variable removed
The old implementation accumulated a minimum check delay across all partitions during Ready() and returned it as NextCheckDelayMs. The new push-based implementation always returns 100 as a safety-net fallback, making the accumulation logic dead code. The cleanup removes the dead variable and adds a comment documenting the 100ms rationale. Clean and correct.
One minor observation (non-blocking)
DrainBatchesForOneNode now has two re-enqueue paths that fire from the sender thread in the same drain cycle:
ReenqueueSkippedPartitions— for partitions not reached due tomaxRequestSize_readyPartitions.Enqueue(tp)afterPollFirst— for the partition that was drained but has more batches
Both correctly re-enqueue for the next sender cycle. The two paths are complementary and there is no overlap: the ReenqueueSkippedPartitions path fires on break (before PollFirst), while the PeekFirst path fires after a successful drain. No double-enqueue for the same partition within a single drain call. ✓
Summary
All review items across all rounds have been addressed. The implementation is correct, well-commented, and consistent with the project's allocation and threading guidelines. The test suite covers all the key behavioral invariants including the two new regression tests added in recent commits.
Ready to merge.
Summary
Ready()with aConcurrentQueue<TopicPartition>that only contains partitions with newly sealed/reenqueued batches, making it O(n_ready_partitions)SealCurrentBatchUnderLock,SealBatchesAsync(linger/flush), andReenqueue(retry)UnmutePartitionre-enqueues if the partition has sealed batches so notifications are not lost while mutedTest plan
Closes #520