-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Closed
Milestone
Description
What happened?
Description
When writing to Iceberg tables with Managed.write(Managed.ICEBERG) in a streaming pipeline, I consistently see the following error in Dataflow workers:
java.lang.IllegalArgumentException: Expected all data writers to be closed, but found 1 data writer(s) still open
at org.apache.beam.sdk.io.iceberg.RecordWriterManager.close(RecordWriterManager.java:395)
at org.apache.beam.sdk.io.iceberg.WriteGroupedRowsToFiles$WriteGroupedRowsToFilesDoFn.processElement(WriteGroupedRowsToFiles.java:110)
Once it starts, this error repeats many times after 20-30 min, but not always in the same moment; the metric activeIcebergWriters rises (e.g., 50+) and the job stops writing new data. There is no consistent user-code exception preceding it. It occurs even after we serialize the sink to a single lane and reduce parallelism to the minimum.
Steps to reproduce
Minimal pipeline (simplified for reproducibility):
public class MainSimpleCount {
private static final Logger LOG = LoggerFactory.getLogger(MainSimpleCount.class);
public static void main(String[] args) throws Exception {
IcebergPipelineOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(IcebergPipelineOptions.class);
options.setStreaming(true);
options.setEnableStreamingEngine(true);
Pipeline pipeline = Pipeline.create(options);
// Parse JSON into Rows (note: schema fields are UPPERCASE per your error log)
PCollection<Row> input = pipeline
.apply("ReadFromPubSub", PubsubIO.readStrings().fromSubscription(options.getInputSubscription()))
.apply("ParseJsonToRow", ParDo.of(new ParseJsonToRow()))
.setRowSchema(Schemas.INPUT_SCHEMA);
// Simple 1-minute fixed window on processing time (no triggers, no accumulation)
PCollection<Row> windowed = input.apply("Window1m",
Window.<Row>into(FixedWindows.of(Duration.standardMinutes(1))));
// Key by ID we derive from UPPERCASE fields: a + '|' + b
PCollection<KV<String, Row>> keyed = windowed.apply("KeyById",
WithKeys.of((Row r) -> {
String a = r.getString("a");
String b = r.getString("b");
return a + "|" + b;
}))
.setCoder(org.apache.beam.sdk.coders.KvCoder.of(
org.apache.beam.sdk.coders.StringUtf8Coder.of(),
org.apache.beam.sdk.schemas.SchemaCoder.of(Schemas.INPUT_SCHEMA)));
// Group by ID
PCollection<KV<String, Iterable<Row>>> grouped =
keyed.apply("GroupById", GroupByKey.create());
// Count per key -> build output Row(ID, count_id)
PCollection<Row> counted = grouped.apply("CountRows",
ParDo.of(new DoFn<KV<String, Iterable<Row>>, Row>() {
@ProcessElement
public void processElement(ProcessContext c) {
String Id = c.element().getKey();
long count = 0;
for (Row r : c.element().getValue()) {
count++;
}
Row outRow = Row.withSchema(Schemas.CC_COUNT_SCHEMA)
.addValues(Id, count)
.build();
c.output(outRow);
}
}))
.setRowSchema(Schemas.CC_COUNT_SCHEMA);
// Write to Iceberg
counted.apply("WriteToIceberg", Managed.write(Managed.ICEBERG)
.withConfig(ImmutableMap.<String, Object>builder()
.put("table", "test_namespace.prices_iceberg_table_1m_id_count")
.put("catalog_name", "snowflake_open_catalog_internal")
.put("catalog_properties", ImmutableMap.<String, String>builder()
.put("type", "rest")
.put("uri", "https://XXXX.XXXXX.gcp.snowflakecomputing.com/polaris/api/catalog/")
.put("oauth2-server-uri", "https://XXXX.XXXX.gcp.snowflakecomputing.com/polaris/api/catalog/v1/oauth/tokens")
.put("header.X-Iceberg-Access-Delegation", "vended-credentials")
.put("credential", options.getCredential())
.put("warehouse", "snowflake_open_catalog_internal")
.put("scope", "PRINCIPAL_ROLE:principal_role")
.put("io-impl", "org.apache.iceberg.gcp.gcs.GCSFileIO")
.put("token-refresh-enabled", "false")
.build())
// only keep the two table columns
.put("keep", java.util.List.of("id", "count_id"))
.put("triggering_frequency_seconds", 120)
.build()));
pipeline.run();
}
}
Environment:
- Apache Beam: 2.67.0
- Runner: Google Cloud Dataflow (streaming)
- Input source: Pub/Sub → JSON → Beam Row
- Snowflake Open Catalog for Iceberg catalog
Issue Priority
Priority: 1 (data loss / total loss of function)
Issue Components
- Component: Python SDK
- Component: Java SDK
- Component: Go SDK
- Component: Typescript SDK
- Component: IO connector
- Component: Beam YAML
- Component: Beam examples
- Component: Beam playground
- Component: Beam katas
- Component: Website
- Component: Infrastructure
- Component: Spark Runner
- Component: Flink Runner
- Component: Samza Runner
- Component: Twister2 Runner
- Component: Hazelcast Jet Runner
- Component: Google Cloud Dataflow Runner