Skip to content

[BUG] Iceberg source initial load completion detection fails due to race condition between Leader and Worker #6686

@lawofcycles

Description

@lawofcycles

Describe the bug
When running iceberg-source with multiple nodes, the initial load completion tracking fails due to a race condition. The Leader creates the completion tracking GlobalState after creating all task partitions. Workers can process partitions and receive acknowledgements before the Leader finishes, and when a Worker calls incrementSnapshotCompletionCount while the completion key does not yet exist, the method returns without incrementing the count. This causes waitForSnapshotComplete to never reach the expected total, the Leader's lease expires, and a new Leader starts a redundant second initial load for the same snapshot.

To Reproduce

  1. Configure an iceberg-source pipeline with acknowledgments: true and multiple Data Prepper nodes (e.g. 8 nodes).
  2. Point it at an Iceberg table with many data files (e.g. 800 files).
  3. Start the pipeline.
  4. Observe the following error in Worker logs:
  ERROR ChangelogWorker - Failed to get completion status for snapshot-completion-initial-<snapshotId>
  1. After all partitions are processed, the Leader remains stuck in waitForSnapshotComplete because the completed count never reaches total.
  2. The Leader's lease expires and a new Leader starts a second initial load for the same snapshot.

Expected behavior
The initial load should complete in a single pass. The Leader should detect that all partitions have been processed and log Initial load completed.

Environment:

  • Data Prepper version: 2.15.0-SNAPSHOT
  • 8 Fargate tasks (2 vCPU / 16 GiB each)
  • Iceberg table: ~41M rows, 800 data files
  • Source coordination store: DynamoDB

Additional context
Root cause is in LeaderScheduler.java. The completion GlobalState is created after the partition creation loop:

// Partitions are created here (Workers can start processing immediately)
try (CloseableIterable<FileScanTask> tasks = scan.planFiles()) {
   for (final FileScanTask task : tasks) {
       sourceCoordinator.createPartition(...);
       taskCount++;
   }
}

// Completion key is created AFTER all partitions — too late
final String completionKey = SNAPSHOT_COMPLETION_PREFIX + "initial-" + snapshotId;
sourceCoordinator.createPartition(new GlobalState(completionKey,
       Map.of("total", taskCount, "completed", 0)));

In ChangelogWorker.incrementSnapshotCompletionCount, when the completion key is not found, the method returns without incrementing:

if (partitionOpt.isEmpty()) {
   LOG.error("Failed to get completion status for {}", completionKey);
   return;  // count is never incremented
}

Data integrity is not affected. All partitions are processed and written to the sink. The issue is that completion detection fails, causing a redundant second initial load and delaying the transition to CDC polling.

Unlike the initial load path, processInsertOnlySnapshot (CDC INSERT path) correctly creates the completion GlobalState before the partitions and does not have this issue.

A possible fix is to add a bounded retry in incrementSnapshotCompletionCount when the completion key is not yet available. Moving the completion key creation before the partition loop (as done in processInsertOnlySnapshot) is not straightforward for initial load because planFiles() is streamed and the total count is not known upfront.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    Status

    Unplanned

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions