Add Iceberg CDC source plugin#6554
Conversation
Discussion: PeerForwarder extension for Source-layer shuffleThe current implementation groups DELETED and ADDED files together at the Source layer so that carryover removal can happen locally on each node. This works well when bounds-based pairing succeeds, but falls back to single-node processing when it fails (e.g., when an UPDATE changes a column's min/max bounds). A more scalable approach would be to shuffle rows between ChangelogWorkers by a hash of all data columns (similar to Spark's repartition). Data Prepper's PeerForwarder infrastructure provides useful building blocks for this (consistent hashing via HashRing, node discovery, Armeria HTTP transport). However, the PeerForwarder's orchestration layer is tightly coupled to the Processor execution model (synchronous send/receive within a batch processing cycle, ReceiveBuffer integration with the Pipeline, pluginId-based routing), so Source-layer shuffling would likely need to build its own send/receive flow on top of these lower-level components, or the PeerForwarder architecture would need to be generalized to support non-Processor use cases. RFC #700 (Core Peer Forwarding) notes that the initial implementation targets Processor plugins only, and suggests opening a new issue to expand the functionality to Source or Sink plugins. I plan to open a separate issue proposing this extension, with Iceberg CDC as the first use case. I'd like to hear the maintainers' thoughts on the best approach for inter-node data exchange at the Source layer. 20260311 UPDATE: Additional benefit of Source-layer shuffle Source-layer shuffle would also enable processing multiple snapshots in a single Currently, the LeaderScheduler processes each snapshot sequentially: plan one snapshot, wait for all partitions to complete (including sink acknowledgement when The performance benefits of batch processing multiple snapshots include following points.
This is particularly relevant for tables with frequent commits (e.g., Spark Streaming committing every few minutes), where dozens of snapshots can accumulate between polling intervals. Note that batch processing also requires ordering writes by |
bf1d396 to
345ca7f
Compare
dlvenable
left a comment
There was a problem hiding this comment.
Thank you @lawofcycles for this great contribution!
I left a few initial comments. I want to look deeper at the logic as well.
One thing we will need before making this official is a set of integration tests that run against an actual Iceberg Docker image. We do this for our Kafka buffer and our OpenSearch sink.
| private String tableName; | ||
|
|
||
| @JsonProperty("catalog") | ||
| private Map<String, String> catalog = Collections.emptyMap(); |
There was a problem hiding this comment.
Is there a model that we can use here instead of a map? The models provide the best experience for the community because they promote configuration consistency and help with documentation.
There was a problem hiding this comment.
I looked into this. Iceberg does not provide a typed model for catalog configuration. All official implementations pass catalog properties as a string map
- Java (reference):
CatalogUtil.buildIcebergCatalog(String name, Map<String, String> options, Object conf) - Python (PyIceberg):
Properties = dict[str, Any] - Rust:
HashMap<String, String> - Go:
Properties = map[string]string
Kafka Connect's Iceberg integration also uses Map<String, String> in the same way.
I think this is an intentional design choice because each catalog type (Glue, REST, Hive, Nessie, JDBC, etc.) accepts a completely different set of properties. A typed model would either couple the plugin to specific catalog types or still need a map fallback for catalog specific properties.
That said, the catalog properties will need thorough documentation with examples for each supported catalog type so that users have clear guidance on what to configure.
| @JsonProperty("initial_load") | ||
| private boolean initialLoad = true; |
There was a problem hiding this comment.
| @JsonProperty("initial_load") | |
| private boolean initialLoad = true; | |
| @JsonProperty("disable_export") | |
| private boolean disableExport = false; |
Our other CDC sources have both export and stream configurations. And we allow the following combinations:
- Export and stream
- Export only
- Stream only
Unless "initial load" is a specialized term in Iceberg we should keep our existing conventions.
Also, we prefer to default boolean values to false.
We may eventually want to add disable_stream but that is not necessary now.
There was a problem hiding this comment.
changed from initialLoad to disableExport
| @@ -0,0 +1,49 @@ | |||
| /* | |||
There was a problem hiding this comment.
Please use the updated license headers.
https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md#license-headers
|
|
||
| public class ChangelogTaskProgressState { | ||
|
|
||
| @JsonProperty("snapshot_id") |
There was a problem hiding this comment.
These save to DynamoDB for source coordination. We use camelCase for these. Please update accordingly.
There was a problem hiding this comment.
made them camel case.
| @@ -0,0 +1,28 @@ | |||
| plugins { | |||
There was a problem hiding this comment.
You don't need these three lines. You can remove them.
| import java.util.Objects; | ||
| import java.util.function.Function; | ||
|
|
||
| @DataPrepperPlugin(name = "iceberg", pluginType = Source.class, pluginConfigurationType = IcebergSourceConfig.class) |
There was a problem hiding this comment.
Let's add the @Experimental annotation here. I'd like to get this PR merged, but we do have a few other items to address before making this fully supported.
| this.state = state; | ||
| } | ||
|
|
||
| public ChangelogTaskPartition(final SourcePartitionStoreItem sourcePartitionStoreItem) { |
There was a problem hiding this comment.
This code should have unit tests. Verify the getter results.
There was a problem hiding this comment.
added unit test.
| } | ||
|
|
||
| public GlobalState(final SourcePartitionStoreItem sourcePartitionStoreItem) { | ||
| setSourcePartitionStoreItem(sourcePartitionStoreItem); |
There was a problem hiding this comment.
This code should have unit tests. Verify the getter results.
There was a problem hiding this comment.
added unit test.
| } | ||
|
|
||
| public InitialLoadTaskPartition(final SourcePartitionStoreItem sourcePartitionStoreItem) { | ||
| setSourcePartitionStoreItem(sourcePartitionStoreItem); |
There was a problem hiding this comment.
This code should have unit tests. Verify the getter results.
| } | ||
|
|
||
| public LeaderPartition(final SourcePartitionStoreItem sourcePartitionStoreItem) { | ||
| setSourcePartitionStoreItem(sourcePartitionStoreItem); |
There was a problem hiding this comment.
This code should have unit tests. Verify the getter results.
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
ba2cd6f to
8ee1b65
Compare
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
3e8ae17 to
3530318
Compare
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
…ecordConverter Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
|
@dlvenable Thank you for the review. Here is a summary of the updates. Review feedback Integration tests docker-compose -f data-prepper-plugins/iceberg-source/docker/docker-compose.yml up -d
./gradlew :data-prepper-plugins:iceberg-source:integrationTest
docker-compose -f data-prepper-plugins/iceberg-source/docker/docker-compose.yml downCode improvements Manual verification I am changing this PR from Draft to Ready for Review. |
|
There are a couple of checkstyle errors: Build results: |
When a CoW UPDATE produces a DELETE + INSERT pair with the same document_id after carryover removal, emit only the INSERT as INDEX. Since OpenSearch INDEX is an upsert, the DELETE is unnecessary. This also eliminates a potential issue where multiple ProcessWorker threads consuming from the buffer in parallel could reorder DELETE and INDEX operations for the same document, causing data loss. Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
|
Found and fixed a correctness issue with UPDATE event handling. Previously, a CoW UPDATE produced two separate events after carryover removal: DELETE (old row) followed by INDEX (new row) for the same The fix merges UPDATE pairs into a single event. After carryover removal, when |
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Fixed 3dc4a7d |
@lawofcycles , I agree that this would be a great approach. The processor approach was what we needed at the time. We actually have some hard-coded, non peer-forwarder support for source forwarding. It is in our OTel traces pipeline only when used with the Kafka buffer. In that case, we put the traces on different partitions. The scenarios you outlined as well as tracing are both good examples of how we could make use of source-based peer-forwarding. Note that we do not generally recommend using |
| taskState.setDataFilePath(task.file().location()); | ||
| taskState.setTotalRecords(task.file().recordCount()); | ||
|
|
||
| final String partitionKey = tableName + "|initial|" + UUID.randomUUID(); |
There was a problem hiding this comment.
Why do you use a UUID here? Could this cause duplicates?
There was a problem hiding this comment.
The UUID was used for simplicity, but you are right that it can cause duplicate partitions if the leader crashes after creating partitions but before updating lastProcessedSnapshotId. Since tryCreatePartitionItem uses a conditional put that returns false for existing keys, switching to deterministic keys would prevent this.
For initial load: tableName|initial|snapshotId|filePath (one file per task, so the file path is unique).
For CDC: tableName|snapshotId|sha256(sorted file paths) (the hash of sorted file paths within each task group).
I will submit a follow up PR for this change.
|
|
||
| for (final ChangelogTaskProgressState taskState : taskGroups) { | ||
| final String partitionKey = tableName + "|" + snapshot.snapshotId() | ||
| + "|" + UUID.randomUUID(); |
There was a problem hiding this comment.
Why do you use a UUID here? Could this cause duplicates?
There was a problem hiding this comment.
Same as above. Will be addressed in the same follow up PR.
| data.put(field.name(), convertValue(value, field.type())); | ||
| } | ||
|
|
||
| final Event event = JacksonEvent.builder() |
There was a problem hiding this comment.
We should move toward using the EventFactory.
Here is a simple example of usage:
| } | ||
| } | ||
|
|
||
| LOG.info("Planned {} task group(s) for table {} (snapshot {} -> {})", |
There was a problem hiding this comment.
This could become noisy quickly. We should make this a DEBUG log.
Another option here is to use a DistributionSummary metric for this. This would allow you to analyze maximum task group sizes and average.
| tableName, tableConfig.getIdentifierColumns()); | ||
| final CarryoverRemover carryoverRemover = new CarryoverRemover(); | ||
|
|
||
| LOG.info("Processing partition for table {} snapshot {} with {} file(s)", |
There was a problem hiding this comment.
This should become a debug level log.
| changelogRows.add(new CarryoverRemover.ChangelogRow(dataColumns, row.operation, i)); | ||
| } | ||
| survivingIndices = carryoverRemover.removeCarryover(changelogRows); | ||
| LOG.info("Carryover removal: {} rows -> {} rows", allRows.size(), survivingIndices.size()); |
|
Great work @lawofcycles, nice to see it merged |
|
@dlvenable Thank you for the feedback and for sharing the OTel traces precedent. That is very helpful context. I have been thinking about the implementation approach and would like to propose a two track plan. Track 1: Framework (RFC) Track 2: Iceberg Source (plugin level implementation) Convergence This approach keeps the Iceberg CDC work moving forward while ensuring the framework design benefits from a concrete, working implementation as a reference. Does this plan sound reasonable? |
|
@dlvenable I need to correct my earlier proposal. After further analysis, I found that the PeerForwarder extension (Track 1) is not necessary. I am now thinking the shuffle should use a pull based approach (similar to Spark's shuffle architecture) where writers partition data to local disk by hash key and readers pull their partitions from each node via HTTP. This would not require extending the PeerForwarder framework, which is push based. I also identified a correctness bug in the current implementation: when a partition column is updated, the DELETE (old partition) and INSERT (new partition) for the same document end up in separate tasks. Since the UPDATE merge only operates within a single task, it cannot detect this cross partition update. The DELETE and INSERT are sent to the sink independently with no ordering guarantee, which can cause data loss if the DELETE arrives after the INSERT. I will open a detailed issue with the revised design shortly. |
Description
Adds a new
icebergsource plugin that captures row-level changes (CDC) from Apache Iceberg tables and ingests them into the Data Prepper pipeline. The plugin uses Iceberg'sIncrementalChangelogScanAPI directly, without requiring Spark or Flink.Key features
EnhancedSourceCoordinatorCurrent limitations
identifier_columnsmust be configured by the user for CDC correctnessCatalog support: Any catalog supported by Iceberg's
CatalogUtil.buildIcebergCatalog()(Glue, REST, Hive, Hadoop, Nessie, JDBC, etc.)Issues Resolved
Resolves #6552
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.