-
Notifications
You must be signed in to change notification settings - Fork 313
[FEATURE] Add source-layer shuffle to iceberg-source for correct and scalable CDC processing #6666
Description
Is your feature request related to a problem? Please describe.
The current iceberg-source plugin has two problems when processing snapshots that contain DELETE operations (UPDATE/DELETE in Copy-on-Write tables).
-
Scalability: The current
TaskGrouperuses bounds-based pairing to match DELETED and ADDED files. When pairing fails (e.g., an UPDATE changes a column's min/max bounds), all files fall back to a single task processed by one node. This is especially problematic for unpartitioned tables where all files belong to the same group, but also affects large partitions in partitioned tables. -
Correctness: When a partition column is updated (e.g.,
regionchanges fromUStoEUfor a row withid=1), Iceberg produces aDeletedDataFileScanTaskin the old partition and anAddedRowsScanTaskin the new partition. The currentTaskGroupergroups tasks by Iceberg partition value, so the DELETE and INSERT for the same document end up in separate tasks. The UPDATE merge logic inChangelogWorker(which detects matching DELETE+INSERT pairs byidentifier_columnsand drops the DELETE) only operates within a single task, so it cannot detect this cross-partition update. Both the DELETE and INSERT are sent to the sink independently with no ordering guarantee. If the DELETE arrives after the INSERT, the document is deleted, causing data loss.
Describe the solution you'd like
Introduce a source-layer shuffle that repartitions records by identifier_columns hash, similar to Spark's shuffle architecture.
The core idea is to redistribute all records in a snapshot so that records sharing the same identifier_columns value always land on the same node. Once co-located, each node can independently perform carryover removal and UPDATE merge on its subset of records, because it is guaranteed to have both the DELETE and INSERT for any given document. This is the same approach Spark uses in its CreateChangelogViewProcedure, where it repartitions changelog rows by identifier columns before computing updates.
This redistribution requires two phases with a barrier in between.
In Phase 1, every worker reads its assigned files and writes records to local disk, partitioned by identifier_columns hash. All workers must complete Phase 1 before Phase 2 can begin, because a reader needs the complete set of records for its hash partitions across all nodes.
At the barrier point between phases, the leader collects the index metadata from all shuffle files to determine how much data each hash partition contains, then coalesces adjacent small partitions into larger groups to form the Phase 2 tasks. This is the same approach as Spark's Adaptive Query Execution (AQE), where the number of post-shuffle tasks is determined dynamically based on actual data sizes rather than fixed upfront.
In Phase 2, each worker pulls its assigned hash partition data from all nodes via HTTP, then performs carryover removal and UPDATE merge.
For INSERT-only snapshots (no DeletedDataFileScanTask), the shuffle is skipped entirely and each file is processed as an independent task, same as today.
Describe alternatives you've considered (Optional)
The shuffle implementation involves two key design decisions.
Data transfer model: Push vs Pull
| Push | Pull | |
|---|---|---|
| Mechanism | Writer sends each record to the destination node in real time | Writer partitions data to local storage, reader fetches from each node |
| Network during write | Required. Slow destination nodes cause backpressure | Not required. Write is purely local I/O |
| Failure resilience | If destination node fails mid-transfer, sender must track and resend | Data stays on writer's disk, reader can retry independently |
| Implementation complexity | Connection management, backpressure handling, receive buffer management | HTTP endpoint for serving partitioned files |
I initially considered extending Data Prepper's PeerForwarder framework (#6554 discussion), which is push-based. However, the pull-based model is simpler and more resilient in the Iceberg source usecase. This is the same architecture used by Spark's SortShuffleManager, where map tasks write partitioned data to local disk and reduce tasks pull their partitions from each node via BlockStoreShuffleReader.
I am leaning toward the pull-based model for its simplicity and failure resilience.
Intermediate storage: Local disk vs S3
| Local disk | S3 | |
|---|---|---|
| Latency | Low (local I/O + node-to-node HTTP) | Higher (S3 PUT + S3 GET) |
| Infrastructure dependency | None | Requires S3 or S3-compatible storage |
| Node failure between phases | Data lost if writer node goes down before reader pulls | Data persisted, reader can retry from any node |
| Disk capacity | Constrained by local disk | Virtually unlimited |
| Node-to-node communication | Required (HTTP pull) | Not required (S3 acts as intermediary) |
I am leaning toward local disk for the initial implementation. The data volume for shuffle is limited to the changed records within a single snapshot, so disk capacity and node failure risk are manageable. S3 could be added as a future option for environments with ephemeral storage or very large change sets.
Additional context
The identifier_columns hash is sufficient as the shuffle key for both carryover removal and UPDATE merge. Carryover pairs (DELETE-INSERT with all data columns identical) necessarily have the same identifier_columns values, so they are routed to the same node. This is why a single shuffle pass is sufficient, unlike Spark's CreateChangelogViewProcedure which uses two separate repartition steps (one by all data columns for carryover removal, one by identifier_columns for update computation). Spark uses two steps because identifier_columns is optional in its API.
When identifier_columns is not configured and a snapshot contains DeletedDataFileScanTasks (UPDATE/DELETE operations), processing fails with an error. identifier_columns is required for correct CDC processing of UPDATE/DELETE because it is needed to compute the shuffle key, perform UPDATE merge, and generate OpenSearch document IDs. Without identifier_columns, there is no way to identify which records belong to the same document on the OpenSearch side. For INSERT-only tables, identifier_columns is not required and the shuffle is not activated.
Related PR: #6554 (original iceberg-source implementation)
Related issue: #6552
Metadata
Metadata
Assignees
Labels
Type
Projects
Status