Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,45 @@ public Server createHTTPServer(final Buffer<Record<Log>> buffer,
return sb.build();
}

/**
* Creates a lightweight HTTP server with TLS support and optional authentication.
* Intended for internal node-to-node communication (e.g. shuffle data transfer)
* where throttling, health checks, and buffer integration are not needed.
*
* @param certificateProvider TLS certificate provider, or null if SSL is disabled
* @param authenticationProvider authentication decorator, or null to skip authentication
* @param annotatedService Armeria annotated service to register
* @param path base path for the annotated service
* @return configured Armeria Server
*/
public Server createHTTPServer(
final CertificateProvider certificateProvider,
final ArmeriaHttpAuthenticationProvider authenticationProvider,
final Object annotatedService,
final String path) {
final ServerBuilder sb = Server.builder();
sb.disableServerHeader();

if (serverConfiguration.isSsl()) {
LOG.info("Creating {} with SSL/TLS enabled.", sourceName);
final Certificate certificate = certificateProvider.getCertificate();
sb.https(serverConfiguration.getPort()).tls(
new ByteArrayInputStream(certificate.getCertificate().getBytes(StandardCharsets.UTF_8)),
new ByteArrayInputStream(certificate.getPrivateKey().getBytes(StandardCharsets.UTF_8)));
} else {
LOG.warn("Creating {} without SSL/TLS. This is not secure.", sourceName);
sb.http(serverConfiguration.getPort());
}

if (authenticationProvider != null) {
authenticationProvider.getAuthenticationDecorator().ifPresent(sb::decorator);
}

sb.annotatedService(path, annotatedService);

return sb.build();
}

private GrpcExceptionHandlerFunction createGrpExceptionHandler() {
RetryInfoConfig retryInfo = serverConfiguration.getRetryInfo() != null
? serverConfiguration.getRetryInfo()
Expand Down
8 changes: 8 additions & 0 deletions data-prepper-plugins/iceberg-source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ dependencies {
implementation project(':data-prepper-api')
implementation project(':data-prepper-plugins:buffer-common')
implementation project(':data-prepper-plugins:aws-plugin-api')
implementation project(':data-prepper-plugins:common')
implementation project(':data-prepper-plugins:armeria-common')
implementation project(':data-prepper-plugins:http-common')
implementation project(':data-prepper-plugins:http-source-common')

implementation 'org.apache.iceberg:iceberg-core:1.10.1'
implementation 'org.apache.iceberg:iceberg-data:1.10.1'
Expand All @@ -40,6 +44,10 @@ dependencies {
implementation libs.hadoop.common
implementation 'org.apache.orc:orc-core:1.9.5'

implementation libs.armeria.core

implementation 'org.lz4:lz4-java:1.8.0'

implementation 'software.amazon.awssdk:glue'
implementation 'software.amazon.awssdk:s3'
implementation 'software.amazon.awssdk:sts'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,85 @@ void cdc_delete_produces_delete_event() throws Exception {
}
}

/**
* When a partition column is updated (e.g. region US -> EU), Iceberg produces
* a DELETE in the old partition and an INSERT in the new partition. The shuffle
* routes both to the same node by identifier_columns hash, enabling correct
* UPDATE merge across partitions.
*/
@Test
void cdc_partition_column_update_correctly_handled_by_shuffle() throws Exception {
final Schema partitionedSchema = new Schema(
Types.NestedField.required(1, "id", Types.IntegerType.get()),
Types.NestedField.required(2, "name", Types.StringType.get()),
Types.NestedField.required(3, "region", Types.StringType.get())
);

final String partitionedTable = "partitioned_users";
helper.dropTable(TEST_NAMESPACE, partitionedTable);
final Table table = helper.createPartitionedTable(TEST_NAMESPACE, partitionedTable,
partitionedSchema, org.apache.iceberg.PartitionSpec.builderFor(partitionedSchema).identity("region").build());

// Insert initial data: id=1 in US, id=2 in EU
final DataFile usFile = helper.appendRows(table, List.of(
helper.newRecord(partitionedSchema, 1, "Alice", "US")
));
final DataFile euFile = helper.appendRows(table, List.of(
helper.newRecord(partitionedSchema, 2, "Bob", "EU")
));

final String fullTableName = TEST_NAMESPACE + "." + partitionedTable;
final IcebergService service = createServiceForTable(fullTableName, List.of("id"), false);
final Buffer<org.opensearch.dataprepper.model.record.Record<Event>> buffer = createMockBuffer();
service.start(buffer);

try {
// Wait for initial load (2 rows from 2 separate appends)
await().atMost(30, TimeUnit.SECONDS)
.untilAsserted(() -> assertThat(receivedRecords, hasSize(greaterThanOrEqualTo(2))));
final int afterInitialLoad = receivedRecords.size();

// UPDATE: move id=1 from US to EU (partition column change)
// Simulate CoW: delete old US file, add new EU file with id=1 in single overwrite
table.refresh();
final DataFile newEuFile = helper.writeDataFile(table, List.of(
helper.newRecord(partitionedSchema, 1, "Alice", "EU"),
helper.newRecord(partitionedSchema, 2, "Bob", "EU")
));
table.newOverwrite()
.deleteFile(usFile)
.deleteFile(euFile)
.addFile(newEuFile)
.commit();

// Wait for CDC events
await().atMost(60, TimeUnit.SECONDS)
.untilAsserted(() -> assertThat(receivedRecords, hasSize(greaterThanOrEqualTo(afterInitialLoad + 1))));

final List<org.opensearch.dataprepper.model.record.Record<Event>> cdcEvents =
receivedRecords.subList(afterInitialLoad, receivedRecords.size());

// With shuffle: the DELETE(id=1, US) and INSERT(id=1, EU) are routed to the same
// node by identifier_columns hash. UPDATE merge detects the pair and drops the DELETE.
// Only the INSERT (INDEX action) should remain.
boolean foundAliceEU = false;
for (final org.opensearch.dataprepper.model.record.Record<Event> record : cdcEvents) {
final Event event = record.getData();
if ("Alice".equals(event.get("name", String.class))
&& "EU".equals(event.get("region", String.class))) {
assertThat(event.getMetadata().getAttribute("bulk_action"), equalTo("index"));
foundAliceEU = true;
}
}
assertThat("Expected INSERT event for Alice in EU after partition column update",
foundAliceEU, equalTo(true));

} finally {
service.shutdown();
helper.dropTable(TEST_NAMESPACE, partitionedTable);
}
}

/**
* Common setup for CDC tests: creates a table with 3 rows (Alice, Bob, Carol),
* starts IcebergService, and waits for initial load to complete.
Expand Down Expand Up @@ -284,20 +363,26 @@ private Buffer<org.opensearch.dataprepper.model.record.Record<Event>> createMock
}

private IcebergService createService(final boolean disableExport) throws Exception {
final String fullTableName = TEST_NAMESPACE + "." + TEST_TABLE;
return createServiceForTable(TEST_NAMESPACE + "." + TEST_TABLE, List.of("id"), disableExport);
}

private IcebergService createServiceForTable(final String fullTableName,
final List<String> identifierColumns,
final boolean disableExport) throws Exception {

// Build config via reflection since fields are private
final IcebergSourceConfig sourceConfig = mock(IcebergSourceConfig.class);
final TableConfig tableConfig = mock(TableConfig.class);

when(tableConfig.getTableName()).thenReturn(fullTableName);
when(tableConfig.getCatalog()).thenReturn(helper.catalogProperties());
when(tableConfig.getIdentifierColumns()).thenReturn(List.of("id"));
when(tableConfig.getIdentifierColumns()).thenReturn(identifierColumns);
when(tableConfig.isDisableExport()).thenReturn(disableExport);

when(sourceConfig.getTables()).thenReturn(List.of(tableConfig));
when(sourceConfig.getPollingInterval()).thenReturn(Duration.ofSeconds(5));
lenient().when(sourceConfig.isAcknowledgmentsEnabled()).thenReturn(false);
when(sourceConfig.getShuffleConfig()).thenReturn(createTestShuffleConfig());

final EnhancedSourceCoordinator coordinator = createInMemoryCoordinator();
coordinator.createPartition(new LeaderPartition());
Expand All @@ -306,6 +391,16 @@ private IcebergService createService(final boolean disableExport) throws Excepti
org.opensearch.dataprepper.event.TestEventFactory.getTestEventFactory());
}

private org.opensearch.dataprepper.plugins.source.iceberg.shuffle.ShuffleConfig createTestShuffleConfig() {
try {
final com.fasterxml.jackson.databind.ObjectMapper mapper = new com.fasterxml.jackson.databind.ObjectMapper();
return mapper.readValue("{\"ssl\": false, \"port\": 4995}",
org.opensearch.dataprepper.plugins.source.iceberg.shuffle.ShuffleConfig.class);
} catch (final Exception e) {
throw new RuntimeException(e);
}
}

private EnhancedSourceCoordinator createInMemoryCoordinator() {
final InMemorySourceCoordinationStore store = new InMemorySourceCoordinationStore(
new org.opensearch.dataprepper.model.configuration.PluginSetting("in_memory", Collections.emptyMap()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ public Table createTable(final String namespace, final String tableName, final S
return catalog.createTable(TableIdentifier.of(namespace, tableName), schema, PartitionSpec.unpartitioned());
}

public Table createPartitionedTable(final String namespace, final String tableName,
final Schema schema, final PartitionSpec spec) {
return catalog.createTable(TableIdentifier.of(namespace, tableName), schema, spec);
}

public void dropTable(final String namespace, final String tableName) {
try {
catalog.dropTable(TableIdentifier.of(namespace, tableName), true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,15 @@
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.plugins.source.iceberg.leader.LeaderScheduler;
import org.opensearch.dataprepper.plugins.source.iceberg.shuffle.LocalDiskShuffleStorage;
import org.opensearch.dataprepper.plugins.source.iceberg.shuffle.ShuffleHttpServer;
import org.opensearch.dataprepper.plugins.source.iceberg.shuffle.ShuffleHttpService;
import org.opensearch.dataprepper.plugins.source.iceberg.shuffle.ShuffleStorage;
import org.opensearch.dataprepper.plugins.source.iceberg.worker.ChangelogWorker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand All @@ -43,6 +48,8 @@ public class IcebergService {
private final PluginMetrics pluginMetrics;
private final AcknowledgementSetManager acknowledgementSetManager;
private final EventFactory eventFactory;
private final ShuffleStorage shuffleStorage;
private ShuffleHttpServer shuffleHttpServer;
private ExecutorService executor;

public IcebergService(final EnhancedSourceCoordinator sourceCoordinator,
Expand All @@ -55,11 +62,19 @@ public IcebergService(final EnhancedSourceCoordinator sourceCoordinator,
this.pluginMetrics = pluginMetrics;
this.acknowledgementSetManager = acknowledgementSetManager;
this.eventFactory = eventFactory;
final Path shuffleBaseDir = Path.of(System.getProperty("java.io.tmpdir"), "data-prepper-shuffle");
this.shuffleStorage = new LocalDiskShuffleStorage(shuffleBaseDir);
this.shuffleStorage.cleanupAll();
}

public void start(final Buffer<Record<Event>> buffer) {
LOG.info("Starting Iceberg service");

// Start shuffle HTTP server
final ShuffleHttpService shuffleHttpService = new ShuffleHttpService(shuffleStorage);
shuffleHttpServer = new ShuffleHttpServer(sourceConfig.getShuffleConfig(), shuffleHttpService);
shuffleHttpServer.start();

// Load all tables upfront. Single point of Table lifecycle management.
final Map<String, Table> tables = new HashMap<>();
final Map<String, TableConfig> tableConfigs = new HashMap<>();
Expand Down Expand Up @@ -100,9 +115,10 @@ public void start(final Buffer<Record<Event>> buffer) {
// Start schedulers with shared table references
final List<Runnable> runnableList = new ArrayList<>();

runnableList.add(new LeaderScheduler(sourceCoordinator, tableConfigs, sourceConfig.getPollingInterval(), tables));
runnableList.add(new LeaderScheduler(sourceCoordinator, tableConfigs,
sourceConfig.getPollingInterval(), tables, shuffleStorage, sourceConfig.getShuffleConfig()));
runnableList.add(new ChangelogWorker(
sourceCoordinator, sourceConfig, tables, tableConfigs, buffer, acknowledgementSetManager, eventFactory));
sourceCoordinator, sourceConfig, tables, tableConfigs, buffer, acknowledgementSetManager, eventFactory, shuffleStorage));

executor = Executors.newFixedThreadPool(runnableList.size());
runnableList.forEach(executor::submit);
Expand All @@ -113,6 +129,9 @@ public void shutdown() {
if (executor != null) {
executor.shutdownNow();
}
if (shuffleHttpServer != null) {
shuffleHttpServer.stop();
}
}

private void validateCoWTable(final Table table, final String tableName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotEmpty;
import org.opensearch.dataprepper.plugins.source.iceberg.shuffle.ShuffleConfig;

import java.time.Duration;
import java.util.List;
Expand All @@ -32,6 +33,10 @@ public class IcebergSourceConfig {
@JsonProperty("acknowledgments")
private boolean acknowledgments = true;

@JsonProperty("shuffle")
@Valid
private ShuffleConfig shuffleConfig = new ShuffleConfig();

public List<TableConfig> getTables() {
return tables;
}
Expand All @@ -43,4 +48,8 @@ public Duration getPollingInterval() {
public boolean isAcknowledgmentsEnabled() {
return acknowledgments;
}

public ShuffleConfig getShuffleConfig() {
return shuffleConfig;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import org.opensearch.dataprepper.plugins.source.iceberg.coordination.partition.GlobalState;
import org.opensearch.dataprepper.plugins.source.iceberg.coordination.partition.InitialLoadTaskPartition;
import org.opensearch.dataprepper.plugins.source.iceberg.coordination.partition.LeaderPartition;
import org.opensearch.dataprepper.plugins.source.iceberg.coordination.partition.ShuffleReadPartition;
import org.opensearch.dataprepper.plugins.source.iceberg.coordination.partition.ShuffleWritePartition;

import java.util.function.Function;

Expand All @@ -26,14 +28,19 @@ public EnhancedSourcePartition apply(final SourcePartitionStoreItem partitionSto
final String sourceIdentifier = partitionStoreItem.getSourceIdentifier();
final String partitionType = sourceIdentifier.substring(sourceIdentifier.lastIndexOf('|') + 1);

if (LeaderPartition.PARTITION_TYPE.equals(partitionType)) {
return new LeaderPartition(partitionStoreItem);
} else if (ChangelogTaskPartition.PARTITION_TYPE.equals(partitionType)) {
return new ChangelogTaskPartition(partitionStoreItem);
} else if (InitialLoadTaskPartition.PARTITION_TYPE.equals(partitionType)) {
return new InitialLoadTaskPartition(partitionStoreItem);
} else {
return new GlobalState(partitionStoreItem);
switch (partitionType) {
case LeaderPartition.PARTITION_TYPE:
return new LeaderPartition(partitionStoreItem);
case ChangelogTaskPartition.PARTITION_TYPE:
return new ChangelogTaskPartition(partitionStoreItem);
case InitialLoadTaskPartition.PARTITION_TYPE:
return new InitialLoadTaskPartition(partitionStoreItem);
case ShuffleWritePartition.PARTITION_TYPE:
return new ShuffleWritePartition(partitionStoreItem);
case ShuffleReadPartition.PARTITION_TYPE:
return new ShuffleReadPartition(partitionStoreItem);
default:
return new GlobalState(partitionStoreItem);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.iceberg.coordination.partition;

import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition;
import org.opensearch.dataprepper.plugins.source.iceberg.coordination.state.ShuffleReadProgressState;

import java.util.Optional;

public class ShuffleReadPartition extends EnhancedSourcePartition<ShuffleReadProgressState> {

public static final String PARTITION_TYPE = "SHUFFLE_READ";

private final String partitionKey;
private final ShuffleReadProgressState state;

public ShuffleReadPartition(final String partitionKey, final ShuffleReadProgressState state) {
this.partitionKey = partitionKey;
this.state = state;
}

public ShuffleReadPartition(final SourcePartitionStoreItem sourcePartitionStoreItem) {
setSourcePartitionStoreItem(sourcePartitionStoreItem);
this.partitionKey = sourcePartitionStoreItem.getSourcePartitionKey();
this.state = convertStringToPartitionProgressState(ShuffleReadProgressState.class,
sourcePartitionStoreItem.getPartitionProgressState());
}

@Override
public String getPartitionType() { return PARTITION_TYPE; }

@Override
public String getPartitionKey() { return partitionKey; }

@Override
public Optional<ShuffleReadProgressState> getProgressState() { return Optional.of(state); }
}
Loading
Loading