Skip to content

perf: replace O(n) partition scan in Ready() with push-based notification#527

Merged
thomhurst merged 12 commits intomainfrom
perf/push-based-ready-notification
Mar 16, 2026
Merged

perf: replace O(n) partition scan in Ready() with push-based notification#527
thomhurst merged 12 commits intomainfrom
perf/push-based-ready-notification

Conversation

@thomhurst
Copy link
Owner

Summary

  • Replaces O(n_all_partitions) dictionary enumeration in Ready() with a ConcurrentQueue<TopicPartition> that only contains partitions with newly sealed/reenqueued batches, making it O(n_ready_partitions)
  • Notification is pushed at all batch-entry points: SealCurrentBatchUnderLock, SealBatchesAsync (linger/flush), and Reenqueue (retry)
  • UnmutePartition re-enqueues if the partition has sealed batches so notifications are not lost while muted
  • Uses a count-snapshot loop to prevent infinite re-enqueue cycles for retry-backoff and unknown-leader partitions

Test plan

  • 8 new unit tests covering: sealed batch detection, no-seal case, muted partitions, unknown leaders, drain-once semantics, unmute re-notification, multi-partition filtering, linger-based seal
  • Full unit test suite passes (3015 tests)

Closes #520

Copy link

@claude claude bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This PR replaces the O(n_all_partitions) scan in Ready() with a push-based notification queue (ConcurrentQueue<TopicPartition>). The intent is sound and the implementation is generally clean. I have a few concerns to raise before merging.


1. Correctness concern: _readyPartitions.Count snapshot is not atomic with the subsequent dequeues

var count = _readyPartitions.Count;

for (var i = 0; i < count; i++)
{
    if (!_readyPartitions.TryDequeue(out var tp))
        break;
    ...
}

ConcurrentQueue<T>.Count is O(n) and not atomic with the dequeue operations that follow. Because Ready() is documented as "only called from the sender thread" (single consumer), the count snapshot correctly prevents re-enqueued items from causing infinite loops — that part is fine. However, it is worth noting that the break on a failed TryDequeue is effectively dead code in the single-consumer case: no other thread can dequeue, so if count items were present when .Count was sampled, all count TryDequeue calls will succeed. The break gives a false sense of safety and could mask a logic bug if the single-consumer invariant is ever relaxed. Consider asserting the dequeue succeeds rather than silently breaking.


2. Missing notification: muted partitions are silently discarded from the queue without re-enqueue

if (_mutedPartitions.ContainsKey(tp))
    continue;   // ← notification is consumed but not re-queued

When a muted partition is dequeued and skipped, the notification is discarded. The PR addresses this in UnmutePartition() by re-enqueueing if the deque has a batch at that time. But there is a subtle race:

  1. Thread A: Ready() dequeues tp, checks _mutedPartitionsnot muted yet → proceeds normally.
  2. Thread B: MutePartition(tp) runs between the mute check and Drain().
  3. Drain() then skips the partition anyway via its own mute check.

This is pre-existing and unavoidable without heavier synchronization, so it is not a regression. But the inverse race is new and worth noting:

  1. Thread B: UnmutePartition(tp) runs, peeks the deque → has batches → enqueues notification.
  2. Thread A: Ready() dequeues the notification but then sees the partition is still muted (the TryRemove in UnmutePartition happened before the peek but Ready() runs before the remove is visible due to CPU reordering).

ConcurrentDictionary.TryRemove uses Volatile.Write-strength ordering on the internal array, and ContainsKey reads with Volatile.Read-strength, so in practice this race is extremely unlikely on x86. Nevertheless, the comment in UnmutePartition should document the invariant that TryRemove always precedes the Enqueue.


3. Backoff re-enqueue loop will fire every sender cycle until backoff expires

// Re-enqueue so the sender loop picks this partition up on the next cycle
// after the backoff expires.
_readyPartitions.Enqueue(tp);

When a batch is in backoff, the partition is re-enqueued every sender cycle and then immediately re-skipped. This is O(1) per cycle but it means the sender loop wakes and runs Ready() continuously for the duration of the backoff window. The old O(n) scan had the same problem structurally (it would re-check every cycle), so this is not a regression. But now that the queue is the canonical source of truth, it would be cleaner to defer the re-enqueue until the backoff expires — for example, via a Timer callback or by storing pending-backoff partitions in a separate min-heap. This is a future enhancement, not a blocker, but worth a TODO comment.


4. HasPendingWork() still uses an O(n) scan and is now inconsistent with Ready()

// Still need O(n) scan for sealed-but-not-yet-sent batches (pd.Count > 0)
foreach (var kvp in _partitionDeques)
{
    if (kvp.Value.Count > 0)
        return true;
}

This is called in the sender loop shutdown path. After this PR, _readyPartitions is the authoritative list of partitions with sealed batches, so HasPendingWork() could check !_readyPartitions.IsEmpty as a fast-path before (or instead of) the full scan. This would make the shutdown path consistent with the new design. Again, not a regression — the comment already acknowledges the O(n) — but a natural follow-up.


5. Duplicate _readyPartitions.Enqueue calls on the same partition are harmless but accumulate

With multiple append workers producing to the same partition, each sealed batch triggers one enqueue. Because Ready() peeks the head of the deque (which represents only the oldest ready batch per partition), a partition with N sealed batches will appear N times in _readyPartitions but Ready() will only add the broker ID to readyNodes once per cycle. The extra dequeues are O(1) and cheap, but the queue can grow proportionally to the number of sealed batches rather than the number of partitions. Under sustained high-throughput load with many small batches, this queue could hold thousands of entries before the sender thread drains them.

This is a bounded-by-design issue (the queue can have at most one entry per sealed batch, and the total number of sealed batches is bounded by buffer memory), so it does not introduce unbounded growth. A de-duplication step (e.g., a ConcurrentDictionary<TopicPartition, byte> set instead of a queue) would keep the queue size bounded by partition count, but at higher constant cost per enqueue. Given the throughput profile, the current approach is probably the right trade-off — just worth making explicit in the comment.


6. Test quality: good coverage, one missing case

The 8 unit tests cover the main scenarios well. One gap: there is no test verifying that a partition in backoff is re-enqueued and eventually becomes ready after the backoff window. The Ready_UnknownLeader_SetsUnknownLeadersExist test covers the unknown-leader re-enqueue path, which is structurally identical, so this is low priority. Still worth adding for completeness given how critical retry correctness is.


Summary

The design is correct and the performance improvement is real: for a producer writing to a handful of active partitions out of thousands, this eliminates the O(n_all) scan entirely. The implementation is clean, well-commented, and the tests cover the important behavioral properties.

The muted-partition handling in UnmutePartition is the subtlest part of the patch and deserves a documentation comment about the ordering invariant (TryRemove before Enqueue). Items 3 and 4 are natural follow-ups. No blocking issues found.

Copy link

@claude claude bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow-up Review (after second commit)

The second commit (911055e49b) addressed some surface-level feedback: comment text was deduplicated across the three _readyPartitions.Enqueue call sites, a stale comment was removed from the test, and the BootstrapServers array initializer was updated to a collection expression. These are clean-ups.

However, the substantive issues raised in the previous review were not addressed. Summarising their current status:


Still open: break on failed TryDequeue is dead code in the single-consumer model

var count = _readyPartitions.Count;
for (var i = 0; i < count; i++)
{
    if (!_readyPartitions.TryDequeue(out var tp))
        break;   // ← can never trigger while Ready() is the sole consumer
    ...
}

TryDequeue can only return false if another consumer has raced ahead and emptied the queue. Because Ready() is documented as exclusively called from the sender thread, no such race exists. The break silently swallows a logic error that would otherwise surface as missing partitions. Replacing it with a Debug.Assert or throw would make the invariant explicit and fail loudly if the single-consumer assumption is ever violated:

var dequeued = _readyPartitions.TryDequeue(out var tp);
Debug.Assert(dequeued, "TryDequeue failed despite being the sole consumer of _readyPartitions");

Still open: UnmutePartition ordering invariant not documented

The TryRemove must precede the Enqueue to ensure Ready() does not see the notification but then observe the partition as still muted. This invariant is upheld by the current code but is not called out in a comment, making it fragile under future refactoring.


Still open: Backoff re-enqueue busy-loops for the duration of the backoff window

When a batch is in backoff, the partition is re-enqueued at the end of every Ready() cycle. On the next sender wakeup the loop immediately re-skips it. This is functionally correct but wastes CPU during high-retry scenarios. A // TODO: defer re-enqueue until backoff expires comment would signal the known trade-off and prevent reviewers from thinking this was unintentional.


Still open: No test for partition re-enqueue after backoff expiry

The 8 tests cover unknown-leader re-enqueue (structurally identical to backoff), mute/unmute, drain-once, and linger seal. The backoff-specific path (the _readyPartitions.Enqueue(tp) at line ~924) has no direct test. Given that retry correctness is critical for producer reliability, a test simulating a batch in backoff that becomes ready after the window would be valuable.


Not blocking, but still worth a follow-up: HasPendingWork() O(n) inconsistency

After this PR, _readyPartitions.IsEmpty is a cheap O(1) proxy for "no sealed batches ready to send." The existing O(n) scan in HasPendingWork() could use this as a fast-path. Not a regression, but the asymmetry between the two code paths will likely confuse the next reader.


Overall

The core algorithmic change is correct and the performance improvement is real. The second commit shows the author is responsive to feedback. The remaining items are mostly hardening (the dead-code break, missing ordering comment, missing backoff test) rather than correctness bugs. Addressing at least the break/assert and the ordering comment before merge would make the invariants explicit and the code more maintainable.

Copy link

@claude claude bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review: perf: replace O(n) partition scan in Ready() with push-based notification

This is a well-motivated and well-scoped performance optimization. The approach is sound and the implementation is generally clean. Below are my observations, ranging from correctness concerns to style suggestions.


Correctness: Duplicate notifications in _readyPartitions queue

File: src/Dekaf/Producer/RecordAccumulator.csSealCurrentBatchUnderLock, SealBatchesAsync, Reenqueue

The _readyPartitions queue can accumulate multiple entries for the same TopicPartition across calls. For example, if a partition has 5 batches sealed in sequence, there will be 5 entries for that TP in the queue. Each pass through Ready() dequeues one, peeks the head batch (which may already have been sent and replaced), and adds the node. This isn't incorrect — the sender will just call Ready() multiple times and Drain() will drain one batch per call, which is exactly the existing per-call contract. However, it means the queue's length is not bounded by the number of partitions; it's bounded by the number of sealed batches across all partitions.

Two consequences worth noting:

  1. The count snapshot at the top of Ready() reads _readyPartitions.Count. In a high-throughput scenario where batches are being sealed continuously by producer threads while Ready() runs (it's single-consumer, so no concurrent drain, but producers are concurrent enqueues), the snapshot is a point-in-time read on a ConcurrentQueue. This is fine for the single-consumer/multi-producer pattern because Count is Volatile.Read internally, but it's worth ensuring the intent is documented — the snapshot is preventing re-enqueued items (muted/backoff/unknown-leader) from inflating future counts unboundedly.

  2. Queue size under sustained load: with N partitions each producing M batches, the queue can hold up to N*M entries instead of N. The practical impact is low because Ready() is called on the sender thread frequently, but a comment noting this could help future readers understand the amortized behavior.


Correctness: Re-enqueue for muted partitions creates FIFO stacking

File: src/Dekaf/Producer/RecordAccumulator.cs, lines ~905–910

When a partition is muted, every Ready() cycle re-enqueues it at the tail and continues. If a partition is muted for a long time and many batches seal for it, the queue will hold many entries for that partition (the re-enqueued ones plus newly pushed ones). When it is eventually unmuted, UnmutePartition enqueues one more notification, but all the stale ones are already in the queue. This is safe because head is null handles the case where the deque is empty, but it could cause a burst of redundant readyNodes.Add(leader.NodeId) calls for the same node on the unmute cycle. The sender is resilient to this (it's a HashSet<int>), so it's not a bug — just a note that the queue can "pile up" for long-muted partitions.


Potential issue: pd is null drop without re-enqueue

File: src/Dekaf/Producer/RecordAccumulator.cs, lines ~913–915

var pd = _partitionDeques.GetValueOrDefault(tp);
if (pd is null)
    continue;

If pd is null (which should never happen in practice — GetOrCreateDeque is called at append time, so the deque exists before any notification is pushed), the notification is silently dropped. Since the deque cannot be null at notification time (the notification is pushed inside SealCurrentBatchUnderLock which operates on a pd already in the dictionary), this is dead-defensive code. A Debug.Assert(pd is not null) or a log warning here would make any future regression immediately visible rather than silently losing batches.


Minor: nowMs parameter is unused

File: src/Dekaf/Producer/RecordAccumulator.cs, line 890

internal (int NextCheckDelayMs, bool UnknownLeadersExist) Ready(
    MetadataManager metadataManager, long nowMs, HashSet<int> readyNodes)

The nowMs parameter is accepted but never read in the new implementation (the old scan used it for linger checks, but the new implementation defers linger decisions to the sealing site). If the callers still pass a meaningful timestamp, it might be worth either removing the parameter or noting in the doc comment that it's intentionally unused as a no-op legacy parameter kept for call-site compatibility. Unused parameters in hot-path signatures can be misleading.


Minor: SealBatchesAsync still enqueues inside a foreach over _partitionDeques

File: src/Dekaf/Producer/RecordAccumulator.cs, lines ~2222–2254

SealBatchesAsync still iterates over _partitionDeques (O(n_all_partitions)) when sealing for linger/flush. This is correct and expected — sealing itself must visit all partitions to decide which ones are due. The push notification added inside that loop is the right place. This is just a note to confirm that SealBatchesAsync is not on the steady-state hot path (it runs on the linger timer or on FlushAsync), so the O(n) enumeration there is acceptable.


Test: Ready_AfterSealingBatch_ReturnsReadyNode — fragile trigger assumption

File: tests/Dekaf.Tests.Unit/Producer/RecordAccumulatorReadyTests.cs, lines ~93–113

The test appends 10 records with batchSize: 50 and expects a batch to seal. This relies on the encoding overhead of 10 empty-key/empty-value records exceeding 50 bytes. This is plausible but the test would be more robust if it used a smaller batch size (e.g., batchSize: 1) or appended records with a non-empty value that provably exceeds the threshold. As written, if record encoding overhead changes, the test may silently start testing the "no sealed batch" path instead.

Similarly for Ready_MutedPartition_SkipsNotification, Ready_UnknownLeader_SetsUnknownLeadersExist, Ready_CalledTwice_SecondCallReturnsEmpty, and Ready_UnmutePartition_ReNotifiesReadyPartition — all use the same 10 records / batchSize: 50 pattern. A shared helper like AppendUntilSealed(accumulator, topic, partition, pool) that returns only after a batch is confirmed sealed (e.g., by checking _readyPartitions indirectly via Ready() returning Contains(nodeId)) would eliminate this fragility and centralize the trigger logic.


Test: Ready_MultiplePartitions_OnlyReportsReadyOnes — asserts correct outcome but not the key invariant

File: tests/Dekaf.Tests.Unit/Producer/RecordAccumulatorReadyTests.cs, lines ~306–345

The test verifies that readyNodes.Contains(1) — i.e., the ready node is reported. The more interesting invariant for this test (given that it's explicitly testing "only ready ones") is that readyNodes.Count == 1, meaning the partitions that did NOT seal (partition 7 and partitions 0–2, 4–6, 8–9) are not included. Adding await Assert.That(readyNodes).HasCount().EqualTo(1) would make the test definitively validate the "only reports ready ones" claim.


Nit: Comment drift in Ready() XML doc

File: src/Dekaf/Producer/RecordAccumulator.cs, line 882

"...whose head batch is sendable (batch is complete, linger expired, or flush in progress)"

The parenthetical linger expired, or flush in progress describes the old sealing criteria that were checked at Ready() time in the O(n) version. In the new model, Ready() never checks linger or flush state — those are checked at seal time. The summary could be simplified to: "...whose head batch is sendable (sealed by append, linger expiry, or flush)."


Overall Assessment

The approach is correct, well-commented, and the test coverage is meaningful. The count-snapshot loop to avoid infinite re-enqueue cycles is a clean solution. The UnmutePartition re-notification is complete. The main items worth addressing before merge are:

  1. Consider a Debug.Assert for the pd is null path to surface any future regression.
  2. Clarify or remove the unused nowMs parameter to avoid reader confusion.
  3. Strengthen the Ready_MultiplePartitions_OnlyReportsReadyOnes test to assert readyNodes.Count == 1.
  4. Document that the queue is not partitions-deduplicated — its length is proportional to sealed batches, not distinct partitions.

None of the above are blockers; items 1–3 are low-effort improvements and item 4 is purely documentation.

thomhurst added a commit that referenced this pull request Mar 16, 2026
…ackoff test

- Replace dead-code `break` in Ready() with Debug.Assert for TryDequeue invariant
- Document UnmutePartition ordering invariant (TryRemove before Enqueue)
- Add TODO comment for deferring backoff re-enqueue to avoid per-cycle churn
- Add fast-path check (!_readyPartitions.IsEmpty) in HasPendingWork() to skip O(n) scan
- Document that duplicate enqueue entries are harmless and bounded by buffer memory
- Add unit test verifying batch re-enqueue during backoff and readiness after expiry
Copy link

@claude claude bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This is a solid, well-scoped performance improvement. The core idea — replacing O(n_all_partitions) dictionary enumeration with a push-based ConcurrentQueue<TopicPartition> — is correct and the implementation is careful about the edge cases. The comments are thorough and the test coverage is good. A few observations below.

Correctness concerns

HasPendingWork still has a partial gap (minor)

The comment at line 2192 says "Still need O(n) scan for sealed-but-not-yet-sent batches (pd.Count > 0)". The explanation in the comment is that the _readyPartitions queue can be empty while a partition deque still has sealed batches — specifically the case where Ready() already dequeued the notification but Drain() has not yet consumed the batch. That's a legitimate case, but it's worth noting that this makes HasPendingWork subtly depend on the caller calling Ready() before Drain() in a tight loop. If Ready() and Drain() are always paired (sender thread only), this is fine; just worth confirming no path calls HasPendingWork in a context where Ready() was already called but a partition notification was successfully dequeued without the corresponding batch being taken.

Muted partition re-enqueue creates unbounded queue growth during a long mute window

When a partition is muted, every Ready() call re-enqueues it:

_readyPartitions.Enqueue(tp); // muted path
continue;

The comment says "The total duplicate count is bounded by the number of sealed batches", but a muted partition keeps getting re-enqueued on every Ready() call regardless of whether any new batches were sealed. If the sender loop runs at high frequency (e.g. during a retry storm) and a partition is muted for a long time, the queue can grow unboundedly. Consider tracking whether the muted partition is already in the queue (e.g. a ConcurrentDictionary<TopicPartition, byte> of "pending notification" entries) to bound duplicates to one-per-partition rather than one-per-sender-cycle.

The count-snapshot loop is a subtle contract

var count = _readyPartitions.Count;
for (var i = 0; i < count; i++)

This works correctly given the single-consumer constraint, but ConcurrentQueue.Count takes a full fence and is O(n) internally (it enumerates segments). For a queue with many entries this may be non-trivial. An alternative that avoids the Count call is to mark a sentinel before entering the loop, or to swap with an empty queue atomically. The sentinel approach is not compatible with ConcurrentQueue, but a simple alternative is to drain into a local List<TopicPartition> (which the caller already owns as readyNodes context) and re-enqueue the deferred ones afterward.

Minor observations

TODO left in production code

// TODO: defer re-enqueue until backoff expires to avoid per-cycle churn
_readyPartitions.Enqueue(tp);

The TODO is fine to leave as a known limitation, but it directly implies the muted-partition unbounded growth concern above. If this is a known open issue, a tracking issue reference would be more durable than an inline TODO.

Debug.Assert on single-consumer guarantee is good

The Debug.Assert(dequeued, ...) on the TryDequeue result is a nice safety net. It will surface any violation of the single-consumer invariant in debug builds.

Test uses Task.Delay for timing (fragile on slow CI)

The Ready_BackoffBatch_ReenqueuedThenBecomesReadyAfterExpiry test uses await Task.Delay(150) to wait for a 100ms backoff to expire. Per the project guidelines, timing-dependent tests are fragile on slow CI runners. Consider making the test deterministic by setting RetryNotBefore to a timestamp in the past (i.e., already expired) and testing the "backoff not yet expired" branch with a timestamp far in the future, verifying the re-enqueue semantics without relying on wall-clock expiry.

HasPendingWork O(n) scan can be eliminated entirely

Now that every sealed batch pushes a notification to _readyPartitions, the O(n) fallback scan in HasPendingWork could be replaced by a simple atomic sealed-batch counter (similar to _unsealedBatchCount / _inFlightBatchCount). A _sealedBatchCount incremented in SealCurrentBatchUnderLock / SealBatchesAsync and decremented in Drain / PollFirst would make HasPendingWork fully O(1) and remove the remaining O(n) scan.

Summary

The algorithmic improvement is sound and the implementation is careful. The main issue worth addressing before merge is the unbounded queue growth for muted partitions under sustained re-enqueue cycles. The Task.Delay in the backoff test is a secondary concern per project guidelines on flaky tests.

Copy link

@claude claude bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review (follow-up after commits 9fce07d and 998a3a5 + 662d20e)

This is a thorough follow-up that closed out nearly all of the prior review's concerns. Below is a status pass on every previously raised item plus fresh observations on the newest code.


Previously raised — now resolved

Item Status
Dead-code break on failed TryDequeue Fixed: replaced with Debug.Assert(dequeued, ...)
UnmutePartition ordering invariant not documented Fixed: comment added explaining TryRemove must precede Enqueue
HasPendingWork O(n) inconsistency Fixed: !_readyPartitions.IsEmpty fast-path added
Backoff re-enqueue acknowledged Fixed: // TODO: defer re-enqueue until backoff expires comment added
Queue duplicates not documented Fixed: comment at field declaration explains harmlessness and bounded count
nowMs parameter unused Fixed: removed from signature and all call sites
Missing backoff test Fixed: Ready_BackoffBatch_ReenqueuedThenBecomesReadyAfterExpiry added

The review feedback has been addressed diligently across multiple commits. The remaining observations below are either minor or future work.


Remaining concern: muted-partition re-enqueue can grow unboundedly under sustained load

File: src/Dekaf/Producer/RecordAccumulator.cs, lines 905–910

if (_mutedPartitions.ContainsKey(tp))
{
    _readyPartitions.Enqueue(tp);
    continue;
}

The field comment states:

"The total duplicate count is bounded by the number of sealed batches, which is itself bounded by buffer memory."

This is true for the sealing sites (SealCurrentBatchUnderLock, SealBatchesAsync, Reenqueue). However, the muted-partition re-enqueue path breaks this bound: every Ready() call re-enqueues a muted partition, independent of whether any new batches have been sealed. If the sender loop runs at high frequency (e.g., during a retry storm or when nextCheckDelayMs returns a small value) and a partition stays muted for many seconds, the queue grows proportionally to (senderCycleFrequency × muteDurationSeconds) for that partition, which has no relationship to BufferMemory / BatchSize.

Concretely: if the sender cycles every 10ms and a partition is muted for 10 seconds, that one partition contributes ~1000 spurious entries to _readyPartitions. Each entry is a TopicPartition struct (~16 bytes), so the absolute size is small, but the queue's length is no longer bounded by a design invariant. The field comment's claim is therefore inaccurate for the muted path.

Suggested fix: Use a ConcurrentDictionary<TopicPartition, byte> (a "pending notification set") instead of a ConcurrentQueue. Before enqueueing in any path, check TryAdd; if the partition is already present, skip the enqueue. This bounds the queue to at most one entry per partition at all times, eliminates all duplicate entries, and makes the muted-path bounded by partition count rather than sender frequency.

Alternatively, a lower-effort fix is to correct the field comment so it explicitly excludes the muted re-enqueue path from the boundedness claim, and note the practical size remains small in any realistic scenario.


Minor: Ready_MultiplePartitions_OnlyReportsReadyOnes doesn't assert the "only" part

File: tests/Dekaf.Tests.Unit/Producer/RecordAccumulatorReadyTests.cs, line 343

The test name claims it verifies only ready partitions are reported, but the assertion only checks Contains(1). With all 10 partitions on node 1, a broken implementation that reported all nodes would still pass. The key invariant — that the notification queue was not populated by the 9 non-sealing partitions — is not verified.

Add one line:

await Assert.That(readyNodes).HasCount().EqualTo(1);

This confirms the push-based model didn't accidentally enqueue the unsealed partitions.


Minor: Backoff test still uses Task.Delay for timing (fragile on slow CI)

File: tests/Dekaf.Tests.Unit/Producer/RecordAccumulatorReadyTests.cs, lines 400–405

// Act 2: Wait for backoff to expire, then Ready() should report the node.
await Task.Delay(150);

Per the project guidelines, timing-dependent tests are fragile on slow CI runners. The test can be made deterministic by splitting into two independent tests:

  1. Backoff active: set RetryNotBefore = Stopwatch.GetTimestamp() + Stopwatch.Frequency * 10 (10s in the future), call Ready(), assert empty and nextCheckDelayMs ≤ 10000.
  2. Backoff expired: set RetryNotBefore = Stopwatch.GetTimestamp() - 1 (already past), reenqueue, call Ready(), assert ready node is returned.

This avoids any real-time wait, is instantaneous, and is not susceptible to CI scheduler delays.


Nit: XML doc comment still describes the old linger-check behavior

File: src/Dekaf/Producer/RecordAccumulator.cs, line 883

"...whose head batch is sendable (batch is complete, linger expired, or flush in progress)"

linger expired and flush in progress are no longer checked inside Ready() — those decisions happen at seal time. This phrasing from the original O(n) implementation persists. The description could be simplified to:

"...whose head batch is sendable (sealed by append overflow, linger expiry, or flush)"

This is purely a documentation fix with no correctness impact.


Summary

The implementation is correct and the performance improvement is real. All blocking concerns from prior reviews have been resolved. The three remaining items are:

  1. Muted-partition re-enqueue unbounded growth — the field comment's correctness claim is violated by this path; worth either fixing with a deduplication set or correcting the comment.
  2. Ready_MultiplePartitions_OnlyReportsReadyOnes missing count assertion — a one-line addition to make the test actually verify its stated invariant.
  3. Backoff test Task.Delay — low-effort split into two deterministic tests per project guidelines.

None of these are blockers for the core optimization, which is sound and well-implemented.

Copy link

@claude claude bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review (final state — commit 719ac93)

This review covers the full PR including the sixth commit (719ac939) which adds DeferReenqueue and was not covered by the previous review round. Items previously raised and resolved are not repeated here.


New code: DeferReenqueue — no cancellation on disposal

private void DeferReenqueue(TopicPartition tp, int delayMs)
{
    _ = Task.Delay(Math.Max(delayMs, 1)).ContinueWith(static (_, state) =>
    {
        var (self, partition) = ((RecordAccumulator, TopicPartition))state!;
        self._readyPartitions.Enqueue(partition);
        self.SignalWakeup();
    }, (this, tp), CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
}

The CancellationToken.None means this timer fires and attempts _readyPartitions.Enqueue + SignalWakeup even after the RecordAccumulator has been disposed. If the producer is disposed during a retry backoff window, the ContinueWith callback runs on the thread pool, calls SignalWakeup on a disposed semaphore, and enqueues into a queue that nobody is draining. SignalWakeup calls TryReleaseSemaphore, which already guards against disposal-related exceptions (its doc comment mentions "ignoring disposal and max-count races"), so this is unlikely to throw — but it is still a reference leak: the captured (this, tp) tuple keeps the RecordAccumulator instance alive on the finalizer queue until the timer fires.

Suggested fix: Pass the CancellationToken from the sender loop's CancellationTokenSource through to DeferReenqueue and use it in Task.Delay. This is consistent with how other background operations in the producer handle cancellation:

private void DeferReenqueue(TopicPartition tp, int delayMs, CancellationToken cancellationToken)
{
    _ = Task.Delay(Math.Max(delayMs, 1), cancellationToken).ContinueWith(static (t, state) =>
    {
        if (t.IsCanceled) return;
        var (self, partition) = ((RecordAccumulator, TopicPartition))state!;
        self._readyPartitions.Enqueue(partition);
        self.SignalWakeup();
    }, (this, tp), CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
}

If that threading is complex to wire through, at minimum checking IsDisposed (if such a flag exists) at the start of the callback would prevent the post-disposal work.


Still open: muted-partition re-enqueue grows without bound under sustained load

This was raised in the prior review at commit 662d20e1 and is not fixed in the current HEAD. The field comment states:

"The total duplicate count is bounded by the number of sealed batches, which is itself bounded by buffer memory (BufferMemory / BatchSize)."

The muted re-enqueue path:

if (_mutedPartitions.ContainsKey(tp))
{
    _readyPartitions.Enqueue(tp);  // re-enqueued every Ready() call
    continue;
}

...breaks this invariant. Every sender cycle re-enqueues the muted partition regardless of whether new batches were sealed. The field comment's correctness claim is inaccurate for this path. This should be either fixed or the comment corrected to acknowledge the exception. A minimally invasive comment fix:

"Entries from sealing sites are bounded by BufferMemory / BatchSize. Muted-partition re-enqueue entries add one additional entry per sender cycle per muted partition; in practice this is small but unbounded by design invariant."


Still open: Ready_MultiplePartitions_OnlyReportsReadyOnes does not assert the key invariant

The test verifies readyNodes.Contains(1) but the test's name claims it verifies "only ready ones". The test does not assert that non-sealing partitions were excluded. Adding await Assert.That(readyNodes).HasCount().EqualTo(1) would make the test actually verify its stated invariant.


Still open: Ready_BackoffBatch_DeferredReenqueueBecomesReadyAfterExpiry uses Task.Delay(200) for timing

Per project guidelines: "Fix timing-dependent tests by using deterministic synchronization instead of arbitrary delays." The test waits 200ms for a 100ms backoff to expire. On a loaded CI runner this can flake. The test can be split into two deterministic cases without any wall-clock wait:

  1. Backoff not yet expired: set RetryNotBefore = Stopwatch.GetTimestamp() + Stopwatch.Frequency * 10 (10s in the future), call Reenqueue, then Ready(), assert empty.
  2. Backoff already expired: set RetryNotBefore = Stopwatch.GetTimestamp() - 1 (already past), call Reenqueue, then Ready(), assert the node is reported.

This avoids the DeferReenqueue timer entirely in the expired case (since backoffRemaining <= 0 bypasses the DeferReenqueue call) and tests both branches without sleeping.


Nit: XML doc still describes old linger-check behavior

/// whose head batch is sendable (batch is complete, linger expired, or flush in progress)

linger expired and flush in progress are no longer evaluated inside Ready() — those decisions happen at seal time. The parenthetical should read: (sealed by append overflow, linger expiry, or flush).


Summary

The sixth commit (DeferReenqueue) correctly eliminates the per-cycle backoff churn that was the primary concern of the previous review. The core algorithmic change is sound and the implementation is careful. The remaining items are:

  1. DeferReenqueue with CancellationToken.None — the timer callback fires and touches the accumulator post-disposal; should respect the sender cancellation token. This is the only new concern introduced in the sixth commit and is worth addressing before merge.
  2. Muted-partition field comment — inaccurate boundedness claim; fix or qualify the comment.
  3. Ready_MultiplePartitions_OnlyReportsReadyOnes — add one assertion to validate the "only" part of the test name.
  4. Ready_BackoffBatch test — split into two deterministic cases to eliminate Task.Delay(200) per project guidelines.

Copy link

@claude claude bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review (follow-up — commit 6873307)

This review covers only what changed in the latest commit. The core algorithmic change is sound and the implementation is clean.


Previous review items: all resolved except one

Item Status
DeferReenqueue with CancellationToken.None — timer fires post-disposal Still open (see below)
Muted-partition queue growth — re-enqueued every cycle ✅ Fixed: muted path now drops notification; UnmutePartition re-notifies
_readyPartitions field comment — inaccurate boundedness claim ✅ Fixed
Ready() XML doc — stale "linger expired" language ✅ Fixed
Ready_MultiplePartitions_OnlyReportsReadyOnes — missing count assertion ✅ Fixed: Count().IsEqualTo(1) added
Backoff test using Task.Delay(200) — timing-dependent ✅ Fixed: split into two deterministic cases

Remaining open: DeferReenqueue uses CancellationToken.None

private void DeferReenqueue(TopicPartition tp, int delayMs)
{
    _ = Task.Delay(Math.Max(delayMs, 1)).ContinueWith(static (_, state) =>
    {
        var (self, partition) = ((RecordAccumulator, TopicPartition))state!;
        self._readyPartitions.Enqueue(partition);
        self.SignalWakeup();
    }, (this, tp), CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
}

Task.Delay ignores cancellation, so if the accumulator is disposed during a backoff window the timer fires, enqueues into a now-orphaned _readyPartitions, and calls SignalWakeup() on a disposed semaphore. SignalWakeup already tolerates disposal exceptions (its comment says "ignoring disposal and max-count races"), so this will not throw — but the RecordAccumulator instance is kept alive by the captured state tuple until every in-flight timer fires. Under a long retry backoff (e.g. 30 s) this is a meaningful reference leak.

Suggested fix:

private void DeferReenqueue(TopicPartition tp, int delayMs, CancellationToken cancellationToken)
{
    _ = Task.Delay(Math.Max(delayMs, 1), cancellationToken).ContinueWith(static (t, state) =>
    {
        if (t.IsCanceled) return;
        var (self, partition) = ((RecordAccumulator, TopicPartition))state!;
        self._readyPartitions.Enqueue(partition);
        self.SignalWakeup();
    }, (this, tp), CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
}

The sender loop's CancellationToken is already available at the call site in Ready(), so wiring it through is straightforward. When the producer is disposed, the token is cancelled, Task.Delay terminates early, and the continuation returns immediately — releasing the captured reference promptly.


Summary

Everything from the previous round is addressed. One item remains: passing the cancellation token to DeferReenqueue to prevent reference leaks on disposal. Once that is addressed this PR is ready to merge.

…tion queue

Ready() previously iterated all _partitionDeques entries on every sender
loop cycle, acquiring a SpinLock per partition. With hundreds of partitions,
this is O(n_partitions) per cycle on the sender hot path.

Now, when a batch is sealed (SealCurrentBatchUnderLock, SealBatchesAsync) or
reenqueued for retry, the partition's TopicPartition is pushed onto a
ConcurrentQueue. Ready() drains only this queue, converting the cost to
O(n_ready_partitions). UnmutePartition re-enqueues if sealed batches exist
so they are not lost.

Closes #520
When Ready() dequeued a muted partition, the notification was dropped
with `continue`. If UnmutePartition raced with Ready(), the notification
could be permanently lost, causing one partition's batches to never be
sent. Re-enqueue muted partitions so they're retried on the next cycle.
…ackoff test

- Replace dead-code `break` in Ready() with Debug.Assert for TryDequeue invariant
- Document UnmutePartition ordering invariant (TryRemove before Enqueue)
- Add TODO comment for deferring backoff re-enqueue to avoid per-cycle churn
- Add fast-path check (!_readyPartitions.IsEmpty) in HasPendingWork() to skip O(n) scan
- Document that duplicate enqueue entries are harmless and bounded by buffer memory
- Add unit test verifying batch re-enqueue during backoff and readiness after expiry
The push-based notification queue eliminated all uses of nowMs
inside Ready(). Remove the dead parameter from the signature and
all call sites.
Replace immediate re-enqueue of backoff partitions with Task.Delay +
ContinueWith that fires once after backoff expires. This eliminates
per-cycle dequeue/re-enqueue churn during retry backoff windows.

One Task.Delay allocation per retry batch (not per message) is
acceptable per allocation guidelines.
…, docs

- Remove re-enqueue of muted partitions in Ready() to prevent unbounded
  queue growth; rely solely on UnmutePartition() for re-notification.
- Update _readyPartitions field comment to reflect boundedness invariant.
- Add count assertion to Ready_MultiplePartitions_OnlyReportsReadyOnes.
- Replace Task.Delay-based backoff test with two deterministic tests:
  one for active backoff (far future) and one for expired backoff (past).
- Fix XML doc on Ready() to say "sealed by append overflow, linger
  expiry, or flush" instead of old linger-check wording.
…gnature

After rebasing onto main (which merged #526's headerCount change),
test Append calls had null instead of 0 for the int headerCount parameter.
@thomhurst thomhurst force-pushed the perf/push-based-ready-notification branch from 6873307 to ad63565 Compare March 16, 2026 00:49
…erence leaks

Pass _disposalCts.Token to Task.Delay so the timer cancels promptly
on disposal. Without this, the captured (this, tp) tuple keeps the
RecordAccumulator alive until the backoff timer fires, which could
be up to 30 seconds for long retry backoffs.
Copy link

@claude claude bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review (follow-up — commit ad63565)

This is a rebase-conflict fix commit only (updating Append call sites in tests after #526 merged). No production code changed since the previous review round.

Status of all previously raised items

Item Status
Dead-code breakDebug.Assert ✅ Resolved
UnmutePartition ordering invariant documented ✅ Resolved
Muted-partition queue grows unboundedly ✅ Resolved — muted path now drops notification; UnmutePartition re-notifies
_readyPartitions field comment — inaccurate boundedness claim ✅ Resolved
nowMs parameter unused ✅ Resolved — removed from signature and call sites
Backoff per-cycle churn ✅ Resolved — DeferReenqueue defers re-enqueue via timer
HasPendingWork O(n) inconsistency ✅ Resolved — !_readyPartitions.IsEmpty fast-path added
Ready() XML doc — stale linger-check wording ✅ Resolved
Ready_MultiplePartitions_OnlyReportsReadyOnes missing count assertion ✅ Resolved
Backoff test using Task.Delay(200) — timing-dependent ✅ Resolved — split into two deterministic tests
DeferReenqueue uses CancellationToken.None — reference leak on disposal Still open

Remaining issue: DeferReenqueue captures this with no cancellation on disposal

private void DeferReenqueue(TopicPartition tp, int delayMs)
{
    _ = Task.Delay(Math.Max(delayMs, 1)).ContinueWith(static (_, state) =>
    {
        var (self, partition) = ((RecordAccumulator, TopicPartition))state!;
        self._readyPartitions.Enqueue(partition);
        self.SignalWakeup();
    }, (this, tp), CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
}

Task.Delay ignores cancellation here. If the producer is disposed during a retry backoff window (which can be up to 30 seconds in default config), the timer fires on the thread pool after DisposeAsync completes and:

  1. Calls _readyPartitions.Enqueue on an orphaned queue that no sender thread is draining.
  2. Calls SignalWakeup on a disposed SemaphoreSlim. SignalWakeup already tolerates this gracefully (as documented), so no exception is thrown.
  3. Holds the (RecordAccumulator, TopicPartition) state tuple — and therefore a strong reference to this — alive for the full duration of the backoff window after disposal.

The fix is a one-line change at the Task.Delay call site. The sender loop's CancellationToken is already available in SenderLoopAsync at the DeferReenqueue call site — thread it through to the private method:

private void DeferReenqueue(TopicPartition tp, int delayMs, CancellationToken cancellationToken)
{
    _ = Task.Delay(Math.Max(delayMs, 1), cancellationToken).ContinueWith(static (t, state) =>
    {
        if (t.IsCanceled) return;
        var (self, partition) = ((RecordAccumulator, TopicPartition))state!;
        self._readyPartitions.Enqueue(partition);
        self.SignalWakeup();
    }, (this, tp), CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
}

When the producer is disposed, the sender's CancellationToken is cancelled, Task.Delay terminates early, the continuation returns immediately, and the captured reference is released promptly. This is consistent with how other background operations in the producer handle disposal-time cancellation.


Overall

The PR is in excellent shape. The algorithmic change is correct, the muted-partition handling is now bounded and sound, the tests are deterministic, and all prior review concerns have been addressed. The one remaining item — passing the cancellation token to DeferReenqueue — is a low-effort fix that eliminates a reference leak on disposal. Once addressed, this PR is ready to merge.

Copy link

@claude claude bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review (follow-up — commit f6c138b)

This commit addresses the one remaining open issue from the previous review round.


Previously open: DeferReenqueue reference leak on disposal — ✅ Resolved

The fix correctly passes _disposalCts.Token to Task.Delay and adds an early-return guard in the ContinueWith callback:

_ = Task.Delay(Math.Max(delayMs, 1), _disposalCts.Token).ContinueWith(static (t, state) =>
{
    if (t.IsCanceled) return;
    var (self, partition) = ((RecordAccumulator, TopicPartition))state!;
    self._readyPartitions.Enqueue(partition);
    self.SignalWakeup();
}, (this, tp), CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);

Using _disposalCts.Token (the accumulator's own disposal token) rather than the sender loop's CancellationToken is actually slightly preferable: the accumulator can be cancelled independently of the sender loop's token, so this ensures the timer fires at the earliest possible cancellation signal. The early-return on t.IsCanceled prevents the post-disposal enqueue and wakeup, releasing the captured (this, tp) reference promptly.

The XML doc comment update ("Respects _disposalCts so the timer cancels promptly on disposal, preventing reference leaks from captured state keeping the accumulator alive") correctly explains the intent.


Full status of all previously raised items

Item Status
Dead-code breakDebug.Assert ✅ Resolved
UnmutePartition ordering invariant documented ✅ Resolved
Muted-partition queue grows unboundedly ✅ Resolved
_readyPartitions field comment — inaccurate boundedness claim ✅ Resolved
nowMs parameter unused ✅ Resolved
Backoff per-cycle churn → DeferReenqueue timer ✅ Resolved
HasPendingWork O(n) inconsistency — fast-path added ✅ Resolved
Ready() XML doc — stale linger-check wording ✅ Resolved
Ready_MultiplePartitions missing count assertion ✅ Resolved
Backoff test using Task.Delay(200) — timing-dependent ✅ Resolved
DeferReenqueue uses CancellationToken.None — reference leak ✅ Resolved in this commit

One minor observation (non-blocking)

DeferReenqueue captures this via the state tuple. If _disposalCts itself is disposed before Task.Delay is awaited internally (which is possible in adversarial timing during DisposeAsync), Task.Delay can throw ObjectDisposedException when the cancellation token source is disposed. In practice, CancellationTokenSource.Dispose after Cancel is safe because Task.Delay has already observed the cancellation at that point, but if Dispose races with the Task.Delay internals registering a cancellation callback, the exception would propagate as an unobserved task exception (the _ = discard swallows it). This is the same pattern used elsewhere in the codebase for fire-and-forget tasks, and it is an accepted trade-off — not a regression introduced here.


Summary

All review items are now resolved. The implementation is correct, well-tested (9 unit tests covering the main behavioral invariants), and consistent with the project's allocation and threading guidelines. The DeferReenqueue disposal-safety fix is the right approach. This PR is ready to merge.

@thomhurst thomhurst enabled auto-merge March 16, 2026 00:54
@thomhurst thomhurst disabled auto-merge March 16, 2026 00:59
With push-based notifications, the sender wakes via SignalWakeup()
from batch sealing or DeferReenqueue timer expiry. The old dynamic
nextCheckDelayMs was never modified after initialization, always
falling through to the 100ms safety-net poll. Remove the dead
variable and document the 100ms fallback rationale.
@thomhurst thomhurst enabled auto-merge March 16, 2026 01:03
Copy link

@claude claude bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow-up Review — Round 3

All items from the previous two review rounds have been resolved. Here's what I verified against the current diff:

Previous Item Status
break on failed dequeue → Debug.Assert ✅ Resolved
UnmutePartition ordering invariant comment ✅ Resolved
Backoff per-cycle churn → DeferReenqueue timer ✅ Resolved
HasPendingWork fast-path with !_readyPartitions.IsEmpty ✅ Resolved
DeferReenqueue CancellationToken.None reference leak ✅ Resolved — _disposalCts.Token cancels the delay, t.IsCanceled guard in continuation
Stale XML doc / nowMs parameter ✅ Resolved

Observations on the current diff

DeferReenqueue looks correct. The value-tuple state object (this, tp) avoids a closure allocation. Math.Max(delayMs, 1) prevents a zero-delay degenerate case. TaskContinuationOptions.ExecuteSynchronously is appropriate since the continuation only enqueues and signals. _disposalCts.Token cancels the timer on disposal and the t.IsCanceled guard prevents a post-disposal enqueue.

Unknown-leader re-enqueue is immediate, not deferred. When the leader is unknown, the partition is re-enqueued immediately (_readyPartitions.Enqueue(tp)) rather than via DeferReenqueue. This means it re-appears on the next sender cycle. This is intentional — the sender triggers a metadata refresh on unknownLeadersExist = true, so the loop naturally backs off via the refresh round-trip latency. The count-snapshot loop prevents this from infinite-cycling within a single Ready() call. Correct as-is.

HasPendingWork fast-path may give a false positive if stale duplicate notifications remain in _readyPartitions after the associated batches have been drained. In that case, HasPendingWork returns true, Ready() runs, drains the stale entries (each finds pd.PeekFirst() == null and skips), then HasPendingWork falls through to the O(n) scan which correctly returns false. Net effect is at most one extra idle sender cycle. Not a bug — worth knowing the false-positive is possible.

One minor test gap (non-blocking): Neither backoff test exercises the actual DeferReenqueue timer path end-to-end (i.e., backoff > 0 → timer fires → partition becomes ready). The Ready_BackoffBatch_ActiveBackoff_DoesNotReport test verifies no report during backoff, and Ready_BackoffBatch_ExpiredBackoff_ReportsReady tests the already-expired case via Reenqueue directly. Testing the timer itself would require a small await Task.Delay and would be timing-sensitive — arguably not worth the fragility. The DeferReenqueue logic is simple enough to trust without it.


Summary

The implementation is correct, complete, and consistent with the project's allocation and threading guidelines. All previous feedback has been addressed. No blocking issues. Ready to merge.

…questSize

Ready() consumes all notifications from _readyPartitions, but
DrainBatchesForOneNode breaks early when maxRequestSize is exceeded.
The skipped partitions' notifications were permanently lost — they
aren't muted (so UnmutePartition never fires), and no new seals
create notifications for their existing batches. This caused
orphaned batch delivery timeouts in backpressure integration tests.

Add ReenqueueSkippedPartitions to blindly re-enqueue remaining
partitions when the size break triggers. No locking or filtering
needed: Ready()/Drain re-validate on the next cycle, and duplicate
entries are harmless per the ConcurrentQueue invariant.
Copy link

@claude claude bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Round 4 Review — Final Commit

The last commit (c6fd8e48) was added after the Round 3 sign-off. It fixes a correctness bug in the push-based notification design where DrainBatchesForOneNode breaks early on maxRequestSize but Ready() had already consumed all the notifications. Without this fix, the skipped partitions' sealed batches would be permanently orphaned.


ReenqueueSkippedPartitions — analysis

private void ReenqueueSkippedPartitions(
    IReadOnlyList<TopicPartition> partitions, int startIndex, int fromOffset, int count)
{
    for (var j = fromOffset; j < count; j++)
        _readyPartitions.Enqueue(partitions[(startIndex + j) % count]);
}

Correctness: The circular index (startIndex + j) % count matches the rotation logic in DrainBatchesForOneNode, so the right partitions are re-enqueued. fromOffset is the index of the first partition that didn't fit, so partitions [0, fromOffset) were drained and notifications are correctly NOT re-enqueued for them.

Edge cases:

  • If fromOffset == count (all partitions fit), the loop is a no-op. ✓
  • If ready.Count == 0 (first batch, condition ready.Count > 0 is false), the break is never reached regardless of size — so the first batch always makes progress and ReenqueueSkippedPartitions is never called with fromOffset == 0. ✓
  • The ready.Count > 0 guard in the break condition means this path is only hit on second+ partition, so the fix is both necessary and correctly scoped. ✓

Thread safety: DrainBatchesForOneNode is called from the sender thread, the same thread that is the sole consumer of _readyPartitions. Enqueueing from the sender thread is safe (multi-producer MPSC semantics are maintained). ✓


Test

The regression test Drain_MaxRequestSizeExceeded_ReenqueuesSkippedPartitions is deterministic and correctly asserts the observable behavior: after a size-limited drain, a subsequent Ready() call must still find the skipped partitions. The test uses maxRequestSize: 1 which is an extreme case but effective for forcing the break path. ✓


Summary

This commit closes a real correctness hole that the push-based design introduced. The fix is minimal, correct, and well-tested. No issues found.

All feedback across all four rounds is now resolved. Ready to merge.

…ning batches

After DrainBatchesForOneNode removes one batch via PollFirst(), any
remaining sealed batches in the same partition's deque had no pending
notification in _readyPartitions. The code relied on the mute/unmute
cycle (UnmutePartition) to re-enqueue, but there is a timing gap
between drain and mute where the sender could sleep with no
notifications — orphaning the remaining batches.

Now PeekFirst() is checked immediately after PollFirst() (still under
the spinlock), and if more batches exist the partition is re-enqueued.
Duplicates are harmless per the ConcurrentQueue invariant.
Copy link

@claude claude bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review — Final Round (latest commits)

This review covers the two most recent commits: 08885779 (cleanup: remove dead nextCheckDelayMs variable) and 1ad24c06 (fix: re-enqueue notification after PollFirst when partition has remaining batches), plus verifying the overall state of the PR.


Status of all previously raised items

Item Status
Dead-code breakDebug.Assert ✅ Resolved
UnmutePartition ordering invariant documented ✅ Resolved
Muted-partition queue grows unboundedly per sender cycle ✅ Resolved — muted path drops notification; UnmutePartition re-notifies
_readyPartitions field comment — inaccurate boundedness claim ✅ Resolved
nowMs parameter unused ✅ Resolved — removed from signature
Backoff per-cycle churn ✅ Resolved — DeferReenqueue defers re-enqueue via timer
DeferReenqueue CancellationToken.None reference leak on disposal ✅ Resolved — _disposalCts.Token used
HasPendingWork O(n) inconsistency — fast-path added ✅ Resolved
Ready() XML doc — stale linger-check wording ✅ Resolved
Ready_MultiplePartitions missing count assertion ✅ Resolved
Backoff test using Task.Delay(200) — timing-dependent ✅ Resolved — split into two deterministic tests
Orphaned partitions when Drain hits maxRequestSize ✅ Resolved — ReenqueueSkippedPartitions added
Remaining batches orphaned after PollFirst ✅ Resolved — this commit

Review of 1ad24c06: re-enqueue after PollFirst

batch = pd.PollFirst();

if (pd.PeekFirst() is not null)
    _readyPartitions.Enqueue(tp);

Correctness: PollFirst removes the head batch for delivery; PeekFirst then checks if more sealed batches remain in the same partition's deque. If so, a notification is enqueued so the sender does not miss the remaining batches. The comment correctly identifies the timing gap — the mute/unmute cycle would eventually re-enqueue, but there is a window between drain and mute where the sender could sleep with no pending notification.

Thread safety: Both PollFirst and PeekFirst are called under the existing spinlock inside DrainBatchesForOneNode. The _readyPartitions.Enqueue call is from the sender thread (sole consumer but concurrent enqueuer), which is safe. ✓

Regression test: Drain_MultipleBatchesSamePartition_ReenqueuesRemaining validates the fix end-to-end: 30 records seal 3 batches on one partition, Drain takes one, and a subsequent Ready() must still find the remaining batches. ✓


Review of 08885779: dead nextCheckDelayMs variable removed

The old implementation accumulated a minimum check delay across all partitions during Ready() and returned it as NextCheckDelayMs. The new push-based implementation always returns 100 as a safety-net fallback, making the accumulation logic dead code. The cleanup removes the dead variable and adds a comment documenting the 100ms rationale. Clean and correct.


One minor observation (non-blocking)

DrainBatchesForOneNode now has two re-enqueue paths that fire from the sender thread in the same drain cycle:

  1. ReenqueueSkippedPartitions — for partitions not reached due to maxRequestSize
  2. _readyPartitions.Enqueue(tp) after PollFirst — for the partition that was drained but has more batches

Both correctly re-enqueue for the next sender cycle. The two paths are complementary and there is no overlap: the ReenqueueSkippedPartitions path fires on break (before PollFirst), while the PeekFirst path fires after a successful drain. No double-enqueue for the same partition within a single drain call. ✓


Summary

All review items across all rounds have been addressed. The implementation is correct, well-commented, and consistent with the project's allocation and threading guidelines. The test suite covers all the key behavioral invariants including the two new regression tests added in recent commits.

Ready to merge.

@thomhurst thomhurst merged commit f36d859 into main Mar 16, 2026
19 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Replace O(n) partition enumeration in Ready() with push-based notification

1 participant