Skip to content

[FEATURE] Add source-layer shuffle to iceberg-source for correct and scalable CDC processing #6666

@lawofcycles

Description

@lawofcycles

Is your feature request related to a problem? Please describe.

The current iceberg-source plugin has two problems when processing snapshots that contain DELETE operations (UPDATE/DELETE in Copy-on-Write tables).

  1. Scalability: The current TaskGrouper uses bounds-based pairing to match DELETED and ADDED files. When pairing fails (e.g., an UPDATE changes a column's min/max bounds), all files fall back to a single task processed by one node. This is especially problematic for unpartitioned tables where all files belong to the same group, but also affects large partitions in partitioned tables.

  2. Correctness: When a partition column is updated (e.g., region changes from US to EU for a row with id=1), Iceberg produces a DeletedDataFileScanTask in the old partition and an AddedRowsScanTask in the new partition. The current TaskGrouper groups tasks by Iceberg partition value, so the DELETE and INSERT for the same document end up in separate tasks. The UPDATE merge logic in ChangelogWorker (which detects matching DELETE+INSERT pairs by identifier_columns and drops the DELETE) only operates within a single task, so it cannot detect this cross-partition update. Both the DELETE and INSERT are sent to the sink independently with no ordering guarantee. If the DELETE arrives after the INSERT, the document is deleted, causing data loss.

Describe the solution you'd like

Introduce a source-layer shuffle that repartitions records by identifier_columns hash, similar to Spark's shuffle architecture.

The core idea is to redistribute all records in a snapshot so that records sharing the same identifier_columns value always land on the same node. Once co-located, each node can independently perform carryover removal and UPDATE merge on its subset of records, because it is guaranteed to have both the DELETE and INSERT for any given document. This is the same approach Spark uses in its CreateChangelogViewProcedure, where it repartitions changelog rows by identifier columns before computing updates.

This redistribution requires two phases with a barrier in between.

In Phase 1, every worker reads its assigned files and writes records to local disk, partitioned by identifier_columns hash. All workers must complete Phase 1 before Phase 2 can begin, because a reader needs the complete set of records for its hash partitions across all nodes.

At the barrier point between phases, the leader collects the index metadata from all shuffle files to determine how much data each hash partition contains, then coalesces adjacent small partitions into larger groups to form the Phase 2 tasks. This is the same approach as Spark's Adaptive Query Execution (AQE), where the number of post-shuffle tasks is determined dynamically based on actual data sizes rather than fixed upfront.

In Phase 2, each worker pulls its assigned hash partition data from all nodes via HTTP, then performs carryover removal and UPDATE merge.

For INSERT-only snapshots (no DeletedDataFileScanTask), the shuffle is skipped entirely and each file is processed as an independent task, same as today.

Describe alternatives you've considered (Optional)

The shuffle implementation involves two key design decisions.

Data transfer model: Push vs Pull

Push Pull
Mechanism Writer sends each record to the destination node in real time Writer partitions data to local storage, reader fetches from each node
Network during write Required. Slow destination nodes cause backpressure Not required. Write is purely local I/O
Failure resilience If destination node fails mid-transfer, sender must track and resend Data stays on writer's disk, reader can retry independently
Implementation complexity Connection management, backpressure handling, receive buffer management HTTP endpoint for serving partitioned files

I initially considered extending Data Prepper's PeerForwarder framework (#6554 discussion), which is push-based. However, the pull-based model is simpler and more resilient in the Iceberg source usecase. This is the same architecture used by Spark's SortShuffleManager, where map tasks write partitioned data to local disk and reduce tasks pull their partitions from each node via BlockStoreShuffleReader.

I am leaning toward the pull-based model for its simplicity and failure resilience.

Intermediate storage: Local disk vs S3

Local disk S3
Latency Low (local I/O + node-to-node HTTP) Higher (S3 PUT + S3 GET)
Infrastructure dependency None Requires S3 or S3-compatible storage
Node failure between phases Data lost if writer node goes down before reader pulls Data persisted, reader can retry from any node
Disk capacity Constrained by local disk Virtually unlimited
Node-to-node communication Required (HTTP pull) Not required (S3 acts as intermediary)

I am leaning toward local disk for the initial implementation. The data volume for shuffle is limited to the changed records within a single snapshot, so disk capacity and node failure risk are manageable. S3 could be added as a future option for environments with ephemeral storage or very large change sets.

Additional context

The identifier_columns hash is sufficient as the shuffle key for both carryover removal and UPDATE merge. Carryover pairs (DELETE-INSERT with all data columns identical) necessarily have the same identifier_columns values, so they are routed to the same node. This is why a single shuffle pass is sufficient, unlike Spark's CreateChangelogViewProcedure which uses two separate repartition steps (one by all data columns for carryover removal, one by identifier_columns for update computation). Spark uses two steps because identifier_columns is optional in its API.

When identifier_columns is not configured and a snapshot contains DeletedDataFileScanTasks (UPDATE/DELETE operations), processing fails with an error. identifier_columns is required for correct CDC processing of UPDATE/DELETE because it is needed to compute the shuffle key, perform UPDATE merge, and generate OpenSearch document IDs. Without identifier_columns, there is no way to identify which records belong to the same document on the OpenSearch side. For INSERT-only tables, identifier_columns is not required and the shuffle is not activated.

Related PR: #6554 (original iceberg-source implementation)
Related issue: #6552

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

Status

In review

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions