Skip to content

Add source-layer shuffle to iceberg-source for correct and scalable C…#6682

Open
lawofcycles wants to merge 8 commits intoopensearch-project:mainfrom
lawofcycles:feature/iceberg-source-shuffle
Open

Add source-layer shuffle to iceberg-source for correct and scalable C…#6682
lawofcycles wants to merge 8 commits intoopensearch-project:mainfrom
lawofcycles:feature/iceberg-source-shuffle

Conversation

@lawofcycles
Copy link
Copy Markdown
Contributor

@lawofcycles lawofcycles commented Mar 29, 2026

Description

This PR adds source-layer shuffle to the iceberg-source plugin for correct and scalable CDC processing of snapshots containing DELETE operations (UPDATE/DELETE in Copy-on-Write tables).

Problems solved

  1. Correctness: When a partition column is updated (e.g. region changes from US to EU), Iceberg produces a DELETE in the old partition and an INSERT in the new partition. The current implementation groups tasks by Iceberg partition, so these end up in separate tasks and the UPDATE merge cannot detect the cross-partition update. If the DELETE arrives at the sink after the INSERT, the document is lost. Shuffle routes all records with the same identifier_columns to the same node, enabling correct cross-partition UPDATE detection.

  2. Scalability: The previous bounds-based pairing heuristic attempts to match DELETED and ADDED files by column statistics. When pairing fails (e.g. an UPDATE changes a column's min/max bounds), all files fall back to a single task processed by one node. This is especially problematic for unpartitioned tables or large partitions where the fallback can include hundreds of files. Shuffle distributes the work evenly across all nodes by hash partitioning, regardless of Iceberg partition structure or file bounds.

Shuffle overview

When a snapshot contains DeletedDataFileScanTasks, all records in the snapshot are redistributed by identifier_columns hash so that records sharing the same identifier_columns value are guaranteed to land on the same node. Once co-located, each node can independently perform carryover removal and UPDATE merge, because it has both the DELETE and INSERT for any given document.

This redistribution uses a pull-based two-phase approach based on Spark's shuffle architecture (SortShuffleManager, IndexShuffleBlockResolver). All workers must finish Phase 1 before Phase 2 can begin, because a reader needs the complete set of records for its hash partitions across all nodes. Between the two phases, the leader collects index metadata from all shuffle files and coalesces adjacent small hash partitions into SHUFFLE_READ tasks (same approach as Spark's Adaptive Query Execution), so the number of Phase 2 tasks is determined by actual data distribution rather than the fixed partition count.

Phase 1 (SHUFFLE_WRITE): Each worker reads its assigned data files, computes hash(identifier_columns) % P for each record (P defaults to 64, configurable via shuffle.partitions), and writes records to local disk sorted by hash partition number. Each task produces one data file and one index file.

Phase 2 (SHUFFLE_READ): Each worker pulls its assigned hash partition range from all nodes, then performs carryover removal and UPDATE merge on the collected records. Since all records with the same identifier_columns value are guaranteed to be on the same node, cross-partition updates are correctly detected.

Shuffle storage is abstracted behind a ShuffleStorage interface. The current implementation uses local disk (LocalDiskShuffleStorage), but the interface allows alternative storage backends (e.g. S3) to be plugged in without changes to the shuffle orchestration or worker logic.

The pull-based design decouples writing from network transfer. Phase 1 is purely local I/O with no network dependency, so it is unaffected by slow or failing remote nodes. Phase 2 readers pull data independently and can retry failed transfers without requiring the writer to resend.

Write-side partitioning uses a fixed count (P=64 by default) because the number of nodes may vary and a sufficiently large P ensures even distribution regardless of cluster size. When the actual data volume is small relative to P, many partitions will be empty or contain very little data. After all writes complete, the leader reads the index files to learn the actual size of each partition, then coalesces adjacent small or empty partitions into larger SHUFFLE_READ tasks targeting a configurable size (default 64MB), avoiding unnecessary tasks.

For INSERT-only snapshots (no DeletedDataFileScanTask), the shuffle is skipped entirely and each file is processed as an independent task.

flowchart TD
    subgraph LeaderScheduler
        A[Detect snapshot] --> B{DELETED files?}
        B -->|No| C[INSERT-only: create one<br/>CHANGELOG_TASK per file]
        B -->|Yes| D[Phase 1: create SHUFFLE_WRITE tasks<br/>1 data file = 1 task]
        D --> E[Wait for all SHUFFLE_WRITE<br/>to complete]
        E --> F{shuffle-failed?}
        F -->|Yes| G[Clean up shuffle files<br/>on all nodes]
        G --> A
        F -->|No| H[Collect index files from all nodes<br/>via local disk and HTTP]
        H --> I[Coalesce:<br/>skip empty partitions<br/>merge small partitions<br/>target 64MB per task]
        I --> J[Phase 2: create SHUFFLE_READ tasks<br/>1 task = 1 partition range]
        J --> K[Wait for all SHUFFLE_READ<br/>to complete]
        K --> K2[Clean up shuffle files<br/>on all nodes]
        K2 --> L[Update lastProcessedSnapshotId]
        L --> A
        C --> M[Wait for all CHANGELOG_TASKs<br/>to complete]
        M --> L
    end
Loading
sequenceDiagram
    participant LS as LeaderScheduler
    participant SC as SourceCoordinator
    participant W1 as Worker Node 1
    participant W2 as Worker Node 2
    participant H1 as Node 1 HTTP Server
    participant H2 as Node 2 HTTP Server

    Note over LS: Snapshot S5 detected (contains DELETED files)

    rect rgb(230, 245, 255)
    Note over LS,H2: Phase 1: SHUFFLE_WRITE
    LS->>SC: createPartition(SHUFFLE_WRITE, file-A)
    LS->>SC: createPartition(SHUFFLE_WRITE, file-B)
    LS->>SC: createPartition(SHUFFLE_WRITE, file-C)

    W1->>SC: acquirePartition(SHUFFLE_WRITE)
    SC-->>W1: file-A task
    W2->>SC: acquirePartition(SHUFFLE_WRITE)
    SC-->>W2: file-B task

    W1->>W1: Read file-A, hash(id_cols) % 64, sort by partition#
    W1->>W1: Write data + index files to local disk
    W1->>SC: Register nodeAddress in GlobalState
    W1->>SC: completePartition(file-A)

    W2->>W2: Read file-B, hash(id_cols) % 64, sort by partition#
    W2->>W2: Write data + index files to local disk
    W2->>SC: Register nodeAddress in GlobalState
    W2->>SC: completePartition(file-B)

    W1->>SC: acquirePartition(SHUFFLE_WRITE)
    SC-->>W1: file-C task
    W1->>W1: Write data + index files to local disk
    W1->>SC: Register nodeAddress in GlobalState
    W1->>SC: completePartition(file-C)
    end

    Note over LS: All SHUFFLE_WRITE complete

    rect rgb(255, 245, 230)
    Note over LS,H2: Barrier: index collection + coalesce
    LS->>SC: Read shuffle-locations GlobalState
    LS->>LS: Read local index files (taskA, taskC)
    LS->>H2: GET /shuffle/{snapshotId}/{taskId}/index (taskB)
    H2-->>LS: index offsets
    LS->>LS: Compute per-partition sizes, coalesce (target 64MB)
    end

    rect rgb(230, 255, 230)
    Note over LS,H2: Phase 2: SHUFFLE_READ
    LS->>SC: createPartition(SHUFFLE_READ, partitions 0-20)
    LS->>SC: createPartition(SHUFFLE_READ, partitions 21-63)

    W1->>SC: acquirePartition(SHUFFLE_READ)
    SC-->>W1: partitions 0-20
    W2->>SC: acquirePartition(SHUFFLE_READ)
    SC-->>W2: partitions 21-63

    W1->>W1: Read partitions 0-20 from local disk (taskA, taskC)
    W1->>H2: GET /shuffle/.../data (taskB, partitions 0-20)
    H2-->>W1: compressed blocks
    W1->>W1: Carryover removal + UPDATE merge
    W1->>W1: Write to Buffer
    W1->>SC: completePartition

    W2->>W2: Read partitions 21-63 from local disk (taskB)
    W2->>H1: GET /shuffle/.../data (taskA, taskC, partitions 21-63)
    H1-->>W2: compressed blocks
    W2->>W2: Carryover removal + UPDATE merge
    W2->>W2: Write to Buffer
    W2->>SC: completePartition
    end

    Note over LS: All SHUFFLE_READ complete
    LS->>LS: Delete local shuffle files
    LS->>H1: DELETE /shuffle/{snapshotId}
    LS->>H2: DELETE /shuffle/{snapshotId}
    LS->>SC: Update lastProcessedSnapshotId
Loading

A SHUFFLE_READ worker retries HTTP pulls up to 3 times with exponential backoff. If all retries fail, it writes a shuffle-failed GlobalState entry. LeaderScheduler detects this during its completion polling, cleans up shuffle files, and aborts without updating lastProcessedSnapshotId. The same snapshot is retried from Phase 1 on the next polling cycle.

Shuffle data format

Each SHUFFLE_WRITE task produces one data file and one index file.

Index file: (numPartitions + 1) long offset values (8 bytes each). Partition i's data occupies the byte range offset[i] to offset[i+1] in the data file. Empty partitions have offset[i] == offset[i+1].

Example (4 partitions, partition 1 is empty):
offset[0] = 0      <- partition 0 start
offset[1] = 608    <- partition 1 start (= partition 0 end)
offset[2] = 608    <- partition 2 start (partition 1 is empty, same offset)
offset[3] = 916    <- partition 3 start
offset[4] = 996    <- end of file

Data file: LZ4-compressed blocks in partition order. Each block contains serialized records.

Per-partition block: [uncompressed size: 4B][compressed size: 4B][LZ4 compressed data]
Per-record format:   [recordLength: 4B][operation: 1B][changeOrdinal: 4B][avroSerializedRecord]

Records are serialized using Avro binary encoding derived from the Iceberg table schema. The schema is not stored in the shuffle files since both write and read sides derive it from the same table.

Writer buffers all records in memory, sorts by partition number, compresses per partition, and writes in a single pass. Memory usage per SHUFFLE_WRITE task is bounded by one Iceberg data file (default 512MB).

Shuffle orchestration

LeaderScheduler coordinates the phases using barrier synchronization via SourceCoordinator.

  1. LeaderScheduler scans the snapshot once via IncrementalChangelogScan and checks for DELETED files
  2. Creates SHUFFLE_WRITE tasks (one per data file). Each worker records its node address in GlobalState on completion
  3. After all SHUFFLE_WRITE tasks complete, reads index files from all tasks and computes per-partition data sizes
  4. Runs coalesce to merge adjacent small hash partitions into SHUFFLE_READ tasks targeting 64MB each (configurable via target_partition_size; similar to Spark's Adaptive Query Execution). This avoids creating excessive tasks when most hash partitions are small
  5. Creates SHUFFLE_READ tasks. Each task includes the coalesced partition range and the list of (taskId, nodeAddress) pairs to pull data from
  6. After all SHUFFLE_READ tasks complete, sends async cleanup requests to all nodes via DELETE /shuffle/{snapshotId} and cleans up local shuffle files. Each node's cleanupAll() at startup serves as a safety net for any missed cleanups

Partition keys are deterministic (based on file paths and partition ranges) to ensure idempotency if LeaderScheduler crashes and replans the same snapshot.

Node-to-node data transfer

Each Data Prepper node runs an Armeria HTTP server (default port 4995) to serve shuffle data.

Endpoint Response
GET /shuffle/{snapshotId}/{taskId}/index Index file as binary (long[] offsets)
GET /shuffle/{snapshotId}/{taskId}/data?offset={offset}&length={length} Specified byte range from data file (LZ4-compressed block)
DELETE /shuffle/{snapshotId} Deletes shuffle files for the snapshot on this node

SHUFFLE_READ workers pull data from each SHUFFLE_WRITE task. For same-node tasks, data is read directly from disk without HTTP. For remote tasks, the worker first fetches the index to compute offsets, then fetches each partition's compressed block individually.

The shuffle HTTP server supports TLS for encryption in transit. Request-level authentication (e.g. mutual TLS) is not included in this PR but can be added as a follow-up.

Shuffle key and identifier_columns

identifier_columns hash is sufficient as the shuffle key for both carryover removal and UPDATE merge. Carryover pairs (DELETE+INSERT with all data columns identical) necessarily have the same identifier_columns values, so they are routed to the same node.

identifier_columns is now required when processing snapshots that contain DELETE operations. Without it, OpenSearch document IDs cannot be determined and correct UPDATE/DELETE processing is impossible. The plugin throws IllegalStateException if DELETED files are detected without identifier_columns configured. For INSERT-only tables, identifier_columns is not required.

Configuration

Parameter Default Description
shuffle.partitions 64 Number of hash partitions
shuffle.target_partition_size 64mb Target size for coalesced SHUFFLE_READ tasks
shuffle.server_port 4995 HTTP server port for shuffle data transfer
shuffle.ssl true Enable TLS for shuffle HTTP server
shuffle.ssl_certificate_file (required when ssl=true) TLS certificate file path
shuffle.ssl_key_file (required when ssl=true) TLS private key file path

Other changes

  • Removed bounds-based pairing from TaskGrouper (replaced entirely by shuffle)
  • Moved changelog scan from TaskGrouper to LeaderScheduler to avoid redundant IncrementalChangelogScan calls for INSERT-only snapshots
  • Partition keys use deterministic hashes (SHA-256 of file paths) instead of UUIDs to ensure idempotency if LeaderScheduler replans the same snapshot after a crash (#6554 review comment)

Issues Resolved

Resolves #6666

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

…DC processing

Introduce a pull-based shuffle mechanism for processing snapshots that contain
DELETE operations (UPDATE/DELETE in Copy-on-Write tables). When a snapshot contains
DeletedDataFileScanTasks, records are shuffled by identifier_columns hash across nodes
so that carryover removal and UPDATE merge operate on complete data, including cross-partition updates.

Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
@lawofcycles
Copy link
Copy Markdown
Contributor Author

@dlvenable I would appreciate your review on this PR. This implements the source-layer shuffle discussed in #6554 (comment), addressing both the correctness bug (cross-partition UPDATE data loss) and the scalability limitation (bounds-based pairing fallback to single-node processing) described in #6666.

Once this is merged, I plan to update the CDC RFC (#6552) to reflect the shuffle design.
With this PR and a few remaining items such as metrics (#6668), I believe the iceberg-source plugin would be feature-complete as an initial version.

… to shuffle server

Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
@lawofcycles
Copy link
Copy Markdown
Contributor Author

I have opened a documentation PR for Iceberg source plugin: opensearch-project/documentation-website#12164. It reflects the current state of the implementation and would be useful as a reference during review.

@lawofcycles
Copy link
Copy Markdown
Contributor Author

The shuffle HTTP server supports TLS for encryption in transit. Request-level authentication (e.g. mutual TLS) is not included in this PR but can be added as a follow-up. If authentication should be included in the initial implementation, please let me know.

…lient utility

Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
@lawofcycles
Copy link
Copy Markdown
Contributor Author

Fixed a bug where LeaderScheduler only read index files from its own local disk for coalescing. In multi-node environments, this could miss data from other nodes, resulting in incorrect partition size estimates or no SHUFFLE_READ tasks being created. The fix fetches index metadata from all nodes via the shuffle HTTP endpoint. Also extracted ShuffleNodeClient as a shared utility for HTTP pull and retry logic.
cfdeb8f

Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
@lawofcycles
Copy link
Copy Markdown
Contributor Author

Fixed another issue: Previously, shuffleStorage.cleanup() only deleted shuffle files on the leader node's local disk. Remote nodes' shuffle files were never cleaned up until process restart. Added a DELETE /{snapshotId} endpoint to ShuffleHttpService and requestCleanup to ShuffleNodeClient. LeaderScheduler now sends async cleanup requests to all nodes after shuffle completion or failure, matching Spark's pattern where the Driver sends RemoveShuffle to all Executors. Startup cleanupAll() remains as a safety net.
70c3cc8

…te before partitions

Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
@lawofcycles
Copy link
Copy Markdown
Contributor Author

During multi-node performance testing, I found a race condition where incrementSnapshotCompletionCount can be called before the Leader creates the completion GlobalState. I fixed this for the shuffle write path in this PR by moving the completion GlobalState creation before the partition creation loop in processShuffleSnapshot, matching the order already used in processInsertOnlySnapshot.
a8bb484
The same issue exists in the initial load path (predates this PR), tracked in #6686.

@github-actions
Copy link
Copy Markdown

github-actions bot commented Mar 30, 2026

✅ License Header Check Passed

All newly added files have proper license headers. Great work! 🎉

…eader for correct type handling

The shuffle record serialization used GenericDatumWriter/GenericDatumReader which only
handle Avro native types. Iceberg Records contain Java types like OffsetDateTime for
timestamptz columns, causing AvroRuntimeException during SHUFFLE_WRITE.

Replace with Iceberg's DataWriter and PlannedDataReader which handle the Iceberg-to-Avro
type conversion internally. Extract serialization logic into RecordAvroSerializer utility
class with roundtrip tests covering temporal types.

Also fix shuffle write completion key race condition by creating GlobalState before
partitions in processShuffleSnapshot, matching the order used in processInsertOnlySnapshot.

Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
@lawofcycles lawofcycles force-pushed the feature/iceberg-source-shuffle branch from 2d1d6f1 to bd4e2c5 Compare March 31, 2026 00:05
@lawofcycles
Copy link
Copy Markdown
Contributor Author

I addressed bugs and improvement points that I found during the e2e performance test.

Copy link
Copy Markdown
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @lawofcycles for this improvement!

* Standalone Armeria HTTP server for serving shuffle data.
* Runs independently from PeerForwarder to avoid core dependencies.
*/
public class ShuffleHttpServer {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use the common HTTP server code. This is important for providing consistent experience with ports that Data Prepper may open. You can see how this is used in the http source for example.

@JsonProperty("target_partition_size")
private ByteCount targetPartitionSize = ByteCount.parse(DEFAULT_TARGET_PARTITION_SIZE);

@JsonProperty("server_port")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The approach of using S3 versus local disk should probably be handled via a plugin. This would allow us to extend it more. This is how we support different source coordination mechanisms or different authentication mechanisms in plugins like the http source.

Since this source is @Experimental we can break the config between versions so we don't need to do this now.

private static final Logger LOG = LoggerFactory.getLogger(ShuffleNodeClient.class);
private static final int MAX_RETRIES = 3;

private final HttpClient httpClient;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use Armeria's web client for web requests.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[FEATURE] Add source-layer shuffle to iceberg-source for correct and scalable CDC processing

2 participants