Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Dekaf/Producer/KafkaProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2018,7 +2018,7 @@ private async Task SenderLoopAsync(CancellationToken cancellationToken)
// 1. Check which brokers have sendable data
readyNodes.Clear();
var (nextCheckDelayMs, unknownLeadersExist) = _accumulator.Ready(
_metadataManager, Stopwatch.GetTimestamp(), readyNodes);
_metadataManager, readyNodes);

if (readyNodes.Count > 0)
{
Expand Down
139 changes: 130 additions & 9 deletions src/Dekaf/Producer/RecordAccumulator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,20 @@ public sealed partial class RecordAccumulator : IAsyncDisposable
// skip enumeration when this counter is non-zero.
private int _pendingAwaitedProduceCount;

// Push-based notification queue for partitions with sealed batches ready to send.
// Populated by SealCurrentBatchUnderLock, SealBatchesAsync, and Reenqueue when a batch
// enters a partition deque. Drained by Ready() instead of scanning all _partitionDeques,
// converting O(n_partitions) to O(n_ready_partitions) per sender cycle.
// ConcurrentQueue is lock-free and safe for multi-producer (append workers, linger timer)
// single-consumer (sender thread) usage.
//
// Duplicate entries for the same partition are harmless: Ready() dequeues and checks the
// partition deque, so extra notifications for an already-drained partition are no-ops.
// The queue is bounded: each partition appears at most once per seal event, muted
// partitions are dropped (not re-enqueued), and UnmutePartition re-enqueues only if
// sealed batches exist. Total size <= number of sealed batches <= BufferMemory / BatchSize.
private readonly ConcurrentQueue<TopicPartition> _readyPartitions = new();

// Coordination lock between FlushAsyncCore and ExpireLingerAsyncCore.
// Ensures linger and flush don't both seal batches simultaneously,
// which could cause ordering or double-seal issues.
Expand Down Expand Up @@ -870,23 +884,39 @@ private PartitionDeque GetOrCreateDeque(TopicPartition tp)
=> _partitionDeques.GetOrAdd(tp, static _ => new PartitionDeque());

/// <summary>
/// Checks all partition deques for sendable data, matching Java's RecordAccumulator.ready().
/// Drains the push-based notification queue to find partitions with sendable data.
/// Populates the caller-provided readyNodes set with broker IDs that have at least one
/// partition whose head batch is sendable (batch is complete, linger expired, or flush in progress).
/// partition whose head batch is sendable (sealed by append overflow, linger expiry, or flush).
/// Only called from the sender thread.
///
/// Complexity is O(n_ready_partitions) instead of O(n_all_partitions) because only partitions
/// that had a batch sealed or reenqueued are in the notification queue.
/// </summary>
internal (int NextCheckDelayMs, bool UnknownLeadersExist) Ready(
MetadataManager metadataManager, long nowMs, HashSet<int> readyNodes)
MetadataManager metadataManager, HashSet<int> readyNodes)
{
var nextCheckDelayMs = int.MaxValue;
var unknownLeadersExist = false;

foreach (var kvp in _partitionDeques)
// Snapshot the current queue length to avoid infinite loop: partitions that need
// re-enqueue (backoff, unknown leader) are added back during the loop, but we only
// process items that were present at the start of this call.
var count = _readyPartitions.Count;

for (var i = 0; i < count; i++)
{
var tp = kvp.Key;
var pd = kvp.Value;
var dequeued = _readyPartitions.TryDequeue(out var tp);
Debug.Assert(dequeued, "TryDequeue failed despite being the sole consumer of _readyPartitions");

if (_mutedPartitions.ContainsKey(tp))
{
// Drop the notification; UnmutePartition() will re-enqueue if the
// partition still has sealed batches. This avoids unbounded queue
// growth while a partition stays muted across many sender cycles.
continue;
}

var pd = _partitionDeques.GetValueOrDefault(tp);
if (pd is null)
continue;

ReadyBatch? head;
Expand All @@ -905,7 +935,11 @@ private PartitionDeque GetOrCreateDeque(TopicPartition tp)
if (backoffRemaining > 0)
{
var backoffMs = (int)(backoffRemaining * 1000 / Stopwatch.Frequency);
nextCheckDelayMs = Math.Min(nextCheckDelayMs, backoffMs);

// Defer re-enqueue until backoff expires to avoid per-cycle churn.
// The timer fires once, re-enqueues the partition, and wakes the sender.
// One Task.Delay allocation per retry batch (not per message) is acceptable.
DeferReenqueue(tp, backoffMs);
continue;
}
}
Expand All @@ -918,6 +952,9 @@ private PartitionDeque GetOrCreateDeque(TopicPartition tp)
// Signal the sender to trigger a metadata refresh, matching Java's
// RecordAccumulator.ready() unknownLeadersExist behavior.
unknownLeadersExist = true;

// Re-enqueue so the sender loop retries after metadata refresh.
_readyPartitions.Enqueue(tp);
continue;
}

Expand All @@ -929,7 +966,10 @@ private PartitionDeque GetOrCreateDeque(TopicPartition tp)
readyNodes.Add(leader.NodeId);
}

return (nextCheckDelayMs == int.MaxValue ? 100 : nextCheckDelayMs, unknownLeadersExist);
// With push-based notifications, the sender wakes on SignalWakeup() from batch
// sealing or DeferReenqueue timer expiry. The 100ms fallback is a safety-net poll
// in case a notification is missed (e.g., during disposal races).
return (100, unknownLeadersExist);
}

/// <summary>
Expand Down Expand Up @@ -1013,9 +1053,24 @@ private void DrainBatchesForOneNode(
continue;

if (ready.Count > 0 && size + batch.DataSize > maxRequestSize)
{
// Ready() already consumed notifications for all partitions on this node.
// Re-enqueue this and remaining partitions so they aren't orphaned.
// No filtering needed: Ready()/Drain will re-validate on the next cycle,
// and duplicate entries in _readyPartitions are harmless (documented invariant).
ReenqueueSkippedPartitions(partitions, startIndex, i, count);
break;
}

batch = pd.PollFirst();

// If more sealed batches remain, re-enqueue so they're drained on
// the next cycle. Ready() already consumed the notification for this
// partition. The mute/unmute cycle would eventually re-enqueue via
// UnmutePartition, but there is a timing gap between drain and mute
// where the sender could sleep with no pending notifications.
if (pd.PeekFirst() is not null)
_readyPartitions.Enqueue(tp);
}

if (batch is not null)
Expand All @@ -1029,6 +1084,21 @@ private void DrainBatchesForOneNode(
_drainIndex[nodeId] = lastDrainIndex;
}

/// <summary>
/// Re-enqueues notifications for partitions skipped by <see cref="DrainBatchesForOneNode"/>
/// when the maxRequestSize limit is hit. Ready() already consumed their notifications, so
/// without re-enqueue these partitions' sealed batches would be orphaned — never drained
/// and never re-notified (they aren't muted, so UnmutePartition won't fire for them).
/// Blind re-enqueue without locking: Ready()/Drain re-validate all conditions on the next
/// cycle, and duplicate entries are harmless (documented ConcurrentQueue invariant).
/// </summary>
private void ReenqueueSkippedPartitions(
IReadOnlyList<TopicPartition> partitions, int startIndex, int fromOffset, int count)
{
for (var j = fromOffset; j < count; j++)
_readyPartitions.Enqueue(partitions[(startIndex + j) % count]);
}

/// <summary>
/// Puts a failed batch back at the front of its partition deque for retry.
/// Matches Java's RecordAccumulator.reenqueue() with addFirst.
Expand All @@ -1045,14 +1115,35 @@ internal void Reenqueue(ReadyBatch batch, long nowMs)
else
pd.AddFirst(batch);
}

// Notify Ready() that this partition has a sendable batch.
_readyPartitions.Enqueue(batch.TopicPartition);
SignalWakeup();
}

internal void MutePartition(TopicPartition tp) => _mutedPartitions.TryAdd(tp, 0);

internal void UnmutePartition(TopicPartition tp)
{
// TryRemove must precede Enqueue to ensure Ready() does not observe the notification
// but then find the partition still muted (which would cause a wasted re-enqueue cycle).
// This ordering is safe because Ready() is single-consumer (sender thread) and
// UnmutePartition is also called from the sender thread, so there is no concurrent
// race between the remove and the enqueue.
_mutedPartitions.TryRemove(tp, out _);

// Re-notify so Ready() picks up any sealed batches that were skipped while muted.
if (_partitionDeques.TryGetValue(tp, out var pd))
{
bool hasBatches;
{
using var guard = new SpinLockGuard(ref pd.Lock);
hasBatches = pd.PeekFirst() is not null;
}
if (hasBatches)
_readyPartitions.Enqueue(tp);
}

SignalWakeup(); // Wake sender loop so it can drain the newly-unmuted partition
}
internal bool IsMuted(TopicPartition tp) => _mutedPartitions.ContainsKey(tp);
Expand All @@ -1062,6 +1153,25 @@ internal void SignalWakeup()
TryReleaseSemaphore(_wakeupSignal);
}

/// <summary>
/// Schedules a partition to be re-enqueued into <see cref="_readyPartitions"/> after
/// <paramref name="delayMs"/> milliseconds. Uses a fire-and-forget <see cref="Task.Delay"/>
/// so the sender loop is not churning on partitions still in retry backoff.
/// One allocation per retry batch (not per message) — acceptable per allocation guidelines.
/// Respects <see cref="_disposalCts"/> so the timer cancels promptly on disposal,
/// preventing reference leaks from captured state keeping the accumulator alive.
/// </summary>
private void DeferReenqueue(TopicPartition tp, int delayMs)
{
_ = Task.Delay(Math.Max(delayMs, 1), _disposalCts.Token).ContinueWith(static (t, state) =>
{
if (t.IsCanceled) return;
var (self, partition) = ((RecordAccumulator, TopicPartition))state!;
self._readyPartitions.Enqueue(partition);
self.SignalWakeup();
}, (this, tp), CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
}

/// <summary>
/// Releases a semaphore if not already signaled, ignoring disposal and max-count races.
/// The CurrentCount check avoids throwing SemaphoreFullException on every call under
Expand Down Expand Up @@ -1710,6 +1820,9 @@ private bool TryAppendFromSpansToBatch(
OnBatchEntersPipeline(readyBatch);
pd.AddLast(readyBatch);
ProducerDebugCounters.RecordBatchQueuedToReady();

// Notify Ready() that this partition has a sendable batch.
_readyPartitions.Enqueue(readyBatch.TopicPartition);
}
_batchPool.Return(currentBatch);
pd.CurrentBatch = null;
Expand Down Expand Up @@ -2122,6 +2235,11 @@ internal bool HasPendingWork()
if (Volatile.Read(ref _unsealedBatchCount) > 0)
return true;

// Fast path: if the ready notification queue is non-empty, there are sealed batches
// waiting to be drained — skip the O(n) partition scan below.
if (!_readyPartitions.IsEmpty)
return true;

// Still need O(n) scan for sealed-but-not-yet-sent batches (pd.Count > 0)
foreach (var kvp in _partitionDeques)
{
Expand Down Expand Up @@ -2198,6 +2316,9 @@ private async ValueTask SealBatchesAsync(bool sealAll, CancellationToken cancell
OnBatchEntersPipeline(readyBatch);
pd.AddLast(readyBatch);
ProducerDebugCounters.RecordBatchQueuedToReady();

// Notify Ready() that this partition has a sendable batch.
_readyPartitions.Enqueue(readyBatch.TopicPartition);
sealedBatch = readyBatch;
}
_batchPool.Return(pd.CurrentBatch);
Expand Down
Loading
Loading