diff --git a/CHANGELOG.adoc b/CHANGELOG.adoc index 543a21f07..c99c222ed 100644 --- a/CHANGELOG.adoc +++ b/CHANGELOG.adoc @@ -19,6 +19,7 @@ endif::[] === Fixes * fix: keep instantiated OffsetMapCodecManager so that metrics will not be recreated every commit (#859) +* feature: add batchStrategy to allow taking multiple records from the same shard in single batch construction == 0.5.3.3 diff --git a/README.adoc b/README.adoc index 39f4db1be..10e253e69 100644 --- a/README.adoc +++ b/README.adoc @@ -1539,6 +1539,7 @@ endif::[] === Fixes * fix: keep instantiated OffsetMapCodecManager so that metrics will not be recreated every commit (#859) +* feature: add batchStrategy to allow taking multiple records from the same shard in single batch construction == 0.5.3.3 diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java index 05cae8240..04639d5ac 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java @@ -570,4 +570,49 @@ public boolean isProducerSupplied() { */ @Builder.Default public final boolean ignoreReflectiveAccessExceptionsForAutoCommitDisabledCheck = false; + + /** + * Defines the strategy for batching records. + *

+ * Behavior by ordering mode: + *

+ */ + public enum BatchStrategy { + /** + * Strict sequential processing for ordered modes. This is the default. + *

+ * Only one record is processed at a time for the same Shard (Key or Partition). + * Even if batchSize is set, batches will be filled with records of different Shards. + *

+ * In {@link ProcessingOrder#UNORDERED}, this behaves as size-based batching and does not limit shard taking. + */ + SEQUENTIAL, + + /** + * Allows batching for the same Shard, but a batch can contain multiple Shards mixed together. + * Retrieves multiple records within the Shard-level lock to increase throughput in ordered modes. + *

+ * In {@link ProcessingOrder#UNORDERED}, this is equivalent to {@link #SEQUENTIAL}. + */ + BATCH_MULTIPLEX, + + /** + * Allows batching for the same Shard, and enforces that a single batch contains only one Shard. + * Use this when atomic batch processing per Shard (Key or Partition) is required. + *

+ * In {@link ProcessingOrder#UNORDERED}, this means one topic-partition per batch. + */ + BATCH_BY_SHARD + } + + /** + * The batch strategy to use. + */ + @Builder.Default + private final BatchStrategy batchStrategy = BatchStrategy.SEQUENTIAL; } diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java index fc716650e..06b94373f 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java @@ -9,6 +9,7 @@ import io.confluent.parallelconsumer.*; import io.confluent.parallelconsumer.metrics.PCMetrics; import io.confluent.parallelconsumer.metrics.PCMetricsDef; +import io.confluent.parallelconsumer.state.ShardKey; import io.confluent.parallelconsumer.state.WorkContainer; import io.confluent.parallelconsumer.state.WorkManager; import io.micrometer.core.instrument.Gauge; @@ -1043,9 +1044,48 @@ private void submitWorkToPoolInner(final Function, private List>> makeBatches(List> workToProcess) { int maxBatchSize = options.getBatchSize(); + ParallelConsumerOptions.BatchStrategy strategy = options.getBatchStrategy(); + + if (strategy == ParallelConsumerOptions.BatchStrategy.BATCH_BY_SHARD) { + // "Shard" depends on ordering: + // KEY => topic+key, PARTITION/UNORDERED => topic-partition. + return partitionByShardAndSize(workToProcess, maxBatchSize); + } return partition(workToProcess, maxBatchSize); } + private List>> partitionByShardAndSize(List> sourceCollection, int maxBatchSize) { + List>> listOfBatches = new ArrayList<>(); + List> batchInConstruction = new ArrayList<>(); + + Object lastKey = null; + + for (WorkContainer item : sourceCollection) { + Object currentKey = ShardKey.of(item, options.getOrdering()); + + // Check if we need to start a new batch + // 1. If the current batch is full + // 2. If the key has changed (and the batch is not empty) + boolean isBatchFull = batchInConstruction.size() >= maxBatchSize; + boolean isKeyChanged = !batchInConstruction.isEmpty() && !Objects.equals(lastKey, currentKey); + + if (isBatchFull || isKeyChanged) { + listOfBatches.add(batchInConstruction); + batchInConstruction = new ArrayList<>(); + } + + batchInConstruction.add(item); + lastKey = currentKey; + } + + // Add the last batch if it has any items + if (!batchInConstruction.isEmpty()) { + listOfBatches.add(batchInConstruction); + } + + return listOfBatches; + } + private static List> partition(Collection sourceCollection, int maxBatchSize) { List> listOfBatches = new ArrayList<>(); List batchInConstruction = new ArrayList<>(); @@ -1531,4 +1571,4 @@ private void clearCommitCommand() { } } -} \ No newline at end of file +} diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ProcessingShard.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ProcessingShard.java index 6dc4d6890..45cb7057f 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ProcessingShard.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ProcessingShard.java @@ -136,21 +136,39 @@ ArrayList> getWorkIfAvailable(int workToGetDelta, RetryQueue var workContainer = iterator.next().getValue(); if (pm.couldBeTakenAsWork(workContainer)) { + boolean taken = false; + if (workContainer.isAvailableToTakeAsWork()) { log.trace("Taking {} as work", workContainer); workContainer.onQueueingForExecution(); workTaken.add(workContainer); + taken = true; } else { log.trace("Skipping {} as work, not available to take as work", workContainer); addToSlowWorkMaybe(slowWork, workContainer); } + // Strategy-based take limits apply only when ordering is KEY/PARTITION. if (isOrderRestricted()) { - // can't take any more work from this shard, due to ordering restrictions - // processing blocked on this shard, continue to next shard - log.trace("Processing by {}, so have cannot get more messages on this ({}) shardEntry.", this.options.getOrdering(), getKey()); - break; + ParallelConsumerOptions.BatchStrategy strategy = options.getBatchStrategy(); + + // Allow continuing to gather work if both conditions are met: + // 1. Strategy is NOT strictly SEQUENTIAL + // 2. The current batch size hasn't reached the configured limit + boolean batchingAllowed = (strategy != ParallelConsumerOptions.BatchStrategy.SEQUENTIAL); + boolean batchIsNotFull = workTaken.size() < options.getBatchSize(); + + if (taken && batchingAllowed && batchIsNotFull) { + log.trace("Batching strategy {} enabled. Batch not full ({}/{}), continuing to gather work for shard {}", + strategy, workTaken.size(), options.getBatchSize(), getKey()); + // Loop continues to fetch the next available record in this shard + } else { + // Stop gathering for this shard if strictly sequential or batch is full + log.trace("Stopping work gather for ordered shard {} (Strategy: {}, Count: {})", + getKey(), strategy, workTaken.size()); + break; + } } } else { // break, assuming all work in this shard, is for the same ShardKey, which is always on the same diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/AbstractParallelEoSStreamProcessorConfigurationTest.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/AbstractParallelEoSStreamProcessorConfigurationTest.java index abe2444a8..34ae65af3 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/AbstractParallelEoSStreamProcessorConfigurationTest.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/AbstractParallelEoSStreamProcessorConfigurationTest.java @@ -23,11 +23,20 @@ import pl.tlinkowski.unij.api.UniLists; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.function.Function; +import static io.confluent.parallelconsumer.ParallelConsumerOptions.BatchStrategy.BATCH_BY_SHARD; +import static io.confluent.parallelconsumer.ParallelConsumerOptions.BatchStrategy.BATCH_MULTIPLEX; +import static io.confluent.parallelconsumer.ParallelConsumerOptions.BatchStrategy.SEQUENTIAL; +import static io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder.KEY; +import static io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder.PARTITION; +import static io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder.UNORDERED; +import static org.assertj.core.api.Assertions.assertThat; + /** * Tests to verify the protected and internal methods of * {@link io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor} work as expected. @@ -123,4 +132,166 @@ void testHandleStaleWorkNoSplit() { Assertions.assertEquals(testInstance.getMailBoxFailedCnt(), 0); } } + + /** Confirms the default batch strategy remains sequential. */ + @Test + void defaultBatchStrategyIsSequential() { + assertThat(testOptions.getBatchStrategy()).isEqualTo(SEQUENTIAL); + } + + /** Ensures batch size one always produces singleton batches. */ + @Test + void batchSizeOneMakesAllStrategiesProduceSingletonBatches() { + List> work = Arrays.asList( + newWorkContainer(0, 0, "key-a"), + newWorkContainer(1, 1, "key-b"), + newWorkContainer(0, 2, "key-c") + ); + + for (ParallelConsumerOptions.BatchStrategy strategy : ParallelConsumerOptions.BatchStrategy.values()) { + try (TestParallelEoSStreamProcessor processor = newProcessor(UNORDERED, strategy, 1)) { + var batches = processor.makeBatchesForTest(work); + assertThat(batchSizes(batches)).as("batch sizes for %s", strategy).containsExactly(1, 1, 1); + } + } + } + + /** Verifies unordered sequential and multiplex batching stay size-based. */ + @Test + void unorderedSequentialAndMultiplexUseSizeBasedBatching() { + List> work = Arrays.asList( + newWorkContainer(0, 0, "key-a"), + newWorkContainer(0, 1, "key-a"), + newWorkContainer(1, 2, "key-b"), + newWorkContainer(1, 3, "key-b") + ); + + for (ParallelConsumerOptions.BatchStrategy strategy : Arrays.asList(SEQUENTIAL, BATCH_MULTIPLEX)) { + try (TestParallelEoSStreamProcessor processor = newProcessor(UNORDERED, strategy, 3)) { + var batches = processor.makeBatchesForTest(work); + + assertThat(batchSizes(batches)).as("batch sizes for %s", strategy).containsExactly(3, 1); + assertThat(offsetsForBatch(batches.get(0))).containsExactly(0L, 1L, 2L); + assertThat(partitionsForBatch(batches.get(0))).containsExactly(0, 0, 1); + } + } + } + + /** Verifies unordered shard batching splits by topic-partition. */ + @Test + void unorderedBatchByShardBatchesPerTopicPartition() { + List> work = Arrays.asList( + newWorkContainer(0, 0, "key-a"), + newWorkContainer(0, 1, "key-b"), + newWorkContainer(1, 2, "key-a"), + newWorkContainer(1, 3, "key-b") + ); + + try (TestParallelEoSStreamProcessor processor = newProcessor(UNORDERED, BATCH_BY_SHARD, 10)) { + var batches = processor.makeBatchesForTest(work); + + assertThat(batchSizes(batches)).containsExactly(2, 2); + assertThat(offsetsForBatch(batches.get(0))).containsExactly(0L, 1L); + assertThat(partitionsForBatch(batches.get(0))).containsExactly(0, 0); + assertThat(offsetsForBatch(batches.get(1))).containsExactly(2L, 3L); + assertThat(partitionsForBatch(batches.get(1))).containsExactly(1, 1); + } + } + + /** Verifies key-ordered shard batching splits by key. */ + @Test + void keyOrderedBatchByShardBatchesPerKey() { + List> work = Arrays.asList( + newWorkContainer(0, 0, "key-a"), + newWorkContainer(0, 1, "key-a"), + newWorkContainer(0, 2, "key-b"), + newWorkContainer(0, 3, "key-b") + ); + + try (TestParallelEoSStreamProcessor processor = newProcessor(KEY, BATCH_BY_SHARD, 10)) { + var batches = processor.makeBatchesForTest(work); + + assertThat(batchSizes(batches)).containsExactly(2, 2); + assertThat(offsetsForBatch(batches.get(0))).containsExactly(0L, 1L); + assertThat(keysForBatch(batches.get(0))).containsExactly("key-a", "key-a"); + assertThat(offsetsForBatch(batches.get(1))).containsExactly(2L, 3L); + assertThat(keysForBatch(batches.get(1))).containsExactly("key-b", "key-b"); + } + } + + /** Verifies partition-ordered multiplex batching can mix partitions. */ + @Test + void partitionOrderedMultiplexCanMixPartitionsInOneBatch() { + List> work = Arrays.asList( + newWorkContainer(0, 0, "key-a"), + newWorkContainer(1, 1, "key-b"), + newWorkContainer(2, 2, "key-c") + ); + + try (TestParallelEoSStreamProcessor processor = newProcessor(PARTITION, BATCH_MULTIPLEX, 3)) { + var batches = processor.makeBatchesForTest(work); + + assertThat(batchSizes(batches)).containsExactly(3); + assertThat(partitionsForBatch(batches.get(0))).containsExactly(0, 1, 2); + } + } + + /** Verifies partition-ordered shard batching keeps partitions separate. */ + @Test + void partitionOrderedBatchByShardSeparatesPartitions() { + List> work = Arrays.asList( + newWorkContainer(0, 0, "key-a"), + newWorkContainer(0, 1, "key-b"), + newWorkContainer(1, 2, "key-c"), + newWorkContainer(1, 3, "key-d") + ); + + try (TestParallelEoSStreamProcessor processor = newProcessor(PARTITION, BATCH_BY_SHARD, 10)) { + var batches = processor.makeBatchesForTest(work); + + assertThat(batchSizes(batches)).containsExactly(2, 2); + assertThat(partitionsForBatch(batches.get(0))).containsExactly(0, 0); + assertThat(partitionsForBatch(batches.get(1))).containsExactly(1, 1); + } + } + + private TestParallelEoSStreamProcessor newProcessor(ParallelConsumerOptions.ProcessingOrder order, + ParallelConsumerOptions.BatchStrategy strategy, + int batchSize) { + var options = ParallelConsumerOptions.builder() + .consumer(new MockConsumer<>(OffsetResetStrategy.LATEST)) + .ordering(order) + .batchStrategy(strategy) + .batchSize(batchSize) + .build(); + return new TestParallelEoSStreamProcessor<>(options); + } + + private WorkContainer newWorkContainer(int partition, long offset, String key) { + return new WorkContainer<>(0, new ConsumerRecord<>(topic, partition, offset, key, "value-" + offset), module); + } + + private List batchSizes(List>> batches) { + List sizes = new ArrayList<>(); + batches.forEach(batch -> sizes.add(batch.size())); + return sizes; + } + + private List offsetsForBatch(List> batch) { + List offsets = new ArrayList<>(); + batch.forEach(workContainer -> offsets.add(workContainer.offset())); + return offsets; + } + + private List partitionsForBatch(List> batch) { + List partitions = new ArrayList<>(); + batch.forEach(workContainer -> partitions.add(workContainer.getTopicPartition().partition())); + return partitions; + } + + private List keysForBatch(List> batch) { + List keys = new ArrayList<>(); + batch.forEach(workContainer -> keys.add(workContainer.getCr().key())); + return keys; + } } diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/internal/TestParallelEoSStreamProcessor.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/internal/TestParallelEoSStreamProcessor.java index 8aa59d3c1..0b5d24f32 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/internal/TestParallelEoSStreamProcessor.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/internal/TestParallelEoSStreamProcessor.java @@ -10,6 +10,7 @@ import io.confluent.parallelconsumer.state.WorkManager; import org.apache.kafka.clients.consumer.ConsumerRecord; +import java.lang.reflect.Method; import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; @@ -35,6 +36,17 @@ public List, R>> runUserFunc( return super.runUserFunction(dummyFunction, callback , activeWorkContainers); } + @SuppressWarnings("unchecked") + public List>> makeBatchesForTest(List> workToProcess) { + try { + Method method = AbstractParallelEoSStreamProcessor.class.getDeclaredMethod("makeBatches", List.class); + method.setAccessible(true); + return (List>>) method.invoke(this, workToProcess); + } catch (ReflectiveOperationException e) { + throw new RuntimeException("Unable to invoke makeBatches", e); + } + } + public void setWm(WorkManager wm) { super.wm = wm; } diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkManagerTest.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkManagerTest.java index a0f3ccdcc..42148596f 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkManagerTest.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkManagerTest.java @@ -30,7 +30,9 @@ import org.junit.jupiter.api.parallel.Execution; import org.junit.jupiter.api.parallel.ExecutionMode; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; import org.threeten.extra.MutableClock; import pl.tlinkowski.unij.api.UniLists; @@ -39,13 +41,18 @@ import java.time.Duration; import java.util.*; import java.util.stream.Collectors; +import java.util.stream.Stream; import static com.google.common.truth.Truth.assertWithMessage; +import static io.confluent.parallelconsumer.ParallelConsumerOptions.BatchStrategy.BATCH_BY_SHARD; +import static io.confluent.parallelconsumer.ParallelConsumerOptions.BatchStrategy.BATCH_MULTIPLEX; +import static io.confluent.parallelconsumer.ParallelConsumerOptions.BatchStrategy.SEQUENTIAL; import static io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder.*; import static java.time.Duration.ofSeconds; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; import static pl.tlinkowski.unij.api.UniLists.of; +import static org.junit.jupiter.params.provider.Arguments.arguments; /** * Needs to run in {@link ExecutionMode#SAME_THREAD} because it manipulates the static state in @@ -169,6 +176,38 @@ void basic(ParallelConsumerOptions.ProcessingOrder order) { assertThat(gottenWork).isEmpty(); } + /** Verifies work selection count across ordering and batch strategy combinations. */ + @ParameterizedTest(name = "{index}: {0} / {1} -> {2}") + @MethodSource("batchStrategyWorkSelectionCases") + void workSelectionFollowsBatchStrategy(ParallelConsumerOptions.ProcessingOrder order, + ParallelConsumerOptions.BatchStrategy strategy, + List expectedOffsets) { + setupWorkManager(ParallelConsumerOptions.builder() + .ordering(order) + .batchStrategy(strategy) + .batchSize(2) + .build()); + + registerSomeWork(); + + var gottenWork = wm.getWorkIfAvailable(10); + assertOffsets(gottenWork, expectedOffsets); + } + + private static Stream batchStrategyWorkSelectionCases() { + return Stream.of( + arguments(KEY, SEQUENTIAL, of(0)), + arguments(KEY, BATCH_MULTIPLEX, of(0, 1)), + arguments(KEY, BATCH_BY_SHARD, of(0, 1)), + arguments(PARTITION, SEQUENTIAL, of(0)), + arguments(PARTITION, BATCH_MULTIPLEX, of(0, 1)), + arguments(PARTITION, BATCH_BY_SHARD, of(0, 1)), + arguments(UNORDERED, SEQUENTIAL, of(0, 1, 2)), + arguments(UNORDERED, BATCH_MULTIPLEX, of(0, 1, 2)), + arguments(UNORDERED, BATCH_BY_SHARD, of(0, 1, 2)) + ); + } + @Test void testUnorderedAndDelayed() { setupWorkManager(ParallelConsumerOptions.builder()