Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ endif::[]
=== Fixes

* fix: keep instantiated OffsetMapCodecManager so that metrics will not be recreated every commit (#859)
* feature: add batchStrategy to allow taking multiple records from the same shard in single batch construction

== 0.5.3.3

Expand Down
1 change: 1 addition & 0 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1539,6 +1539,7 @@ endif::[]
=== Fixes

* fix: keep instantiated OffsetMapCodecManager so that metrics will not be recreated every commit (#859)
* feature: add batchStrategy to allow taking multiple records from the same shard in single batch construction

== 0.5.3.3

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -570,4 +570,49 @@ public boolean isProducerSupplied() {
*/
@Builder.Default
public final boolean ignoreReflectiveAccessExceptionsForAutoCommitDisabledCheck = false;

/**
* Defines the strategy for batching records.
* <p>
* Behavior by ordering mode:
* <ul>
* <li>{@link ProcessingOrder#KEY} / {@link ProcessingOrder#PARTITION}: strategy controls how many records can
* be taken from a shard in a single scheduling cycle and whether batches can mix shards.</li>
* <li>{@link ProcessingOrder#UNORDERED}: take-from-shard flow is not restricted by strategy. Batch grouping
* still applies; with {@link BatchStrategy#BATCH_BY_SHARD}, a shard maps to topic-partition.</li>
* </ul>
*/
public enum BatchStrategy {
/**
* Strict sequential processing for ordered modes. This is the default.
* <p>
* Only one record is processed at a time for the same Shard (Key or Partition).
* Even if batchSize is set, batches will be filled with records of different Shards.
* <p>
* In {@link ProcessingOrder#UNORDERED}, this behaves as size-based batching and does not limit shard taking.
*/
SEQUENTIAL,

/**
* Allows batching for the same Shard, but a batch can contain multiple Shards mixed together.
* Retrieves multiple records within the Shard-level lock to increase throughput in ordered modes.
* <p>
* In {@link ProcessingOrder#UNORDERED}, this is equivalent to {@link #SEQUENTIAL}.
*/
BATCH_MULTIPLEX,

/**
* Allows batching for the same Shard, and enforces that a single batch contains only one Shard.
* Use this when atomic batch processing per Shard (Key or Partition) is required.
* <p>
* In {@link ProcessingOrder#UNORDERED}, this means one topic-partition per batch.
*/
BATCH_BY_SHARD
}

/**
* The batch strategy to use.
*/
@Builder.Default
private final BatchStrategy batchStrategy = BatchStrategy.SEQUENTIAL;
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.confluent.parallelconsumer.*;
import io.confluent.parallelconsumer.metrics.PCMetrics;
import io.confluent.parallelconsumer.metrics.PCMetricsDef;
import io.confluent.parallelconsumer.state.ShardKey;
import io.confluent.parallelconsumer.state.WorkContainer;
import io.confluent.parallelconsumer.state.WorkManager;
import io.micrometer.core.instrument.Gauge;
Expand Down Expand Up @@ -1043,9 +1044,48 @@ private <R> void submitWorkToPoolInner(final Function<PollContextInternal<K, V>,

private List<List<WorkContainer<K, V>>> makeBatches(List<WorkContainer<K, V>> workToProcess) {
int maxBatchSize = options.getBatchSize();
ParallelConsumerOptions.BatchStrategy strategy = options.getBatchStrategy();

if (strategy == ParallelConsumerOptions.BatchStrategy.BATCH_BY_SHARD) {
// "Shard" depends on ordering:
// KEY => topic+key, PARTITION/UNORDERED => topic-partition.
return partitionByShardAndSize(workToProcess, maxBatchSize);
}
return partition(workToProcess, maxBatchSize);
}

private List<List<WorkContainer<K, V>>> partitionByShardAndSize(List<WorkContainer<K, V>> sourceCollection, int maxBatchSize) {
List<List<WorkContainer<K, V>>> listOfBatches = new ArrayList<>();
List<WorkContainer<K, V>> batchInConstruction = new ArrayList<>();

Object lastKey = null;

for (WorkContainer<K, V> item : sourceCollection) {
Object currentKey = ShardKey.of(item, options.getOrdering());

// Check if we need to start a new batch
// 1. If the current batch is full
// 2. If the key has changed (and the batch is not empty)
boolean isBatchFull = batchInConstruction.size() >= maxBatchSize;
boolean isKeyChanged = !batchInConstruction.isEmpty() && !Objects.equals(lastKey, currentKey);

if (isBatchFull || isKeyChanged) {
listOfBatches.add(batchInConstruction);
batchInConstruction = new ArrayList<>();
}

batchInConstruction.add(item);
lastKey = currentKey;
}

// Add the last batch if it has any items
if (!batchInConstruction.isEmpty()) {
listOfBatches.add(batchInConstruction);
}

return listOfBatches;
}

private static <T> List<List<T>> partition(Collection<T> sourceCollection, int maxBatchSize) {
List<List<T>> listOfBatches = new ArrayList<>();
List<T> batchInConstruction = new ArrayList<>();
Expand Down Expand Up @@ -1531,4 +1571,4 @@ private void clearCommitCommand() {
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,21 +136,39 @@ ArrayList<WorkContainer<K, V>> getWorkIfAvailable(int workToGetDelta, RetryQueue
var workContainer = iterator.next().getValue();

if (pm.couldBeTakenAsWork(workContainer)) {
boolean taken = false;

if (workContainer.isAvailableToTakeAsWork()) {
log.trace("Taking {} as work", workContainer);

workContainer.onQueueingForExecution();
workTaken.add(workContainer);
taken = true;
} else {
log.trace("Skipping {} as work, not available to take as work", workContainer);
addToSlowWorkMaybe(slowWork, workContainer);
}

// Strategy-based take limits apply only when ordering is KEY/PARTITION.
if (isOrderRestricted()) {
// can't take any more work from this shard, due to ordering restrictions
// processing blocked on this shard, continue to next shard
log.trace("Processing by {}, so have cannot get more messages on this ({}) shardEntry.", this.options.getOrdering(), getKey());
break;
ParallelConsumerOptions.BatchStrategy strategy = options.getBatchStrategy();

// Allow continuing to gather work if both conditions are met:
// 1. Strategy is NOT strictly SEQUENTIAL
// 2. The current batch size hasn't reached the configured limit
boolean batchingAllowed = (strategy != ParallelConsumerOptions.BatchStrategy.SEQUENTIAL);
boolean batchIsNotFull = workTaken.size() < options.getBatchSize();

if (taken && batchingAllowed && batchIsNotFull) {
log.trace("Batching strategy {} enabled. Batch not full ({}/{}), continuing to gather work for shard {}",
strategy, workTaken.size(), options.getBatchSize(), getKey());
// Loop continues to fetch the next available record in this shard
} else {
// Stop gathering for this shard if strictly sequential or batch is full
log.trace("Stopping work gather for ordered shard {} (Strategy: {}, Count: {})",
getKey(), strategy, workTaken.size());
break;
}
}
} else {
// break, assuming all work in this shard, is for the same ShardKey, which is always on the same
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,20 @@
import pl.tlinkowski.unij.api.UniLists;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;

import static io.confluent.parallelconsumer.ParallelConsumerOptions.BatchStrategy.BATCH_BY_SHARD;
import static io.confluent.parallelconsumer.ParallelConsumerOptions.BatchStrategy.BATCH_MULTIPLEX;
import static io.confluent.parallelconsumer.ParallelConsumerOptions.BatchStrategy.SEQUENTIAL;
import static io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder.KEY;
import static io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder.PARTITION;
import static io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder.UNORDERED;
import static org.assertj.core.api.Assertions.assertThat;

/**
* Tests to verify the protected and internal methods of
* {@link io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor} work as expected.
Expand Down Expand Up @@ -123,4 +132,166 @@ void testHandleStaleWorkNoSplit() {
Assertions.assertEquals(testInstance.getMailBoxFailedCnt(), 0);
}
}

/** Confirms the default batch strategy remains sequential. */
@Test
void defaultBatchStrategyIsSequential() {
assertThat(testOptions.getBatchStrategy()).isEqualTo(SEQUENTIAL);
}

/** Ensures batch size one always produces singleton batches. */
@Test
void batchSizeOneMakesAllStrategiesProduceSingletonBatches() {
List<WorkContainer<String, String>> work = Arrays.asList(
newWorkContainer(0, 0, "key-a"),
newWorkContainer(1, 1, "key-b"),
newWorkContainer(0, 2, "key-c")
);

for (ParallelConsumerOptions.BatchStrategy strategy : ParallelConsumerOptions.BatchStrategy.values()) {
try (TestParallelEoSStreamProcessor<String, String> processor = newProcessor(UNORDERED, strategy, 1)) {
var batches = processor.makeBatchesForTest(work);
assertThat(batchSizes(batches)).as("batch sizes for %s", strategy).containsExactly(1, 1, 1);
}
}
}

/** Verifies unordered sequential and multiplex batching stay size-based. */
@Test
void unorderedSequentialAndMultiplexUseSizeBasedBatching() {
List<WorkContainer<String, String>> work = Arrays.asList(
newWorkContainer(0, 0, "key-a"),
newWorkContainer(0, 1, "key-a"),
newWorkContainer(1, 2, "key-b"),
newWorkContainer(1, 3, "key-b")
);

for (ParallelConsumerOptions.BatchStrategy strategy : Arrays.asList(SEQUENTIAL, BATCH_MULTIPLEX)) {
try (TestParallelEoSStreamProcessor<String, String> processor = newProcessor(UNORDERED, strategy, 3)) {
var batches = processor.makeBatchesForTest(work);

assertThat(batchSizes(batches)).as("batch sizes for %s", strategy).containsExactly(3, 1);
assertThat(offsetsForBatch(batches.get(0))).containsExactly(0L, 1L, 2L);
assertThat(partitionsForBatch(batches.get(0))).containsExactly(0, 0, 1);
}
}
}

/** Verifies unordered shard batching splits by topic-partition. */
@Test
void unorderedBatchByShardBatchesPerTopicPartition() {
List<WorkContainer<String, String>> work = Arrays.asList(
newWorkContainer(0, 0, "key-a"),
newWorkContainer(0, 1, "key-b"),
newWorkContainer(1, 2, "key-a"),
newWorkContainer(1, 3, "key-b")
);

try (TestParallelEoSStreamProcessor<String, String> processor = newProcessor(UNORDERED, BATCH_BY_SHARD, 10)) {
var batches = processor.makeBatchesForTest(work);

assertThat(batchSizes(batches)).containsExactly(2, 2);
assertThat(offsetsForBatch(batches.get(0))).containsExactly(0L, 1L);
assertThat(partitionsForBatch(batches.get(0))).containsExactly(0, 0);
assertThat(offsetsForBatch(batches.get(1))).containsExactly(2L, 3L);
assertThat(partitionsForBatch(batches.get(1))).containsExactly(1, 1);
}
}

/** Verifies key-ordered shard batching splits by key. */
@Test
void keyOrderedBatchByShardBatchesPerKey() {
List<WorkContainer<String, String>> work = Arrays.asList(
newWorkContainer(0, 0, "key-a"),
newWorkContainer(0, 1, "key-a"),
newWorkContainer(0, 2, "key-b"),
newWorkContainer(0, 3, "key-b")
);

try (TestParallelEoSStreamProcessor<String, String> processor = newProcessor(KEY, BATCH_BY_SHARD, 10)) {
var batches = processor.makeBatchesForTest(work);

assertThat(batchSizes(batches)).containsExactly(2, 2);
assertThat(offsetsForBatch(batches.get(0))).containsExactly(0L, 1L);
assertThat(keysForBatch(batches.get(0))).containsExactly("key-a", "key-a");
assertThat(offsetsForBatch(batches.get(1))).containsExactly(2L, 3L);
assertThat(keysForBatch(batches.get(1))).containsExactly("key-b", "key-b");
}
}

/** Verifies partition-ordered multiplex batching can mix partitions. */
@Test
void partitionOrderedMultiplexCanMixPartitionsInOneBatch() {
List<WorkContainer<String, String>> work = Arrays.asList(
newWorkContainer(0, 0, "key-a"),
newWorkContainer(1, 1, "key-b"),
newWorkContainer(2, 2, "key-c")
);

try (TestParallelEoSStreamProcessor<String, String> processor = newProcessor(PARTITION, BATCH_MULTIPLEX, 3)) {
var batches = processor.makeBatchesForTest(work);

assertThat(batchSizes(batches)).containsExactly(3);
assertThat(partitionsForBatch(batches.get(0))).containsExactly(0, 1, 2);
}
}

/** Verifies partition-ordered shard batching keeps partitions separate. */
@Test
void partitionOrderedBatchByShardSeparatesPartitions() {
List<WorkContainer<String, String>> work = Arrays.asList(
newWorkContainer(0, 0, "key-a"),
newWorkContainer(0, 1, "key-b"),
newWorkContainer(1, 2, "key-c"),
newWorkContainer(1, 3, "key-d")
);

try (TestParallelEoSStreamProcessor<String, String> processor = newProcessor(PARTITION, BATCH_BY_SHARD, 10)) {
var batches = processor.makeBatchesForTest(work);

assertThat(batchSizes(batches)).containsExactly(2, 2);
assertThat(partitionsForBatch(batches.get(0))).containsExactly(0, 0);
assertThat(partitionsForBatch(batches.get(1))).containsExactly(1, 1);
}
}

private TestParallelEoSStreamProcessor<String, String> newProcessor(ParallelConsumerOptions.ProcessingOrder order,
ParallelConsumerOptions.BatchStrategy strategy,
int batchSize) {
var options = ParallelConsumerOptions.<String, String>builder()
.consumer(new MockConsumer<>(OffsetResetStrategy.LATEST))
.ordering(order)
.batchStrategy(strategy)
.batchSize(batchSize)
.build();
return new TestParallelEoSStreamProcessor<>(options);
}

private WorkContainer<String, String> newWorkContainer(int partition, long offset, String key) {
return new WorkContainer<>(0, new ConsumerRecord<>(topic, partition, offset, key, "value-" + offset), module);
}

private List<Integer> batchSizes(List<List<WorkContainer<String, String>>> batches) {
List<Integer> sizes = new ArrayList<>();
batches.forEach(batch -> sizes.add(batch.size()));
return sizes;
}

private List<Long> offsetsForBatch(List<WorkContainer<String, String>> batch) {
List<Long> offsets = new ArrayList<>();
batch.forEach(workContainer -> offsets.add(workContainer.offset()));
return offsets;
}

private List<Integer> partitionsForBatch(List<WorkContainer<String, String>> batch) {
List<Integer> partitions = new ArrayList<>();
batch.forEach(workContainer -> partitions.add(workContainer.getTopicPartition().partition()));
return partitions;
}

private List<String> keysForBatch(List<WorkContainer<String, String>> batch) {
List<String> keys = new ArrayList<>();
batch.forEach(workContainer -> keys.add(workContainer.getCr().key()));
return keys;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.confluent.parallelconsumer.state.WorkManager;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
Expand All @@ -35,6 +36,17 @@ public <R> List<Tuple<ConsumerRecord<K, V>, R>> runUserFunc(
return super.runUserFunction(dummyFunction, callback , activeWorkContainers);
}

@SuppressWarnings("unchecked")
public List<List<WorkContainer<K, V>>> makeBatchesForTest(List<WorkContainer<K, V>> workToProcess) {
try {
Method method = AbstractParallelEoSStreamProcessor.class.getDeclaredMethod("makeBatches", List.class);
method.setAccessible(true);
return (List<List<WorkContainer<K, V>>>) method.invoke(this, workToProcess);
} catch (ReflectiveOperationException e) {
throw new RuntimeException("Unable to invoke makeBatches", e);
}
}

public void setWm(WorkManager wm) {
super.wm = wm;
}
Expand Down
Loading