Skip to content

feat: scale ValueTaskSourcePool cap with concurrency level#524

Merged
thomhurst merged 7 commits intomainfrom
scale-vtsp-pool-with-concurrency
Mar 16, 2026
Merged

feat: scale ValueTaskSourcePool cap with concurrency level#524
thomhurst merged 7 commits intomainfrom
scale-vtsp-pool-with-concurrency

Conversation

@thomhurst
Copy link
Owner

Summary

  • Auto-calculate ValueTaskSourcePool size from BufferMemory / BatchSize instead of a fixed default of 4096. This scales the pool to match the maximum number of concurrent in-flight batches, reducing allocations under sustained high-throughput async produce.
  • Introduced a non-generic ValueTaskSourcePool static class with CalculatePoolSize(bufferMemory, batchSize), clamped to [256, 65536].
  • ProducerOptions.ValueTaskSourcePoolSize now defaults to 0 (auto-calculate). Explicit positive values still override.

Test plan

  • New unit tests for CalculatePoolSize covering scaling, min/max clamping, edge cases, and invalid input
  • Updated ProducerOptionsDefaultsTests for new default value (0)
  • All 27 ValueTaskSourcePoolTests and 29 ProducerOptionsDefaultsTests pass
  • CI passes

Closes #522

Copy link

@claude claude bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

Overall, this is a clean and well-motivated change. The static companion class design is idiomatic, the auto-calculation logic is well-encapsulated, and the tests cover the important edge cases. A few concerns worth discussing:


Semantic mismatch: pool measures per-batch, but usage is per-ProduceAsync call

The core assumption behind BufferMemory / BatchSize is that the number of concurrent in-flight batches is a good proxy for pool pressure. But PooledValueTaskSource<T> instances are rented per ProduceAsync call (one per message awaiting delivery), not per batch.

A batch groups many messages together. In a steady-state high-throughput scenario where messages from thousands of concurrent ProduceAsync callers are coalesced into batches, the number of concurrent PooledValueTaskSource instances alive at any moment equals the number of in-flight messages, not the number of in-flight batches.

For default settings (BufferMemory = 128MB, BatchSize = 1MB): 128MB / 1MB = 128 → clamped to 256. But a single 1MB batch at 1KB/msg contains ~1000 messages, each holding a rented source. So actual pool pressure under async high-throughput could be 128 * 1000 = 128,000 concurrent sources — not 256.

The 4096 fixed default was actually much closer to reality for this scenario. A better proxy for pool demand would be BufferMemory / AverageMessageSize (which is hard to know) or simply NumBatches * EstimatedMessagesPerBatch.

Suggestion: Either document this assumption explicitly (that the calculation sizes for batch-level concurrency, not message-level), or consider a multiplier like (BufferMemory / BatchSize) * InitialBatchRecordCapacity to approximate message-level concurrency. The issue's profiling context mentions ProduceAllAsync hitting pool pressure — that usage pattern has far more concurrent sources than batches.


DefaultMaxPoolSize is now a misleading constant

DefaultMaxPoolSize = 4096 is moved to the non-generic ValueTaskSourcePool class and is used by the parameterless ValueTaskSourcePool<T>() constructor. But through ProducerOptions, the actual default is now 0 (auto-calculate), meaning the producer never uses DefaultMaxPoolSize by default.

This creates confusion: a user reading the API docs might conclude that 4096 is the operational default, when in practice the auto-calculated value (256 in the example above) will be used. Consider either renaming it to FallbackMaxPoolSize or ExplicitDefaultPoolSize, or removing it as a public constant altogether.


Assert.ThrowsAsync wrapping synchronous throws

In CalculatePoolSize_ThrowsForInvalidBatchSize, CalculatePoolSize throws synchronously but is wrapped in an async lambda:

await Assert.ThrowsAsync<ArgumentOutOfRangeException>(() =>
{
    ValueTaskSourcePool.CalculatePoolSize(1073741824UL, 0); // synchronous throw
    return Task.CompletedTask;
});

This works (the sync exception propagates through the Func<Task>), but Assert.Throws<ArgumentOutOfRangeException>(() => ValueTaskSourcePool.CalculatePoolSize(..., 0)) is more direct for a synchronous method. Not a correctness issue, just a clarity one.


What's working well

  • The static companion class (ValueTaskSourcePool) is the right design — it avoids the awkward ValueTaskSourcePool<RecordMetadata>.DefaultMaxPoolSize generic access syntax
  • Clamping to [256, 65536] is a sensible safety net regardless of whether the formula is exactly right
  • Tests cover scaling, both clamp boundaries, zero-division (buffer < batch size), and invalid input — thorough
  • The opt-out mechanism (> 0 overrides auto-calc) is a good escape hatch for users who know their workload
  • Moving DefaultMaxPoolSize out of the generic type is a solid cleanup

Copy link

@claude claude bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow-up Code Review

The second commit addressed all three issues from the previous review. Here's an assessment of what was resolved and what remains worth discussing.


Previous issues: resolution status

Semantic mismatch (per-batch vs per-message) — Addressed. The EstimatedMessagesPerBatch = 1024 multiplier and the updated formula (BufferMemory / BatchSize) * EstimatedMessagesPerBatch correctly shifts the sizing to per-message granularity. The docs are also updated to explain the reasoning. This is the right fix.

Misleading DefaultMaxPoolSize constant — Addressed. Renamed to FallbackMaxPoolSize, which accurately describes when it is used (only by the parameterless ValueTaskSourcePool<T>() constructor, not through ProducerOptions).

Assert.ThrowsAsync wrapping synchronous throws — Partially addressed. The tests were split into separate methods for zero and negative batch sizes, but both still wrap the synchronous throw inside a Func<Task>. The clarity concern from the prior review stands: Assert.Throws<ArgumentOutOfRangeException>(() => ValueTaskSourcePool.CalculatePoolSize(...)) is the idiomatic TUnit pattern for synchronous throws. Using the async form for a method that never returns a Task is misleading and could mask issues if CalculatePoolSize is ever made async in the future.


New issues in the current revision

The formula almost always produces MaxAutoPoolSize for real configurations

With the actual defaults (BufferMemory = 1 GB, BatchSize = 1 MB):

maxBatches = 1GB / 1MB = 1024
estimatedSources = 1024 * 1024 = 1,048,576
clamped to MaxAutoPoolSize = 65,536

The formula only produces a value below MaxAutoPoolSize when BufferMemory / BatchSize <= 64. With the default BatchSize of 1 MB, that requires a BufferMemory of 64 MB or less. With the default BufferMemory of 1 GB, that requires a BatchSize of 16 MB or more.

In practice, the auto-calculated pool size is 65,536 for the vast majority of users. The PR's primary goal — "scaling the pool to match the maximum number of concurrent in-flight batches" — achieves a fixed 65,536 cap for almost everyone, which is a meaningful improvement over the old fixed 4,096, but the "scaling" behavior only kicks in for low-memory or oversized-batch configurations.

This isn't necessarily wrong, but it's worth documenting in ProducerOptions.ValueTaskSourcePoolSize that for default settings the auto-calculated value will be 65,536. Otherwise users may think they are getting a precisely tuned value when they are simply getting the upper bound.

Potential silent overflow in estimatedSources = maxBatches * EstimatedMessagesPerBatch

maxBatches is ulong and EstimatedMessagesPerBatch is int (promoted to ulong for the multiply). The multiplication overflows when maxBatches > ulong.MaxValue / 1024 (~18 petabytes / batchSize). While this is unlikely in practice, bufferMemory is declared as ulong with no documented upper bound, so an adversarial or misconfigured value could cause silent wrap-around and produce a completely wrong pool size (likely clamped to MinAutoPoolSize instead of MaxAutoPoolSize).

A defensive fix requires no runtime cost since MaxAutoPoolSize / EstimatedMessagesPerBatch = 64 — you can simply clamp maxBatches before multiplying:

public static int CalculatePoolSize(ulong bufferMemory, int batchSize)
{
    if (batchSize <= 0)
        throw new ArgumentOutOfRangeException(nameof(batchSize), "Batch size must be positive.");

    // Clamp maxBatches before multiplying to prevent ulong overflow.
    // Any value above MaxAutoPoolSize/EstimatedMessagesPerBatch will be clamped anyway.
    const ulong maxUsefulBatches = MaxAutoPoolSize / EstimatedMessagesPerBatch;
    var maxBatches = Math.Min(bufferMemory / (ulong)batchSize, maxUsefulBatches);
    var estimatedSources = (int)(maxBatches * EstimatedMessagesPerBatch);

    return Math.Clamp(estimatedSources, MinAutoPoolSize, MaxAutoPoolSize);
}

This also removes the need for the Math.Clamp call to operate on ulong values (avoiding implicit conversions from int constants) and makes the (int) cast guaranteed-safe at the source.

FallbackMaxPoolSize = 4096 is now a large inconsistency

The parameterless ValueTaskSourcePool<T>() constructor uses FallbackMaxPoolSize = 4096, but any producer constructed via KafkaProducer with default options will use a pool of 65,536. These are now 16x apart for identical-seeming usage, which could cause confusion for library consumers who create ValueTaskSourcePool<T> directly. Consider either: raising FallbackMaxPoolSize to match MaxAutoPoolSize, or making it internal so library users are nudged toward providing an explicit size.


What is working well

  • The overall design is sound and the second commit demonstrates responsiveness to feedback.
  • The EstimatedMessagesPerBatch constant is well-documented and grounded in the project's established estimates.
  • The [256, 65536] clamp range is a reasonable safety net.
  • The opt-out mechanism (ValueTaskSourcePoolSize > 0 overrides) is clean.
  • Test coverage for the calculation logic is thorough.

Copy link

@claude claude bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow-up Review (third commit)

The third commit addresses only the test deduplication concern: CalculatePoolSize_SmallBufferLargeBatch_ClampsToMin was replaced with CalculatePoolSize_SingleBatchBuffer_ClampsToMin, which exercises a distinct scenario (1 batch × 1024 msgs = 1024, above the floor). This is a good cleanup.

However, the three issues raised in the second review remain unaddressed in the current revision. Summarising their current status:


1. Potential silent overflow in the multiply (unaddressed — highest priority)

estimatedSources = maxBatches * EstimatedMessagesPerBatch is a ulong * int (promoted to ulong * ulong). When bufferMemory is very large relative to batchSize, maxBatches can overflow the ulong range after multiplication, wrap around to a small value, and then be clamped to MinAutoPoolSize instead of MaxAutoPoolSize. Because bufferMemory is declared as ulong with no documented ceiling, this is a realistic misconfiguration scenario.

The fix is cheap — clamp maxBatches before multiplying, since any value above MaxAutoPoolSize / EstimatedMessagesPerBatch = 64 will saturate the result anyway:

const ulong maxUsefulBatches = MaxAutoPoolSize / EstimatedMessagesPerBatch; // 64
var maxBatches = Math.Min(bufferMemory / (ulong)batchSize, maxUsefulBatches);
var estimatedSources = (int)(maxBatches * EstimatedMessagesPerBatch);
return Math.Clamp(estimatedSources, MinAutoPoolSize, MaxAutoPoolSize);

This also makes the (int) cast statically safe (max value is 65536, well within int range), and removes the implicit conversion from int constants in the Math.Clamp call.


2. Formula almost always produces MaxAutoPoolSize for real configurations (unaddressed — documentation gap)

With the actual defaults (BufferMemory = 1 GB, BatchSize = 1 MB):

  • maxBatches = 1024
  • estimatedSources = 1024 × 1024 = 1,048,576
  • clamped to MaxAutoPoolSize = 65,536

The formula only produces a sub-maximum value when BufferMemory / BatchSize ≤ 64 — with a 1 MB batch size that means BufferMemory ≤ 64 MB. The PR's goal of "scaling to match the maximum number of concurrent in-flight batches" effectively becomes a fixed 65,536 ceiling for nearly all users with default settings.

This is not wrong — 65,536 is a meaningful improvement over the old 4,096. But the ProducerOptions.ValueTaskSourcePoolSize XML doc comment should say so explicitly. Otherwise users reading the docs will believe they are getting a precisely tuned per-configuration value when they are almost certainly getting the upper bound. A sentence like: "For the default BufferMemory and BatchSize settings, the auto-calculated value is 65,536." would set accurate expectations.


3. FallbackMaxPoolSize = 4096 is inconsistent with the auto-calculated default (unaddressed — lower priority)

Any KafkaProducer constructed via ProducerOptions defaults will get a pool of 65,536. The parameterless ValueTaskSourcePool<T>() constructor (which library users can call directly) gets 4,096 — a 16× difference for what appear to be equivalent default usages. If FallbackMaxPoolSize is intended only for internal use by the parameterless constructor, making it internal would prevent confusion. If it should remain public, raising it to match MaxAutoPoolSize (or at least documenting the discrepancy) is worth considering.


Assert.ThrowsAsync wrapping synchronous throws (still not addressed)

From the second review: both CalculatePoolSize_ThrowsForZeroBatchSize and CalculatePoolSize_ThrowsForNegativeBatchSize still wrap a synchronous throw inside Func<Task>. This works, but Assert.Throws<ArgumentOutOfRangeException>(() => ValueTaskSourcePool.CalculatePoolSize(...)) is the idiomatic pattern for synchronous throws. Using the async form signals to readers that the method is async or that the exception is thrown asynchronously, which is misleading here.


Summary

Issue Status
Test deduplication (CalculatePoolSize_SmallBufferLargeBatch) Resolved in commit 3
Per-message vs per-batch sizing Resolved in commit 2
DefaultMaxPoolSize renamed to FallbackMaxPoolSize Resolved in commit 2
Overflow in maxBatches * EstimatedMessagesPerBatch Still open
Formula always hits cap for real configs (doc gap) Still open
FallbackMaxPoolSize inconsistency Still open
Assert.ThrowsAsync for synchronous throws Still open

The overflow fix (issue 1) is the only one with a correctness risk and is a one-liner. The others are documentation and style concerns that can be addressed before merge.

Copy link

@claude claude bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow-up Review (fourth commit)

All four outstanding issues from the third review have been addressed. Here's the final status:

All previously open issues: resolved

Issue Status
Overflow in maxBatches * EstimatedMessagesPerBatch ✅ Resolved — pre-clamp to maxUsefulBatches = 64 before multiplying, making the (int) cast statically safe
Formula always hits cap for real configs (doc gap) ✅ Resolved — doc comment now explicitly states the auto-calculated value is 65,536 for default settings
FallbackMaxPoolSize inconsistency ✅ Resolved — marked internal, preventing confusion for library consumers who can't directly access it
Assert.ThrowsAsync wrapping synchronous throws ✅ Resolved — all sync throws now use Assert.Throws<T> directly

Overall assessment

This is clean, well-motivated code. The iteration across four commits was disciplined — each commit addressed specific feedback without scope creep. The final state is solid:

  • The static companion class design is idiomatic and avoids the awkward generic type accessor
  • The pre-clamping before multiplication is both overflow-safe and makes the cast provably correct
  • FallbackMaxPoolSize being internal is the right call given the 16× gap with the auto-calculated default
  • Test coverage is comprehensive: scaling behavior, both clamp boundaries, zero-division (buffer < batch), invalid inputs, and the parameterless constructor

This is ready to merge.

@thomhurst thomhurst enabled auto-merge March 16, 2026 00:02
Copy link

@claude claude bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review — fifth pass (current state)

This PR has been through four previous review iterations and all issues raised in those rounds have been resolved. The current revision is clean and ready to merge.

What was addressed in prior rounds (all resolved)

  • Per-message vs per-batch sizing: the EstimatedMessagesPerBatch = 1024 multiplier correctly shifts pool sizing to per-message granularity.
  • Overflow in maxBatches * EstimatedMessagesPerBatch: pre-clamping maxBatches to maxUsefulBatches = 64 before the multiply makes the (int) cast statically safe with no runtime cost.
  • FallbackMaxPoolSize visibility: marked internal, preventing library consumers from drawing false equivalence between the parameterless constructor default (4096) and the auto-calculated producer default (65536).
  • Documentation gap: ProducerOptions.ValueTaskSourcePoolSize now explicitly states the auto-calculated value is 65536 for default settings.
  • Assert.ThrowsAsync wrapping synchronous throws: all sync throws now use Assert.Throws<T> directly.

Current code is correct

The CalculatePoolSize implementation in the merged state:

const ulong maxUsefulBatches = MaxAutoPoolSize / EstimatedMessagesPerBatch; // 64
var maxBatches = Math.Min(bufferMemory / (ulong)batchSize, maxUsefulBatches);
var estimatedSources = (int)(maxBatches * EstimatedMessagesPerBatch);
return Math.Clamp(estimatedSources, MinAutoPoolSize, MaxAutoPoolSize);

This is overflow-safe, the cast is provably in range, and the clamp correctly floors edge cases (buffer smaller than one batch).

Integration test change

Moving producer initialization and topic warmup before host.RunAsync() is the correct fix for the flaky CI test. It serializes resource acquisition so the consumer's background polling doesn't race with producer connection establishment on constrained runners. The Task.Delay(2000) for consumer group join still relies on timing, but this is an inherent constraint of the Kafka group coordination protocol — there is no deterministic signal available from the hosted service abstraction to replace it.

Approval

This is a meaningful improvement: replacing a fixed 4096 default with a formula that produces 65536 for typical production configurations reduces allocation pressure under sustained high-throughput async produce. The code is well-documented, tested, and free of correctness issues. Approving.

Copy link

@claude claude bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review — sixth pass (current state)

This is a follow-up review after five prior rounds of feedback. All issues raised across those rounds have been addressed, and the fifth review already approved the PR. This pass confirms there is nothing new to flag.

Commit 6 (most recent): test rename

The final commit renames CalculatePoolSize_SingleBatchBuffer_ClampsToMin to CalculatePoolSize_SingleBatchBuffer_Returns1024 with an updated comment. This is correct: the test result (1024) is above MinAutoPoolSize (256), so no clamping occurs and the old name was misleading. The rename is accurate.

Current code state: correct

All issues from prior rounds are resolved in the final state:

Issue Status
Per-message vs per-batch pool sizing Resolved (commit 2: EstimatedMessagesPerBatch = 1024 multiplier)
DefaultMaxPoolSize renamed to FallbackMaxPoolSize Resolved (commit 2)
Overflow in maxBatches * EstimatedMessagesPerBatch Resolved (commit 4: pre-clamp to maxUsefulBatches = 64)
Formula always hits cap for default config (doc gap) Resolved (commit 4: XML doc now states 65,536 for defaults)
FallbackMaxPoolSize visibility inconsistency Resolved (commit 4: marked internal)
Assert.ThrowsAsync wrapping synchronous throws Resolved (commit 4: all sync throws use Assert.Throws<T>)
HostedService test flakiness from producer/consumer resource contention Resolved (commit 5: producer initialized before host starts)
Duplicate pool size clamping test Resolved (commits 3 and 6: replaced with distinct single-batch scenario, renamed for accuracy)

No new issues

The CalculatePoolSize implementation is overflow-safe, the (int) cast is statically bounded to [0, 65536], and test coverage is comprehensive across scaling behavior, both clamp boundaries, invalid inputs, and the parameterless constructor fallback.

This PR is ready to merge.

Auto-calculate pool size from BufferMemory / BatchSize instead of using
a fixed 4096 default. This ensures the pool covers the expected number
of concurrent in-flight batches, reducing allocations under sustained
high-throughput async produce.

- Add non-generic ValueTaskSourcePool static class with CalculatePoolSize
- Change ProducerOptions.ValueTaskSourcePoolSize default to 0 (auto)
- KafkaProducer resolves 0 to calculated size at construction time
- Pool size clamped to [256, 65536] for safety

Closes #522
…row tests

- Multiply batch count by EstimatedMessagesPerBatch (1024) in CalculatePoolSize
  since the pool is rented per ProduceAsync call, not per batch
- Rename DefaultMaxPoolSize to FallbackMaxPoolSize to clarify its role
- Split CalculatePoolSize_ThrowsForInvalidBatchSize into two focused tests
- Update test expectations for the new formula
CalculatePoolSize_SmallBufferLargeBatch_ClampsToMin was identical to
CalculatePoolSize_ClampsToMinimum (same inputs, same assertion).
Replace it with a distinct single-batch scenario that exercises
a different code path.
- Clamp maxBatches before multiplying by EstimatedMessagesPerBatch to
  prevent overflow for large bufferMemory values
- Make FallbackMaxPoolSize internal since it is only used by the
  parameterless constructor
- Use synchronous Assert.Throws for synchronous throw tests, matching
  existing codebase patterns (e.g. VarintEncoderTests)
- Add XML doc note that default config auto-calculates to 65,536
…Tests

Move producer creation and warmup before the host starts so that the
producer's connection and metadata are fully established before the
consumer's background polling begins competing for thread pool and
connection resources. This prevents ProduceAsync timeouts on
resource-constrained CI runners.

Also replace the manual polling loop for message receipt with the
existing WaitForConditionAsync helper, and increase the consumer
startup delay from 1s to 2s to give the group join more headroom.
…sToMin

The test result (1024) is above MinAutoPoolSize (256), so no clamping
occurs. Renamed to _Returns1024 with an updated comment for clarity.
After rebasing onto main (which merged #526's headerCount change),
the new test from #525 had null instead of 0 for the int headerCount
parameter.
@thomhurst thomhurst force-pushed the scale-vtsp-pool-with-concurrency branch from 35e6285 to a51a397 Compare March 16, 2026 00:40
Copy link

@claude claude bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review — seventh pass (current state)

This pass covers the single new commit added since the sixth review: a51a3978ca ("fix: resolve rebase conflict in BufferMemoryTests Append signature").

New commit: trivially correct

The change updates a BufferMemoryTests.cs call site to match a modified Append method signature:

-                pooledKey, pooledValue, null, null, null, null);
+                pooledKey, pooledValue, null, 0, null, null);

This is a mechanical fix to align the test with a changed parameter type (the previously nullable argument is now an int with a default of 0). No logic changes, no new concerns.

All prior issues remain resolved

Issue Status
Per-message vs per-batch pool sizing ✅ Resolved
DefaultMaxPoolSize renamed to FallbackMaxPoolSize ✅ Resolved
Overflow in maxBatches * EstimatedMessagesPerBatch ✅ Resolved
Formula always hits cap for default config (doc gap) ✅ Resolved
FallbackMaxPoolSize visibility inconsistency ✅ Resolved
Assert.ThrowsAsync wrapping synchronous throws ✅ Resolved
HostedService test flakiness ✅ Resolved
Duplicate/misleading clamping test ✅ Resolved

Ready to merge

No new issues. The rebase fix is correct and the PR remains in the approved state from the fifth review.

@thomhurst thomhurst merged commit 8126aa3 into main Mar 16, 2026
19 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Scale ValueTaskSourcePool cap with concurrency level

1 participant