Skip to content

[Bug]: IcebergIO- RecordWriterManager fails with "Expected all data writers to be closed" when exceptions occur during write #35940

@masfworld

Description

@masfworld

What happened?

Description

When writing to Iceberg tables with Managed.write(Managed.ICEBERG) in a streaming pipeline, I consistently see the following error in Dataflow workers:

java.lang.IllegalArgumentException: Expected all data writers to be closed, but found 1 data writer(s) still open
    at org.apache.beam.sdk.io.iceberg.RecordWriterManager.close(RecordWriterManager.java:395)
    at org.apache.beam.sdk.io.iceberg.WriteGroupedRowsToFiles$WriteGroupedRowsToFilesDoFn.processElement(WriteGroupedRowsToFiles.java:110)

Once it starts, this error repeats many times after 20-30 min, but not always in the same moment; the metric activeIcebergWriters rises (e.g., 50+) and the job stops writing new data. There is no consistent user-code exception preceding it. It occurs even after we serialize the sink to a single lane and reduce parallelism to the minimum.

Steps to reproduce

Minimal pipeline (simplified for reproducibility):

public class MainSimpleCount {

  private static final Logger LOG = LoggerFactory.getLogger(MainSimpleCount.class);

  public static void main(String[] args) throws Exception {
    IcebergPipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(IcebergPipelineOptions.class);
    options.setStreaming(true);
    options.setEnableStreamingEngine(true);

    Pipeline pipeline = Pipeline.create(options);

    // Parse JSON into Rows (note: schema fields are UPPERCASE per your error log)
    PCollection<Row> input = pipeline
        .apply("ReadFromPubSub", PubsubIO.readStrings().fromSubscription(options.getInputSubscription()))
        .apply("ParseJsonToRow", ParDo.of(new ParseJsonToRow()))
        .setRowSchema(Schemas.INPUT_SCHEMA);

    // Simple 1-minute fixed window on processing time (no triggers, no accumulation)
    PCollection<Row> windowed = input.apply("Window1m",
        Window.<Row>into(FixedWindows.of(Duration.standardMinutes(1))));

    // Key by ID we derive from UPPERCASE fields: a + '|' + b
    PCollection<KV<String, Row>> keyed = windowed.apply("KeyById",
        WithKeys.of((Row r) -> {
          String a = r.getString("a");
          String b = r.getString("b");
          return a + "|" + b;
        }))
        .setCoder(org.apache.beam.sdk.coders.KvCoder.of(
            org.apache.beam.sdk.coders.StringUtf8Coder.of(),
            org.apache.beam.sdk.schemas.SchemaCoder.of(Schemas.INPUT_SCHEMA)));

    // Group by ID
    PCollection<KV<String, Iterable<Row>>> grouped =
        keyed.apply("GroupById", GroupByKey.create());

    // Count per key -> build output Row(ID, count_id)
    PCollection<Row> counted = grouped.apply("CountRows",
        ParDo.of(new DoFn<KV<String, Iterable<Row>>, Row>() {
          @ProcessElement
          public void processElement(ProcessContext c) {
            String Id = c.element().getKey();
            long count = 0;
            for (Row r : c.element().getValue()) {
              count++;
            }

            Row outRow = Row.withSchema(Schemas.CC_COUNT_SCHEMA)
                .addValues(Id, count)
                .build();
            c.output(outRow);
          }
        }))
        .setRowSchema(Schemas.CC_COUNT_SCHEMA);

    // Write to Iceberg
    counted.apply("WriteToIceberg", Managed.write(Managed.ICEBERG)
        .withConfig(ImmutableMap.<String, Object>builder()
            .put("table", "test_namespace.prices_iceberg_table_1m_id_count")
            .put("catalog_name", "snowflake_open_catalog_internal")
            .put("catalog_properties", ImmutableMap.<String, String>builder()
                .put("type", "rest")
                .put("uri", "https://XXXX.XXXXX.gcp.snowflakecomputing.com/polaris/api/catalog/")
                .put("oauth2-server-uri", "https://XXXX.XXXX.gcp.snowflakecomputing.com/polaris/api/catalog/v1/oauth/tokens")
                .put("header.X-Iceberg-Access-Delegation", "vended-credentials")
                .put("credential", options.getCredential())
                .put("warehouse", "snowflake_open_catalog_internal")
                .put("scope", "PRINCIPAL_ROLE:principal_role")
                .put("io-impl", "org.apache.iceberg.gcp.gcs.GCSFileIO")
                .put("token-refresh-enabled", "false")
                .build())
            // only keep the two table columns
            .put("keep", java.util.List.of("id", "count_id"))
            .put("triggering_frequency_seconds", 120)
            .build()));

    pipeline.run();
  }
} 

Environment:

  • Apache Beam: 2.67.0
  • Runner: Google Cloud Dataflow (streaming)
  • Input source: Pub/Sub → JSON → Beam Row
  • Snowflake Open Catalog for Iceberg catalog

Issue Priority

Priority: 1 (data loss / total loss of function)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions