-
Notifications
You must be signed in to change notification settings - Fork 313
[RFC] Iceberg Sink Plugin #6664
Description
[RFC] Iceberg Sink Plugin
This RFC proposes a new iceberg sink plugin for Data Prepper that writes events into Apache Iceberg tables. The plugin supports INSERT, UPDATE, and DELETE operations, enabling use cases from log ingestion to change data replication.
Related: #6652 (S3 Tables Iceberg proposal), #6552 (Iceberg CDC Source Plugin)
1. Context
Apache Iceberg is an open table format widely used for lakehouse architectures on storage like S3, GCS, and HDFS. Iceberg tables are written by engines like Spark, Flink, and Trino, and are commonly used as the central store for analytical data.
Data Prepper already has an Iceberg CDC Source plugin (#6552) that reads changes from Iceberg tables. An Iceberg Sink plugin completes the picture by enabling Data Prepper to write data into Iceberg tables from any source, including HTTP, S3, Kafka, RDS, and the Iceberg Source itself.
2. Motivation
Data Prepper collects data from diverse sources (logs, metrics, traces, change streams) and delivers them to sinks such as OpenSearch, S3, and Kafka. Adding an Iceberg sink enables the following use cases.
Log and event analytics in the lakehouse. Organizations using Iceberg-based lakehouses can ingest logs, metrics, and traces directly from Data Prepper into Iceberg tables, making them queryable by Spark, Trino, Athena, and other engines without an intermediate ETL step.
CDC replication to Iceberg. Data Prepper's RDS Source captures row-level changes from relational databases. An Iceberg sink enables replicating these changes into Iceberg tables for analytical processing, replacing custom Spark/Flink jobs.
Iceberg-to-Iceberg synchronization. Combined with the Iceberg CDC Source (#6552), the sink enables replicating changes between Iceberg tables across different catalogs or environments (e.g., production to analytics).
3. Proposal
A new iceberg sink plugin that writes Data Prepper events into Iceberg tables using the Iceberg Java API.
Features
- Streaming ingestion from any Data Prepper source to Iceberg tables
- Dynamic multi-table routing using Data Prepper expression language
- INSERT, UPDATE, and DELETE support from RDS and Iceberg sources
- Automatic table creation from pipeline configuration
- Coordinated commits across multi-node Data Prepper clusters
- At-least-once delivery with end-to-end acknowledgements
- Automatic schema evolution
- Deletion Vector support for in-batch deduplication on v3 tables
Pipeline examples
Append-only: HTTP source to Iceberg
Events received via HTTP are written as new rows to an existing Iceberg table.
iceberg-pipeline:
source:
http:
path: "/logs"
sink:
- iceberg:
catalog:
type: "rest"
uri: "https://polaris.example.com/api/catalog"
warehouse: "my_warehouse"
io-impl: "org.apache.iceberg.aws.s3.S3FileIO"
client.region: "us-east-1"
table_identifier: "my_namespace.my_table"Row-level changes: RDS Source to Iceberg (multi-table)
Replicate row-level changes from a relational database to Iceberg tables. The table_identifier uses the expression language to route each event to the corresponding Iceberg table based on the source table name. The operation setting reads the operation type from the event metadata set by the RDS Source.
rds-to-iceberg-pipeline:
source:
rds:
# ...
sink:
- iceberg:
catalog:
type: "rest"
uri: "https://polaris.example.com/api/catalog"
warehouse: "my_warehouse"
table_identifier: "${getMetadata(\"table_name\")}"
operation: "${getMetadata(\"opensearch_action\")}"
identifier_columns: ["id"]
dlq:
s3:
bucket: "my-dlq-bucket"
key_path_prefix: "iceberg-failures/"Multiple static tables
To write different events to different tables with static table names, use multiple sink entries. Combined with Data Prepper's route feature, each sink receives only the events matching its route condition.
multi-table-pipeline:
source:
http:
path: "/events"
route:
- orders: '/type == "order"'
- users: '/type == "user"'
sink:
- iceberg:
routes: ["orders"]
catalog:
type: "rest"
uri: "https://polaris.example.com/api/catalog"
warehouse: "my_warehouse"
table_identifier: "my_namespace.orders"
- iceberg:
routes: ["users"]
catalog:
type: "rest"
uri: "https://polaris.example.com/api/catalog"
warehouse: "my_warehouse"
table_identifier: "my_namespace.users"Auto-create with schema definition
When the destination table does not exist yet, the plugin can create it from a schema definition in the pipeline YAML. Column types use Iceberg type names directly. Event fields are matched to table columns by name.
iceberg-pipeline:
source:
http:
path: "/logs"
sink:
- iceberg:
catalog:
type: "glue"
warehouse: "s3://my-bucket/warehouse"
io-impl: "org.apache.iceberg.aws.s3.S3FileIO"
table_identifier: "my_namespace.my_table"
auto_create: true
schema:
columns:
- name: "user_id"
type: "long"
required: true
- name: "event_time"
type: "timestamp"
required: true
- name: "message"
type: "string"
partition_spec:
- column: "event_time"
transform: "day"Auto-create with schema inference
When auto_create is true and no schema definition is provided, the plugin infers the schema from the first batch of events. Type inference is based on the Java types of event values (e.g., Number to LongType, String to StringType), the same approach used by the Kafka Connect Iceberg Sink for schemaless records.
iceberg-pipeline:
source:
http:
path: "/logs"
sink:
- iceberg:
catalog:
type: "glue"
warehouse: "s3://my-bucket/warehouse"
io-impl: "org.apache.iceberg.aws.s3.S3FileIO"
table_identifier: "my_namespace.my_table"
auto_create: trueSchema inference has limitations: it cannot distinguish between int and long, float and double, or string and timestamp. For precise type control, use a schema definition or create the table in advance.
The full configuration reference is at the end of this document.
4. Background: Iceberg write model
This section explains the Iceberg concepts relevant to the sink's design.
4.1 Table structure
An Iceberg table stores its actual data in data files (Parquet, ORC, or Avro) on object storage such as S3. Separately from the data files, Iceberg maintains a metadata layer that tracks which data files belong to the current table state. This separation of data and metadata is a key design principle of Iceberg.
The metadata layer consists of a metadata file, manifest lists, and manifest files, forming a tree structure that ultimately points to the data files. A catalog (such as REST, Glue, or Hive Metastore) stores the location of the current metadata file for each table. The catalog is the entry point for all table operations.
Every write operation creates a new immutable snapshot. A snapshot does not copy or move data files. Instead, it records a new set of manifest files that reference the data files making up the table at that point in time. Each snapshot knows which data files were added and which were removed compared to the previous snapshot.
A commit is the atomic operation that publishes a new snapshot. The writer creates new metadata and manifest files on storage, then asks the catalog to atomically update the metadata pointer. Iceberg uses Optimistic Concurrency Control (OCC) to handle concurrent commits: if another writer committed in the meantime, the operation is retried against the updated metadata.
4.2 Write modes for DELETE and UPDATE
Iceberg supports two approaches for handling DELETE and UPDATE operations. The approach is not fixed per table. Different engines, or the same engine at different times, can write to the same table using different approaches.
Copy-on-Write (CoW). When a row in a data file is updated or deleted, the entire data file is rewritten with the modification applied. The snapshot records the old file as removed and the new file as added. This produces the best read performance because there are no delete files to reconcile at query time, but write cost is high because unchanged rows must also be rewritten.
Merge-on-Read (MoR). Instead of rewriting data files, a separate delete file records which rows should be considered deleted. Readers merge the data files and delete files at query time. This is cheaper to write because existing data files are not touched, but adds read-time overhead. Periodic compaction merges delete files into data files, eliminating the read-time cost and reclaiming storage.
MoR uses delete files, which come in three forms:
| Delete file type | Mechanism | Write cost | Read cost |
|---|---|---|---|
| Equality delete | Specifies rows to delete by column values (e.g., user_id = 123) |
Low. No existing data read needed | High. Every read must match against delete files |
| Positional delete | Specifies rows to delete by data file path + row position | Low if the writer knows the row position. Otherwise requires a table scan to locate the row | Low. Direct row lookup |
| Deletion Vector (v3) | A bitmaps indicating deleted row positions within a data file, stored in Puffin format. This is new feature on Iceberg v3 and it will replace Positional delete in long term. | Generally lower than positional delete with dedicated file format. Writers must merge existing DVs to maintain at most one DV per data file | Low. Direct row lookup with compact representation |
Note: some query engines do not support reading equality deletes (e.g., Snowflake, PyIceberg). Tables containing equality delete files may not be readable by these engines until the delete files are compacted away.
5. Architecture
5.1 Overview
The plugin's processing is split into two paths.
Write path. Data Prepper's pipeline framework calls Sink.output(records) concurrently from multiple threads on each node. Since Iceberg's TaskWriter is not thread-safe, each thread maintains its own instance. When table_identifier is dynamic, the plugin resolves the target table for each event and maintains a separate TaskWriter per table (similar to the OpenSearch Sink's dynamic index routing). In a multi-node cluster, every node writes data files independently to storage. The TaskWriter handles file size management automatically (rolling to a new file when targetFileSize is reached).
Commit path. A CommitScheduler runs on a single leader node. Leader election uses Data Prepper's existing lease-based coordination infrastructure (EnhancedSourceCoordinator), which is currently used by Source plugins such as the Iceberg CDC Source. This RFC proposes extending it to support Sink plugins as well. At each commit_interval (default 5 minutes), the scheduler collects all pending write results from all nodes and commits them to Iceberg in a single transaction.
flowchart TB
subgraph WritePath["Write path (all nodes)"]
direction TB
Output["Sink.output(records)"] --> Convert["Event → GenericRecord"]
Convert --> TW["TaskWriter.write()"]
end
TW --> Storage[("Storage\n(S3 / GCS / HDFS)")]
TW --> CS[("Coordination Store")]
subgraph CommitPath["Commit path (leader node only)"]
direction TB
Scheduler["CommitScheduler\n(every commit_interval)"] --> Collect["Collect pending\nWriteResultPartitions"]
Collect --> Commit["Iceberg commit"]
Commit --> Ack["EventHandle.release(true)"]
end
CS --> Collect
Commit --> Catalog[("Iceberg Catalog\n(REST / Glue / Hive / ...)")]
sequenceDiagram
participant N1 as Node 1
participant N2 as Node 2
participant ST as Storage (S3/GCS)
participant CS as Coordination Store
participant L as Leader Node
participant CAT as Iceberg Catalog
Note over N1,N2: Write path
N1->>ST: TaskWriter writes data/delete files
N2->>ST: TaskWriter writes data/delete files
N1->>CS: Register WriteResultPartition
N2->>CS: Register WriteResultPartition
Note over L: Commit path (every commit_interval)
L->>CS: Collect pending WriteResultPartitions
CS-->>L: WriteResults from all nodes
L->>CAT: Commit snapshot (newAppend / newRowDelta)
CAT-->>L: Commit success
L->>CS: Mark partitions complete
L->>L: EventHandle.release(true) for all committed events
5.2 Write mode
When handling DELETE and UPDATE operations, the plugin uses equality delete with in-batch positional delete optimization, the same approach as the Flink Iceberg sink. Equality delete writes a small delete file instead of rewriting existing data files, making it suitable for high-frequency streaming writes. For rows INSERTed and then DELETEd within the same batch, the sink already knows the file path and row position, so it uses the cheaper positional delete (or Deletion Vectors on v3 tables) instead of equality delete.
This is a pragmatic choice for streaming. Depending on the query engines used to read the table, different write strategies may be preferable (e.g., CoW for engines that do not support equality deletes, or positional delete only for better read performance). The plugin is designed with extensibility in mind so that alternative strategies can be offered as configuration options in the future.
5.3 Commit coordination
sequenceDiagram
participant N1 as Node 1
participant N2 as Node 2
participant CS as Coordination Store
participant L as Leader Node
participant CAT as Iceberg Catalog
Note over N1,N2: output() completes, files written to storage
N1->>CS: Register WriteResultPartition (file paths, EventHandles)
N2->>CS: Register WriteResultPartition (file paths, EventHandles)
Note over L: CommitScheduler wakes (every commit_interval)
L->>CS: Acquire pending WriteResultPartitions
CS-->>L: WriteResults from Node 1 and Node 2
alt Append-only (no delete files)
L->>CAT: table.newAppend() with all data files
else With delete files
L->>CAT: table.newRowDelta() per batch in order
end
CAT-->>L: Commit success
L->>CS: Mark partitions complete
L->>L: EventHandle.release(true)
Note over L: On commit failure
L->>L: Iceberg auto-retry (OCC)
L-->>L: If exhausted, retry next cycle
Each Iceberg commit creates a new snapshot with associated metadata and manifest files. If every node committed independently at high frequency, the resulting volume of snapshots and small data files would degrade read performance and increase OCC conflicts. Centralizing commits into a single leader at a controlled interval (default 5 minutes) keeps snapshot and file counts manageable. The Flink Iceberg sink and the Kafka Connect Iceberg sink both use this single-committer pattern.
The plugin follows the same approach by extending Data Prepper's EnhancedSourceCoordinator to support Sink plugins.
Framework changes required. The EnhancedSourceCoordinator itself is a general-purpose lease-based partition manager with no Source-specific logic. Extending it to Sink plugins requires adding a new interface (analogous to UsesEnhancedSourceCoordination) and a coordinator injection loop for Sinks in Pipeline.execute(). The existing Source coordinator logic is not affected.
The coordinator is a general-purpose lease-based partition manager backed by Data Prepper's coordination store.
Partition types used by the plugin:
| Partition type | Purpose |
|---|---|
LeaderPartition |
Leader election. One per table. |
WriteResultPartition |
One per output() batch per table. Contains the table identifier, data file paths, and delete file paths. EventHandles are held in memory on the originating node, keyed by partition ID. |
GlobalState |
Table metadata (last commit time, commit ID). |
Commit behavior depends on the write mode:
| Write mode | Commit strategy |
|---|---|
| Append-only (no delete files) | Multiple WriteResults merged into a single table.newAppend() |
| With delete files | WriteResults committed individually in order via table.newRowDelta(). Merging would break equality delete semantics because Iceberg applies equality deletes only to data files with a strictly lower sequence number. |
When table_identifier is dynamic, each WriteResultPartition includes the resolved table identifier. The CommitScheduler groups pending partitions by table and commits each table independently.
5.4 Leader failover
When the leader node fails, the coordinator's lease expires and another node becomes leader. Uncommitted WriteResultPartition entries remain in the coordinator for the new leader to process.
To prevent duplicate commits when the leader fails mid-commit, each commit records a unique commit ID in the Iceberg snapshot's summary properties. The new leader checks the latest snapshot's commit ID and skips already-committed partitions. Flink uses the same approach (flink.job-id + flink.max-committed-checkpoint-id in snapshot summary).
6. Processing flow
6.1 Append-only flow
By default, the plugin writes all events as new rows (INSERT). This is the behavior when the operation setting (described in the next section) is not configured.
Each event received in output() is converted to an Iceberg GenericRecord using the table schema (see Event conversion below). The record is then written to the TaskWriter, which handles partitioning and file management internally: it computes the partition value from the record using the table's partition spec, routes the record to the appropriate per-partition file writer, and rolls to a new file when targetFileSize is reached.
Each thread maintains a long-lived TaskWriter across multiple output() calls. When commit_interval has elapsed since the last flush, the thread closes the TaskWriter, registers the resulting WriteResult in the coordination store, and creates a new TaskWriter. This keeps the number of WriteResults per commit cycle to approximately (number of nodes) x (number of threads), rather than one per output() call. Each thread flushes independently; the CommitScheduler commits whatever WriteResults are available in the coordinator at that point. WriteResults from threads that have not yet flushed are committed in the next cycle.
After writing, the TaskWriter's WriteResult (containing the paths of data files written) is registered in the coordination store. The EventHandles for the written events are not stored in the coordinator (they are not serializable). Instead, each node holds them in memory, keyed by the WriteResultPartition ID. When the CommitScheduler marks a partition as COMPLETED in the coordinator, the originating node detects this via polling and calls EventHandle.release(true) for the corresponding events.
6.2 DELETE and UPDATE flow
When operation is configured, the plugin resolves the operation type for each event using the configured expression (e.g., ${getMetadata("opensearch_action")}). The resolved value is mapped to an Iceberg operation (case-insensitive):
| Value | Operation | Notes |
|---|---|---|
index, insert, create, c, i, r |
INSERT | r is Debezium's snapshot (read) operation |
update, u |
UPDATE (equality DELETE + INSERT) | |
delete, d |
DELETE (equality delete) | |
| Unrecognized | DLQ |
This mapping supports RDS Source ("index" / "delete") and Iceberg Source ("INSERT" / "DELETE") without additional configuration.
The write path uses BaseEqualityDeltaWriter from Iceberg core. For each event:
- INSERT: The record is written to a data file. The writer records the record's key and file position in an in-memory map (
insertedRowMap) for potential in-batch deduplication. - DELETE: The writer first checks
insertedRowMap. If the key was inserted earlier in the same batch, the writer emits a positional delete (or Deletion Vector on v3 tables), which is cheaper to read than an equality delete. If the key is not found (the row was written in a previous commit), the writer emits an equality delete file using theidentifier_columns. - UPDATE: Processed as a DELETE followed by an INSERT.
identifier_columns specifies which columns uniquely identify a row in the table (e.g., a primary key). The equality delete file contains only these columns' values, and at read time, any row matching those values is treated as deleted. This setting is required when operation is configured, because without it the plugin cannot construct equality delete files. If the table already has identifier-field-ids set in its schema, those are used by default. The YAML identifier_columns setting overrides the table definition, which is useful when the table does not have identifier-field-ids set (the more common case in practice).
For partitioned tables, the plugin maintains a separate BaseEqualityDeltaWriter per partition, so that each partition has its own insertedRowMap.
The plugin selects the TaskWriter implementation automatically based on the table's partition spec and whether identifier_columns is configured:
| Table state | Writer |
|---|---|
| Unpartitioned, append-only | UnpartitionedWriter |
| Partitioned, append-only | PartitionedFanoutWriter |
| Unpartitioned, with DELETE/UPDATE | Custom writer with single BaseEqualityDeltaWriter |
| Partitioned, with DELETE/UPDATE | Custom writer with per-partition BaseEqualityDeltaWriter |
6.3 Event conversion
Events (JacksonEvent) are schema-less JSON. The plugin converts them to Iceberg GenericRecord using the table schema as the reference, applying type coercion where possible:
| Iceberg type | Accepted event values |
|---|---|
int, long |
Number (intValue()/longValue()) or String (parseInt()/parseLong()) |
float, double |
Number or String |
decimal(P,S) |
Number or String -> BigDecimal |
boolean |
Boolean or String (parseBoolean()) |
string |
String, Number (toString()), Boolean (toString()) |
timestamp, timestamptz |
String (ISO 8601) or Number (epoch millis) |
date |
String (ISO 8601) or Number (epoch days) |
time |
String (ISO 8601) or Number (millis) |
binary |
String (Base64 decode) |
uuid |
String (UUID.fromString()) |
struct |
Map (recursive conversion) |
list |
List (recursive conversion) |
map |
Map |
Mismatch handling is per-event. One bad event does not fail the entire batch.
| Scenario | Behavior |
|---|---|
| Exact match | Write |
| Extra fields in event | Ignore |
| Missing optional field | NULL |
| Missing required field | DLQ |
| Wrong type, coercible | Coerce and write |
| Wrong type, not coercible, optional column | NULL |
| Wrong type, not coercible, required column | DLQ |
| No fields match schema | DLQ |
6.4 Schema management
Table exists (default). The plugin loads the table and uses table.schema() for conversion. No schema configuration needed.
Auto-create. When auto_create is true and the table does not exist, the plugin creates it. If schema is provided, the table is created from the definition. If schema is not provided, the schema is inferred from the first batch of events. After creation, the table schema is used for conversion. Race conditions during creation are handled by catching AlreadyExistsException and falling back to loadTable().
When table_identifier is dynamic, auto-create applies independently to each resolved table. In this case, schema cannot be specified (the plugin rejects this combination at startup) because different tables typically have different schemas. Each table's schema is inferred from the first batch of events routed to that table.
Schema evolution. When schema_evolution is true and an event contains fields not present in the current table schema, the plugin adds the missing columns automatically. The process follows the same approach as the Kafka Connect Iceberg Sink: flush the current file, call table.updateSchema().addColumn() to commit the new columns, reinitialize the TaskWriter with the updated schema, and re-convert the event. Column types are inferred from the event value using the same logic as schema inference in auto-create. Concurrent schema updates from multiple nodes are handled by refreshing the table metadata and skipping columns that have already been added by another node.
Schema evolution supports column addition only. Column deletion, type changes (e.g., int to long), and renames require precise type information that is not available from schemaless JSON events. Supporting these operations would require an external schema source such as a schema registry.
When multiple nodes infer different types for the same new field, the first committed type wins. Subsequent nodes refresh the table metadata, discover the existing column, and convert values using the established type. If conversion fails, the standard mismatch handling applies (NULL for optional columns, DLQ for required columns).
If auto_create is false (the default) and the table does not exist, the plugin fails at startup. When table_identifier is dynamic, the table name is resolved at runtime. If a resolved table does not exist, the event is sent to DLQ.
7. Data guarantees
7.1 At-least-once delivery
With end-to-end acknowledgements enabled. Events are acknowledged back to the source only after the Iceberg commit succeeds. If the commit fails, events are not acknowledged and the source re-sends them. This provides at-least-once delivery. Duplicate events can occur in a narrow window: when the Iceberg commit succeeds but the acknowledgement does not reach the source before a crash. On recovery, the source re-sends the same events, resulting in duplicate writes.
Without end-to-end acknowledgements. The source does not wait for confirmation from the sink. Events may be lost if the sink fails before committing. This provides at-most-once delivery. Throughput is higher because the source does not block on acknowledgements.
Choosing between the two. Whether acknowledgements are enabled depends on the source configuration (e.g., RDS Source enables them by default, HTTP Source does not). Use at-least-once (acknowledgements enabled) when data completeness matters more than avoiding duplicates, such as CDC replication where missing a DELETE would leave stale rows. Use at-most-once (acknowledgements disabled) when occasional data loss is acceptable and throughput is the priority, such as high-volume log ingestion where duplicates or gaps do not affect analytical results.
7.2 Duplicate impact by write mode
| Scenario | Impact |
|---|---|
| Append-only, duplicate INSERT | Duplicate rows exist. Addressable with aggregation queries (DISTINCT, GROUP BY). |
| Duplicate DELETE | No impact on data correctness. Equality delete is idempotent. Extra delete files accumulate until compaction. |
| Duplicate INSERT + subsequent DELETE | No impact. Equality delete removes all matching rows. |
| Duplicate UPDATE (DELETE + INSERT pair) | Temporary duplicate rows until the next UPDATE/DELETE. Addressable with ROW_NUMBER() deduplication in queries. |
7.3 Internal deduplication
Duplicate commits from the same WriteResult (e.g., during leader failover) are prevented by the commit ID recorded in snapshot properties (see Leader failover above).
8. Error handling
Errors are handled differently based on their cause.
Event conversion errors (caused by event content):
| Error | Scope | Action |
|---|---|---|
| Type conversion failure, required column | Per-event | DLQ |
| Required field missing | Per-event | DLQ |
| No fields match schema | Per-event | DLQ |
| Unrecognized operation type | Per-event | DLQ |
| Type conversion failure, optional column | Per-event | Write NULL, continue |
File write errors (caused by infrastructure):
| Error | Scope | Action |
|---|---|---|
| Transient storage failure (network, S3 503) | Per-batch | Retry, then throw exception. Source re-sends. |
| Permanent storage failure (permissions, capacity) | Per-batch | Retry, then throw exception. Source re-sends. |
| OOM (excessive partition count) | Per-node | Crash. Source re-sends after restart. |
Commit errors (caused by infrastructure):
| Error | Scope | Action |
|---|---|---|
| OCC conflict | Per-commit | Iceberg auto-retry (configurable via commit.retry.* table properties) |
| Retry limit exceeded | Per-commit | Retry in next commit cycle |
| Persistent commit failure | Per-commit | EventHandle.release(false). Source re-sends. |
Startup errors:
| Error | Scope | Action |
|---|---|---|
Table does not exist (auto_create is false) |
Pipeline | Pipeline fails to start |
| Catalog unreachable | Pipeline | Pipeline fails to start |
DLQ entries include the original event and the error reason. The plugin uses Data Prepper's standard DlqProvider.
Orphan data files (written but not committed) do not affect data correctness. Cleanup is handled by external maintenance operations (removeOrphanFiles), which is the standard practice across the Iceberg ecosystem.
9. Operational considerations
Compaction. Streaming writes produce many small data files and, when DELETE/UPDATE operations are used, equality delete files. Both degrade read performance over time. Periodic compaction is required. Iceberg provides the rewriteDataFiles action for this purpose, and it can be run from Spark or any engine that supports Iceberg's maintenance operations.
Graceful shutdown. On shutdown, each node closes its TaskWriters and registers the resulting WriteResults in the coordinator. The CommitScheduler attempts a final commit. WriteResults that are not committed remain in the coordinator and are processed after restart. Events whose EventHandles are not released are re-sent by the source.
10. Configuration reference
| Setting | Type | Required | Default | Description |
|---|---|---|---|---|
catalog |
Map<String, String> | Yes | Iceberg catalog properties, passed to CatalogUtil.buildIcebergCatalog(). The type property determines the catalog implementation (rest, glue, hive, jdbc, nessie, hadoop). Any catalog supported by Iceberg can be used. Authentication and storage configuration are part of these properties. |
|
table_identifier |
String | Yes | Table identifier, resolved per event. Supports Data Prepper expression language for dynamic routing (e.g., ${getMetadata("table_name")}). When the value is static, all events are written to a single table. |
|
operation |
String | No | None (all INSERT) | Operation type for each event. Supports Data Prepper expression language (e.g., ${getMetadata("opensearch_action")}). See the DELETE and UPDATE flow section for the default value mapping. |
identifier_columns |
List | No | Table's identifier-field-ids |
Columns used for equality delete. YAML setting takes precedence over table definition. Required when operation is configured and the table does not have identifier-field-ids. |
auto_create |
Boolean | No | false |
Create the table if it does not exist. If schema is provided, uses the definition. Otherwise, infers the schema from the first batch of events. |
schema_evolution |
Boolean | No | false |
When true, automatically add new columns to the table schema when events contain fields not in the current schema. Type is inferred from the event value. |
schema.columns |
List | No | Column definitions for auto-create. Each entry has name (String), type (Iceberg type name), and optional required (Boolean, default false). |
|
schema.partition_spec |
List | No | Partition spec for auto-create. Each entry has column (String) and transform (String, e.g., day, hour, bucket[16], truncate[4]). |
|
commit_interval |
Duration | No | 5m |
Interval between Iceberg commits. |
dlq |
Object | No | Dead letter queue configuration. Uses Data Prepper's standard DlqProvider (e.g., dlq.s3.bucket, dlq.s3.key_path_prefix). |
11. Limitations and future work
Limitations
- CoW write mode is not supported. The plugin uses equality delete (MoR) only.
- Exactly-once delivery is not supported. The plugin provides at-least-once delivery.
- Schema evolution supports column addition only. Column deletion, type changes, and renames are not supported.
- Writing to many partitions simultaneously increases memory consumption, because
PartitionedFanoutWriterkeeps a file writer open for each active partition.
Future enhancements
| Feature | Description |
|---|---|
| Schema registry | Obtain schema from an external schema registry. |
| Write properties passthrough | Override Iceberg writer properties from sink configuration. |
| Commit branch | Write to a specific Iceberg branch for staging/validation workflows. |
| Case-insensitive field matching | Match event fields to table columns ignoring case. |
| CoW write mode | Copy-on-Write mode for maximum read performance. |
| Exactly-once delivery | Record source offsets in snapshot properties for duplicate detection on recovery. |
12. References
Metadata
Metadata
Assignees
Labels
Type
Projects
Status