Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
788dab5
Add Iceberg CDC source plugin (#6552)
lawofcycles Feb 23, 2026
8ee1b65
simplify table name parse
lawofcycles Mar 1, 2026
dc5b930
change initialLoad config to disableExport
lawofcycles Mar 6, 2026
46ebd49
updte license header
lawofcycles Mar 6, 2026
5238bf2
remove accidentally committed log files
lawofcycles Mar 6, 2026
39f5d88
add unit test LeaderPartitionTest
lawofcycles Mar 6, 2026
4c85711
add unit test for ChangelogTaskPartitionTest
lawofcycles Mar 6, 2026
d136999
add unit test for InitialLoadTaskPartition
lawofcycles Mar 6, 2026
5bc32e6
add GlobalState unit test
lawofcycles Mar 6, 2026
3530318
make properties camel case
lawofcycles Mar 6, 2026
1c54bdf
remove unnecessary property
lawofcycles Mar 6, 2026
67fd0cf
refactor LeaderScheduler
lawofcycles Mar 6, 2026
329ae63
make Iceberg source plugin experimental
lawofcycles Mar 6, 2026
13700ff
refactor TaskGrouper
lawofcycles Mar 6, 2026
689392e
enhance ChangelogWorker
lawofcycles Mar 6, 2026
39bae56
add incrementSnapshotCompletionCount even when ackEnabled = true
lawofcycles Mar 7, 2026
b65deb7
enhance CarryoverRemover to handle duplicate insert, delete pairs
lawofcycles Mar 7, 2026
fc05840
Fix struct field name loss and add Variant type support in ChangelogR…
lawofcycles Mar 7, 2026
e74fb01
Validate identifier_columns against table schema at startup
lawofcycles Mar 7, 2026
368320f
remove unnecessary java plugin definition
lawofcycles Mar 7, 2026
35c4170
fix ack mechanism
lawofcycles Mar 8, 2026
3343cd6
add docker based IntegTest
lawofcycles Mar 8, 2026
094e5a2
Merge UPDATE pairs into single INDEX for CDC events
lawofcycles Mar 11, 2026
3dc4a7d
Remove unused imports in ChangelogRecordConverterTest
lawofcycles Mar 11, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions data-prepper-plugins/iceberg-source/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

sourceSets {
integrationTest {
java {
compileClasspath += main.output + test.output
runtimeClasspath += main.output + test.output
srcDir file('src/integrationTest/java')
}
resources.srcDir file('src/integrationTest/resources')
}
}

configurations {
integrationTestImplementation.extendsFrom testImplementation
integrationTestRuntime.extendsFrom testRuntime
}

dependencies {
implementation project(':data-prepper-api')
implementation project(':data-prepper-plugins:buffer-common')
implementation project(':data-prepper-plugins:aws-plugin-api')

implementation 'org.apache.iceberg:iceberg-core:1.10.1'
implementation 'org.apache.iceberg:iceberg-data:1.10.1'
implementation 'org.apache.iceberg:iceberg-parquet:1.10.1'
implementation 'org.apache.iceberg:iceberg-orc:1.10.1'
implementation 'org.apache.iceberg:iceberg-aws:1.10.1'

implementation libs.parquet.hadoop
implementation libs.avro.core
implementation libs.hadoop.common
implementation 'org.apache.orc:orc-core:1.9.5'

implementation 'software.amazon.awssdk:glue'
implementation 'software.amazon.awssdk:s3'
implementation 'software.amazon.awssdk:sts'

implementation 'com.fasterxml.jackson.core:jackson-databind'

testImplementation project(':data-prepper-test:test-common')

integrationTestImplementation project(':data-prepper-plugins:in-memory-source-coordination-store')
integrationTestImplementation project(':data-prepper-core')
integrationTestImplementation project(':data-prepper-event')
integrationTestImplementation 'org.awaitility:awaitility:4.2.2'
}

task integrationTest(type: Test) {
group = 'verification'
testClassesDirs = sourceSets.integrationTest.output.classesDirs

useJUnitPlatform()

classpath = sourceSets.integrationTest.runtimeClasspath
systemProperty 'tests.iceberg.rest.uri', System.getProperty('tests.iceberg.rest.uri', 'http://localhost:8181')
systemProperty 'tests.iceberg.s3.endpoint', System.getProperty('tests.iceberg.s3.endpoint', 'http://localhost:8333')
systemProperty 'tests.iceberg.s3.accessKey', System.getProperty('tests.iceberg.s3.accessKey', 'admin')
systemProperty 'tests.iceberg.s3.secretKey', System.getProperty('tests.iceberg.s3.secretKey', 'password')
systemProperty 'tests.iceberg.s3.region', System.getProperty('tests.iceberg.s3.region', 'us-east-1')

filter {
includeTestsMatching '*IT'
}
}
59 changes: 59 additions & 0 deletions data-prepper-plugins/iceberg-source/docker/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Copyright OpenSearch Contributors
# SPDX-License-Identifier: Apache-2.0
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.

services:
seaweedfs:
image: chrislusf/seaweedfs:4.06
ports:
- "8333:8333"
command: server -s3 -s3.port=8333 -s3.config=/etc/seaweedfs/s3.json -master.port=9333 -volume.port=8080 -filer -filer.port=8888
configs:
- source: s3-config
target: /etc/seaweedfs/s3.json
healthcheck:
test: ["CMD-SHELL", "curl -sf http://localhost:8333 || exit 1"]
interval: 2s
timeout: 5s
retries: 15
start_period: 10s

create-bucket:
image: chrislusf/seaweedfs:4.06
depends_on:
seaweedfs:
condition: service_healthy
entrypoint: /bin/sh
command: >
-c "
until wget -q --spider http://seaweedfs:8333 2>/dev/null; do sleep 1; done;
/usr/bin/weed shell -master=seaweedfs:9333 -filer=seaweedfs:8888 <<EOF
s3.bucket.create -name warehouse
EOF
"

iceberg-rest:
image: apache/iceberg-rest-fixture
depends_on:
create-bucket:
condition: service_completed_successfully
ports:
- "8181:8181"
environment:
AWS_REGION: us-east-1
AWS_ACCESS_KEY_ID: admin
AWS_SECRET_ACCESS_KEY: password
CATALOG_WAREHOUSE: s3://warehouse/
CATALOG_IO__IMPL: org.apache.iceberg.aws.s3.S3FileIO
CATALOG_S3_ENDPOINT: http://seaweedfs:8333
CATALOG_S3_ACCESS__KEY__ID: admin
CATALOG_S3_SECRET__ACCESS__KEY: password
CATALOG_S3_PATH__STYLE__ACCESS: "true"

configs:
s3-config:
content: |
{ "identities": [ { "name": "anonymous", "credentials": [ { "accessKey": "admin", "secretKey": "password" } ], "actions": [ "Admin", "Read", "Write" ] } ] }
Loading
Loading