diff --git a/data-prepper-plugins/iceberg-source/build.gradle b/data-prepper-plugins/iceberg-source/build.gradle new file mode 100644 index 0000000000..33a96fd42e --- /dev/null +++ b/data-prepper-plugins/iceberg-source/build.gradle @@ -0,0 +1,73 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +sourceSets { + integrationTest { + java { + compileClasspath += main.output + test.output + runtimeClasspath += main.output + test.output + srcDir file('src/integrationTest/java') + } + resources.srcDir file('src/integrationTest/resources') + } +} + +configurations { + integrationTestImplementation.extendsFrom testImplementation + integrationTestRuntime.extendsFrom testRuntime +} + +dependencies { + implementation project(':data-prepper-api') + implementation project(':data-prepper-plugins:buffer-common') + implementation project(':data-prepper-plugins:aws-plugin-api') + + implementation 'org.apache.iceberg:iceberg-core:1.10.1' + implementation 'org.apache.iceberg:iceberg-data:1.10.1' + implementation 'org.apache.iceberg:iceberg-parquet:1.10.1' + implementation 'org.apache.iceberg:iceberg-orc:1.10.1' + implementation 'org.apache.iceberg:iceberg-aws:1.10.1' + + implementation libs.parquet.hadoop + implementation libs.avro.core + implementation libs.hadoop.common + implementation 'org.apache.orc:orc-core:1.9.5' + + implementation 'software.amazon.awssdk:glue' + implementation 'software.amazon.awssdk:s3' + implementation 'software.amazon.awssdk:sts' + + implementation 'com.fasterxml.jackson.core:jackson-databind' + + testImplementation project(':data-prepper-test:test-common') + + integrationTestImplementation project(':data-prepper-plugins:in-memory-source-coordination-store') + integrationTestImplementation project(':data-prepper-core') + integrationTestImplementation project(':data-prepper-event') + integrationTestImplementation 'org.awaitility:awaitility:4.2.2' +} + +task integrationTest(type: Test) { + group = 'verification' + testClassesDirs = sourceSets.integrationTest.output.classesDirs + + useJUnitPlatform() + + classpath = sourceSets.integrationTest.runtimeClasspath + systemProperty 'tests.iceberg.rest.uri', System.getProperty('tests.iceberg.rest.uri', 'http://localhost:8181') + systemProperty 'tests.iceberg.s3.endpoint', System.getProperty('tests.iceberg.s3.endpoint', 'http://localhost:8333') + systemProperty 'tests.iceberg.s3.accessKey', System.getProperty('tests.iceberg.s3.accessKey', 'admin') + systemProperty 'tests.iceberg.s3.secretKey', System.getProperty('tests.iceberg.s3.secretKey', 'password') + systemProperty 'tests.iceberg.s3.region', System.getProperty('tests.iceberg.s3.region', 'us-east-1') + + filter { + includeTestsMatching '*IT' + } +} diff --git a/data-prepper-plugins/iceberg-source/docker/docker-compose.yml b/data-prepper-plugins/iceberg-source/docker/docker-compose.yml new file mode 100644 index 0000000000..8445f157e0 --- /dev/null +++ b/data-prepper-plugins/iceberg-source/docker/docker-compose.yml @@ -0,0 +1,59 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. + +services: + seaweedfs: + image: chrislusf/seaweedfs:4.06 + ports: + - "8333:8333" + command: server -s3 -s3.port=8333 -s3.config=/etc/seaweedfs/s3.json -master.port=9333 -volume.port=8080 -filer -filer.port=8888 + configs: + - source: s3-config + target: /etc/seaweedfs/s3.json + healthcheck: + test: ["CMD-SHELL", "curl -sf http://localhost:8333 || exit 1"] + interval: 2s + timeout: 5s + retries: 15 + start_period: 10s + + create-bucket: + image: chrislusf/seaweedfs:4.06 + depends_on: + seaweedfs: + condition: service_healthy + entrypoint: /bin/sh + command: > + -c " + until wget -q --spider http://seaweedfs:8333 2>/dev/null; do sleep 1; done; + /usr/bin/weed shell -master=seaweedfs:9333 -filer=seaweedfs:8888 < + * Requires Docker containers running (see docker/docker-compose.yml): + * docker compose -f docker/docker-compose.yml up -d + *

+ * Run with: + * ./gradlew :data-prepper-plugins:iceberg-source:integrationTest + */ +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.LENIENT) +public class IcebergSourceIT { + + private static final Schema TEST_SCHEMA = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "name", Types.StringType.get()), + Types.NestedField.optional(3, "age", Types.IntegerType.get()) + ); + + private static final String TEST_NAMESPACE = "test_" + UUID.randomUUID().toString().replace("-", "").substring(0, 8); + private static final String TEST_TABLE = "users"; + + private IcebergTestHelper helper; + + @Mock + private PluginMetrics pluginMetrics; + + @Mock + private AcknowledgementSetManager acknowledgementSetManager; + + private final List> receivedRecords = + Collections.synchronizedList(new ArrayList<>()); + + @BeforeEach + void setUp() { + final String restUri = System.getProperty("tests.iceberg.rest.uri", "http://localhost:8181"); + final String s3Endpoint = System.getProperty("tests.iceberg.s3.endpoint", "http://localhost:8333"); + final String accessKey = System.getProperty("tests.iceberg.s3.accessKey", "admin"); + final String secretKey = System.getProperty("tests.iceberg.s3.secretKey", "password"); + final String region = System.getProperty("tests.iceberg.s3.region", "us-east-1"); + + helper = new IcebergTestHelper(restUri, s3Endpoint, accessKey, secretKey, region); + helper.createNamespace(TEST_NAMESPACE); + receivedRecords.clear(); + } + + @AfterEach + void tearDown() { + helper.dropTable(TEST_NAMESPACE, TEST_TABLE); + helper.dropNamespace(TEST_NAMESPACE); + helper.close(); + } + + @Test + void export_writes_all_rows_to_buffer() throws Exception { + // Create table and insert 3 rows + final Table table = helper.createTable(TEST_NAMESPACE, TEST_TABLE, TEST_SCHEMA); + final List rows = List.of( + helper.newRecord(TEST_SCHEMA, 1, "Alice", 30), + helper.newRecord(TEST_SCHEMA, 2, "Bob", 25), + helper.newRecord(TEST_SCHEMA, 3, "Carol", 35) + ); + helper.appendRows(table, rows); + + // Start IcebergSource and wait for events + final IcebergService service = createService(false); + final Buffer> buffer = createMockBuffer(); + service.start(buffer); + + try { + await().atMost(30, TimeUnit.SECONDS) + .untilAsserted(() -> assertThat(receivedRecords, hasSize(greaterThanOrEqualTo(3)))); + + // Verify all 3 rows are INSERT events + assertThat(receivedRecords, hasSize(3)); + for (final org.opensearch.dataprepper.model.record.Record record : receivedRecords) { + final Event event = record.getData(); + assertThat(event.getMetadata().getAttribute("iceberg_operation"), equalTo("INSERT")); + assertThat(event.getMetadata().getAttribute("bulk_action"), equalTo("index")); + } + } finally { + service.shutdown(); + } + } + + @Test + void cdc_insert_produces_insert_events() throws Exception { + final CdcTestFixture fixture = createCdcTestFixture(); + + try { + // Insert new row (new snapshot) + fixture.table.refresh(); + helper.appendRows(fixture.table, List.of( + helper.newRecord(TEST_SCHEMA, 4, "Dave", 40) + )); + + // Wait for CDC event + await().atMost(60, TimeUnit.SECONDS) + .untilAsserted(() -> assertThat(receivedRecords, hasSize(greaterThanOrEqualTo(fixture.afterInitialLoad + 1)))); + + final Event lastEvent = receivedRecords.get(receivedRecords.size() - 1).getData(); + assertThat(lastEvent.getMetadata().getAttribute("iceberg_operation"), equalTo("INSERT")); + assertThat(lastEvent.get("name", String.class), equalTo("Dave")); + } finally { + fixture.service.shutdown(); + } + } + + @Test + void cdc_update_removes_carryover_and_produces_correct_events() throws Exception { + final CdcTestFixture fixture = createCdcTestFixture(); + + try { + // UPDATE: change Bob's age from 25 to 26 (CoW rewrites entire file) + fixture.table.refresh(); + helper.overwriteRows(fixture.table, fixture.initialFile, List.of( + helper.newRecord(TEST_SCHEMA, 1, "Alice", 30), + helper.newRecord(TEST_SCHEMA, 2, "Bob", 26), // changed + helper.newRecord(TEST_SCHEMA, 3, "Carol", 35) + )); + + // Wait for CDC event: carryover for Alice and Carol removed, + // UPDATE pair (DELETE old Bob + INSERT new Bob) merged into single INDEX + await().atMost(60, TimeUnit.SECONDS) + .untilAsserted(() -> assertThat(receivedRecords, hasSize(greaterThanOrEqualTo(fixture.afterInitialLoad + 1)))); + + final List> cdcEvents = + receivedRecords.subList(fixture.afterInitialLoad, receivedRecords.size()); + + assertThat(cdcEvents, hasSize(1)); + + final Event insertEvent = cdcEvents.get(0).getData(); + assertThat(insertEvent.getMetadata().getAttribute("iceberg_operation"), equalTo("INSERT")); + assertThat(insertEvent.getMetadata().getAttribute("bulk_action"), equalTo("index")); + assertThat(insertEvent.get("name", String.class), equalTo("Bob")); + assertThat(insertEvent.get("age", Integer.class), equalTo(26)); + } finally { + fixture.service.shutdown(); + } + } + + @Test + void cdc_delete_produces_delete_event() throws Exception { + final CdcTestFixture fixture = createCdcTestFixture(); + + try { + // DELETE: remove Bob (CoW rewrites file without Bob) + fixture.table.refresh(); + helper.overwriteRows(fixture.table, fixture.initialFile, List.of( + helper.newRecord(TEST_SCHEMA, 1, "Alice", 30), + helper.newRecord(TEST_SCHEMA, 3, "Carol", 35) + )); + + // Wait for CDC event: should be DELETE(Bob) only + // Carryover for Alice and Carol should be removed + await().atMost(60, TimeUnit.SECONDS) + .untilAsserted(() -> assertThat(receivedRecords, hasSize(greaterThanOrEqualTo(fixture.afterInitialLoad + 1)))); + + final List> cdcEvents = + receivedRecords.subList(fixture.afterInitialLoad, receivedRecords.size()); + + assertThat(cdcEvents, hasSize(1)); + + final Event deleteEvent = cdcEvents.get(0).getData(); + assertThat(deleteEvent.getMetadata().getAttribute("iceberg_operation"), equalTo("DELETE")); + assertThat(deleteEvent.getMetadata().getAttribute("bulk_action"), equalTo("delete")); + assertThat(deleteEvent.get("name", String.class), equalTo("Bob")); + } finally { + fixture.service.shutdown(); + } + } + + /** + * Common setup for CDC tests: creates a table with 3 rows (Alice, Bob, Carol), + * starts IcebergService, and waits for initial load to complete. + */ + private CdcTestFixture createCdcTestFixture() throws Exception { + final Table table = helper.createTable(TEST_NAMESPACE, TEST_TABLE, TEST_SCHEMA); + final DataFile initialFile = helper.appendRows(table, List.of( + helper.newRecord(TEST_SCHEMA, 1, "Alice", 30), + helper.newRecord(TEST_SCHEMA, 2, "Bob", 25), + helper.newRecord(TEST_SCHEMA, 3, "Carol", 35) + )); + + final IcebergService service = createService(false); + final Buffer> buffer = createMockBuffer(); + service.start(buffer); + + await().atMost(30, TimeUnit.SECONDS) + .untilAsserted(() -> assertThat(receivedRecords, hasSize(greaterThanOrEqualTo(3)))); + + return new CdcTestFixture(table, initialFile, service, receivedRecords.size()); + } + + private static class CdcTestFixture { + final Table table; + final DataFile initialFile; + final IcebergService service; + final int afterInitialLoad; + + CdcTestFixture(final Table table, final DataFile initialFile, + final IcebergService service, final int afterInitialLoad) { + this.table = table; + this.initialFile = initialFile; + this.service = service; + this.afterInitialLoad = afterInitialLoad; + } + } + + @SuppressWarnings("unchecked") + private Buffer> createMockBuffer() throws Exception { + final Buffer> buffer = mock(Buffer.class); + doAnswer(invocation -> { + final java.util.Collection> records = + invocation.getArgument(0); + receivedRecords.addAll(records); + return null; + }).when(buffer).writeAll(anyCollection(), anyInt()); + + doAnswer(invocation -> { + final org.opensearch.dataprepper.model.record.Record record = invocation.getArgument(0); + receivedRecords.add(record); + return null; + }).when(buffer).write(any(), anyInt()); + + return buffer; + } + + private IcebergService createService(final boolean disableExport) throws Exception { + final String fullTableName = TEST_NAMESPACE + "." + TEST_TABLE; + + // Build config via reflection since fields are private + final IcebergSourceConfig sourceConfig = mock(IcebergSourceConfig.class); + final TableConfig tableConfig = mock(TableConfig.class); + + when(tableConfig.getTableName()).thenReturn(fullTableName); + when(tableConfig.getCatalog()).thenReturn(helper.catalogProperties()); + when(tableConfig.getIdentifierColumns()).thenReturn(List.of("id")); + when(tableConfig.isDisableExport()).thenReturn(disableExport); + + when(sourceConfig.getTables()).thenReturn(List.of(tableConfig)); + when(sourceConfig.getPollingInterval()).thenReturn(Duration.ofSeconds(5)); + lenient().when(sourceConfig.isAcknowledgmentsEnabled()).thenReturn(false); + + final EnhancedSourceCoordinator coordinator = createInMemoryCoordinator(); + coordinator.createPartition(new LeaderPartition()); + + return new IcebergService(coordinator, sourceConfig, pluginMetrics, acknowledgementSetManager); + } + + private EnhancedSourceCoordinator createInMemoryCoordinator() { + final InMemorySourceCoordinationStore store = new InMemorySourceCoordinationStore( + new org.opensearch.dataprepper.model.configuration.PluginSetting("in_memory", Collections.emptyMap())); + final SourceCoordinationConfig coordinationConfig = new SourceCoordinationConfig( + new PluginModel("in_memory", Collections.emptyMap()), null); + final PluginMetrics coordinatorMetrics = PluginMetrics.fromNames("source-coordinator", "iceberg-it"); + final EnhancedLeaseBasedSourceCoordinator coordinator = new EnhancedLeaseBasedSourceCoordinator( + store, coordinationConfig, coordinatorMetrics, "iceberg-it", new PartitionFactory()); + coordinator.initialize(); + return coordinator; + } +} diff --git a/data-prepper-plugins/iceberg-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergTestHelper.java b/data-prepper-plugins/iceberg-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergTestHelper.java new file mode 100644 index 0000000000..000e0e20ed --- /dev/null +++ b/data-prepper-plugins/iceberg-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergTestHelper.java @@ -0,0 +1,170 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.iceberg; + +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.GenericAppenderFactory; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.encryption.PlaintextEncryptionManager; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.rest.RESTCatalog; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +/** + * Helper for integration tests. Uses Iceberg Java API to create tables and write + * actual Parquet files, following the same pattern as Iceberg's own iceberg-data + * test suite (TestWriterMetrics, TestPartitioningWriters, etc.). + */ +public class IcebergTestHelper { + + private final RESTCatalog catalog; + private final String restUri; + private final String s3Endpoint; + private final String accessKey; + private final String secretKey; + private final String region; + + public IcebergTestHelper(final String restUri, + final String s3Endpoint, + final String accessKey, + final String secretKey, + final String region) { + this.restUri = restUri; + this.s3Endpoint = s3Endpoint; + this.accessKey = accessKey; + this.secretKey = secretKey; + this.region = region; + + this.catalog = new RESTCatalog(); + final Map props = new HashMap<>(); + props.put(CatalogProperties.URI, restUri); + props.put(CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.aws.s3.S3FileIO"); + props.put("s3.endpoint", s3Endpoint); + props.put("s3.access-key-id", accessKey); + props.put("s3.secret-access-key", secretKey); + props.put("s3.path-style-access", "true"); + props.put("client.region", region); + this.catalog.initialize("integration-test", props); + } + + public RESTCatalog catalog() { + return catalog; + } + + public Map catalogProperties() { + final Map props = new HashMap<>(); + props.put("type", "rest"); + props.put(CatalogProperties.URI, restUri); + props.put(CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.aws.s3.S3FileIO"); + props.put("s3.endpoint", s3Endpoint); + props.put("s3.access-key-id", accessKey); + props.put("s3.secret-access-key", secretKey); + props.put("s3.path-style-access", "true"); + props.put("client.region", region); + return props; + } + + public void createNamespace(final String namespace) { + catalog.createNamespace(Namespace.of(namespace)); + } + + public void dropNamespace(final String namespace) { + try { + catalog.dropNamespace(Namespace.of(namespace)); + } catch (final Exception e) { + // ignore + } + } + + public Table createTable(final String namespace, final String tableName, final Schema schema) { + return catalog.createTable(TableIdentifier.of(namespace, tableName), schema, PartitionSpec.unpartitioned()); + } + + public void dropTable(final String namespace, final String tableName) { + try { + catalog.dropTable(TableIdentifier.of(namespace, tableName), true); + } catch (final Exception e) { + // ignore + } + } + + /** + * Write records to a Parquet file and return the DataFile metadata. + */ + public DataFile writeDataFile(final Table table, final List records) throws IOException { + final GenericAppenderFactory appenderFactory = new GenericAppenderFactory(table.schema()); + final String filePath = table.location() + "/data/" + UUID.randomUUID() + ".parquet"; + final OutputFile outputFile = table.io().newOutputFile(filePath); + + final DataWriter writer = appenderFactory.newDataWriter( + PlaintextEncryptionManager.instance().encrypt(outputFile), FileFormat.PARQUET, null); + try (writer) { + for (final Record record : records) { + writer.write(record); + } + } + return writer.toDataFile(); + } + + /** + * Append records as a new snapshot. + */ + public DataFile appendRows(final Table table, final List records) throws IOException { + final DataFile dataFile = writeDataFile(table, records); + table.newAppend().appendFile(dataFile).commit(); + return dataFile; + } + + /** + * Simulate a CoW UPDATE or DELETE: remove old data file, add new data file. + * This is how CoW works: the entire data file is rewritten. + */ + public DataFile overwriteRows(final Table table, final DataFile oldDataFile, + final List newRecords) throws IOException { + final DataFile newDataFile = writeDataFile(table, newRecords); + table.newOverwrite() + .deleteFile(oldDataFile) + .addFile(newDataFile) + .commit(); + return newDataFile; + } + + public GenericRecord newRecord(final Schema schema, final Object... values) { + final GenericRecord record = GenericRecord.create(schema); + final List fields = schema.columns(); + for (int i = 0; i < values.length; i++) { + record.set(i, values[i]); + } + return record; + } + + public void close() { + try { + catalog.close(); + } catch (final Exception e) { + // ignore + } + } +} diff --git a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergService.java b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergService.java new file mode 100644 index 0000000000..968d52eec9 --- /dev/null +++ b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergService.java @@ -0,0 +1,126 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.iceberg; + +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.plugins.source.iceberg.leader.LeaderScheduler; +import org.opensearch.dataprepper.plugins.source.iceberg.worker.ChangelogWorker; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +public class IcebergService { + + private static final Logger LOG = LoggerFactory.getLogger(IcebergService.class); + private static final String COW_MODE = "copy-on-write"; + + private final EnhancedSourceCoordinator sourceCoordinator; + private final IcebergSourceConfig sourceConfig; + private final PluginMetrics pluginMetrics; + private final AcknowledgementSetManager acknowledgementSetManager; + private ExecutorService executor; + + public IcebergService(final EnhancedSourceCoordinator sourceCoordinator, + final IcebergSourceConfig sourceConfig, + final PluginMetrics pluginMetrics, + final AcknowledgementSetManager acknowledgementSetManager) { + this.sourceCoordinator = sourceCoordinator; + this.sourceConfig = sourceConfig; + this.pluginMetrics = pluginMetrics; + this.acknowledgementSetManager = acknowledgementSetManager; + } + + public void start(final Buffer> buffer) { + LOG.info("Starting Iceberg service"); + + // Load all tables upfront. Single point of Table lifecycle management. + final Map tables = new HashMap<>(); + final Map tableConfigs = new HashMap<>(); + + for (final TableConfig tableConfig : sourceConfig.getTables()) { + final String tableName = tableConfig.getTableName(); + LOG.info("Loading catalog and table for {}", tableName); + + final Map catalogProps = new HashMap<>(tableConfig.getCatalog()); + final Catalog catalog = CatalogUtil.buildIcebergCatalog(tableName, catalogProps, null); + + final TableIdentifier tableId = TableIdentifier.parse(tableName); + + final Table table = catalog.loadTable(tableId); + validateCoWTable(table, tableName); + + if (tableConfig.getIdentifierColumns().isEmpty()) { + LOG.warn("No identifier_columns configured for table {}. " + + "CDC correctness requires identifier_columns for UPDATE/DELETE support " + + "and idempotent writes.", tableName); + } else { + for (final String col : tableConfig.getIdentifierColumns()) { + if (table.schema().findField(col) == null) { + throw new IllegalArgumentException( + "identifier_columns contains '" + col + "' which does not exist in table " + tableName); + } + } + } + + tables.put(tableName, table); + tableConfigs.put(tableName, tableConfig); + + LOG.info("Loaded table {} (current snapshot: {})", + tableName, + table.currentSnapshot() != null ? table.currentSnapshot().snapshotId() : "none"); + } + + // Start schedulers with shared table references + final List runnableList = new ArrayList<>(); + + runnableList.add(new LeaderScheduler(sourceCoordinator, tableConfigs, sourceConfig.getPollingInterval(), tables)); + runnableList.add(new ChangelogWorker( + sourceCoordinator, sourceConfig, tables, tableConfigs, buffer, acknowledgementSetManager)); + + executor = Executors.newFixedThreadPool(runnableList.size()); + runnableList.forEach(executor::submit); + } + + public void shutdown() { + LOG.info("Shutting down Iceberg service"); + if (executor != null) { + executor.shutdownNow(); + } + } + + private void validateCoWTable(final Table table, final String tableName) { + final String deleteMode = table.properties().getOrDefault("write.delete.mode", COW_MODE); + final String updateMode = table.properties().getOrDefault("write.update.mode", COW_MODE); + final String mergeMode = table.properties().getOrDefault("write.merge.mode", COW_MODE); + + if (!COW_MODE.equals(deleteMode) || !COW_MODE.equals(updateMode) || !COW_MODE.equals(mergeMode)) { + throw new IllegalArgumentException( + "Table " + tableName + " uses Merge-on-Read (delete.mode=" + deleteMode + + ", update.mode=" + updateMode + ", merge.mode=" + mergeMode + + "). Only Copy-on-Write tables are supported."); + } + } +} diff --git a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSource.java b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSource.java new file mode 100644 index 0000000000..2f25770503 --- /dev/null +++ b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSource.java @@ -0,0 +1,90 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.iceberg; + +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.annotations.Experimental; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.source.Source; +import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.UsesEnhancedSourceCoordination; +import org.opensearch.dataprepper.plugins.source.iceberg.coordination.PartitionFactory; +import org.opensearch.dataprepper.plugins.source.iceberg.coordination.partition.LeaderPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Objects; +import java.util.function.Function; + +@Experimental +@DataPrepperPlugin(name = "iceberg", pluginType = Source.class, pluginConfigurationType = IcebergSourceConfig.class) +public class IcebergSource implements Source>, UsesEnhancedSourceCoordination { + + private static final Logger LOG = LoggerFactory.getLogger(IcebergSource.class); + + private final IcebergSourceConfig sourceConfig; + private final PluginMetrics pluginMetrics; + private final AcknowledgementSetManager acknowledgementSetManager; + private EnhancedSourceCoordinator sourceCoordinator; + private IcebergService icebergService; + + @DataPrepperPluginConstructor + public IcebergSource(final IcebergSourceConfig sourceConfig, + final PluginMetrics pluginMetrics, + final AcknowledgementSetManager acknowledgementSetManager) { + this.sourceConfig = sourceConfig; + this.pluginMetrics = pluginMetrics; + this.acknowledgementSetManager = acknowledgementSetManager; + LOG.info("Creating Iceberg Source for {} table(s)", sourceConfig.getTables().size()); + } + + @Override + public void start(final Buffer> buffer) { + LOG.info("Starting Iceberg Source"); + Objects.requireNonNull(sourceCoordinator); + + sourceCoordinator.createPartition(new LeaderPartition()); + + icebergService = new IcebergService(sourceCoordinator, sourceConfig, pluginMetrics, acknowledgementSetManager); + icebergService.start(buffer); + } + + @Override + public void stop() { + LOG.info("Stopping Iceberg Source"); + if (Objects.nonNull(icebergService)) { + icebergService.shutdown(); + } + } + + @Override + public boolean areAcknowledgementsEnabled() { + return sourceConfig.isAcknowledgmentsEnabled(); + } + + @Override + public void setEnhancedSourceCoordinator(final EnhancedSourceCoordinator sourceCoordinator) { + this.sourceCoordinator = sourceCoordinator; + this.sourceCoordinator.initialize(); + } + + @Override + public Function getPartitionFactory() { + return new PartitionFactory(); + } +} diff --git a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSourceConfig.java b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSourceConfig.java new file mode 100644 index 0000000000..0dac830aed --- /dev/null +++ b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSourceConfig.java @@ -0,0 +1,46 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.iceberg; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.Valid; +import jakarta.validation.constraints.NotEmpty; + +import java.time.Duration; +import java.util.List; + +public class IcebergSourceConfig { + + static final Duration DEFAULT_POLLING_INTERVAL = Duration.ofSeconds(30); + + @JsonProperty("tables") + @NotEmpty + @Valid + private List tables; + + @JsonProperty("polling_interval") + private Duration pollingInterval = DEFAULT_POLLING_INTERVAL; + + @JsonProperty("acknowledgments") + private boolean acknowledgments = true; + + public List getTables() { + return tables; + } + + public Duration getPollingInterval() { + return pollingInterval; + } + + public boolean isAcknowledgmentsEnabled() { + return acknowledgments; + } +} diff --git a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/TableConfig.java b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/TableConfig.java new file mode 100644 index 0000000000..5a3873f32d --- /dev/null +++ b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/TableConfig.java @@ -0,0 +1,48 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.iceberg; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.NotBlank; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public class TableConfig { + + @JsonProperty("table_name") + @NotBlank + private String tableName; + + @JsonProperty("catalog") + private Map catalog = Collections.emptyMap(); + + @JsonProperty("identifier_columns") + private List identifierColumns = Collections.emptyList(); + + @JsonProperty("disable_export") + private boolean disableExport = false; + + public String getTableName() { + return tableName; + } + + public Map getCatalog() { + return catalog; + } + + public List getIdentifierColumns() { + return identifierColumns; + } + + public boolean isDisableExport() { return disableExport; } +} diff --git a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/coordination/PartitionFactory.java b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/coordination/PartitionFactory.java new file mode 100644 index 0000000000..4484b90acc --- /dev/null +++ b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/coordination/PartitionFactory.java @@ -0,0 +1,39 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.iceberg.coordination; + +import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; +import org.opensearch.dataprepper.plugins.source.iceberg.coordination.partition.ChangelogTaskPartition; +import org.opensearch.dataprepper.plugins.source.iceberg.coordination.partition.GlobalState; +import org.opensearch.dataprepper.plugins.source.iceberg.coordination.partition.InitialLoadTaskPartition; +import org.opensearch.dataprepper.plugins.source.iceberg.coordination.partition.LeaderPartition; + +import java.util.function.Function; + +public class PartitionFactory implements Function { + + @Override + public EnhancedSourcePartition apply(final SourcePartitionStoreItem partitionStoreItem) { + final String sourceIdentifier = partitionStoreItem.getSourceIdentifier(); + final String partitionType = sourceIdentifier.substring(sourceIdentifier.lastIndexOf('|') + 1); + + if (LeaderPartition.PARTITION_TYPE.equals(partitionType)) { + return new LeaderPartition(partitionStoreItem); + } else if (ChangelogTaskPartition.PARTITION_TYPE.equals(partitionType)) { + return new ChangelogTaskPartition(partitionStoreItem); + } else if (InitialLoadTaskPartition.PARTITION_TYPE.equals(partitionType)) { + return new InitialLoadTaskPartition(partitionStoreItem); + } else { + return new GlobalState(partitionStoreItem); + } + } +} diff --git a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/coordination/partition/ChangelogTaskPartition.java b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/coordination/partition/ChangelogTaskPartition.java new file mode 100644 index 0000000000..8b946612af --- /dev/null +++ b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/coordination/partition/ChangelogTaskPartition.java @@ -0,0 +1,51 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.iceberg.coordination.partition; + +import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; +import org.opensearch.dataprepper.plugins.source.iceberg.coordination.state.ChangelogTaskProgressState; + +import java.util.Optional; + +public class ChangelogTaskPartition extends EnhancedSourcePartition { + + public static final String PARTITION_TYPE = "CHANGELOG_TASK"; + + private final String partitionKey; + private final ChangelogTaskProgressState state; + + public ChangelogTaskPartition(final String partitionKey, final ChangelogTaskProgressState state) { + this.partitionKey = partitionKey; + this.state = state; + } + + public ChangelogTaskPartition(final SourcePartitionStoreItem sourcePartitionStoreItem) { + setSourcePartitionStoreItem(sourcePartitionStoreItem); + this.partitionKey = sourcePartitionStoreItem.getSourcePartitionKey(); + this.state = convertStringToPartitionProgressState(ChangelogTaskProgressState.class, sourcePartitionStoreItem.getPartitionProgressState()); + } + + @Override + public String getPartitionType() { + return PARTITION_TYPE; + } + + @Override + public String getPartitionKey() { + return partitionKey; + } + + @Override + public Optional getProgressState() { + return Optional.of(state); + } +} diff --git a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/coordination/partition/GlobalState.java b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/coordination/partition/GlobalState.java new file mode 100644 index 0000000000..94d232809b --- /dev/null +++ b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/coordination/partition/GlobalState.java @@ -0,0 +1,56 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.iceberg.coordination.partition; + +import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; + +import java.util.Map; +import java.util.Optional; + +public class GlobalState extends EnhancedSourcePartition> { + + private final String stateName; + private Map state; + + public GlobalState(final String stateName, final Map state) { + this.stateName = stateName; + this.state = state; + } + + public GlobalState(final SourcePartitionStoreItem sourcePartitionStoreItem) { + setSourcePartitionStoreItem(sourcePartitionStoreItem); + this.stateName = sourcePartitionStoreItem.getSourcePartitionKey(); + this.state = convertStringToPartitionProgressState(null, sourcePartitionStoreItem.getPartitionProgressState()); + } + + @Override + public String getPartitionType() { + return null; + } + + @Override + public String getPartitionKey() { + return stateName; + } + + @Override + public Optional> getProgressState() { + if (state != null) { + return Optional.of(state); + } + return Optional.empty(); + } + + public void setProgressState(final Map state) { + this.state = state; + } +} diff --git a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/coordination/partition/InitialLoadTaskPartition.java b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/coordination/partition/InitialLoadTaskPartition.java new file mode 100644 index 0000000000..ff0114d104 --- /dev/null +++ b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/coordination/partition/InitialLoadTaskPartition.java @@ -0,0 +1,52 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.iceberg.coordination.partition; + +import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; +import org.opensearch.dataprepper.plugins.source.iceberg.coordination.state.InitialLoadTaskProgressState; + +import java.util.Optional; + +public class InitialLoadTaskPartition extends EnhancedSourcePartition { + + public static final String PARTITION_TYPE = "INITIAL_LOAD_TASK"; + + private final String partitionKey; + private final InitialLoadTaskProgressState state; + + public InitialLoadTaskPartition(final String partitionKey, final InitialLoadTaskProgressState state) { + this.partitionKey = partitionKey; + this.state = state; + } + + public InitialLoadTaskPartition(final SourcePartitionStoreItem sourcePartitionStoreItem) { + setSourcePartitionStoreItem(sourcePartitionStoreItem); + this.partitionKey = sourcePartitionStoreItem.getSourcePartitionKey(); + this.state = convertStringToPartitionProgressState( + InitialLoadTaskProgressState.class, sourcePartitionStoreItem.getPartitionProgressState()); + } + + @Override + public String getPartitionType() { + return PARTITION_TYPE; + } + + @Override + public String getPartitionKey() { + return partitionKey; + } + + @Override + public Optional getProgressState() { + return Optional.of(state); + } +} diff --git a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/coordination/partition/LeaderPartition.java b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/coordination/partition/LeaderPartition.java new file mode 100644 index 0000000000..eaf6c2c553 --- /dev/null +++ b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/coordination/partition/LeaderPartition.java @@ -0,0 +1,50 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.iceberg.coordination.partition; + +import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; +import org.opensearch.dataprepper.plugins.source.iceberg.coordination.state.LeaderProgressState; + +import java.util.Optional; + +public class LeaderPartition extends EnhancedSourcePartition { + + public static final String PARTITION_TYPE = "LEADER"; + private static final String DEFAULT_PARTITION_KEY = "GLOBAL"; + + private final LeaderProgressState state; + + public LeaderPartition() { + this.state = new LeaderProgressState(); + this.state.setInitialized(false); + } + + public LeaderPartition(final SourcePartitionStoreItem sourcePartitionStoreItem) { + setSourcePartitionStoreItem(sourcePartitionStoreItem); + this.state = convertStringToPartitionProgressState(LeaderProgressState.class, sourcePartitionStoreItem.getPartitionProgressState()); + } + + @Override + public String getPartitionType() { + return PARTITION_TYPE; + } + + @Override + public String getPartitionKey() { + return DEFAULT_PARTITION_KEY; + } + + @Override + public Optional getProgressState() { + return Optional.of(state); + } +} diff --git a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/coordination/state/ChangelogTaskProgressState.java b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/coordination/state/ChangelogTaskProgressState.java new file mode 100644 index 0000000000..d4300f6364 --- /dev/null +++ b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/coordination/state/ChangelogTaskProgressState.java @@ -0,0 +1,73 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.iceberg.coordination.state; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; + +public class ChangelogTaskProgressState { + + @JsonProperty("snapshotId") + private long snapshotId; + + @JsonProperty("tableName") + private String tableName; + + @JsonProperty("totalRecords") + private long totalRecords; + + @JsonProperty("dataFilePaths") + private List dataFilePaths; + + @JsonProperty("taskTypes") + private List taskTypes; + + public long getSnapshotId() { + return snapshotId; + } + + public void setSnapshotId(final long snapshotId) { + this.snapshotId = snapshotId; + } + + public String getTableName() { + return tableName; + } + + public void setTableName(final String tableName) { + this.tableName = tableName; + } + + public long getTotalRecords() { + return totalRecords; + } + + public void setTotalRecords(final long totalRecords) { + this.totalRecords = totalRecords; + } + + public List getDataFilePaths() { + return dataFilePaths; + } + + public void setDataFilePaths(final List dataFilePaths) { + this.dataFilePaths = dataFilePaths; + } + + public List getTaskTypes() { + return taskTypes; + } + + public void setTaskTypes(final List taskTypes) { + this.taskTypes = taskTypes; + } +} diff --git a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/coordination/state/InitialLoadTaskProgressState.java b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/coordination/state/InitialLoadTaskProgressState.java new file mode 100644 index 0000000000..31c190fbf4 --- /dev/null +++ b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/coordination/state/InitialLoadTaskProgressState.java @@ -0,0 +1,60 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.iceberg.coordination.state; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class InitialLoadTaskProgressState { + + @JsonProperty("snapshotId") + private long snapshotId; + + @JsonProperty("tableName") + private String tableName; + + @JsonProperty("dataFilePath") + private String dataFilePath; + + @JsonProperty("totalRecords") + private long totalRecords; + + public long getSnapshotId() { + return snapshotId; + } + + public void setSnapshotId(final long snapshotId) { + this.snapshotId = snapshotId; + } + + public String getTableName() { + return tableName; + } + + public void setTableName(final String tableName) { + this.tableName = tableName; + } + + public String getDataFilePath() { + return dataFilePath; + } + + public void setDataFilePath(final String dataFilePath) { + this.dataFilePath = dataFilePath; + } + + public long getTotalRecords() { + return totalRecords; + } + + public void setTotalRecords(final long totalRecords) { + this.totalRecords = totalRecords; + } +} diff --git a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/coordination/state/LeaderProgressState.java b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/coordination/state/LeaderProgressState.java new file mode 100644 index 0000000000..06cc590b2a --- /dev/null +++ b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/coordination/state/LeaderProgressState.java @@ -0,0 +1,38 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.iceberg.coordination.state; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class LeaderProgressState { + + @JsonProperty("initialized") + private boolean initialized; + + @JsonProperty("lastProcessedSnapshotId") + private Long lastProcessedSnapshotId; + + public boolean isInitialized() { + return initialized; + } + + public void setInitialized(final boolean initialized) { + this.initialized = initialized; + } + + public Long getLastProcessedSnapshotId() { + return lastProcessedSnapshotId; + } + + public void setLastProcessedSnapshotId(final Long lastProcessedSnapshotId) { + this.lastProcessedSnapshotId = lastProcessedSnapshotId; + } +} diff --git a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/leader/LeaderScheduler.java b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/leader/LeaderScheduler.java new file mode 100644 index 0000000000..b432386832 --- /dev/null +++ b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/leader/LeaderScheduler.java @@ -0,0 +1,307 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.iceberg.leader; + +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableScan; +import org.apache.iceberg.io.CloseableIterable; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; +import org.opensearch.dataprepper.plugins.source.iceberg.TableConfig; +import org.opensearch.dataprepper.plugins.source.iceberg.coordination.partition.ChangelogTaskPartition; +import org.opensearch.dataprepper.plugins.source.iceberg.coordination.partition.GlobalState; +import org.opensearch.dataprepper.plugins.source.iceberg.coordination.partition.InitialLoadTaskPartition; +import org.opensearch.dataprepper.plugins.source.iceberg.coordination.partition.LeaderPartition; +import org.opensearch.dataprepper.plugins.source.iceberg.coordination.state.ChangelogTaskProgressState; +import org.opensearch.dataprepper.plugins.source.iceberg.coordination.state.InitialLoadTaskProgressState; +import org.opensearch.dataprepper.plugins.source.iceberg.coordination.state.LeaderProgressState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; + +public class LeaderScheduler implements Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(LeaderScheduler.class); + private static final Duration DEFAULT_EXTEND_LEASE_DURATION = Duration.ofMinutes(3); + private static final Duration COMPLETION_CHECK_INTERVAL = Duration.ofSeconds(2); + static final String SNAPSHOT_COMPLETION_PREFIX = "snapshot-completion-"; + + private final EnhancedSourceCoordinator sourceCoordinator; + private final Map tableConfigs; + private final Duration pollingInterval; + private final Map tables; + private final TaskGrouper taskGrouper = new TaskGrouper(); + private LeaderPartition leaderPartition; + + public LeaderScheduler(final EnhancedSourceCoordinator sourceCoordinator, + final Map tableConfigs, + final Duration pollingInterval, + final Map tables) { + this.sourceCoordinator = sourceCoordinator; + this.tableConfigs = tableConfigs; + this.pollingInterval = pollingInterval; + this.tables = tables; + } + + @Override + public void run() { + LOG.info("Starting Iceberg Leader Scheduler"); + + while (!Thread.currentThread().isInterrupted()) { + try { + if (leaderPartition == null) { + final Optional sourcePartition = + sourceCoordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE); + if (sourcePartition.isPresent()) { + LOG.info("Running as LEADER node"); + leaderPartition = (LeaderPartition) sourcePartition.get(); + } + } + + if (leaderPartition != null) { + final LeaderProgressState progressState = leaderPartition.getProgressState().orElseThrow(); + if (!progressState.isInitialized()) { + performInitialLoad(); + progressState.setInitialized(true); + LOG.info("Leader initialized"); + } + pollAndPlan(); + } + } catch (final Exception e) { + LOG.error("Exception in leader scheduling loop", e); + } finally { + if (leaderPartition != null) { + try { + sourceCoordinator.saveProgressStateForPartition( + leaderPartition, DEFAULT_EXTEND_LEASE_DURATION); + } catch (final Exception e) { + LOG.error("Failed to extend leader lease, will attempt to reacquire"); + leaderPartition = null; + } + } + try { + Thread.sleep(pollingInterval.toMillis()); + } catch (final InterruptedException e) { + LOG.info("Leader scheduler interrupted"); + break; + } + } + } + + LOG.warn("Quitting Leader Scheduler"); + if (leaderPartition != null) { + sourceCoordinator.giveUpPartition(leaderPartition); + } + } + + private void performInitialLoad() { + final LeaderProgressState progressState = leaderPartition.getProgressState().orElseThrow(); + + for (final Map.Entry entry : tables.entrySet()) { + final String tableName = entry.getKey(); + final Table table = entry.getValue(); + final TableConfig tableConfig = tableConfigs.get(tableName); + + if (tableConfig.isDisableExport()) { + LOG.info("Skipping initial Export for table {} (isDisableExport=true)", tableName); + continue; + } + + table.refresh(); + if (table.currentSnapshot() == null) { + LOG.info("No snapshot for table {}, skipping initial load", tableName); + continue; + } + + final long snapshotId = table.currentSnapshot().snapshotId(); + LOG.info("Starting initial load for table {} at snapshot {}", tableName, snapshotId); + + final TableScan scan = table.newScan().useSnapshot(snapshotId); + int taskCount = 0; + + try (CloseableIterable tasks = scan.planFiles()) { + for (final FileScanTask task : tasks) { + final InitialLoadTaskProgressState taskState = new InitialLoadTaskProgressState(); + taskState.setSnapshotId(snapshotId); + taskState.setTableName(tableName); + taskState.setDataFilePath(task.file().location()); + taskState.setTotalRecords(task.file().recordCount()); + + final String partitionKey = tableName + "|initial|" + UUID.randomUUID(); + sourceCoordinator.createPartition(new InitialLoadTaskPartition(partitionKey, taskState)); + taskCount++; + } + } catch (final java.io.IOException e) { + throw new RuntimeException("Failed to plan initial load for " + tableName, e); + } + + LOG.info("Created {} initial load partition(s) for table {} snapshot {}", + taskCount, tableName, snapshotId); + + // Wait for all initial load partitions to complete + final String completionKey = SNAPSHOT_COMPLETION_PREFIX + "initial-" + snapshotId; + sourceCoordinator.createPartition(new GlobalState(completionKey, + Map.of("total", taskCount, "completed", 0))); + waitForSnapshotComplete(completionKey, taskCount); + + // Set lastProcessedSnapshotId so CDC starts from this snapshot + progressState.setLastProcessedSnapshotId(snapshotId); + sourceCoordinator.saveProgressStateForPartition(leaderPartition, DEFAULT_EXTEND_LEASE_DURATION); + + LOG.info("Initial load completed for table {} at snapshot {}", tableName, snapshotId); + } + } + + private void pollAndPlan() { + final LeaderProgressState progressState = leaderPartition.getProgressState().orElseThrow(); + + for (final Map.Entry entry : tables.entrySet()) { + final String tableName = entry.getKey(); + final Table table = entry.getValue(); + + table.refresh(); + + if (table.currentSnapshot() == null) { + continue; + } + + final long currentSnapshotId = table.currentSnapshot().snapshotId(); + final Long lastProcessedId = progressState.getLastProcessedSnapshotId(); + + if (lastProcessedId != null && lastProcessedId.equals(currentSnapshotId)) { + continue; + } + + final List snapshotsToProcess = getSnapshotsBetween( + table, lastProcessedId, currentSnapshotId); + + if (snapshotsToProcess.isEmpty()) { + progressState.setLastProcessedSnapshotId(currentSnapshotId); + continue; + } + + LOG.info("Processing {} snapshot(s) for table {} ({} -> {})", + snapshotsToProcess.size(), tableName, lastProcessedId, currentSnapshotId); + + for (final Snapshot snapshot : snapshotsToProcess) { + if (Thread.currentThread().isInterrupted()) { + break; + } + + final long parentId = snapshot.parentId() != null ? snapshot.parentId() : -1; + if (parentId == -1) { + LOG.info("First snapshot {} for {}, skipping (use initial_load)", + snapshot.snapshotId(), tableName); + progressState.setLastProcessedSnapshotId(snapshot.snapshotId()); + continue; + } + + if ("replace".equals(snapshot.operation())) { + LOG.debug("Skipping REPLACE snapshot {} for {}", + snapshot.snapshotId(), tableName); + progressState.setLastProcessedSnapshotId(snapshot.snapshotId()); + continue; + } + + LOG.info("Planning snapshot {} for table {} (operation: {})", + snapshot.snapshotId(), tableName, snapshot.operation()); + + final List taskGroups = + taskGrouper.planAndGroup(table, tableName, parentId, snapshot.snapshotId()); + + if (taskGroups.isEmpty()) { + progressState.setLastProcessedSnapshotId(snapshot.snapshotId()); + continue; + } + + // Create a completion tracker in GlobalState + final String completionKey = SNAPSHOT_COMPLETION_PREFIX + snapshot.snapshotId(); + sourceCoordinator.createPartition(new GlobalState(completionKey, + Map.of("total", taskGroups.size(), "completed", 0))); + + for (final ChangelogTaskProgressState taskState : taskGroups) { + final String partitionKey = tableName + "|" + snapshot.snapshotId() + + "|" + UUID.randomUUID(); + sourceCoordinator.createPartition( + new ChangelogTaskPartition(partitionKey, taskState)); + } + + LOG.info("Created {} partition(s) for snapshot {}, waiting for completion", + taskGroups.size(), snapshot.snapshotId()); + + waitForSnapshotComplete(completionKey, taskGroups.size()); + + progressState.setLastProcessedSnapshotId(snapshot.snapshotId()); + sourceCoordinator.saveProgressStateForPartition( + leaderPartition, DEFAULT_EXTEND_LEASE_DURATION); + + LOG.info("Snapshot {} completed for table {}", snapshot.snapshotId(), tableName); + } + } + } + + private List getSnapshotsBetween(final Table table, + final Long fromExclusive, + final long toInclusive) { + final List result = new ArrayList<>(); + Snapshot current = table.snapshot(toInclusive); + while (current != null) { + if (fromExclusive != null && fromExclusive.equals(current.snapshotId())) { + break; + } + result.add(0, current); + if (current.parentId() == null) { + break; + } + current = table.snapshot(current.parentId()); + } + return result; + } + + private void waitForSnapshotComplete(final String completionKey, final int totalPartitions) { + while (!Thread.currentThread().isInterrupted()) { + final Optional state = + sourceCoordinator.getPartition(completionKey); + + if (state.isPresent()) { + final GlobalState gs = (GlobalState) state.get(); + final Map progress = gs.getProgressState().orElse(Map.of()); + final int completed = ((Number) progress.getOrDefault("completed", 0)).intValue(); + if (completed >= totalPartitions) { + return; + } + LOG.debug("Waiting for snapshot completion: {}/{}", completed, totalPartitions); + } + + try { + sourceCoordinator.saveProgressStateForPartition( + leaderPartition, DEFAULT_EXTEND_LEASE_DURATION); + } catch (final Exception e) { + LOG.warn("Failed to extend lease while waiting", e); + } + + try { + Thread.sleep(COMPLETION_CHECK_INTERVAL.toMillis()); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + } + } +} diff --git a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/leader/TaskGrouper.java b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/leader/TaskGrouper.java new file mode 100644 index 0000000000..56fe5eb1f0 --- /dev/null +++ b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/leader/TaskGrouper.java @@ -0,0 +1,245 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.iceberg.leader; + +import org.apache.iceberg.AddedRowsScanTask; +import org.apache.iceberg.ChangelogScanTask; +import org.apache.iceberg.ContentScanTask; +import org.apache.iceberg.DeletedDataFileScanTask; +import org.apache.iceberg.IncrementalChangelogScan; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.io.CloseableIterable; +import org.opensearch.dataprepper.plugins.source.iceberg.coordination.state.ChangelogTaskProgressState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Groups ChangelogScanTasks for distributed processing. + *

+ * Grouping stages: + * 1. Iceberg partition isolation: tasks grouped by partition value + * 2. Bounds-based pairing: within a partition, DELETED-ADDED file pairs with matching + * bounds are split into individual groups for maximum distribution + */ +public class TaskGrouper { + + private static final Logger LOG = LoggerFactory.getLogger(TaskGrouper.class); + private static final String UNPARTITIONED_KEY = "__unpartitioned__"; + + public List planAndGroup( + final Table table, + final String tableName, + final long fromSnapshotIdExclusive, + final long toSnapshotId) { + + final IncrementalChangelogScan scan = table.newIncrementalChangelogScan() + .fromSnapshotExclusive(fromSnapshotIdExclusive) + .toSnapshot(toSnapshotId) + .includeColumnStats(); + + // Stage 1: Group by Iceberg partition + final Map> tasksByPartition = new HashMap<>(); + + try (CloseableIterable tasks = scan.planFiles()) { + for (final ChangelogScanTask task : tasks) { + final String partitionKey = extractPartitionKey(task); + tasksByPartition + .computeIfAbsent(partitionKey, k -> new ArrayList<>()) + .add(TaskInfo.from(task)); + } + } catch (final IOException e) { + throw new RuntimeException("Failed to plan changelog scan for " + tableName, e); + } + + // Stage 2: Within each partition, attempt bounds-based pairing + final List result = new ArrayList<>(); + + for (final Map.Entry> entry : tasksByPartition.entrySet()) { + final List partitionTasks = entry.getValue(); + final List> groups = pairByBounds(partitionTasks); + + for (final List group : groups) { + final ChangelogTaskProgressState state = new ChangelogTaskProgressState(); + state.setSnapshotId(toSnapshotId); + state.setTableName(tableName); + state.setDataFilePaths(group.stream().map(t -> t.filePath).collect(Collectors.toList())); + state.setTaskTypes(group.stream().map(t -> t.taskType).collect(Collectors.toList())); + state.setTotalRecords(group.stream().mapToLong(t -> t.recordCount).sum()); + result.add(state); + } + } + + LOG.info("Planned {} task group(s) for table {} (snapshot {} -> {})", + result.size(), tableName, fromSnapshotIdExclusive, toSnapshotId); + return result; + } + + /** + * Pairs DELETED and ADDED tasks by matching lower/upper bounds. + * Returns a list of groups, where each group is a list of tasks to process together. + */ + List> pairByBounds(final List tasks) { + final List deleted = new ArrayList<>(); + final List added = new ArrayList<>(); + + for (final TaskInfo task : tasks) { + if ("DELETED".equals(task.taskType)) { + deleted.add(task); + } else { + added.add(task); + } + } + + // If no deleted files, each added file is an independent group (INSERT only) + if (deleted.isEmpty()) { + final List> result = new ArrayList<>(); + for (final TaskInfo task : added) { + result.add(List.of(task)); + } + return result; + } + + // If no added files, each deleted file is an independent group (full-file delete) + if (added.isEmpty()) { + final List> result = new ArrayList<>(); + for (final TaskInfo task : deleted) { + result.add(List.of(task)); + } + return result; + } + + // Try to pair DELETED-ADDED by bounds + final List> paired = new ArrayList<>(); + final List unpairedDeleted = new ArrayList<>(deleted); + final List unpairedAdded = new ArrayList<>(added); + + final Iterator delIter = unpairedDeleted.iterator(); + while (delIter.hasNext()) { + final TaskInfo del = delIter.next(); + TaskInfo matchedAdd = null; + int matchCount = 0; + + for (final TaskInfo add : unpairedAdded) { + if (boundsMatch(del, add)) { + matchedAdd = add; + matchCount++; + } + } + + // Only pair if exactly one match (unambiguous) + if (matchCount == 1) { + paired.add(List.of(del, matchedAdd)); + delIter.remove(); + unpairedAdded.remove(matchedAdd); + } + } + + // Unpaired DELETED-only or ADDED-only -> individual groups + for (final TaskInfo task : unpairedDeleted) { + if (unpairedAdded.isEmpty()) { + paired.add(List.of(task)); + } + } + for (final TaskInfo task : unpairedAdded) { + if (unpairedDeleted.isEmpty()) { + paired.add(List.of(task)); + } + } + + // If both unpaired DELETED and ADDED remain -> single fallback group. + // These must stay together so that carryover removal can match DELETE-INSERT + // pairs from different files on the same worker node. + // TODO: A Source-layer shuffle via PeerForwarder extension could distribute + // these rows by hash of all columns, enabling parallel carryover removal. + // See RFC #6552 Section 5.2.4 for details. + if (!unpairedDeleted.isEmpty() && !unpairedAdded.isEmpty()) { + final List fallback = new ArrayList<>(); + fallback.addAll(unpairedDeleted); + fallback.addAll(unpairedAdded); + paired.add(fallback); + } + + return paired; + } + + private boolean boundsMatch(final TaskInfo a, final TaskInfo b) { + if (a.boundsKey == null || b.boundsKey == null) { + return false; + } + return a.boundsKey.equals(b.boundsKey); + } + + private String extractPartitionKey(final ChangelogScanTask task) { + if (task instanceof ContentScanTask) { + final StructLike partition = ((ContentScanTask) task).file().partition(); + if (partition.size() == 0) { + return UNPARTITIONED_KEY; + } + final StringBuilder sb = new StringBuilder(); + for (int i = 0; i < partition.size(); i++) { + if (i > 0) { + sb.append("|"); + } + sb.append(partition.get(i, Object.class)); + } + return sb.toString(); + } + return UNPARTITIONED_KEY; + } + + static class TaskInfo { + final String filePath; + final String taskType; + final long recordCount; + final String boundsKey; // serialized lower+upper bounds for pairing + + TaskInfo(final String filePath, final String taskType, + final long recordCount, final String boundsKey) { + this.filePath = filePath; + this.taskType = taskType; + this.recordCount = recordCount; + this.boundsKey = boundsKey; + } + + static TaskInfo from(final ChangelogScanTask task) { + if (task instanceof AddedRowsScanTask) { + final AddedRowsScanTask t = (AddedRowsScanTask) task; + return new TaskInfo( + t.file().location(), "ADDED", + t.file().recordCount(), extractBoundsKey(t)); + } else if (task instanceof DeletedDataFileScanTask) { + final DeletedDataFileScanTask t = (DeletedDataFileScanTask) task; + return new TaskInfo( + t.file().location(), "DELETED", + t.file().recordCount(), extractBoundsKey(t)); + } + throw new IllegalArgumentException("Unsupported ChangelogScanTask type: " + task.getClass().getName()); + } + + private static String extractBoundsKey(final ContentScanTask task) { + final var lower = task.file().lowerBounds(); + final var upper = task.file().upperBounds(); + if (lower == null || upper == null || lower.isEmpty() || upper.isEmpty()) { + return null; + } + return lower + "|" + upper; + } + } +} diff --git a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/worker/CarryoverRemover.java b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/worker/CarryoverRemover.java new file mode 100644 index 0000000000..aa0f944d67 --- /dev/null +++ b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/worker/CarryoverRemover.java @@ -0,0 +1,155 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.iceberg.worker; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Objects; + +/** + * Removes carryover rows from a list of changelog rows. + * Carryover = identical DELETE + INSERT pairs produced by CoW file rewrites. + * + * Algorithm (aligned with Spark's RemoveCarryoverIterator): + * 1. Sort all rows by data columns, with DELETE before INSERT for equal rows + * 2. Walk the sorted list, track consecutive DELETE count, cancel with matching INSERTs + */ +public class CarryoverRemover { + + public static class ChangelogRow { + private final List dataColumns; + private final String operation; + private final int originalIndex; + + public ChangelogRow(final List dataColumns, final String operation, final int originalIndex) { + this.dataColumns = dataColumns; + this.operation = operation; + this.originalIndex = originalIndex; + } + + public List getDataColumns() { + return dataColumns; + } + + public String getOperation() { + return operation; + } + + public int getOriginalIndex() { + return originalIndex; + } + } + + /** + * Removes carryover pairs from the given rows. + * Returns the indices (in the original list) of rows that are NOT carryover. + */ + public List removeCarryover(final List rows) { + if (rows.isEmpty()) { + return List.of(); + } + + rows.sort(Comparator + .comparing(r -> new ComparableColumns(r.getDataColumns())) + .thenComparing(r -> "DELETE".equals(r.getOperation()) ? 0 : 1)); + + final List result = new ArrayList<>(); + int i = 0; + while (i < rows.size()) { + final ChangelogRow current = rows.get(i); + if (!"DELETE".equals(current.getOperation())) { + result.add(current.getOriginalIndex()); + i++; + continue; + } + + // Current row is DELETE. Count consecutive identical DELETE rows. + int deleteCount = 1; + int j = i + 1; + while (j < rows.size() + && "DELETE".equals(rows.get(j).getOperation()) + && dataColumnsEqual(current.getDataColumns(), rows.get(j).getDataColumns())) { + deleteCount++; + j++; + } + + // Cancel DELETE rows with matching INSERT rows + while (j < rows.size() + && "INSERT".equals(rows.get(j).getOperation()) + && dataColumnsEqual(current.getDataColumns(), rows.get(j).getDataColumns()) + && deleteCount > 0) { + deleteCount--; + j++; + } + + // Emit remaining uncancelled DELETE rows + for (int k = 0; k < deleteCount; k++) { + result.add(rows.get(i + k).getOriginalIndex()); + } + + i = j; + } + return result; + } + + private boolean dataColumnsEqual(final List a, final List b) { + if (a.size() != b.size()) { + return false; + } + for (int i = 0; i < a.size(); i++) { + if (!Objects.equals(a.get(i), b.get(i))) { + return false; + } + } + return true; + } + + /** + * Comparable wrapper for column-by-column comparison, used as sort key. + */ + private static class ComparableColumns implements Comparable { + private final List columns; + + ComparableColumns(final List columns) { + this.columns = columns; + } + + @Override + @SuppressWarnings("unchecked") + public int compareTo(final ComparableColumns other) { + final int len = Math.min(columns.size(), other.columns.size()); + for (int i = 0; i < len; i++) { + final Object a = columns.get(i); + final Object b = other.columns.get(i); + if (a == null && b == null) { + continue; + } + if (a == null) { + return -1; + } + if (b == null) { + return 1; + } + final int cmp; + if (a instanceof Comparable && b instanceof Comparable) { + cmp = ((Comparable) a).compareTo(b); + } else { + cmp = a.toString().compareTo(b.toString()); + } + if (cmp != 0) { + return cmp; + } + } + return Integer.compare(columns.size(), other.columns.size()); + } + } +} diff --git a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/worker/ChangelogRecordConverter.java b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/worker/ChangelogRecordConverter.java new file mode 100644 index 0000000000..af966f2992 --- /dev/null +++ b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/worker/ChangelogRecordConverter.java @@ -0,0 +1,174 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.iceberg.worker; + +import org.apache.iceberg.data.Record; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.variants.Variant; +import org.apache.iceberg.variants.VariantArray; +import org.apache.iceberg.variants.VariantObject; +import org.apache.iceberg.variants.VariantValue; +import org.apache.iceberg.variants.PhysicalType; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Base64; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; + +/** + * Converts an Iceberg Record to a Data Prepper Event. + */ +public class ChangelogRecordConverter { + + private static final String EVENT_TYPE = "EVENT"; + + private final String tableName; + private final List identifierColumns; + + public ChangelogRecordConverter(final String tableName, final List identifierColumns) { + this.tableName = tableName; + this.identifierColumns = identifierColumns; + } + + public Event convert(final Record record, + final org.apache.iceberg.Schema schema, + final String operation, + final long snapshotId) { + final Map data = new LinkedHashMap<>(); + for (final Types.NestedField field : schema.columns()) { + final Object value = record.getField(field.name()); + data.put(field.name(), convertValue(value, field.type())); + } + + final Event event = JacksonEvent.builder() + .withEventType(EVENT_TYPE) + .withData(data) + .build(); + + event.getMetadata().setAttribute("iceberg_operation", operation); + event.getMetadata().setAttribute("iceberg_table_name", tableName); + event.getMetadata().setAttribute("iceberg_snapshot_id", snapshotId); + event.getMetadata().setAttribute("bulk_action", toBulkAction(operation)); + + if (!identifierColumns.isEmpty()) { + final String documentId = identifierColumns.stream() + .map(col -> String.valueOf(data.getOrDefault(col, ""))) + .collect(Collectors.joining("|")); + event.getMetadata().setAttribute("document_id", documentId); + } + + return event; + } + + private String toBulkAction(final String operation) { + if ("DELETE".equals(operation)) { + return "delete"; + } + return "index"; + } + + private Object convertValue(final Object value, final Type type) { + if (value == null) { + return null; + } + if (value instanceof BigDecimal) { + return ((BigDecimal) value).toPlainString(); + } + if (value instanceof ByteBuffer) { + final ByteBuffer buf = (ByteBuffer) value; + final byte[] bytes = new byte[buf.remaining()]; + buf.duplicate().get(bytes); + return Base64.getEncoder().encodeToString(bytes); + } + if (value instanceof byte[]) { + return Base64.getEncoder().encodeToString((byte[]) value); + } + if (value instanceof LocalDate) { + return value.toString(); + } + if (value instanceof LocalTime) { + return value.toString(); + } + if (value instanceof LocalDateTime) { + return ((LocalDateTime) value).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME); + } + if (value instanceof OffsetDateTime) { + return ((OffsetDateTime) value).format(DateTimeFormatter.ISO_OFFSET_DATE_TIME); + } + if (value instanceof UUID) { + return value.toString(); + } + if (value instanceof Record && type instanceof Types.StructType) { + final Record struct = (Record) value; + final Types.StructType structType = (Types.StructType) type; + final Map map = new LinkedHashMap<>(); + for (final Types.NestedField field : structType.fields()) { + map.put(field.name(), convertValue(struct.getField(field.name()), field.type())); + } + return map; + } + if (value instanceof List && type instanceof Types.ListType) { + final Types.ListType listType = (Types.ListType) type; + return ((List) value).stream() + .map(v -> convertValue(v, listType.elementType())) + .collect(Collectors.toList()); + } + if (value instanceof Map && type instanceof Types.MapType) { + final Types.MapType mapType = (Types.MapType) type; + final Map srcMap = (Map) value; + final Map result = new LinkedHashMap<>(); + srcMap.forEach((k, v) -> result.put(String.valueOf(k), convertValue(v, mapType.valueType()))); + return result; + } + if (value instanceof Variant) { + return convertVariant((Variant) value); + } + return value; + } + + private Object convertVariant(final Variant variant) { + return convertVariantValue(variant.value()); + } + + private Object convertVariantValue(final VariantValue value) { + final PhysicalType physicalType = value.type(); + if (physicalType == PhysicalType.OBJECT) { + final VariantObject obj = value.asObject(); + final Map map = new LinkedHashMap<>(); + for (final String fieldName : obj.fieldNames()) { + map.put(fieldName, convertVariantValue(obj.get(fieldName))); + } + return map; + } + if (physicalType == PhysicalType.ARRAY) { + final VariantArray arr = value.asArray(); + final List list = new ArrayList<>(); + for (int i = 0; i < arr.numElements(); i++) { + list.add(convertVariantValue(arr.get(i))); + } + return list; + } + return value.asPrimitive().get(); + } +} diff --git a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/worker/ChangelogWorker.java b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/worker/ChangelogWorker.java new file mode 100644 index 0000000000..1ec95b58bf --- /dev/null +++ b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/worker/ChangelogWorker.java @@ -0,0 +1,412 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.iceberg.worker; + +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.avro.DataReader; +import org.apache.iceberg.data.orc.GenericOrcReader; +import org.apache.iceberg.data.parquet.GenericParquetReaders; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.orc.ORC; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.types.Types; +import org.opensearch.dataprepper.buffer.common.BufferAccumulator; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; +import org.opensearch.dataprepper.plugins.source.iceberg.IcebergSourceConfig; +import org.opensearch.dataprepper.plugins.source.iceberg.TableConfig; +import org.opensearch.dataprepper.plugins.source.iceberg.coordination.partition.ChangelogTaskPartition; +import org.opensearch.dataprepper.plugins.source.iceberg.coordination.partition.GlobalState; +import org.opensearch.dataprepper.plugins.source.iceberg.coordination.partition.InitialLoadTaskPartition; +import org.opensearch.dataprepper.plugins.source.iceberg.coordination.state.ChangelogTaskProgressState; +import org.opensearch.dataprepper.plugins.source.iceberg.coordination.state.InitialLoadTaskProgressState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +public class ChangelogWorker implements Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(ChangelogWorker.class); + private static final int DEFAULT_LEASE_INTERVAL_MILLIS = 2000; + private static final int BUFFER_ACCUMULATOR_SIZE = 100; + private static final Duration BUFFER_TIMEOUT = Duration.ofSeconds(10); + + private final EnhancedSourceCoordinator sourceCoordinator; + private final IcebergSourceConfig sourceConfig; + private final Map tables; + private final Map tableConfigs; + private final Buffer> buffer; + private final AcknowledgementSetManager acknowledgementSetManager; + + public ChangelogWorker(final EnhancedSourceCoordinator sourceCoordinator, + final IcebergSourceConfig sourceConfig, + final Map tables, + final Map tableConfigs, + final Buffer> buffer, + final AcknowledgementSetManager acknowledgementSetManager) { + this.sourceCoordinator = sourceCoordinator; + this.sourceConfig = sourceConfig; + this.tables = tables; + this.tableConfigs = tableConfigs; + this.buffer = buffer; + this.acknowledgementSetManager = acknowledgementSetManager; + } + + @Override + public void run() { + LOG.info("Starting Changelog Worker"); + + while (!Thread.currentThread().isInterrupted()) { + boolean processed = false; + try { + // Try changelog tasks first, then initial load tasks + Optional partition = + sourceCoordinator.acquireAvailablePartition(ChangelogTaskPartition.PARTITION_TYPE); + + if (partition.isPresent() && partition.get() instanceof ChangelogTaskPartition) { + processPartition((ChangelogTaskPartition) partition.get()); + processed = true; + } else { + partition.ifPresent(sourceCoordinator::giveUpPartition); + // Try initial load tasks + partition = sourceCoordinator.acquireAvailablePartition( + InitialLoadTaskPartition.PARTITION_TYPE); + if (partition.isPresent() && partition.get() instanceof InitialLoadTaskPartition) { + processInitialLoadPartition((InitialLoadTaskPartition) partition.get()); + processed = true; + } else { + partition.ifPresent(sourceCoordinator::giveUpPartition); + } + } + } catch (final Exception e) { + LOG.error("Error in changelog worker", e); + } + + if (!processed) { + try { + Thread.sleep(DEFAULT_LEASE_INTERVAL_MILLIS); + } catch (final InterruptedException e) { + LOG.info("Changelog worker interrupted"); + break; + } + } + } + + LOG.warn("Quitting Changelog Worker"); + } + + private void processPartition(final ChangelogTaskPartition partition) throws Exception { + final ChangelogTaskProgressState state = partition.getProgressState().orElseThrow(); + final String tableName = state.getTableName(); + final Table table = tables.get(tableName); + final TableConfig tableConfig = tableConfigs.get(tableName); + + if (table == null || tableConfig == null) { + LOG.error("Table {} not found, giving up partition", tableName); + sourceCoordinator.giveUpPartition(partition); + return; + } + + final Schema schema = table.schema(); + final ChangelogRecordConverter converter = new ChangelogRecordConverter( + tableName, tableConfig.getIdentifierColumns()); + final CarryoverRemover carryoverRemover = new CarryoverRemover(); + + LOG.info("Processing partition for table {} snapshot {} with {} file(s)", + tableName, state.getSnapshotId(), state.getDataFilePaths().size()); + + // Step 1: Read all rows from all data files in this partition + final List allRows = new ArrayList<>(); + for (int i = 0; i < state.getDataFilePaths().size(); i++) { + final String filePath = state.getDataFilePaths().get(i); + final String taskType = state.getTaskTypes().get(i); + final String operation = "DELETED".equals(taskType) ? "DELETE" : "INSERT"; + + LOG.info("Reading file {} (type: {}, operation: {})", filePath, taskType, operation); + + final InputFile inputFile = table.io().newInputFile(filePath); + try (CloseableIterable reader = openDataFile(inputFile, schema, filePath)) { + for (final Record record : reader) { + allRows.add(new RowWithMeta(record, operation)); + LOG.debug(" Row: {} op={}", record, operation); + } + } + } + + // Step 2: Remove carryover + final boolean hasDeletedFiles = state.getTaskTypes().stream().anyMatch("DELETED"::equals); + final List survivingIndices; + if (hasDeletedFiles) { + final List changelogRows = new ArrayList<>(); + for (int i = 0; i < allRows.size(); i++) { + final RowWithMeta row = allRows.get(i); + final List dataColumns = new ArrayList<>(); + for (final Types.NestedField field : schema.columns()) { + dataColumns.add(row.record.getField(field.name())); + } + changelogRows.add(new CarryoverRemover.ChangelogRow(dataColumns, row.operation, i)); + } + survivingIndices = carryoverRemover.removeCarryover(changelogRows); + LOG.info("Carryover removal: {} rows -> {} rows", allRows.size(), survivingIndices.size()); + for (final int idx : survivingIndices) { + final RowWithMeta row = allRows.get(idx); + LOG.debug(" Surviving row: {} op={}", row.record, row.operation); + } + } else { + survivingIndices = new ArrayList<>(); + for (int i = 0; i < allRows.size(); i++) { + survivingIndices.add(i); + } + } + + // Step 3: Merge UPDATE pairs and write to buffer + // When identifier_columns is set, a DELETE + INSERT pair with the same document_id + // represents an UPDATE. Since OpenSearch INDEX is an upsert, the INSERT alone + // overwrites the existing document, making the DELETE unnecessary. + final boolean ackEnabled = sourceConfig.isAcknowledgmentsEnabled(); + AcknowledgementSet acknowledgementSet = null; + if (ackEnabled) { + acknowledgementSet = acknowledgementSetManager.create((result) -> { + try { + if (result) { + LOG.info("Acknowledgement received for partition {}", partition.getPartitionKey()); + sourceCoordinator.completePartition(partition); + incrementSnapshotCompletionCount(state.getSnapshotId()); + } else { + LOG.warn("Negative acknowledgement for partition {}, giving up", partition.getPartitionKey()); + sourceCoordinator.giveUpPartition(partition); + } + } catch (final Exception e) { + LOG.error("Error in acknowledgement callback for partition {}", partition.getPartitionKey(), e); + } + }, Duration.ofMinutes(30)); + } + + final BufferAccumulator> accumulator = + BufferAccumulator.create(buffer, BUFFER_ACCUMULATOR_SIZE, BUFFER_TIMEOUT); + + // Identify DELETE indices whose document_id has a matching INSERT (i.e., UPDATE pairs) + final Set deletesToSkip = new HashSet<>(); + if (!tableConfig.getIdentifierColumns().isEmpty()) { + final Map deleteByDocId = new LinkedHashMap<>(); + for (final int idx : survivingIndices) { + final RowWithMeta row = allRows.get(idx); + if ("DELETE".equals(row.operation)) { + final String docId = buildDocumentId(row.record, schema, tableConfig.getIdentifierColumns()); + deleteByDocId.put(docId, idx); + } + } + for (final int idx : survivingIndices) { + final RowWithMeta row = allRows.get(idx); + if ("INSERT".equals(row.operation)) { + final String docId = buildDocumentId(row.record, schema, tableConfig.getIdentifierColumns()); + if (deleteByDocId.containsKey(docId)) { + deletesToSkip.add(deleteByDocId.remove(docId)); + } + } + } + if (!deletesToSkip.isEmpty()) { + LOG.info("Merged {} UPDATE pair(s) (DELETE + INSERT -> INDEX only)", deletesToSkip.size()); + } + } + + for (final int idx : survivingIndices) { + if (deletesToSkip.contains(idx)) { + continue; + } + final RowWithMeta row = allRows.get(idx); + final Event event = converter.convert(row.record, schema, row.operation, state.getSnapshotId()); + LOG.debug("Writing event: op={}, document_id={}, bulk_action={}", + row.operation, + event.getMetadata().getAttribute("document_id"), + event.getMetadata().getAttribute("bulk_action")); + if (acknowledgementSet != null) { + acknowledgementSet.add(event); + } + accumulator.add(new org.opensearch.dataprepper.model.record.Record<>(event)); + } + + accumulator.flush(); + + if (ackEnabled) { + if (survivingIndices.isEmpty()) { + sourceCoordinator.completePartition(partition); + incrementSnapshotCompletionCount(state.getSnapshotId()); + } else { + acknowledgementSet.complete(); + } + } else { + sourceCoordinator.completePartition(partition); + incrementSnapshotCompletionCount(state.getSnapshotId()); + } + + LOG.info("Completed processing partition for table {} snapshot {}: {} events written", + tableName, state.getSnapshotId(), survivingIndices.size()); + } + + private void processInitialLoadPartition(final InitialLoadTaskPartition partition) throws Exception { + final InitialLoadTaskProgressState state = partition.getProgressState().orElseThrow(); + final String tableName = state.getTableName(); + final Table table = tables.get(tableName); + final TableConfig tableConfig = tableConfigs.get(tableName); + + if (table == null || tableConfig == null) { + LOG.error("Table {} not found for initial load, giving up partition", tableName); + sourceCoordinator.giveUpPartition(partition); + return; + } + + final Schema schema = table.schema(); + final ChangelogRecordConverter converter = new ChangelogRecordConverter( + tableName, tableConfig.getIdentifierColumns()); + + LOG.info("Processing initial load partition for table {} file {}", + tableName, state.getDataFilePath()); + + final InputFile inputFile = table.io().newInputFile(state.getDataFilePath()); + + final boolean ackEnabled = sourceConfig.isAcknowledgmentsEnabled(); + AcknowledgementSet acknowledgementSet = null; + if (ackEnabled) { + acknowledgementSet = acknowledgementSetManager.create((result) -> { + try { + if (result) { + LOG.info("Acknowledgement received for initial load partition {}", partition.getPartitionKey()); + sourceCoordinator.completePartition(partition); + incrementSnapshotCompletionCount("initial-" + state.getSnapshotId()); + } else { + LOG.warn("Negative acknowledgement for initial load partition {}, giving up", partition.getPartitionKey()); + sourceCoordinator.giveUpPartition(partition); + } + } catch (final Exception e) { + LOG.error("Error in acknowledgement callback for initial load partition {}", partition.getPartitionKey(), e); + } + }, Duration.ofMinutes(30)); + } + + final BufferAccumulator> accumulator = + BufferAccumulator.create(buffer, BUFFER_ACCUMULATOR_SIZE, BUFFER_TIMEOUT); + + int rowCount = 0; + try (CloseableIterable reader = openDataFile(inputFile, schema, state.getDataFilePath())) { + for (final Record record : reader) { + final Event event = converter.convert(record, schema, "INSERT", state.getSnapshotId()); + if (acknowledgementSet != null) { + acknowledgementSet.add(event); + } + accumulator.add(new org.opensearch.dataprepper.model.record.Record<>(event)); + rowCount++; + } + } + + accumulator.flush(); + + if (ackEnabled) { + acknowledgementSet.complete(); + } else { + sourceCoordinator.completePartition(partition); + incrementSnapshotCompletionCount("initial-" + state.getSnapshotId()); + } + + LOG.info("Completed initial load partition for table {}: {} rows", tableName, rowCount); + } + + private void incrementSnapshotCompletionCount(final long snapshotId) { + incrementSnapshotCompletionCount(String.valueOf(snapshotId)); + } + + private String buildDocumentId(final Record record, final Schema schema, final List identifierColumns) { + return identifierColumns.stream() + .map(col -> String.valueOf(record.getField(col))) + .collect(java.util.stream.Collectors.joining("|")); + } + + private synchronized void incrementSnapshotCompletionCount(final String snapshotKey) { + final String completionKey = "snapshot-completion-" + snapshotKey; + while (true) { + final Optional partitionOpt = sourceCoordinator.getPartition(completionKey); + if (partitionOpt.isEmpty()) { + LOG.error("Failed to get completion status for {}", completionKey); + return; + } + final GlobalState gs = (GlobalState) partitionOpt.get(); + final Map progress = new java.util.HashMap<>( + gs.getProgressState().orElse(Map.of())); + final int completed = ((Number) progress.getOrDefault("completed", 0)).intValue(); + progress.put("completed", completed + 1); + gs.setProgressState(progress); + try { + sourceCoordinator.saveProgressStateForPartition(gs, Duration.ZERO); + break; + } catch (final Exception e) { + LOG.warn("Completion count update conflict for {}, retrying", completionKey); + } + } + } + + // TODO: Replace format switch with FormatModelRegistry when available (Iceberg 1.11+). + // TODO: Add GenericDeleteFilter for MoR support. See GenericReader.open(FileScanTask) + // in iceberg-data for the reference pattern (delete file merge + format-agnostic reading). + private CloseableIterable openDataFile(final InputFile inputFile, + final Schema schema, + final String filePath) { + final FileFormat format = FileFormat.fromFileName(filePath); + if (format == null) { + throw new IllegalArgumentException("Cannot determine file format for: " + filePath); + } + switch (format) { + case PARQUET: + return Parquet.read(inputFile) + .project(schema) + .createReaderFunc(fs -> GenericParquetReaders.buildReader(schema, fs)) + .build(); + case AVRO: + return Avro.read(inputFile) + .project(schema) + .createReaderFunc(DataReader::create) + .build(); + case ORC: + return ORC.read(inputFile) + .project(schema) + .createReaderFunc(fs -> GenericOrcReader.buildReader(schema, fs)) + .build(); + default: + throw new UnsupportedOperationException("Unsupported file format: " + format); + } + } + + private static class RowWithMeta { + final Record record; + final String operation; + + RowWithMeta(final Record record, final String operation) { + this.record = record; + this.operation = operation; + } + } +} diff --git a/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSourceTest.java b/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSourceTest.java new file mode 100644 index 0000000000..75d1f65394 --- /dev/null +++ b/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSourceTest.java @@ -0,0 +1,70 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.iceberg; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; + +import java.util.List; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class IcebergSourceTest { + + @Mock + private IcebergSourceConfig sourceConfig; + + @Mock + private PluginMetrics pluginMetrics; + + @Mock + private AcknowledgementSetManager acknowledgementSetManager; + + @Mock + private EnhancedSourceCoordinator sourceCoordinator; + + @Mock + private TableConfig tableConfig; + + @Test + void areAcknowledgementsEnabled_returnsConfigValue_whenTrue() { + when(sourceConfig.isAcknowledgmentsEnabled()).thenReturn(true); + when(sourceConfig.getTables()).thenReturn(List.of(tableConfig)); + + final IcebergSource source = new IcebergSource(sourceConfig, pluginMetrics, acknowledgementSetManager); + assertThat(source.areAcknowledgementsEnabled(), equalTo(true)); + } + + @Test + void areAcknowledgementsEnabled_returnsConfigValue_whenFalse() { + when(sourceConfig.isAcknowledgmentsEnabled()).thenReturn(false); + when(sourceConfig.getTables()).thenReturn(List.of(tableConfig)); + + final IcebergSource source = new IcebergSource(sourceConfig, pluginMetrics, acknowledgementSetManager); + assertThat(source.areAcknowledgementsEnabled(), equalTo(false)); + } + + @Test + void getPartitionFactory_returnsNonNull() { + when(sourceConfig.getTables()).thenReturn(List.of(tableConfig)); + + final IcebergSource source = new IcebergSource(sourceConfig, pluginMetrics, acknowledgementSetManager); + assertThat(source.getPartitionFactory() != null, equalTo(true)); + } +} diff --git a/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/coordination/PartitionFactoryTest.java b/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/coordination/PartitionFactoryTest.java new file mode 100644 index 0000000000..9a74a9d306 --- /dev/null +++ b/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/coordination/PartitionFactoryTest.java @@ -0,0 +1,76 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.iceberg.coordination; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; +import org.opensearch.dataprepper.plugins.source.iceberg.coordination.partition.ChangelogTaskPartition; +import org.opensearch.dataprepper.plugins.source.iceberg.coordination.partition.GlobalState; +import org.opensearch.dataprepper.plugins.source.iceberg.coordination.partition.InitialLoadTaskPartition; +import org.opensearch.dataprepper.plugins.source.iceberg.coordination.partition.LeaderPartition; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.instanceOf; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class PartitionFactoryTest { + + @Mock + private SourcePartitionStoreItem sourcePartitionStoreItem; + + private final PartitionFactory partitionFactory = new PartitionFactory(); + + @Test + void apply_leaderPartition_returnsLeaderPartition() { + when(sourcePartitionStoreItem.getSourceIdentifier()).thenReturn("prefix|LEADER"); + when(sourcePartitionStoreItem.getPartitionProgressState()).thenReturn("{\"initialized\":false}"); + + final EnhancedSourcePartition result = partitionFactory.apply(sourcePartitionStoreItem); + assertThat(result, instanceOf(LeaderPartition.class)); + } + + @Test + void apply_changelogTaskPartition_returnsChangelogTaskPartition() { + when(sourcePartitionStoreItem.getSourceIdentifier()).thenReturn("prefix|CHANGELOG_TASK"); + when(sourcePartitionStoreItem.getSourcePartitionKey()).thenReturn("table|123|uuid"); + when(sourcePartitionStoreItem.getPartitionProgressState()) + .thenReturn("{\"snapshotId\":123,\"tableName\":\"test\",\"totalRecords\":10,\"dataFilePaths\":[\"path1\"],\"taskTypes\":[\"ADDED\"]}"); + + final EnhancedSourcePartition result = partitionFactory.apply(sourcePartitionStoreItem); + assertThat(result, instanceOf(ChangelogTaskPartition.class)); + } + + @Test + void apply_initialLoadTaskPartition_returnsInitialLoadTaskPartition() { + when(sourcePartitionStoreItem.getSourceIdentifier()).thenReturn("prefix|INITIAL_LOAD_TASK"); + when(sourcePartitionStoreItem.getSourcePartitionKey()).thenReturn("table|initial|uuid"); + when(sourcePartitionStoreItem.getPartitionProgressState()) + .thenReturn("{\"snapshotId\":456,\"tableName\":\"test\",\"dataFilePath\":\"path1\",\"totalRecords\":100}"); + + final EnhancedSourcePartition result = partitionFactory.apply(sourcePartitionStoreItem); + assertThat(result, instanceOf(InitialLoadTaskPartition.class)); + } + + @Test + void apply_unknownType_returnsGlobalState() { + when(sourcePartitionStoreItem.getSourceIdentifier()).thenReturn("prefix|UNKNOWN"); + when(sourcePartitionStoreItem.getSourcePartitionKey()).thenReturn("some-key"); + when(sourcePartitionStoreItem.getPartitionProgressState()).thenReturn(null); + + final EnhancedSourcePartition result = partitionFactory.apply(sourcePartitionStoreItem); + assertThat(result, instanceOf(GlobalState.class)); + } +} diff --git a/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/coordination/partition/ChangelogTaskPartitionTest.java b/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/coordination/partition/ChangelogTaskPartitionTest.java new file mode 100644 index 0000000000..1996bfd724 --- /dev/null +++ b/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/coordination/partition/ChangelogTaskPartitionTest.java @@ -0,0 +1,71 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.iceberg.coordination.partition; + +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; +import org.opensearch.dataprepper.plugins.source.iceberg.coordination.state.ChangelogTaskProgressState; + +import java.util.List; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class ChangelogTaskPartitionTest { + + @Test + void getPartitionType_returnsChangelogTask() { + final ChangelogTaskProgressState state = new ChangelogTaskProgressState(); + final ChangelogTaskPartition partition = new ChangelogTaskPartition("key", state); + assertThat(partition.getPartitionType(), equalTo("CHANGELOG_TASK")); + } + + @Test + void getPartitionKey_returnsProvidedKey() { + final ChangelogTaskProgressState state = new ChangelogTaskProgressState(); + final ChangelogTaskPartition partition = new ChangelogTaskPartition("test-key", state); + assertThat(partition.getPartitionKey(), equalTo("test-key")); + } + + @Test + void getProgressState_returnsProvidedState() { + final ChangelogTaskProgressState state = new ChangelogTaskProgressState(); + state.setSnapshotId(100L); + state.setTableName("db.table1"); + final ChangelogTaskPartition partition = new ChangelogTaskPartition("key", state); + assertThat(partition.getProgressState().isPresent(), equalTo(true)); + assertThat(partition.getProgressState().get().getSnapshotId(), equalTo(100L)); + assertThat(partition.getProgressState().get().getTableName(), equalTo("db.table1")); + } + + @Test + void getProgressState_fromStoreItem_returnsRestoredState() { + final SourcePartitionStoreItem item = mock(SourcePartitionStoreItem.class); + when(item.getSourceIdentifier()).thenReturn("prefix|CHANGELOG_TASK"); + when(item.getSourcePartitionKey()).thenReturn("db.table1|snap|uuid"); + when(item.getPartitionProgressState()).thenReturn( + "{\"snapshotId\":100,\"tableName\":\"db.table1\",\"totalRecords\":50," + + "\"dataFilePaths\":[\"/path/file1.parquet\"],\"taskTypes\":[\"ADDED\"]}"); + + final ChangelogTaskPartition partition = new ChangelogTaskPartition(item); + assertThat(partition.getPartitionKey(), equalTo("db.table1|snap|uuid")); + assertThat(partition.getProgressState().isPresent(), equalTo(true)); + + final ChangelogTaskProgressState state = partition.getProgressState().get(); + assertThat(state.getSnapshotId(), equalTo(100L)); + assertThat(state.getTableName(), equalTo("db.table1")); + assertThat(state.getTotalRecords(), equalTo(50L)); + assertThat(state.getDataFilePaths(), equalTo(List.of("/path/file1.parquet"))); + assertThat(state.getTaskTypes(), equalTo(List.of("ADDED"))); + } +} diff --git a/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/coordination/partition/GlobalStateTest.java b/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/coordination/partition/GlobalStateTest.java new file mode 100644 index 0000000000..aa34bcf45c --- /dev/null +++ b/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/coordination/partition/GlobalStateTest.java @@ -0,0 +1,67 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.iceberg.coordination.partition; + +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; + +import java.util.Map; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class GlobalStateTest { + + @Test + void getPartitionType_returnsNull() { + final GlobalState globalState = new GlobalState("test-state", Map.of("key", "value")); + assertThat(globalState.getPartitionType(), nullValue()); + } + + @Test + void getPartitionKey_returnsProvidedStateName() { + final GlobalState globalState = new GlobalState("test-state", Map.of()); + assertThat(globalState.getPartitionKey(), equalTo("test-state")); + } + + @Test + void getProgressState_returnsProvidedState() { + final Map state = Map.of("snapshotId", 42, "completed", true); + final GlobalState globalState = new GlobalState("test-state", state); + assertThat(globalState.getProgressState().isPresent(), equalTo(true)); + assertThat(globalState.getProgressState().get(), equalTo(state)); + } + + @Test + void setProgressState_replacesState() { + final GlobalState globalState = new GlobalState("test-state", Map.of("old", "value")); + final Map newState = Map.of("new", "value"); + globalState.setProgressState(newState); + assertThat(globalState.getProgressState().get(), equalTo(newState)); + } + + @Test + void fromSourcePartitionStoreItem_returnsRestoredState() { + final SourcePartitionStoreItem item = mock(SourcePartitionStoreItem.class); + when(item.getSourceIdentifier()).thenReturn("prefix|unknown"); + when(item.getSourcePartitionKey()).thenReturn("snapshot-completion-db.table1"); + when(item.getPartitionProgressState()).thenReturn("{\"completed\":true,\"snapshotId\":42}"); + + final GlobalState globalState = new GlobalState(item); + assertThat(globalState.getPartitionKey(), equalTo("snapshot-completion-db.table1")); + assertThat(globalState.getProgressState().isPresent(), equalTo(true)); + assertThat(globalState.getProgressState().get().get("completed"), equalTo(true)); + assertThat(globalState.getProgressState().get().get("snapshotId"), equalTo(42)); + } +} diff --git a/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/coordination/partition/InitialLoadTaskPartitionTest.java b/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/coordination/partition/InitialLoadTaskPartitionTest.java new file mode 100644 index 0000000000..bc31dd990f --- /dev/null +++ b/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/coordination/partition/InitialLoadTaskPartitionTest.java @@ -0,0 +1,71 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.iceberg.coordination.partition; + +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; +import org.opensearch.dataprepper.plugins.source.iceberg.coordination.state.InitialLoadTaskProgressState; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class InitialLoadTaskPartitionTest { + + @Test + void getPartitionType_returnsInitialLoadTask() { + final InitialLoadTaskProgressState state = new InitialLoadTaskProgressState(); + final InitialLoadTaskPartition partition = new InitialLoadTaskPartition("key", state); + assertThat(partition.getPartitionType(), equalTo("INITIAL_LOAD_TASK")); + } + + @Test + void getPartitionKey_returnsProvidedKey() { + final InitialLoadTaskProgressState state = new InitialLoadTaskProgressState(); + final InitialLoadTaskPartition partition = new InitialLoadTaskPartition("test-key", state); + assertThat(partition.getPartitionKey(), equalTo("test-key")); + } + + @Test + void getProgressState_returnsProvidedState() { + final InitialLoadTaskProgressState state = new InitialLoadTaskProgressState(); + state.setSnapshotId(200L); + state.setTableName("db.table1"); + state.setDataFilePath("/path/file.parquet"); + state.setTotalRecords(1000L); + final InitialLoadTaskPartition partition = new InitialLoadTaskPartition("key", state); + assertThat(partition.getProgressState().isPresent(), equalTo(true)); + assertThat(partition.getProgressState().get().getSnapshotId(), equalTo(200L)); + assertThat(partition.getProgressState().get().getTableName(), equalTo("db.table1")); + assertThat(partition.getProgressState().get().getDataFilePath(), equalTo("/path/file.parquet")); + assertThat(partition.getProgressState().get().getTotalRecords(), equalTo(1000L)); + } + + @Test + void getProgressState_fromStoreItem_returnsRestoredState() { + final SourcePartitionStoreItem item = mock(SourcePartitionStoreItem.class); + when(item.getSourceIdentifier()).thenReturn("prefix|INITIAL_LOAD_TASK"); + when(item.getSourcePartitionKey()).thenReturn("db.table1|initial|uuid"); + when(item.getPartitionProgressState()).thenReturn( + "{\"snapshotId\":200,\"tableName\":\"db.table1\",\"dataFilePath\":\"/path/file.parquet\",\"totalRecords\":1000}"); + + final InitialLoadTaskPartition partition = new InitialLoadTaskPartition(item); + assertThat(partition.getPartitionKey(), equalTo("db.table1|initial|uuid")); + assertThat(partition.getProgressState().isPresent(), equalTo(true)); + + final InitialLoadTaskProgressState state = partition.getProgressState().get(); + assertThat(state.getSnapshotId(), equalTo(200L)); + assertThat(state.getTableName(), equalTo("db.table1")); + assertThat(state.getDataFilePath(), equalTo("/path/file.parquet")); + assertThat(state.getTotalRecords(), equalTo(1000L)); + } +} diff --git a/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/coordination/partition/LeaderPartitionTest.java b/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/coordination/partition/LeaderPartitionTest.java new file mode 100644 index 0000000000..d0948ac738 --- /dev/null +++ b/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/coordination/partition/LeaderPartitionTest.java @@ -0,0 +1,55 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.iceberg.coordination.partition; + +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class LeaderPartitionTest { + + @Test + void getPartitionType_newInstance_returnsLeader() { + final LeaderPartition partition = new LeaderPartition(); + assertThat(partition.getPartitionType(), equalTo("LEADER")); + } + + @Test + void getPartitionKey_newInstance_returnsGlobal() { + final LeaderPartition partition = new LeaderPartition(); + assertThat(partition.getPartitionKey(), equalTo("GLOBAL")); + } + + @Test + void getProgressState_newInstance_returnsNotInitialized() { + final LeaderPartition partition = new LeaderPartition(); + assertThat(partition.getProgressState().isPresent(), equalTo(true)); + assertThat(partition.getProgressState().get().isInitialized(), equalTo(false)); + } + + @Test + void getProgressState_fromStoreItem_returnsRestoredState() { + final SourcePartitionStoreItem item = mock(SourcePartitionStoreItem.class); + when(item.getSourceIdentifier()).thenReturn("prefix|LEADER"); + when(item.getSourcePartitionKey()).thenReturn("GLOBAL"); + when(item.getPartitionProgressState()).thenReturn("{\"initialized\":true,\"lastProcessedSnapshotId\":42}"); + + final LeaderPartition partition = new LeaderPartition(item); + assertThat(partition.getProgressState(), notNullValue()); + assertThat(partition.getProgressState().get().isInitialized(), equalTo(true)); + assertThat(partition.getProgressState().get().getLastProcessedSnapshotId(), equalTo(42L)); + } +} diff --git a/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/leader/TaskGrouperTest.java b/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/leader/TaskGrouperTest.java new file mode 100644 index 0000000000..305954c602 --- /dev/null +++ b/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/leader/TaskGrouperTest.java @@ -0,0 +1,106 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.iceberg.leader; + +import org.junit.jupiter.api.Test; + +import java.util.List; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasSize; + +class TaskGrouperTest { + + private final TaskGrouper taskGrouper = new TaskGrouper(); + + @Test + void pairByBounds_insertOnly_eachFileIsIndependentGroup() { + final List tasks = List.of( + new TaskGrouper.TaskInfo("file1", "ADDED", 100, "bounds1"), + new TaskGrouper.TaskInfo("file2", "ADDED", 200, "bounds2") + ); + final List> groups = taskGrouper.pairByBounds(tasks); + assertThat(groups, hasSize(2)); + } + + @Test + void pairByBounds_deleteOnly_eachFileIsIndependentGroup() { + final List tasks = List.of( + new TaskGrouper.TaskInfo("file1", "DELETED", 100, "bounds1") + ); + final List> groups = taskGrouper.pairByBounds(tasks); + assertThat(groups, hasSize(1)); + } + + @Test + void pairByBounds_matchingBounds_pairedTogether() { + final List tasks = List.of( + new TaskGrouper.TaskInfo("old_file", "DELETED", 100, "{1=abc}|{1=xyz}"), + new TaskGrouper.TaskInfo("new_file", "ADDED", 100, "{1=abc}|{1=xyz}") + ); + final List> groups = taskGrouper.pairByBounds(tasks); + assertThat(groups, hasSize(1)); + assertThat(groups.get(0), hasSize(2)); + } + + @Test + void pairByBounds_differentBounds_fallbackGroup() { + final List tasks = List.of( + new TaskGrouper.TaskInfo("old_file", "DELETED", 100, "{1=abc}|{1=xyz}"), + new TaskGrouper.TaskInfo("new_file", "ADDED", 100, "{1=abc}|{1=zzz}") + ); + final List> groups = taskGrouper.pairByBounds(tasks); + // Bounds don't match -> fallback group with both + assertThat(groups, hasSize(1)); + assertThat(groups.get(0), hasSize(2)); + } + + @Test + void pairByBounds_multiplePairs_eachPairedSeparately() { + final List tasks = List.of( + new TaskGrouper.TaskInfo("old_us", "DELETED", 100, "bounds_us"), + new TaskGrouper.TaskInfo("old_eu", "DELETED", 100, "bounds_eu"), + new TaskGrouper.TaskInfo("new_us", "ADDED", 100, "bounds_us"), + new TaskGrouper.TaskInfo("new_eu", "ADDED", 100, "bounds_eu") + ); + final List> groups = taskGrouper.pairByBounds(tasks); + assertThat(groups, hasSize(2)); + assertThat(groups.get(0), hasSize(2)); + assertThat(groups.get(1), hasSize(2)); + } + + @Test + void pairByBounds_ambiguousBounds_fallbackGroup() { + // Two DELETED and two ADDED with same bounds -> can't pair uniquely + final List tasks = List.of( + new TaskGrouper.TaskInfo("old1", "DELETED", 100, "same_bounds"), + new TaskGrouper.TaskInfo("old2", "DELETED", 100, "same_bounds"), + new TaskGrouper.TaskInfo("new1", "ADDED", 100, "same_bounds"), + new TaskGrouper.TaskInfo("new2", "ADDED", 100, "same_bounds") + ); + final List> groups = taskGrouper.pairByBounds(tasks); + // Ambiguous -> all in one fallback group + assertThat(groups, hasSize(1)); + assertThat(groups.get(0), hasSize(4)); + } + + @Test + void pairByBounds_nullBounds_fallbackGroup() { + final List tasks = List.of( + new TaskGrouper.TaskInfo("old_file", "DELETED", 100, null), + new TaskGrouper.TaskInfo("new_file", "ADDED", 100, null) + ); + final List> groups = taskGrouper.pairByBounds(tasks); + // Null bounds -> can't pair -> fallback + assertThat(groups, hasSize(1)); + assertThat(groups.get(0), hasSize(2)); + } +} diff --git a/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/worker/CarryoverRemoverTest.java b/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/worker/CarryoverRemoverTest.java new file mode 100644 index 0000000000..c6dd57b27b --- /dev/null +++ b/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/worker/CarryoverRemoverTest.java @@ -0,0 +1,150 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.iceberg.worker; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasSize; + +class CarryoverRemoverTest { + + private final CarryoverRemover carryoverRemover = new CarryoverRemover(); + + @Test + void removeCarryover_emptyList_returnsEmpty() { + final List result = carryoverRemover.removeCarryover(new ArrayList<>()); + assertThat(result, hasSize(0)); + } + + @Test + void removeCarryover_insertOnly_returnsAll() { + final List rows = List.of( + new CarryoverRemover.ChangelogRow(List.of(1, "Alice", 30), "INSERT", 0), + new CarryoverRemover.ChangelogRow(List.of(2, "Bob", 25), "INSERT", 1) + ); + final List result = carryoverRemover.removeCarryover(new ArrayList<>(rows)); + assertThat(result, hasSize(2)); + } + + @Test + void removeCarryover_deleteOnly_returnsAll() { + final List rows = List.of( + new CarryoverRemover.ChangelogRow(List.of(1, "Alice", 30), "DELETE", 0) + ); + final List result = carryoverRemover.removeCarryover(new ArrayList<>(rows)); + assertThat(result, hasSize(1)); + } + + @Test + void removeCarryover_identicalDeleteInsertPair_removedAsCarryover() { + final List rows = List.of( + new CarryoverRemover.ChangelogRow(List.of(1, "Alice", 30), "DELETE", 0), + new CarryoverRemover.ChangelogRow(List.of(1, "Alice", 30), "INSERT", 1) + ); + final List result = carryoverRemover.removeCarryover(new ArrayList<>(rows)); + assertThat(result, hasSize(0)); + } + + @Test + void removeCarryover_updateProducesDeleteAndInsertWithDifferentValues() { + final List rows = List.of( + new CarryoverRemover.ChangelogRow(List.of(2, "Bob", 25), "DELETE", 0), + new CarryoverRemover.ChangelogRow(List.of(2, "Bobby", 25), "INSERT", 1) + ); + final List result = carryoverRemover.removeCarryover(new ArrayList<>(rows)); + assertThat(result, hasSize(2)); + } + + @Test + void removeCarryover_mixedCarryoverAndActualChanges() { + // CoW UPDATE: file with 3 rows rewritten, 1 row changed + final List rows = List.of( + new CarryoverRemover.ChangelogRow(List.of(1, "Alice", 30), "DELETE", 0), + new CarryoverRemover.ChangelogRow(List.of(2, "Bob", 25), "DELETE", 1), + new CarryoverRemover.ChangelogRow(List.of(3, "Carol", 35), "DELETE", 2), + new CarryoverRemover.ChangelogRow(List.of(1, "Alice", 30), "INSERT", 3), + new CarryoverRemover.ChangelogRow(List.of(2, "Bobby", 25), "INSERT", 4), + new CarryoverRemover.ChangelogRow(List.of(3, "Carol", 35), "INSERT", 5) + ); + final List result = carryoverRemover.removeCarryover(new ArrayList<>(rows)); + // Only id=2 (Bob->Bobby) should survive + assertThat(result, hasSize(2)); + } + + @Test + void removeCarryover_deleteWithNoMatchingInsert_survives() { + // DELETE id=4 with no corresponding INSERT (actual delete) + final List rows = List.of( + new CarryoverRemover.ChangelogRow(List.of(3, "Carol", 35), "DELETE", 0), + new CarryoverRemover.ChangelogRow(List.of(4, "Dave", 28), "DELETE", 1), + new CarryoverRemover.ChangelogRow(List.of(3, "Carol", 35), "INSERT", 2) + ); + final List result = carryoverRemover.removeCarryover(new ArrayList<>(rows)); + // Only id=4 DELETE should survive + assertThat(result, hasSize(1)); + } + + @Test + void removeCarryover_nullValues_handledCorrectly() { + final List rows = new ArrayList<>(List.of( + new CarryoverRemover.ChangelogRow(Arrays.asList(1, "Alice", null), "DELETE", 0), + new CarryoverRemover.ChangelogRow(Arrays.asList(1, "Alice", null), "INSERT", 1) + )); + final List result = carryoverRemover.removeCarryover(rows); + assertThat(result, hasSize(0)); + } + + @Test + void removeCarryover_nullToNonNull_notCarryover() { + // Schema evolution: null email -> non-null email + final List rows = new ArrayList<>(List.of( + new CarryoverRemover.ChangelogRow(Arrays.asList(3, "Carol", 35, null), "DELETE", 0), + new CarryoverRemover.ChangelogRow(Arrays.asList(3, "Carol", 35, "carol@example.com"), "INSERT", 1) + )); + final List result = carryoverRemover.removeCarryover(rows); + assertThat(result, hasSize(2)); + } + + @Test + void removeCarryover_duplicateIdenticalRows_allCancelledAsCarryover() { + // Two identical rows exist in both old and new files (legitimate duplicates). + // Sorted: D(Alice), D(Alice), I(Alice), I(Alice) + // deleteCount=2, then 2 INSERTs cancel both -> 0 remaining. + // All are carryover pairs. + final List rows = new ArrayList<>(List.of( + new CarryoverRemover.ChangelogRow(List.of(1, "Alice", 30), "DELETE", 0), + new CarryoverRemover.ChangelogRow(List.of(1, "Alice", 30), "DELETE", 1), + new CarryoverRemover.ChangelogRow(List.of(1, "Alice", 30), "INSERT", 2), + new CarryoverRemover.ChangelogRow(List.of(1, "Alice", 30), "INSERT", 3) + )); + final List result = carryoverRemover.removeCarryover(rows); + assertThat(result, hasSize(0)); + } + + @Test + void removeCarryover_asymmetricDeleteInsert_deleteSurvives() { + // Two files contain Alice, only one is rewritten with a change. + // Sorted: D(Alice), D(Alice), I(Alice) + // deleteCount=2, 1 INSERT cancels 1 -> 1 DELETE remains. + final List rows = new ArrayList<>(List.of( + new CarryoverRemover.ChangelogRow(List.of(1, "Alice", 30), "DELETE", 0), + new CarryoverRemover.ChangelogRow(List.of(1, "Alice", 30), "DELETE", 1), + new CarryoverRemover.ChangelogRow(List.of(1, "Alice", 30), "INSERT", 2) + )); + final List result = carryoverRemover.removeCarryover(rows); + assertThat(result, hasSize(1)); + } +} diff --git a/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/worker/ChangelogRecordConverterTest.java b/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/worker/ChangelogRecordConverterTest.java new file mode 100644 index 0000000000..b5572f695b --- /dev/null +++ b/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/worker/ChangelogRecordConverterTest.java @@ -0,0 +1,248 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.iceberg.worker; + +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.variants.PhysicalType; +import org.apache.iceberg.variants.Variant; +import org.apache.iceberg.variants.VariantArray; +import org.apache.iceberg.variants.VariantObject; +import org.apache.iceberg.variants.VariantPrimitive; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.event.Event; + +import java.math.BigDecimal; +import java.time.LocalDate; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class ChangelogRecordConverterTest { + + private static final Schema TEST_SCHEMA = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "name", Types.StringType.get()), + Types.NestedField.optional(3, "age", Types.IntegerType.get()) + ); + + @Test + void convert_insertOperation_setsCorrectMetadata() { + final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of("id")); + final Record record = GenericRecord.create(TEST_SCHEMA); + record.setField("id", 1); + record.setField("name", "Alice"); + record.setField("age", 30); + + final Event event = converter.convert(record, TEST_SCHEMA, "INSERT", 12345L); + + assertThat(event.get("id", Integer.class), equalTo(1)); + assertThat(event.get("name", String.class), equalTo("Alice")); + assertThat(event.get("age", Integer.class), equalTo(30)); + assertThat(event.getMetadata().getAttribute("iceberg_operation"), equalTo("INSERT")); + assertThat(event.getMetadata().getAttribute("iceberg_table_name"), equalTo("test_table")); + assertThat(event.getMetadata().getAttribute("iceberg_snapshot_id"), equalTo(12345L)); + assertThat(event.getMetadata().getAttribute("bulk_action"), equalTo("index")); + assertThat(event.getMetadata().getAttribute("document_id"), equalTo("1")); + } + + @Test + void convert_deleteOperation_setsDeleteBulkAction() { + final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of("id")); + final Record record = GenericRecord.create(TEST_SCHEMA); + record.setField("id", 1); + record.setField("name", "Alice"); + record.setField("age", 30); + + final Event event = converter.convert(record, TEST_SCHEMA, "DELETE", 12345L); + + assertThat(event.getMetadata().getAttribute("bulk_action"), equalTo("delete")); + assertThat(event.getMetadata().getAttribute("document_id"), equalTo("1")); + } + + @Test + void convert_multipleIdentifierColumns_concatenatedWithPipe() { + final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of("id", "name")); + final Record record = GenericRecord.create(TEST_SCHEMA); + record.setField("id", 1); + record.setField("name", "Alice"); + record.setField("age", 30); + + final Event event = converter.convert(record, TEST_SCHEMA, "INSERT", 12345L); + + assertThat(event.getMetadata().getAttribute("document_id"), equalTo("1|Alice")); + } + + @Test + void convert_noIdentifierColumns_noDocumentId() { + final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of()); + final Record record = GenericRecord.create(TEST_SCHEMA); + record.setField("id", 1); + record.setField("name", "Alice"); + record.setField("age", 30); + + final Event event = converter.convert(record, TEST_SCHEMA, "INSERT", 12345L); + + assertThat(event.getMetadata().getAttribute("document_id"), nullValue()); + } + + @Test + void convert_decimalType_convertedToString() { + final Schema schema = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "price", Types.DecimalType.of(10, 2)) + ); + final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of("id")); + final Record record = GenericRecord.create(schema); + record.setField("id", 1); + record.setField("price", new BigDecimal("123.45")); + + final Event event = converter.convert(record, schema, "INSERT", 12345L); + + assertThat(event.get("price", String.class), equalTo("123.45")); + } + + @Test + void convert_dateType_convertedToIsoString() { + final Schema schema = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "created", Types.DateType.get()) + ); + final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of("id")); + final Record record = GenericRecord.create(schema); + record.setField("id", 1); + record.setField("created", LocalDate.of(2024, 1, 15)); + + final Event event = converter.convert(record, schema, "INSERT", 12345L); + + assertThat(event.get("created", String.class), equalTo("2024-01-15")); + } + + @Test + void convert_nullField_preservedAsNull() { + final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of("id")); + final Record record = GenericRecord.create(TEST_SCHEMA); + record.setField("id", 1); + record.setField("name", null); + record.setField("age", null); + + final Event event = converter.convert(record, TEST_SCHEMA, "INSERT", 12345L); + + assertThat(event.get("id", Integer.class), equalTo(1)); + assertThat(event.get("name", Object.class), nullValue()); + } + + @Test + void convert_structType_preservesFieldNames() { + final Types.StructType addressType = Types.StructType.of( + Types.NestedField.required(10, "city", Types.StringType.get()), + Types.NestedField.optional(11, "zip", Types.IntegerType.get()) + ); + final Schema schema = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "address", addressType) + ); + final Record addressRecord = GenericRecord.create(addressType); + addressRecord.setField("city", "Tokyo"); + addressRecord.setField("zip", 100); + final Record record = GenericRecord.create(schema); + record.setField("id", 1); + record.setField("address", addressRecord); + + final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of("id")); + final Event event = converter.convert(record, schema, "INSERT", 12345L); + + @SuppressWarnings("unchecked") + final Map address = (Map) event.get("address", Object.class); + assertThat(address.get("city"), equalTo("Tokyo")); + assertThat(address.get("zip"), equalTo(100)); + } + + @Test + void convert_variantType_objectConvertedToMap() { + final Schema schema = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "data", Types.VariantType.get()) + ); + + final VariantPrimitive cityValue = mock(VariantPrimitive.class); + when(cityValue.type()).thenReturn(PhysicalType.STRING); + doReturn("Tokyo").when(cityValue).get(); + doReturn(cityValue).when(cityValue).asPrimitive(); + + final VariantObject variantObj = mock(VariantObject.class); + when(variantObj.type()).thenReturn(PhysicalType.OBJECT); + when(variantObj.fieldNames()).thenReturn(List.of("city")); + when(variantObj.get("city")).thenReturn(cityValue); + when(variantObj.asObject()).thenReturn(variantObj); + + final Variant variant = mock(Variant.class); + when(variant.value()).thenReturn(variantObj); + + final Record record = GenericRecord.create(schema); + record.setField("id", 1); + record.setField("data", variant); + + final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of("id")); + final Event event = converter.convert(record, schema, "INSERT", 12345L); + + @SuppressWarnings("unchecked") + final Map data = (Map) event.get("data", Object.class); + assertThat(data.get("city"), equalTo("Tokyo")); + } + + @Test + void convert_variantType_arrayConvertedToList() { + final Schema schema = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "tags", Types.VariantType.get()) + ); + + final VariantPrimitive elem0 = mock(VariantPrimitive.class); + when(elem0.type()).thenReturn(PhysicalType.STRING); + doReturn("a").when(elem0).get(); + doReturn(elem0).when(elem0).asPrimitive(); + final VariantPrimitive elem1 = mock(VariantPrimitive.class); + when(elem1.type()).thenReturn(PhysicalType.STRING); + doReturn("b").when(elem1).get(); + doReturn(elem1).when(elem1).asPrimitive(); + + final VariantArray variantArr = mock(VariantArray.class); + when(variantArr.type()).thenReturn(PhysicalType.ARRAY); + when(variantArr.numElements()).thenReturn(2); + when(variantArr.get(0)).thenReturn(elem0); + when(variantArr.get(1)).thenReturn(elem1); + when(variantArr.asArray()).thenReturn(variantArr); + + final Variant variant = mock(Variant.class); + when(variant.value()).thenReturn(variantArr); + + final Record record = GenericRecord.create(schema); + record.setField("id", 1); + record.setField("tags", variant); + + final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of("id")); + final Event event = converter.convert(record, schema, "INSERT", 12345L); + + @SuppressWarnings("unchecked") + final List tags = (List) event.get("tags", Object.class); + assertThat(tags.get(0), equalTo("a")); + assertThat(tags.get(1), equalTo("b")); + } +} diff --git a/settings.gradle b/settings.gradle index a5a2a976a2..3824f3c9fe 100644 --- a/settings.gradle +++ b/settings.gradle @@ -192,6 +192,7 @@ include 'data-prepper-plugins:split-event-processor' include 'data-prepper-plugins:flatten-processor' include 'data-prepper-plugins:mongodb' include 'data-prepper-plugins:rds-source' +include 'data-prepper-plugins:iceberg-source' include 'data-prepper-plugins:http-source-common' include 'data-prepper-plugins:http-common' include 'data-prepper-plugins:aws-lambda'