diff --git a/CHANGELOG.adoc b/CHANGELOG.adoc
index 543a21f07..c99c222ed 100644
--- a/CHANGELOG.adoc
+++ b/CHANGELOG.adoc
@@ -19,6 +19,7 @@ endif::[]
=== Fixes
* fix: keep instantiated OffsetMapCodecManager so that metrics will not be recreated every commit (#859)
+* feature: add batchStrategy to allow taking multiple records from the same shard in single batch construction
== 0.5.3.3
diff --git a/README.adoc b/README.adoc
index 39f4db1be..10e253e69 100644
--- a/README.adoc
+++ b/README.adoc
@@ -1539,6 +1539,7 @@ endif::[]
=== Fixes
* fix: keep instantiated OffsetMapCodecManager so that metrics will not be recreated every commit (#859)
+* feature: add batchStrategy to allow taking multiple records from the same shard in single batch construction
== 0.5.3.3
diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java
index 05cae8240..04639d5ac 100644
--- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java
+++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java
@@ -570,4 +570,49 @@ public boolean isProducerSupplied() {
*/
@Builder.Default
public final boolean ignoreReflectiveAccessExceptionsForAutoCommitDisabledCheck = false;
+
+ /**
+ * Defines the strategy for batching records.
+ *
+ * Behavior by ordering mode:
+ *
+ * - {@link ProcessingOrder#KEY} / {@link ProcessingOrder#PARTITION}: strategy controls how many records can
+ * be taken from a shard in a single scheduling cycle and whether batches can mix shards.
+ * - {@link ProcessingOrder#UNORDERED}: take-from-shard flow is not restricted by strategy. Batch grouping
+ * still applies; with {@link BatchStrategy#BATCH_BY_SHARD}, a shard maps to topic-partition.
+ *
+ */
+ public enum BatchStrategy {
+ /**
+ * Strict sequential processing for ordered modes. This is the default.
+ *
+ * Only one record is processed at a time for the same Shard (Key or Partition).
+ * Even if batchSize is set, batches will be filled with records of different Shards.
+ *
+ * In {@link ProcessingOrder#UNORDERED}, this behaves as size-based batching and does not limit shard taking.
+ */
+ SEQUENTIAL,
+
+ /**
+ * Allows batching for the same Shard, but a batch can contain multiple Shards mixed together.
+ * Retrieves multiple records within the Shard-level lock to increase throughput in ordered modes.
+ *
+ * In {@link ProcessingOrder#UNORDERED}, this is equivalent to {@link #SEQUENTIAL}.
+ */
+ BATCH_MULTIPLEX,
+
+ /**
+ * Allows batching for the same Shard, and enforces that a single batch contains only one Shard.
+ * Use this when atomic batch processing per Shard (Key or Partition) is required.
+ *
+ * In {@link ProcessingOrder#UNORDERED}, this means one topic-partition per batch.
+ */
+ BATCH_BY_SHARD
+ }
+
+ /**
+ * The batch strategy to use.
+ */
+ @Builder.Default
+ private final BatchStrategy batchStrategy = BatchStrategy.SEQUENTIAL;
}
diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java
index fc716650e..06b94373f 100644
--- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java
+++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java
@@ -9,6 +9,7 @@
import io.confluent.parallelconsumer.*;
import io.confluent.parallelconsumer.metrics.PCMetrics;
import io.confluent.parallelconsumer.metrics.PCMetricsDef;
+import io.confluent.parallelconsumer.state.ShardKey;
import io.confluent.parallelconsumer.state.WorkContainer;
import io.confluent.parallelconsumer.state.WorkManager;
import io.micrometer.core.instrument.Gauge;
@@ -1043,9 +1044,48 @@ private void submitWorkToPoolInner(final Function,
private List>> makeBatches(List> workToProcess) {
int maxBatchSize = options.getBatchSize();
+ ParallelConsumerOptions.BatchStrategy strategy = options.getBatchStrategy();
+
+ if (strategy == ParallelConsumerOptions.BatchStrategy.BATCH_BY_SHARD) {
+ // "Shard" depends on ordering:
+ // KEY => topic+key, PARTITION/UNORDERED => topic-partition.
+ return partitionByShardAndSize(workToProcess, maxBatchSize);
+ }
return partition(workToProcess, maxBatchSize);
}
+ private List>> partitionByShardAndSize(List> sourceCollection, int maxBatchSize) {
+ List>> listOfBatches = new ArrayList<>();
+ List> batchInConstruction = new ArrayList<>();
+
+ Object lastKey = null;
+
+ for (WorkContainer item : sourceCollection) {
+ Object currentKey = ShardKey.of(item, options.getOrdering());
+
+ // Check if we need to start a new batch
+ // 1. If the current batch is full
+ // 2. If the key has changed (and the batch is not empty)
+ boolean isBatchFull = batchInConstruction.size() >= maxBatchSize;
+ boolean isKeyChanged = !batchInConstruction.isEmpty() && !Objects.equals(lastKey, currentKey);
+
+ if (isBatchFull || isKeyChanged) {
+ listOfBatches.add(batchInConstruction);
+ batchInConstruction = new ArrayList<>();
+ }
+
+ batchInConstruction.add(item);
+ lastKey = currentKey;
+ }
+
+ // Add the last batch if it has any items
+ if (!batchInConstruction.isEmpty()) {
+ listOfBatches.add(batchInConstruction);
+ }
+
+ return listOfBatches;
+ }
+
private static List> partition(Collection sourceCollection, int maxBatchSize) {
List> listOfBatches = new ArrayList<>();
List batchInConstruction = new ArrayList<>();
@@ -1531,4 +1571,4 @@ private void clearCommitCommand() {
}
}
-}
\ No newline at end of file
+}
diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ProcessingShard.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ProcessingShard.java
index 6dc4d6890..45cb7057f 100644
--- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ProcessingShard.java
+++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ProcessingShard.java
@@ -136,21 +136,39 @@ ArrayList> getWorkIfAvailable(int workToGetDelta, RetryQueue
var workContainer = iterator.next().getValue();
if (pm.couldBeTakenAsWork(workContainer)) {
+ boolean taken = false;
+
if (workContainer.isAvailableToTakeAsWork()) {
log.trace("Taking {} as work", workContainer);
workContainer.onQueueingForExecution();
workTaken.add(workContainer);
+ taken = true;
} else {
log.trace("Skipping {} as work, not available to take as work", workContainer);
addToSlowWorkMaybe(slowWork, workContainer);
}
+ // Strategy-based take limits apply only when ordering is KEY/PARTITION.
if (isOrderRestricted()) {
- // can't take any more work from this shard, due to ordering restrictions
- // processing blocked on this shard, continue to next shard
- log.trace("Processing by {}, so have cannot get more messages on this ({}) shardEntry.", this.options.getOrdering(), getKey());
- break;
+ ParallelConsumerOptions.BatchStrategy strategy = options.getBatchStrategy();
+
+ // Allow continuing to gather work if both conditions are met:
+ // 1. Strategy is NOT strictly SEQUENTIAL
+ // 2. The current batch size hasn't reached the configured limit
+ boolean batchingAllowed = (strategy != ParallelConsumerOptions.BatchStrategy.SEQUENTIAL);
+ boolean batchIsNotFull = workTaken.size() < options.getBatchSize();
+
+ if (taken && batchingAllowed && batchIsNotFull) {
+ log.trace("Batching strategy {} enabled. Batch not full ({}/{}), continuing to gather work for shard {}",
+ strategy, workTaken.size(), options.getBatchSize(), getKey());
+ // Loop continues to fetch the next available record in this shard
+ } else {
+ // Stop gathering for this shard if strictly sequential or batch is full
+ log.trace("Stopping work gather for ordered shard {} (Strategy: {}, Count: {})",
+ getKey(), strategy, workTaken.size());
+ break;
+ }
}
} else {
// break, assuming all work in this shard, is for the same ShardKey, which is always on the same
diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/AbstractParallelEoSStreamProcessorConfigurationTest.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/AbstractParallelEoSStreamProcessorConfigurationTest.java
index abe2444a8..34ae65af3 100644
--- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/AbstractParallelEoSStreamProcessorConfigurationTest.java
+++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/AbstractParallelEoSStreamProcessorConfigurationTest.java
@@ -23,11 +23,20 @@
import pl.tlinkowski.unij.api.UniLists;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
+import static io.confluent.parallelconsumer.ParallelConsumerOptions.BatchStrategy.BATCH_BY_SHARD;
+import static io.confluent.parallelconsumer.ParallelConsumerOptions.BatchStrategy.BATCH_MULTIPLEX;
+import static io.confluent.parallelconsumer.ParallelConsumerOptions.BatchStrategy.SEQUENTIAL;
+import static io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder.KEY;
+import static io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder.PARTITION;
+import static io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder.UNORDERED;
+import static org.assertj.core.api.Assertions.assertThat;
+
/**
* Tests to verify the protected and internal methods of
* {@link io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor} work as expected.
@@ -123,4 +132,166 @@ void testHandleStaleWorkNoSplit() {
Assertions.assertEquals(testInstance.getMailBoxFailedCnt(), 0);
}
}
+
+ /** Confirms the default batch strategy remains sequential. */
+ @Test
+ void defaultBatchStrategyIsSequential() {
+ assertThat(testOptions.getBatchStrategy()).isEqualTo(SEQUENTIAL);
+ }
+
+ /** Ensures batch size one always produces singleton batches. */
+ @Test
+ void batchSizeOneMakesAllStrategiesProduceSingletonBatches() {
+ List> work = Arrays.asList(
+ newWorkContainer(0, 0, "key-a"),
+ newWorkContainer(1, 1, "key-b"),
+ newWorkContainer(0, 2, "key-c")
+ );
+
+ for (ParallelConsumerOptions.BatchStrategy strategy : ParallelConsumerOptions.BatchStrategy.values()) {
+ try (TestParallelEoSStreamProcessor processor = newProcessor(UNORDERED, strategy, 1)) {
+ var batches = processor.makeBatchesForTest(work);
+ assertThat(batchSizes(batches)).as("batch sizes for %s", strategy).containsExactly(1, 1, 1);
+ }
+ }
+ }
+
+ /** Verifies unordered sequential and multiplex batching stay size-based. */
+ @Test
+ void unorderedSequentialAndMultiplexUseSizeBasedBatching() {
+ List> work = Arrays.asList(
+ newWorkContainer(0, 0, "key-a"),
+ newWorkContainer(0, 1, "key-a"),
+ newWorkContainer(1, 2, "key-b"),
+ newWorkContainer(1, 3, "key-b")
+ );
+
+ for (ParallelConsumerOptions.BatchStrategy strategy : Arrays.asList(SEQUENTIAL, BATCH_MULTIPLEX)) {
+ try (TestParallelEoSStreamProcessor processor = newProcessor(UNORDERED, strategy, 3)) {
+ var batches = processor.makeBatchesForTest(work);
+
+ assertThat(batchSizes(batches)).as("batch sizes for %s", strategy).containsExactly(3, 1);
+ assertThat(offsetsForBatch(batches.get(0))).containsExactly(0L, 1L, 2L);
+ assertThat(partitionsForBatch(batches.get(0))).containsExactly(0, 0, 1);
+ }
+ }
+ }
+
+ /** Verifies unordered shard batching splits by topic-partition. */
+ @Test
+ void unorderedBatchByShardBatchesPerTopicPartition() {
+ List> work = Arrays.asList(
+ newWorkContainer(0, 0, "key-a"),
+ newWorkContainer(0, 1, "key-b"),
+ newWorkContainer(1, 2, "key-a"),
+ newWorkContainer(1, 3, "key-b")
+ );
+
+ try (TestParallelEoSStreamProcessor processor = newProcessor(UNORDERED, BATCH_BY_SHARD, 10)) {
+ var batches = processor.makeBatchesForTest(work);
+
+ assertThat(batchSizes(batches)).containsExactly(2, 2);
+ assertThat(offsetsForBatch(batches.get(0))).containsExactly(0L, 1L);
+ assertThat(partitionsForBatch(batches.get(0))).containsExactly(0, 0);
+ assertThat(offsetsForBatch(batches.get(1))).containsExactly(2L, 3L);
+ assertThat(partitionsForBatch(batches.get(1))).containsExactly(1, 1);
+ }
+ }
+
+ /** Verifies key-ordered shard batching splits by key. */
+ @Test
+ void keyOrderedBatchByShardBatchesPerKey() {
+ List> work = Arrays.asList(
+ newWorkContainer(0, 0, "key-a"),
+ newWorkContainer(0, 1, "key-a"),
+ newWorkContainer(0, 2, "key-b"),
+ newWorkContainer(0, 3, "key-b")
+ );
+
+ try (TestParallelEoSStreamProcessor processor = newProcessor(KEY, BATCH_BY_SHARD, 10)) {
+ var batches = processor.makeBatchesForTest(work);
+
+ assertThat(batchSizes(batches)).containsExactly(2, 2);
+ assertThat(offsetsForBatch(batches.get(0))).containsExactly(0L, 1L);
+ assertThat(keysForBatch(batches.get(0))).containsExactly("key-a", "key-a");
+ assertThat(offsetsForBatch(batches.get(1))).containsExactly(2L, 3L);
+ assertThat(keysForBatch(batches.get(1))).containsExactly("key-b", "key-b");
+ }
+ }
+
+ /** Verifies partition-ordered multiplex batching can mix partitions. */
+ @Test
+ void partitionOrderedMultiplexCanMixPartitionsInOneBatch() {
+ List> work = Arrays.asList(
+ newWorkContainer(0, 0, "key-a"),
+ newWorkContainer(1, 1, "key-b"),
+ newWorkContainer(2, 2, "key-c")
+ );
+
+ try (TestParallelEoSStreamProcessor processor = newProcessor(PARTITION, BATCH_MULTIPLEX, 3)) {
+ var batches = processor.makeBatchesForTest(work);
+
+ assertThat(batchSizes(batches)).containsExactly(3);
+ assertThat(partitionsForBatch(batches.get(0))).containsExactly(0, 1, 2);
+ }
+ }
+
+ /** Verifies partition-ordered shard batching keeps partitions separate. */
+ @Test
+ void partitionOrderedBatchByShardSeparatesPartitions() {
+ List> work = Arrays.asList(
+ newWorkContainer(0, 0, "key-a"),
+ newWorkContainer(0, 1, "key-b"),
+ newWorkContainer(1, 2, "key-c"),
+ newWorkContainer(1, 3, "key-d")
+ );
+
+ try (TestParallelEoSStreamProcessor processor = newProcessor(PARTITION, BATCH_BY_SHARD, 10)) {
+ var batches = processor.makeBatchesForTest(work);
+
+ assertThat(batchSizes(batches)).containsExactly(2, 2);
+ assertThat(partitionsForBatch(batches.get(0))).containsExactly(0, 0);
+ assertThat(partitionsForBatch(batches.get(1))).containsExactly(1, 1);
+ }
+ }
+
+ private TestParallelEoSStreamProcessor newProcessor(ParallelConsumerOptions.ProcessingOrder order,
+ ParallelConsumerOptions.BatchStrategy strategy,
+ int batchSize) {
+ var options = ParallelConsumerOptions.builder()
+ .consumer(new MockConsumer<>(OffsetResetStrategy.LATEST))
+ .ordering(order)
+ .batchStrategy(strategy)
+ .batchSize(batchSize)
+ .build();
+ return new TestParallelEoSStreamProcessor<>(options);
+ }
+
+ private WorkContainer newWorkContainer(int partition, long offset, String key) {
+ return new WorkContainer<>(0, new ConsumerRecord<>(topic, partition, offset, key, "value-" + offset), module);
+ }
+
+ private List batchSizes(List>> batches) {
+ List sizes = new ArrayList<>();
+ batches.forEach(batch -> sizes.add(batch.size()));
+ return sizes;
+ }
+
+ private List offsetsForBatch(List> batch) {
+ List offsets = new ArrayList<>();
+ batch.forEach(workContainer -> offsets.add(workContainer.offset()));
+ return offsets;
+ }
+
+ private List partitionsForBatch(List> batch) {
+ List partitions = new ArrayList<>();
+ batch.forEach(workContainer -> partitions.add(workContainer.getTopicPartition().partition()));
+ return partitions;
+ }
+
+ private List keysForBatch(List> batch) {
+ List keys = new ArrayList<>();
+ batch.forEach(workContainer -> keys.add(workContainer.getCr().key()));
+ return keys;
+ }
}
diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/internal/TestParallelEoSStreamProcessor.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/internal/TestParallelEoSStreamProcessor.java
index 8aa59d3c1..0b5d24f32 100644
--- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/internal/TestParallelEoSStreamProcessor.java
+++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/internal/TestParallelEoSStreamProcessor.java
@@ -10,6 +10,7 @@
import io.confluent.parallelconsumer.state.WorkManager;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
@@ -35,6 +36,17 @@ public List, R>> runUserFunc(
return super.runUserFunction(dummyFunction, callback , activeWorkContainers);
}
+ @SuppressWarnings("unchecked")
+ public List>> makeBatchesForTest(List> workToProcess) {
+ try {
+ Method method = AbstractParallelEoSStreamProcessor.class.getDeclaredMethod("makeBatches", List.class);
+ method.setAccessible(true);
+ return (List>>) method.invoke(this, workToProcess);
+ } catch (ReflectiveOperationException e) {
+ throw new RuntimeException("Unable to invoke makeBatches", e);
+ }
+ }
+
public void setWm(WorkManager wm) {
super.wm = wm;
}
diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkManagerTest.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkManagerTest.java
index a0f3ccdcc..42148596f 100644
--- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkManagerTest.java
+++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkManagerTest.java
@@ -30,7 +30,9 @@
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.threeten.extra.MutableClock;
import pl.tlinkowski.unij.api.UniLists;
@@ -39,13 +41,18 @@
import java.time.Duration;
import java.util.*;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static com.google.common.truth.Truth.assertWithMessage;
+import static io.confluent.parallelconsumer.ParallelConsumerOptions.BatchStrategy.BATCH_BY_SHARD;
+import static io.confluent.parallelconsumer.ParallelConsumerOptions.BatchStrategy.BATCH_MULTIPLEX;
+import static io.confluent.parallelconsumer.ParallelConsumerOptions.BatchStrategy.SEQUENTIAL;
import static io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder.*;
import static java.time.Duration.ofSeconds;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static pl.tlinkowski.unij.api.UniLists.of;
+import static org.junit.jupiter.params.provider.Arguments.arguments;
/**
* Needs to run in {@link ExecutionMode#SAME_THREAD} because it manipulates the static state in
@@ -169,6 +176,38 @@ void basic(ParallelConsumerOptions.ProcessingOrder order) {
assertThat(gottenWork).isEmpty();
}
+ /** Verifies work selection count across ordering and batch strategy combinations. */
+ @ParameterizedTest(name = "{index}: {0} / {1} -> {2}")
+ @MethodSource("batchStrategyWorkSelectionCases")
+ void workSelectionFollowsBatchStrategy(ParallelConsumerOptions.ProcessingOrder order,
+ ParallelConsumerOptions.BatchStrategy strategy,
+ List expectedOffsets) {
+ setupWorkManager(ParallelConsumerOptions.builder()
+ .ordering(order)
+ .batchStrategy(strategy)
+ .batchSize(2)
+ .build());
+
+ registerSomeWork();
+
+ var gottenWork = wm.getWorkIfAvailable(10);
+ assertOffsets(gottenWork, expectedOffsets);
+ }
+
+ private static Stream batchStrategyWorkSelectionCases() {
+ return Stream.of(
+ arguments(KEY, SEQUENTIAL, of(0)),
+ arguments(KEY, BATCH_MULTIPLEX, of(0, 1)),
+ arguments(KEY, BATCH_BY_SHARD, of(0, 1)),
+ arguments(PARTITION, SEQUENTIAL, of(0)),
+ arguments(PARTITION, BATCH_MULTIPLEX, of(0, 1)),
+ arguments(PARTITION, BATCH_BY_SHARD, of(0, 1)),
+ arguments(UNORDERED, SEQUENTIAL, of(0, 1, 2)),
+ arguments(UNORDERED, BATCH_MULTIPLEX, of(0, 1, 2)),
+ arguments(UNORDERED, BATCH_BY_SHARD, of(0, 1, 2))
+ );
+ }
+
@Test
void testUnorderedAndDelayed() {
setupWorkManager(ParallelConsumerOptions.builder()