Skip to content

Add Iceberg CDC source plugin#6554

Merged
dlvenable merged 24 commits intoopensearch-project:mainfrom
lawofcycles:feature/iceberg-cdc-source
Mar 11, 2026
Merged

Add Iceberg CDC source plugin#6554
dlvenable merged 24 commits intoopensearch-project:mainfrom
lawofcycles:feature/iceberg-cdc-source

Conversation

@lawofcycles
Copy link
Copy Markdown
Contributor

Description

Adds a new iceberg source plugin that captures row-level changes (CDC) from Apache Iceberg tables and ingests them into the Data Prepper pipeline. The plugin uses Iceberg's IncrementalChangelogScan API directly, without requiring Spark or Flink.

Key features

  • Polls Iceberg tables for new snapshots and processes row-level changes (INSERT, UPDATE, DELETE) incrementally
  • Supports initial load (full table scan) with automatic transition to CDC
  • Handles CoW carryover removal for correct change detection
  • Distributes work across Data Prepper nodes via EnhancedSourceCoordinator
  • Works with any Iceberg catalog (Glue, REST, Hive, Hadoop, Nessie, JDBC, etc.)

Current limitations

  • Only Copy-on-Write (CoW) tables are supported. MoR tables are rejected at startup.
  • identifier_columns must be configured by the user for CDC correctness
  • When multiple DELETED and ADDED files exist within the same Iceberg partition and bounds-based pairing fails, work for that partition cannot be distributed across nodes.

Catalog support: Any catalog supported by Iceberg's CatalogUtil.buildIcebergCatalog() (Glue, REST, Hive, Hadoop, Nessie, JDBC, etc.)

Issues Resolved

Resolves #6552

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
  • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@lawofcycles
Copy link
Copy Markdown
Contributor Author

lawofcycles commented Feb 23, 2026

Discussion: PeerForwarder extension for Source-layer shuffle

The current implementation groups DELETED and ADDED files together at the Source layer so that carryover removal can happen locally on each node. This works well when bounds-based pairing succeeds, but falls back to single-node processing when it fails (e.g., when an UPDATE changes a column's min/max bounds).

A more scalable approach would be to shuffle rows between ChangelogWorkers by a hash of all data columns (similar to Spark's repartition). Data Prepper's PeerForwarder infrastructure provides useful building blocks for this (consistent hashing via HashRing, node discovery, Armeria HTTP transport). However, the PeerForwarder's orchestration layer is tightly coupled to the Processor execution model (synchronous send/receive within a batch processing cycle, ReceiveBuffer integration with the Pipeline, pluginId-based routing), so Source-layer shuffling would likely need to build its own send/receive flow on top of these lower-level components, or the PeerForwarder architecture would need to be generalized to support non-Processor use cases.

RFC #700 (Core Peer Forwarding) notes that the initial implementation targets Processor plugins only, and suggests opening a new issue to expand the functionality to Source or Sink plugins. I plan to open a separate issue proposing this extension, with Iceberg CDC as the first use case.

I'd like to hear the maintainers' thoughts on the best approach for inter-node data exchange at the Source layer.

20260311 UPDATE: Additional benefit of Source-layer shuffle

Source-layer shuffle would also enable processing multiple snapshots in a single IncrementalChangelogScan call, rather than the current one-snapshot-at-a-time serial approach.

Currently, the LeaderScheduler processes each snapshot sequentially: plan one snapshot, wait for all partitions to complete (including sink acknowledgement when acknowledgments: true), then move to the next. This is necessary because without shuffle, carryover removal relies on bounds-based file pairing within each snapshot. With shuffle (repartition by hash of all data columns), carryover removal works correctly across snapshot boundaries, just as Spark's RemoveCarryoverIterator does after repartition.

The performance benefits of batch processing multiple snapshots include following points.

  1. Eliminating per-snapshot synchronization overhead. With acknowledgments enabled, the leader must wait for the sink to finish writing and return acknowledgments before planning the next snapshot. Batch processing reduces N sequential wait cycles to one.
  2. Reducing scan planning overhead. One IncrementalChangelogScan call reads manifests once instead of N times.

This is particularly relevant for tables with frequent commits (e.g., Spark Streaming committing every few minutes), where dozens of snapshots can accumulate between polling intervals.

Note that batch processing also requires ordering writes by changeOrdinal (the snapshot sequence number that IncrementalChangelogScan assigns to each task) to ensure correct operation order in OpenSearch when the same document is modified across multiple snapshots.

@lawofcycles lawofcycles changed the title Add Iceberg CDC source plugin (#6552) Add Iceberg CDC source plugin Feb 23, 2026
@lawofcycles lawofcycles force-pushed the feature/iceberg-cdc-source branch from bf1d396 to 345ca7f Compare March 1, 2026 12:30
Copy link
Copy Markdown
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @lawofcycles for this great contribution!

I left a few initial comments. I want to look deeper at the logic as well.

One thing we will need before making this official is a set of integration tests that run against an actual Iceberg Docker image. We do this for our Kafka buffer and our OpenSearch sink.

private String tableName;

@JsonProperty("catalog")
private Map<String, String> catalog = Collections.emptyMap();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a model that we can use here instead of a map? The models provide the best experience for the community because they promote configuration consistency and help with documentation.

Copy link
Copy Markdown
Contributor Author

@lawofcycles lawofcycles Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked into this. Iceberg does not provide a typed model for catalog configuration. All official implementations pass catalog properties as a string map

Kafka Connect's Iceberg integration also uses Map<String, String> in the same way.

I think this is an intentional design choice because each catalog type (Glue, REST, Hive, Nessie, JDBC, etc.) accepts a completely different set of properties. A typed model would either couple the plugin to specific catalog types or still need a map fallback for catalog specific properties.

That said, the catalog properties will need thorough documentation with examples for each supported catalog type so that users have clear guidance on what to configure.

Comment on lines +27 to +28
@JsonProperty("initial_load")
private boolean initialLoad = true;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
@JsonProperty("initial_load")
private boolean initialLoad = true;
@JsonProperty("disable_export")
private boolean disableExport = false;

Our other CDC sources have both export and stream configurations. And we allow the following combinations:

  • Export and stream
  • Export only
  • Stream only

Unless "initial load" is a specialized term in Iceberg we should keep our existing conventions.

Also, we prefer to default boolean values to false.

We may eventually want to add disable_stream but that is not necessary now.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed from initialLoad to disableExport

@@ -0,0 +1,49 @@
/*
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed!


public class ChangelogTaskProgressState {

@JsonProperty("snapshot_id")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These save to DynamoDB for source coordination. We use camelCase for these. Please update accordingly.

@JsonProperty("exportArn")
private String exportArn;
@JsonProperty("status")
private String status;
@JsonProperty("bucket")
private String bucket;
@JsonProperty("prefix")
private String prefix;
@JsonProperty("kmsKeyId")
private String kmsKeyId;
@JsonProperty("exportTime")
private String exportTime;

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

made them camel case.

@@ -0,0 +1,28 @@
plugins {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need these three lines. You can remove them.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed.

import java.util.Objects;
import java.util.function.Function;

@DataPrepperPlugin(name = "iceberg", pluginType = Source.class, pluginConfigurationType = IcebergSourceConfig.class)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add the @Experimental annotation here. I'd like to get this PR merged, but we do have a few other items to address before making this fully supported.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added the @experimental !

this.state = state;
}

public ChangelogTaskPartition(final SourcePartitionStoreItem sourcePartitionStoreItem) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code should have unit tests. Verify the getter results.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added unit test.

}

public GlobalState(final SourcePartitionStoreItem sourcePartitionStoreItem) {
setSourcePartitionStoreItem(sourcePartitionStoreItem);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code should have unit tests. Verify the getter results.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added unit test.

}

public InitialLoadTaskPartition(final SourcePartitionStoreItem sourcePartitionStoreItem) {
setSourcePartitionStoreItem(sourcePartitionStoreItem);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code should have unit tests. Verify the getter results.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added unit test

}

public LeaderPartition(final SourcePartitionStoreItem sourcePartitionStoreItem) {
setSourcePartitionStoreItem(sourcePartitionStoreItem);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code should have unit tests. Verify the getter results.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added unit test

Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
@lawofcycles lawofcycles force-pushed the feature/iceberg-cdc-source branch from ba2cd6f to 8ee1b65 Compare March 5, 2026 23:18
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
@lawofcycles lawofcycles force-pushed the feature/iceberg-cdc-source branch from 3e8ae17 to 3530318 Compare March 6, 2026 06:23
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
…ecordConverter

Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
@lawofcycles lawofcycles requested a review from dlvenable March 8, 2026 08:37
@lawofcycles
Copy link
Copy Markdown
Contributor Author

lawofcycles commented Mar 8, 2026

@dlvenable Thank you for the review. Here is a summary of the updates.

Review feedback
All review comments have been addressed.

Integration tests
Added Docker based integration tests following the existing patterns. The tests use SeaweedFS (S3 compatible storage) and the Iceberg REST Catalog.

docker-compose -f data-prepper-plugins/iceberg-source/docker/docker-compose.yml up -d
./gradlew :data-prepper-plugins:iceberg-source:integrationTest
docker-compose -f data-prepper-plugins/iceberg-source/docker/docker-compose.yml down

Code improvements
Reviewed the code in detail and fixed several issues found during the personal review process.

Manual verification
Verified end to end with Iceberg tables on S3 (Glue Catalog + EMR Spark) writing CDC events to OpenSearch, covering multiple scenarios including INSERT, UPDATE, DELETE, and carryover removal.

I am changing this PR from Draft to Ready for Review.

@dlvenable
Copy link
Copy Markdown
Member

There are a couple of checkstyle errors:

Error: eckstyle] [ERROR] /home/runner/work/data-prepper/data-prepper/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/worker/ChangelogRecordConverterTest.java:22:8: Unused import - org.apache.iceberg.variants.VariantValue. [UnusedImports]
Error: eckstyle] [ERROR] /home/runner/work/data-prepper/data-prepper/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/worker/ChangelogRecordConverterTest.java:27:8: Unused import - java.nio.ByteBuffer. [UnusedImports]

Build results:
https://github.com/opensearch-project/data-prepper/actions/runs/22817460047/job/66184697244?pr=6554

When a CoW UPDATE produces a DELETE + INSERT pair with the same
document_id after carryover removal, emit only the INSERT as INDEX.
Since OpenSearch INDEX is an upsert, the DELETE is unnecessary.

This also eliminates a potential issue where multiple ProcessWorker
threads consuming from the buffer in parallel could reorder DELETE
and INDEX operations for the same document, causing data loss.

Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
@lawofcycles
Copy link
Copy Markdown
Contributor Author

Found and fixed a correctness issue with UPDATE event handling.
094e5a2

Previously, a CoW UPDATE produced two separate events after carryover removal: DELETE (old row) followed by INDEX (new row) for the same document_id. This relied on the DELETE arriving at OpenSearch before the INDEX. However, Data Prepper's pipeline uses multiple ProcessWorker threads that consume from the buffer in parallel, so these two events could end up in different bulk requests with no ordering guarantee. If the INDEX arrived first and the DELETE second, the document would be deleted and lost.

The fix merges UPDATE pairs into a single event. After carryover removal, when identifier_columns is configured, if a DELETE and INSERT produce the same document_id, only the INSERT is emitted as INDEX. Since OpenSearch INDEX is an upsert, the DELETE is unnecessary. Pure deletes (DELETE with no matching INSERT) are unaffected. When identifier_columns is not configured, no document_id is generated and the behavior is unchanged.

Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
@lawofcycles
Copy link
Copy Markdown
Contributor Author

There are a couple of checkstyle errors:

Fixed 3dc4a7d

@dlvenable dlvenable merged commit 34a63e9 into opensearch-project:main Mar 11, 2026
100 of 108 checks passed
@dlvenable
Copy link
Copy Markdown
Member

Discussion: PeerForwarder extension for Source-layer shuffle

The current implementation groups DELETED and ADDED files together at the Source layer so that carryover removal can happen locally on each node. This works well when bounds-based pairing succeeds, but falls back to single-node processing when it fails (e.g., when an UPDATE changes a column's min/max bounds).

A more scalable approach would be to shuffle rows between ChangelogWorkers by a hash of all data columns (similar to Spark's repartition). Data Prepper's PeerForwarder infrastructure provides useful building blocks for this (consistent hashing via HashRing, node discovery, Armeria HTTP transport). However, the PeerForwarder's orchestration layer is tightly coupled to the Processor execution model (synchronous send/receive within a batch processing cycle, ReceiveBuffer integration with the Pipeline, pluginId-based routing), so Source-layer shuffling would likely need to build its own send/receive flow on top of these lower-level components, or the PeerForwarder architecture would need to be generalized to support non-Processor use cases.

RFC #700 (Core Peer Forwarding) notes that the initial implementation targets Processor plugins only, and suggests opening a new issue to expand the functionality to Source or Sink plugins. I plan to open a separate issue proposing this extension, with Iceberg CDC as the first use case.

I'd like to hear the maintainers' thoughts on the best approach for inter-node data exchange at the Source layer.

20260311 UPDATE: Additional benefit of Source-layer shuffle

Source-layer shuffle would also enable processing multiple snapshots in a single IncrementalChangelogScan call, rather than the current one-snapshot-at-a-time serial approach.

Currently, the LeaderScheduler processes each snapshot sequentially: plan one snapshot, wait for all partitions to complete (including sink acknowledgement when acknowledgments: true), then move to the next. This is necessary because without shuffle, carryover removal relies on bounds-based file pairing within each snapshot. With shuffle (repartition by hash of all data columns), carryover removal works correctly across snapshot boundaries, just as Spark's RemoveCarryoverIterator does after repartition.

The performance benefits of batch processing multiple snapshots include following points.

1. Eliminating per-snapshot synchronization overhead. With acknowledgments enabled, the leader must wait for the sink to finish writing and return acknowledgments before planning the next snapshot. Batch processing reduces N sequential wait cycles to one.

2. Reducing scan planning overhead. One `IncrementalChangelogScan` call reads manifests once instead of N times.

This is particularly relevant for tables with frequent commits (e.g., Spark Streaming committing every few minutes), where dozens of snapshots can accumulate between polling intervals.

Note that batch processing also requires ordering writes by changeOrdinal (the snapshot sequence number that IncrementalChangelogScan assigns to each task) to ensure correct operation order in OpenSearch when the same document is modified across multiple snapshots.

@lawofcycles , I agree that this would be a great approach. The processor approach was what we needed at the time.

We actually have some hard-coded, non peer-forwarder support for source forwarding. It is in our OTel traces pipeline only when used with the Kafka buffer. In that case, we put the traces on different partitions.

The scenarios you outlined as well as tracing are both good examples of how we could make use of source-based peer-forwarding. Note that we do not generally recommend using kafka buffer with pull-based pipelines. So peer-forwarding would probably be the best solution here.

taskState.setDataFilePath(task.file().location());
taskState.setTotalRecords(task.file().recordCount());

final String partitionKey = tableName + "|initial|" + UUID.randomUUID();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you use a UUID here? Could this cause duplicates?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The UUID was used for simplicity, but you are right that it can cause duplicate partitions if the leader crashes after creating partitions but before updating lastProcessedSnapshotId. Since tryCreatePartitionItem uses a conditional put that returns false for existing keys, switching to deterministic keys would prevent this.

For initial load: tableName|initial|snapshotId|filePath (one file per task, so the file path is unique).
For CDC: tableName|snapshotId|sha256(sorted file paths) (the hash of sorted file paths within each task group).

I will submit a follow up PR for this change.


for (final ChangelogTaskProgressState taskState : taskGroups) {
final String partitionKey = tableName + "|" + snapshot.snapshotId()
+ "|" + UUID.randomUUID();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you use a UUID here? Could this cause duplicates?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above. Will be addressed in the same follow up PR.

data.put(field.name(), convertValue(value, field.type()));
}

final Event event = JacksonEvent.builder()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should move toward using the EventFactory.

Here is a simple example of usage:

eventFactory.eventBuilder(EventBuilder.class)
.withEventType(fileSourceConfig.getRecordType())
.withData(structuredLine)
.build());

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, raised follow up PR #6641

}
}

LOG.info("Planned {} task group(s) for table {} (snapshot {} -> {})",
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could become noisy quickly. We should make this a DEBUG log.

Another option here is to use a DistributionSummary metric for this. This would allow you to analyze maximum task group sizes and average.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opened PR #6648 to adress this point.

tableName, tableConfig.getIdentifierColumns());
final CarryoverRemover carryoverRemover = new CarryoverRemover();

LOG.info("Processing partition for table {} snapshot {} with {} file(s)",
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should become a debug level log.

changelogRows.add(new CarryoverRemover.ChangelogRow(dataColumns, row.operation, i));
}
survivingIndices = carryoverRemover.removeCarryover(changelogRows);
LOG.info("Carryover removal: {} rows -> {} rows", allRows.size(), survivingIndices.size());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a debug log.

@horovits
Copy link
Copy Markdown

Great work @lawofcycles, nice to see it merged

@lawofcycles
Copy link
Copy Markdown
Contributor Author

@dlvenable Thank you for the feedback and for sharing the OTel traces precedent. That is very helpful context.

I have been thinking about the implementation approach and would like to propose a two track plan.

Track 1: Framework (RFC)
I will open a separate RFC issue proposing a generalized Source layer peer forwarding extension to the existing PeerForwarder infrastructure. The design would extract the reusable building blocks (HashRing, PeerForwarderClient, PeerListProvider, Armeria transport) into a shared layer that both Processor and Source plugins can use, while keeping the orchestration logic (send/receive flow, buffering) pluggable per use case. This would also provide a path to replace the hard coded OTel traces source forwarding with the generalized mechanism.

Track 2: Iceberg Source (plugin level implementation)
In parallel, I will implement the shuffle logic directly within the iceberg source plugin using the same lower level components (HashRing for consistent hashing, Armeria HTTP for transport). This unblocks the Iceberg CDC improvements (fallback partition distribution, multi snapshot batch processing) without waiting for the framework RFC to be reviewed and merged.

Convergence
Once the framework lands, the Iceberg source plugin's shuffle implementation would be migrated to use the framework API. The plugin level code is designed with this migration in mind: the hashing and transport logic will be isolated behind an internal interface so that swapping to the framework API is straightforward.

This approach keeps the Iceberg CDC work moving forward while ensuring the framework design benefits from a concrete, working implementation as a reference.

Does this plan sound reasonable?

@lawofcycles
Copy link
Copy Markdown
Contributor Author

@dlvenable I need to correct my earlier proposal. After further analysis, I found that the PeerForwarder extension (Track 1) is not necessary.

I am now thinking the shuffle should use a pull based approach (similar to Spark's shuffle architecture) where writers partition data to local disk by hash key and readers pull their partitions from each node via HTTP. This would not require extending the PeerForwarder framework, which is push based.

I also identified a correctness bug in the current implementation: when a partition column is updated, the DELETE (old partition) and INSERT (new partition) for the same document end up in separate tasks. Since the UPDATE merge only operates within a single task, it cannot detect this cross partition update. The DELETE and INSERT are sent to the sink independently with no ordering guarantee, which can cause data loss if the DELETE arrives after the INSERT.

I will open a detailed issue with the revised design shortly.

@lawofcycles
Copy link
Copy Markdown
Contributor Author

lawofcycles commented Mar 23, 2026

I opened two follow up issues.

#6666
#6667

I believe #6666 is more critical.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[RFC] Iceberg CDC Source Plugin

4 participants