Add source-layer shuffle to iceberg-source for correct and scalable C…#6682
Add source-layer shuffle to iceberg-source for correct and scalable C…#6682lawofcycles wants to merge 8 commits intoopensearch-project:mainfrom
Conversation
…DC processing Introduce a pull-based shuffle mechanism for processing snapshots that contain DELETE operations (UPDATE/DELETE in Copy-on-Write tables). When a snapshot contains DeletedDataFileScanTasks, records are shuffled by identifier_columns hash across nodes so that carryover removal and UPDATE merge operate on complete data, including cross-partition updates. Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
|
@dlvenable I would appreciate your review on this PR. This implements the source-layer shuffle discussed in #6554 (comment), addressing both the correctness bug (cross-partition UPDATE data loss) and the scalability limitation (bounds-based pairing fallback to single-node processing) described in #6666. Once this is merged, I plan to update the CDC RFC (#6552) to reflect the shuffle design. |
… to shuffle server Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
|
I have opened a documentation PR for Iceberg source plugin: opensearch-project/documentation-website#12164. It reflects the current state of the implementation and would be useful as a reference during review. |
|
The shuffle HTTP server supports TLS for encryption in transit. Request-level authentication (e.g. mutual TLS) is not included in this PR but can be added as a follow-up. If authentication should be included in the initial implementation, please let me know. |
…lient utility Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
|
Fixed a bug where |
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
|
Fixed another issue: Previously, |
…te before partitions Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
|
During multi-node performance testing, I found a race condition where |
✅ License Header Check PassedAll newly added files have proper license headers. Great work! 🎉 |
…eader for correct type handling The shuffle record serialization used GenericDatumWriter/GenericDatumReader which only handle Avro native types. Iceberg Records contain Java types like OffsetDateTime for timestamptz columns, causing AvroRuntimeException during SHUFFLE_WRITE. Replace with Iceberg's DataWriter and PlannedDataReader which handle the Iceberg-to-Avro type conversion internally. Extract serialization logic into RecordAvroSerializer utility class with roundtrip tests covering temporal types. Also fix shuffle write completion key race condition by creating GlobalState before partitions in processShuffleSnapshot, matching the order used in processInsertOnlySnapshot. Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
2d1d6f1 to
bd4e2c5
Compare
|
I addressed bugs and improvement points that I found during the e2e performance test. |
dlvenable
left a comment
There was a problem hiding this comment.
Thanks @lawofcycles for this improvement!
| * Standalone Armeria HTTP server for serving shuffle data. | ||
| * Runs independently from PeerForwarder to avoid core dependencies. | ||
| */ | ||
| public class ShuffleHttpServer { |
There was a problem hiding this comment.
We should use the common HTTP server code. This is important for providing consistent experience with ports that Data Prepper may open. You can see how this is used in the http source for example.
| @JsonProperty("target_partition_size") | ||
| private ByteCount targetPartitionSize = ByteCount.parse(DEFAULT_TARGET_PARTITION_SIZE); | ||
|
|
||
| @JsonProperty("server_port") |
There was a problem hiding this comment.
The approach of using S3 versus local disk should probably be handled via a plugin. This would allow us to extend it more. This is how we support different source coordination mechanisms or different authentication mechanisms in plugins like the http source.
Since this source is @Experimental we can break the config between versions so we don't need to do this now.
| private static final Logger LOG = LoggerFactory.getLogger(ShuffleNodeClient.class); | ||
| private static final int MAX_RETRIES = 3; | ||
|
|
||
| private final HttpClient httpClient; |
There was a problem hiding this comment.
Let's use Armeria's web client for web requests.
Description
This PR adds source-layer shuffle to the iceberg-source plugin for correct and scalable CDC processing of snapshots containing DELETE operations (UPDATE/DELETE in Copy-on-Write tables).
Problems solved
Correctness: When a partition column is updated (e.g.
regionchanges fromUStoEU), Iceberg produces a DELETE in the old partition and an INSERT in the new partition. The current implementation groups tasks by Iceberg partition, so these end up in separate tasks and the UPDATE merge cannot detect the cross-partition update. If the DELETE arrives at the sink after the INSERT, the document is lost. Shuffle routes all records with the sameidentifier_columnsto the same node, enabling correct cross-partition UPDATE detection.Scalability: The previous bounds-based pairing heuristic attempts to match DELETED and ADDED files by column statistics. When pairing fails (e.g. an UPDATE changes a column's min/max bounds), all files fall back to a single task processed by one node. This is especially problematic for unpartitioned tables or large partitions where the fallback can include hundreds of files. Shuffle distributes the work evenly across all nodes by hash partitioning, regardless of Iceberg partition structure or file bounds.
Shuffle overview
When a snapshot contains
DeletedDataFileScanTasks, all records in the snapshot are redistributed byidentifier_columnshash so that records sharing the sameidentifier_columnsvalue are guaranteed to land on the same node. Once co-located, each node can independently perform carryover removal and UPDATE merge, because it has both the DELETE and INSERT for any given document.This redistribution uses a pull-based two-phase approach based on Spark's shuffle architecture (
SortShuffleManager,IndexShuffleBlockResolver). All workers must finish Phase 1 before Phase 2 can begin, because a reader needs the complete set of records for its hash partitions across all nodes. Between the two phases, the leader collects index metadata from all shuffle files and coalesces adjacent small hash partitions into SHUFFLE_READ tasks (same approach as Spark's Adaptive Query Execution), so the number of Phase 2 tasks is determined by actual data distribution rather than the fixed partition count.Phase 1 (SHUFFLE_WRITE): Each worker reads its assigned data files, computes
hash(identifier_columns) % Pfor each record (P defaults to 64, configurable viashuffle.partitions), and writes records to local disk sorted by hash partition number. Each task produces one data file and one index file.Phase 2 (SHUFFLE_READ): Each worker pulls its assigned hash partition range from all nodes, then performs carryover removal and UPDATE merge on the collected records. Since all records with the same
identifier_columnsvalue are guaranteed to be on the same node, cross-partition updates are correctly detected.Shuffle storage is abstracted behind a
ShuffleStorageinterface. The current implementation uses local disk (LocalDiskShuffleStorage), but the interface allows alternative storage backends (e.g. S3) to be plugged in without changes to the shuffle orchestration or worker logic.The pull-based design decouples writing from network transfer. Phase 1 is purely local I/O with no network dependency, so it is unaffected by slow or failing remote nodes. Phase 2 readers pull data independently and can retry failed transfers without requiring the writer to resend.
Write-side partitioning uses a fixed count (P=64 by default) because the number of nodes may vary and a sufficiently large P ensures even distribution regardless of cluster size. When the actual data volume is small relative to P, many partitions will be empty or contain very little data. After all writes complete, the leader reads the index files to learn the actual size of each partition, then coalesces adjacent small or empty partitions into larger SHUFFLE_READ tasks targeting a configurable size (default 64MB), avoiding unnecessary tasks.
For INSERT-only snapshots (no
DeletedDataFileScanTask), the shuffle is skipped entirely and each file is processed as an independent task.flowchart TD subgraph LeaderScheduler A[Detect snapshot] --> B{DELETED files?} B -->|No| C[INSERT-only: create one<br/>CHANGELOG_TASK per file] B -->|Yes| D[Phase 1: create SHUFFLE_WRITE tasks<br/>1 data file = 1 task] D --> E[Wait for all SHUFFLE_WRITE<br/>to complete] E --> F{shuffle-failed?} F -->|Yes| G[Clean up shuffle files<br/>on all nodes] G --> A F -->|No| H[Collect index files from all nodes<br/>via local disk and HTTP] H --> I[Coalesce:<br/>skip empty partitions<br/>merge small partitions<br/>target 64MB per task] I --> J[Phase 2: create SHUFFLE_READ tasks<br/>1 task = 1 partition range] J --> K[Wait for all SHUFFLE_READ<br/>to complete] K --> K2[Clean up shuffle files<br/>on all nodes] K2 --> L[Update lastProcessedSnapshotId] L --> A C --> M[Wait for all CHANGELOG_TASKs<br/>to complete] M --> L endsequenceDiagram participant LS as LeaderScheduler participant SC as SourceCoordinator participant W1 as Worker Node 1 participant W2 as Worker Node 2 participant H1 as Node 1 HTTP Server participant H2 as Node 2 HTTP Server Note over LS: Snapshot S5 detected (contains DELETED files) rect rgb(230, 245, 255) Note over LS,H2: Phase 1: SHUFFLE_WRITE LS->>SC: createPartition(SHUFFLE_WRITE, file-A) LS->>SC: createPartition(SHUFFLE_WRITE, file-B) LS->>SC: createPartition(SHUFFLE_WRITE, file-C) W1->>SC: acquirePartition(SHUFFLE_WRITE) SC-->>W1: file-A task W2->>SC: acquirePartition(SHUFFLE_WRITE) SC-->>W2: file-B task W1->>W1: Read file-A, hash(id_cols) % 64, sort by partition# W1->>W1: Write data + index files to local disk W1->>SC: Register nodeAddress in GlobalState W1->>SC: completePartition(file-A) W2->>W2: Read file-B, hash(id_cols) % 64, sort by partition# W2->>W2: Write data + index files to local disk W2->>SC: Register nodeAddress in GlobalState W2->>SC: completePartition(file-B) W1->>SC: acquirePartition(SHUFFLE_WRITE) SC-->>W1: file-C task W1->>W1: Write data + index files to local disk W1->>SC: Register nodeAddress in GlobalState W1->>SC: completePartition(file-C) end Note over LS: All SHUFFLE_WRITE complete rect rgb(255, 245, 230) Note over LS,H2: Barrier: index collection + coalesce LS->>SC: Read shuffle-locations GlobalState LS->>LS: Read local index files (taskA, taskC) LS->>H2: GET /shuffle/{snapshotId}/{taskId}/index (taskB) H2-->>LS: index offsets LS->>LS: Compute per-partition sizes, coalesce (target 64MB) end rect rgb(230, 255, 230) Note over LS,H2: Phase 2: SHUFFLE_READ LS->>SC: createPartition(SHUFFLE_READ, partitions 0-20) LS->>SC: createPartition(SHUFFLE_READ, partitions 21-63) W1->>SC: acquirePartition(SHUFFLE_READ) SC-->>W1: partitions 0-20 W2->>SC: acquirePartition(SHUFFLE_READ) SC-->>W2: partitions 21-63 W1->>W1: Read partitions 0-20 from local disk (taskA, taskC) W1->>H2: GET /shuffle/.../data (taskB, partitions 0-20) H2-->>W1: compressed blocks W1->>W1: Carryover removal + UPDATE merge W1->>W1: Write to Buffer W1->>SC: completePartition W2->>W2: Read partitions 21-63 from local disk (taskB) W2->>H1: GET /shuffle/.../data (taskA, taskC, partitions 21-63) H1-->>W2: compressed blocks W2->>W2: Carryover removal + UPDATE merge W2->>W2: Write to Buffer W2->>SC: completePartition end Note over LS: All SHUFFLE_READ complete LS->>LS: Delete local shuffle files LS->>H1: DELETE /shuffle/{snapshotId} LS->>H2: DELETE /shuffle/{snapshotId} LS->>SC: Update lastProcessedSnapshotIdA SHUFFLE_READ worker retries HTTP pulls up to 3 times with exponential backoff. If all retries fail, it writes a
shuffle-failedGlobalState entry. LeaderScheduler detects this during its completion polling, cleans up shuffle files, and aborts without updatinglastProcessedSnapshotId. The same snapshot is retried from Phase 1 on the next polling cycle.Shuffle data format
Each SHUFFLE_WRITE task produces one data file and one index file.
Index file:
(numPartitions + 1)long offset values (8 bytes each). Partitioni's data occupies the byte rangeoffset[i]tooffset[i+1]in the data file. Empty partitions haveoffset[i] == offset[i+1].Data file: LZ4-compressed blocks in partition order. Each block contains serialized records.
Records are serialized using Avro binary encoding derived from the Iceberg table schema. The schema is not stored in the shuffle files since both write and read sides derive it from the same table.
Writer buffers all records in memory, sorts by partition number, compresses per partition, and writes in a single pass. Memory usage per SHUFFLE_WRITE task is bounded by one Iceberg data file (default 512MB).
Shuffle orchestration
LeaderScheduler coordinates the phases using barrier synchronization via SourceCoordinator.
IncrementalChangelogScanand checks for DELETED filestarget_partition_size; similar to Spark's Adaptive Query Execution). This avoids creating excessive tasks when most hash partitions are smallDELETE /shuffle/{snapshotId}and cleans up local shuffle files. Each node'scleanupAll()at startup serves as a safety net for any missed cleanupsPartition keys are deterministic (based on file paths and partition ranges) to ensure idempotency if LeaderScheduler crashes and replans the same snapshot.
Node-to-node data transfer
Each Data Prepper node runs an Armeria HTTP server (default port 4995) to serve shuffle data.
GET /shuffle/{snapshotId}/{taskId}/indexGET /shuffle/{snapshotId}/{taskId}/data?offset={offset}&length={length}DELETE /shuffle/{snapshotId}SHUFFLE_READ workers pull data from each SHUFFLE_WRITE task. For same-node tasks, data is read directly from disk without HTTP. For remote tasks, the worker first fetches the index to compute offsets, then fetches each partition's compressed block individually.
The shuffle HTTP server supports TLS for encryption in transit. Request-level authentication (e.g. mutual TLS) is not included in this PR but can be added as a follow-up.
Shuffle key and identifier_columns
identifier_columnshash is sufficient as the shuffle key for both carryover removal and UPDATE merge. Carryover pairs (DELETE+INSERT with all data columns identical) necessarily have the sameidentifier_columnsvalues, so they are routed to the same node.identifier_columnsis now required when processing snapshots that contain DELETE operations. Without it, OpenSearch document IDs cannot be determined and correct UPDATE/DELETE processing is impossible. The plugin throwsIllegalStateExceptionif DELETED files are detected withoutidentifier_columnsconfigured. For INSERT-only tables,identifier_columnsis not required.Configuration
shuffle.partitionsshuffle.target_partition_sizeshuffle.server_portshuffle.sslshuffle.ssl_certificate_fileshuffle.ssl_key_fileOther changes
TaskGrouper(replaced entirely by shuffle)TaskGroupertoLeaderSchedulerto avoid redundantIncrementalChangelogScancalls for INSERT-only snapshotsIssues Resolved
Resolves #6666
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.