feat: scale ValueTaskSourcePool cap with concurrency level#524
feat: scale ValueTaskSourcePool cap with concurrency level#524
Conversation
There was a problem hiding this comment.
Code Review
Overall, this is a clean and well-motivated change. The static companion class design is idiomatic, the auto-calculation logic is well-encapsulated, and the tests cover the important edge cases. A few concerns worth discussing:
Semantic mismatch: pool measures per-batch, but usage is per-ProduceAsync call
The core assumption behind BufferMemory / BatchSize is that the number of concurrent in-flight batches is a good proxy for pool pressure. But PooledValueTaskSource<T> instances are rented per ProduceAsync call (one per message awaiting delivery), not per batch.
A batch groups many messages together. In a steady-state high-throughput scenario where messages from thousands of concurrent ProduceAsync callers are coalesced into batches, the number of concurrent PooledValueTaskSource instances alive at any moment equals the number of in-flight messages, not the number of in-flight batches.
For default settings (BufferMemory = 128MB, BatchSize = 1MB): 128MB / 1MB = 128 → clamped to 256. But a single 1MB batch at 1KB/msg contains ~1000 messages, each holding a rented source. So actual pool pressure under async high-throughput could be 128 * 1000 = 128,000 concurrent sources — not 256.
The 4096 fixed default was actually much closer to reality for this scenario. A better proxy for pool demand would be BufferMemory / AverageMessageSize (which is hard to know) or simply NumBatches * EstimatedMessagesPerBatch.
Suggestion: Either document this assumption explicitly (that the calculation sizes for batch-level concurrency, not message-level), or consider a multiplier like (BufferMemory / BatchSize) * InitialBatchRecordCapacity to approximate message-level concurrency. The issue's profiling context mentions ProduceAllAsync hitting pool pressure — that usage pattern has far more concurrent sources than batches.
DefaultMaxPoolSize is now a misleading constant
DefaultMaxPoolSize = 4096 is moved to the non-generic ValueTaskSourcePool class and is used by the parameterless ValueTaskSourcePool<T>() constructor. But through ProducerOptions, the actual default is now 0 (auto-calculate), meaning the producer never uses DefaultMaxPoolSize by default.
This creates confusion: a user reading the API docs might conclude that 4096 is the operational default, when in practice the auto-calculated value (256 in the example above) will be used. Consider either renaming it to FallbackMaxPoolSize or ExplicitDefaultPoolSize, or removing it as a public constant altogether.
Assert.ThrowsAsync wrapping synchronous throws
In CalculatePoolSize_ThrowsForInvalidBatchSize, CalculatePoolSize throws synchronously but is wrapped in an async lambda:
await Assert.ThrowsAsync<ArgumentOutOfRangeException>(() =>
{
ValueTaskSourcePool.CalculatePoolSize(1073741824UL, 0); // synchronous throw
return Task.CompletedTask;
});This works (the sync exception propagates through the Func<Task>), but Assert.Throws<ArgumentOutOfRangeException>(() => ValueTaskSourcePool.CalculatePoolSize(..., 0)) is more direct for a synchronous method. Not a correctness issue, just a clarity one.
What's working well
- The static companion class (
ValueTaskSourcePool) is the right design — it avoids the awkwardValueTaskSourcePool<RecordMetadata>.DefaultMaxPoolSizegeneric access syntax - Clamping to
[256, 65536]is a sensible safety net regardless of whether the formula is exactly right - Tests cover scaling, both clamp boundaries, zero-division (buffer < batch size), and invalid input — thorough
- The opt-out mechanism (
> 0overrides auto-calc) is a good escape hatch for users who know their workload - Moving
DefaultMaxPoolSizeout of the generic type is a solid cleanup
There was a problem hiding this comment.
Follow-up Code Review
The second commit addressed all three issues from the previous review. Here's an assessment of what was resolved and what remains worth discussing.
Previous issues: resolution status
Semantic mismatch (per-batch vs per-message) — Addressed. The EstimatedMessagesPerBatch = 1024 multiplier and the updated formula (BufferMemory / BatchSize) * EstimatedMessagesPerBatch correctly shifts the sizing to per-message granularity. The docs are also updated to explain the reasoning. This is the right fix.
Misleading DefaultMaxPoolSize constant — Addressed. Renamed to FallbackMaxPoolSize, which accurately describes when it is used (only by the parameterless ValueTaskSourcePool<T>() constructor, not through ProducerOptions).
Assert.ThrowsAsync wrapping synchronous throws — Partially addressed. The tests were split into separate methods for zero and negative batch sizes, but both still wrap the synchronous throw inside a Func<Task>. The clarity concern from the prior review stands: Assert.Throws<ArgumentOutOfRangeException>(() => ValueTaskSourcePool.CalculatePoolSize(...)) is the idiomatic TUnit pattern for synchronous throws. Using the async form for a method that never returns a Task is misleading and could mask issues if CalculatePoolSize is ever made async in the future.
New issues in the current revision
The formula almost always produces MaxAutoPoolSize for real configurations
With the actual defaults (BufferMemory = 1 GB, BatchSize = 1 MB):
maxBatches = 1GB / 1MB = 1024
estimatedSources = 1024 * 1024 = 1,048,576
clamped to MaxAutoPoolSize = 65,536
The formula only produces a value below MaxAutoPoolSize when BufferMemory / BatchSize <= 64. With the default BatchSize of 1 MB, that requires a BufferMemory of 64 MB or less. With the default BufferMemory of 1 GB, that requires a BatchSize of 16 MB or more.
In practice, the auto-calculated pool size is 65,536 for the vast majority of users. The PR's primary goal — "scaling the pool to match the maximum number of concurrent in-flight batches" — achieves a fixed 65,536 cap for almost everyone, which is a meaningful improvement over the old fixed 4,096, but the "scaling" behavior only kicks in for low-memory or oversized-batch configurations.
This isn't necessarily wrong, but it's worth documenting in ProducerOptions.ValueTaskSourcePoolSize that for default settings the auto-calculated value will be 65,536. Otherwise users may think they are getting a precisely tuned value when they are simply getting the upper bound.
Potential silent overflow in estimatedSources = maxBatches * EstimatedMessagesPerBatch
maxBatches is ulong and EstimatedMessagesPerBatch is int (promoted to ulong for the multiply). The multiplication overflows when maxBatches > ulong.MaxValue / 1024 (~18 petabytes / batchSize). While this is unlikely in practice, bufferMemory is declared as ulong with no documented upper bound, so an adversarial or misconfigured value could cause silent wrap-around and produce a completely wrong pool size (likely clamped to MinAutoPoolSize instead of MaxAutoPoolSize).
A defensive fix requires no runtime cost since MaxAutoPoolSize / EstimatedMessagesPerBatch = 64 — you can simply clamp maxBatches before multiplying:
public static int CalculatePoolSize(ulong bufferMemory, int batchSize)
{
if (batchSize <= 0)
throw new ArgumentOutOfRangeException(nameof(batchSize), "Batch size must be positive.");
// Clamp maxBatches before multiplying to prevent ulong overflow.
// Any value above MaxAutoPoolSize/EstimatedMessagesPerBatch will be clamped anyway.
const ulong maxUsefulBatches = MaxAutoPoolSize / EstimatedMessagesPerBatch;
var maxBatches = Math.Min(bufferMemory / (ulong)batchSize, maxUsefulBatches);
var estimatedSources = (int)(maxBatches * EstimatedMessagesPerBatch);
return Math.Clamp(estimatedSources, MinAutoPoolSize, MaxAutoPoolSize);
}This also removes the need for the Math.Clamp call to operate on ulong values (avoiding implicit conversions from int constants) and makes the (int) cast guaranteed-safe at the source.
FallbackMaxPoolSize = 4096 is now a large inconsistency
The parameterless ValueTaskSourcePool<T>() constructor uses FallbackMaxPoolSize = 4096, but any producer constructed via KafkaProducer with default options will use a pool of 65,536. These are now 16x apart for identical-seeming usage, which could cause confusion for library consumers who create ValueTaskSourcePool<T> directly. Consider either: raising FallbackMaxPoolSize to match MaxAutoPoolSize, or making it internal so library users are nudged toward providing an explicit size.
What is working well
- The overall design is sound and the second commit demonstrates responsiveness to feedback.
- The
EstimatedMessagesPerBatchconstant is well-documented and grounded in the project's established estimates. - The
[256, 65536]clamp range is a reasonable safety net. - The opt-out mechanism (
ValueTaskSourcePoolSize > 0overrides) is clean. - Test coverage for the calculation logic is thorough.
There was a problem hiding this comment.
Follow-up Review (third commit)
The third commit addresses only the test deduplication concern: CalculatePoolSize_SmallBufferLargeBatch_ClampsToMin was replaced with CalculatePoolSize_SingleBatchBuffer_ClampsToMin, which exercises a distinct scenario (1 batch × 1024 msgs = 1024, above the floor). This is a good cleanup.
However, the three issues raised in the second review remain unaddressed in the current revision. Summarising their current status:
1. Potential silent overflow in the multiply (unaddressed — highest priority)
estimatedSources = maxBatches * EstimatedMessagesPerBatch is a ulong * int (promoted to ulong * ulong). When bufferMemory is very large relative to batchSize, maxBatches can overflow the ulong range after multiplication, wrap around to a small value, and then be clamped to MinAutoPoolSize instead of MaxAutoPoolSize. Because bufferMemory is declared as ulong with no documented ceiling, this is a realistic misconfiguration scenario.
The fix is cheap — clamp maxBatches before multiplying, since any value above MaxAutoPoolSize / EstimatedMessagesPerBatch = 64 will saturate the result anyway:
const ulong maxUsefulBatches = MaxAutoPoolSize / EstimatedMessagesPerBatch; // 64
var maxBatches = Math.Min(bufferMemory / (ulong)batchSize, maxUsefulBatches);
var estimatedSources = (int)(maxBatches * EstimatedMessagesPerBatch);
return Math.Clamp(estimatedSources, MinAutoPoolSize, MaxAutoPoolSize);This also makes the (int) cast statically safe (max value is 65536, well within int range), and removes the implicit conversion from int constants in the Math.Clamp call.
2. Formula almost always produces MaxAutoPoolSize for real configurations (unaddressed — documentation gap)
With the actual defaults (BufferMemory = 1 GB, BatchSize = 1 MB):
maxBatches = 1024estimatedSources = 1024 × 1024 = 1,048,576- clamped to
MaxAutoPoolSize = 65,536
The formula only produces a sub-maximum value when BufferMemory / BatchSize ≤ 64 — with a 1 MB batch size that means BufferMemory ≤ 64 MB. The PR's goal of "scaling to match the maximum number of concurrent in-flight batches" effectively becomes a fixed 65,536 ceiling for nearly all users with default settings.
This is not wrong — 65,536 is a meaningful improvement over the old 4,096. But the ProducerOptions.ValueTaskSourcePoolSize XML doc comment should say so explicitly. Otherwise users reading the docs will believe they are getting a precisely tuned per-configuration value when they are almost certainly getting the upper bound. A sentence like: "For the default BufferMemory and BatchSize settings, the auto-calculated value is 65,536." would set accurate expectations.
3. FallbackMaxPoolSize = 4096 is inconsistent with the auto-calculated default (unaddressed — lower priority)
Any KafkaProducer constructed via ProducerOptions defaults will get a pool of 65,536. The parameterless ValueTaskSourcePool<T>() constructor (which library users can call directly) gets 4,096 — a 16× difference for what appear to be equivalent default usages. If FallbackMaxPoolSize is intended only for internal use by the parameterless constructor, making it internal would prevent confusion. If it should remain public, raising it to match MaxAutoPoolSize (or at least documenting the discrepancy) is worth considering.
Assert.ThrowsAsync wrapping synchronous throws (still not addressed)
From the second review: both CalculatePoolSize_ThrowsForZeroBatchSize and CalculatePoolSize_ThrowsForNegativeBatchSize still wrap a synchronous throw inside Func<Task>. This works, but Assert.Throws<ArgumentOutOfRangeException>(() => ValueTaskSourcePool.CalculatePoolSize(...)) is the idiomatic pattern for synchronous throws. Using the async form signals to readers that the method is async or that the exception is thrown asynchronously, which is misleading here.
Summary
| Issue | Status |
|---|---|
| Test deduplication (CalculatePoolSize_SmallBufferLargeBatch) | Resolved in commit 3 |
| Per-message vs per-batch sizing | Resolved in commit 2 |
DefaultMaxPoolSize renamed to FallbackMaxPoolSize |
Resolved in commit 2 |
Overflow in maxBatches * EstimatedMessagesPerBatch |
Still open |
| Formula always hits cap for real configs (doc gap) | Still open |
FallbackMaxPoolSize inconsistency |
Still open |
Assert.ThrowsAsync for synchronous throws |
Still open |
The overflow fix (issue 1) is the only one with a correctness risk and is a one-liner. The others are documentation and style concerns that can be addressed before merge.
There was a problem hiding this comment.
Follow-up Review (fourth commit)
All four outstanding issues from the third review have been addressed. Here's the final status:
All previously open issues: resolved
| Issue | Status |
|---|---|
Overflow in maxBatches * EstimatedMessagesPerBatch |
✅ Resolved — pre-clamp to maxUsefulBatches = 64 before multiplying, making the (int) cast statically safe |
| Formula always hits cap for real configs (doc gap) | ✅ Resolved — doc comment now explicitly states the auto-calculated value is 65,536 for default settings |
FallbackMaxPoolSize inconsistency |
✅ Resolved — marked internal, preventing confusion for library consumers who can't directly access it |
Assert.ThrowsAsync wrapping synchronous throws |
✅ Resolved — all sync throws now use Assert.Throws<T> directly |
Overall assessment
This is clean, well-motivated code. The iteration across four commits was disciplined — each commit addressed specific feedback without scope creep. The final state is solid:
- The static companion class design is idiomatic and avoids the awkward generic type accessor
- The pre-clamping before multiplication is both overflow-safe and makes the cast provably correct
FallbackMaxPoolSizebeinginternalis the right call given the 16× gap with the auto-calculated default- Test coverage is comprehensive: scaling behavior, both clamp boundaries, zero-division (buffer < batch), invalid inputs, and the parameterless constructor
This is ready to merge.
There was a problem hiding this comment.
Code Review — fifth pass (current state)
This PR has been through four previous review iterations and all issues raised in those rounds have been resolved. The current revision is clean and ready to merge.
What was addressed in prior rounds (all resolved)
- Per-message vs per-batch sizing: the
EstimatedMessagesPerBatch = 1024multiplier correctly shifts pool sizing to per-message granularity. - Overflow in
maxBatches * EstimatedMessagesPerBatch: pre-clampingmaxBatchestomaxUsefulBatches = 64before the multiply makes the(int)cast statically safe with no runtime cost. FallbackMaxPoolSizevisibility: markedinternal, preventing library consumers from drawing false equivalence between the parameterless constructor default (4096) and the auto-calculated producer default (65536).- Documentation gap:
ProducerOptions.ValueTaskSourcePoolSizenow explicitly states the auto-calculated value is 65536 for default settings. Assert.ThrowsAsyncwrapping synchronous throws: all sync throws now useAssert.Throws<T>directly.
Current code is correct
The CalculatePoolSize implementation in the merged state:
const ulong maxUsefulBatches = MaxAutoPoolSize / EstimatedMessagesPerBatch; // 64
var maxBatches = Math.Min(bufferMemory / (ulong)batchSize, maxUsefulBatches);
var estimatedSources = (int)(maxBatches * EstimatedMessagesPerBatch);
return Math.Clamp(estimatedSources, MinAutoPoolSize, MaxAutoPoolSize);This is overflow-safe, the cast is provably in range, and the clamp correctly floors edge cases (buffer smaller than one batch).
Integration test change
Moving producer initialization and topic warmup before host.RunAsync() is the correct fix for the flaky CI test. It serializes resource acquisition so the consumer's background polling doesn't race with producer connection establishment on constrained runners. The Task.Delay(2000) for consumer group join still relies on timing, but this is an inherent constraint of the Kafka group coordination protocol — there is no deterministic signal available from the hosted service abstraction to replace it.
Approval
This is a meaningful improvement: replacing a fixed 4096 default with a formula that produces 65536 for typical production configurations reduces allocation pressure under sustained high-throughput async produce. The code is well-documented, tested, and free of correctness issues. Approving.
There was a problem hiding this comment.
Code Review — sixth pass (current state)
This is a follow-up review after five prior rounds of feedback. All issues raised across those rounds have been addressed, and the fifth review already approved the PR. This pass confirms there is nothing new to flag.
Commit 6 (most recent): test rename
The final commit renames CalculatePoolSize_SingleBatchBuffer_ClampsToMin to CalculatePoolSize_SingleBatchBuffer_Returns1024 with an updated comment. This is correct: the test result (1024) is above MinAutoPoolSize (256), so no clamping occurs and the old name was misleading. The rename is accurate.
Current code state: correct
All issues from prior rounds are resolved in the final state:
| Issue | Status |
|---|---|
| Per-message vs per-batch pool sizing | Resolved (commit 2: EstimatedMessagesPerBatch = 1024 multiplier) |
DefaultMaxPoolSize renamed to FallbackMaxPoolSize |
Resolved (commit 2) |
Overflow in maxBatches * EstimatedMessagesPerBatch |
Resolved (commit 4: pre-clamp to maxUsefulBatches = 64) |
| Formula always hits cap for default config (doc gap) | Resolved (commit 4: XML doc now states 65,536 for defaults) |
FallbackMaxPoolSize visibility inconsistency |
Resolved (commit 4: marked internal) |
Assert.ThrowsAsync wrapping synchronous throws |
Resolved (commit 4: all sync throws use Assert.Throws<T>) |
| HostedService test flakiness from producer/consumer resource contention | Resolved (commit 5: producer initialized before host starts) |
| Duplicate pool size clamping test | Resolved (commits 3 and 6: replaced with distinct single-batch scenario, renamed for accuracy) |
No new issues
The CalculatePoolSize implementation is overflow-safe, the (int) cast is statically bounded to [0, 65536], and test coverage is comprehensive across scaling behavior, both clamp boundaries, invalid inputs, and the parameterless constructor fallback.
This PR is ready to merge.
Auto-calculate pool size from BufferMemory / BatchSize instead of using a fixed 4096 default. This ensures the pool covers the expected number of concurrent in-flight batches, reducing allocations under sustained high-throughput async produce. - Add non-generic ValueTaskSourcePool static class with CalculatePoolSize - Change ProducerOptions.ValueTaskSourcePoolSize default to 0 (auto) - KafkaProducer resolves 0 to calculated size at construction time - Pool size clamped to [256, 65536] for safety Closes #522
…row tests - Multiply batch count by EstimatedMessagesPerBatch (1024) in CalculatePoolSize since the pool is rented per ProduceAsync call, not per batch - Rename DefaultMaxPoolSize to FallbackMaxPoolSize to clarify its role - Split CalculatePoolSize_ThrowsForInvalidBatchSize into two focused tests - Update test expectations for the new formula
CalculatePoolSize_SmallBufferLargeBatch_ClampsToMin was identical to CalculatePoolSize_ClampsToMinimum (same inputs, same assertion). Replace it with a distinct single-batch scenario that exercises a different code path.
- Clamp maxBatches before multiplying by EstimatedMessagesPerBatch to prevent overflow for large bufferMemory values - Make FallbackMaxPoolSize internal since it is only used by the parameterless constructor - Use synchronous Assert.Throws for synchronous throw tests, matching existing codebase patterns (e.g. VarintEncoderTests) - Add XML doc note that default config auto-calculates to 65,536
…Tests Move producer creation and warmup before the host starts so that the producer's connection and metadata are fully established before the consumer's background polling begins competing for thread pool and connection resources. This prevents ProduceAsync timeouts on resource-constrained CI runners. Also replace the manual polling loop for message receipt with the existing WaitForConditionAsync helper, and increase the consumer startup delay from 1s to 2s to give the group join more headroom.
…sToMin The test result (1024) is above MinAutoPoolSize (256), so no clamping occurs. Renamed to _Returns1024 with an updated comment for clarity.
35e6285 to
a51a397
Compare
There was a problem hiding this comment.
Code Review — seventh pass (current state)
This pass covers the single new commit added since the sixth review: a51a3978ca ("fix: resolve rebase conflict in BufferMemoryTests Append signature").
New commit: trivially correct
The change updates a BufferMemoryTests.cs call site to match a modified Append method signature:
- pooledKey, pooledValue, null, null, null, null);
+ pooledKey, pooledValue, null, 0, null, null);This is a mechanical fix to align the test with a changed parameter type (the previously nullable argument is now an int with a default of 0). No logic changes, no new concerns.
All prior issues remain resolved
| Issue | Status |
|---|---|
| Per-message vs per-batch pool sizing | ✅ Resolved |
DefaultMaxPoolSize renamed to FallbackMaxPoolSize |
✅ Resolved |
Overflow in maxBatches * EstimatedMessagesPerBatch |
✅ Resolved |
| Formula always hits cap for default config (doc gap) | ✅ Resolved |
FallbackMaxPoolSize visibility inconsistency |
✅ Resolved |
Assert.ThrowsAsync wrapping synchronous throws |
✅ Resolved |
| HostedService test flakiness | ✅ Resolved |
| Duplicate/misleading clamping test | ✅ Resolved |
Ready to merge
No new issues. The rebase fix is correct and the PR remains in the approved state from the fifth review.
Summary
ValueTaskSourcePoolsize fromBufferMemory / BatchSizeinstead of a fixed default of 4096. This scales the pool to match the maximum number of concurrent in-flight batches, reducing allocations under sustained high-throughput async produce.ValueTaskSourcePoolstatic class withCalculatePoolSize(bufferMemory, batchSize), clamped to[256, 65536].ProducerOptions.ValueTaskSourcePoolSizenow defaults to0(auto-calculate). Explicit positive values still override.Test plan
CalculatePoolSizecovering scaling, min/max clamping, edge cases, and invalid inputProducerOptionsDefaultsTestsfor new default value (0)ValueTaskSourcePoolTestsand 29ProducerOptionsDefaultsTestspassCloses #522