Skip to content

Commit de7a095

Browse files
author
XuQianJin-Stars
committed
[HUDI-2937] Introduce a pulsar implementation of hoodie write commit callback
1 parent a0af660 commit de7a095

3 files changed

Lines changed: 30 additions & 6 deletions

File tree

hudi-utilities/src/main/java/org/apache/hudi/utilities/callback/pulsar/HoodieWriteCommitPulsarCallback.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,6 @@ public HoodieWriteCommitPulsarCallback(HoodieWriteConfig config) throws PulsarCl
7878
this.topic = config.getString(TOPIC);
7979
this.client = createClient(config);
8080
this.producer = createProducer(config);
81-
validatePulsarConfig();
8281
}
8382

8483
@Override
@@ -121,8 +120,9 @@ public Producer<String> createProducer(HoodieConfig hoodieConfig) throws PulsarC
121120
.create();
122121
}
123122

124-
public static PulsarClient createClient(HoodieConfig hoodieConfig) throws PulsarClientException {
125-
String serviceUrl = hoodieConfig.getString(BROKER_SERVICE_URL);
123+
public PulsarClient createClient(HoodieConfig hoodieConfig) throws PulsarClientException {
124+
validatePulsarConfig();
125+
126126
Duration operationTimeout =
127127
parseDuration(hoodieConfig.getString(OPERATION_TIMEOUT));
128128
Duration connectionTimeout =

hudi-utilities/src/main/java/org/apache/hudi/utilities/callback/pulsar/HoodieWriteCommitPulsarCallbackConfig.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,4 +100,19 @@ public class HoodieWriteCommitPulsarCallbackConfig extends HoodieConfig {
100100
.sinceVersion("0.11.0")
101101
.withDocumentation("Duration of keeping alive interval for each "
102102
+ "client broker connection.");
103+
104+
/**
105+
* Set default value for {@link HoodieWriteCommitPulsarCallbackConfig} if needed.
106+
*/
107+
public static void setCallbackPulsarConfigIfNeeded(HoodieConfig config) {
108+
config.setDefaultValue(PRODUCER_ROUTE_MODE);
109+
config.setDefaultValue(OPERATION_TIMEOUT);
110+
config.setDefaultValue(CONNECTION_TIMEOUT);
111+
config.setDefaultValue(REQUEST_TIMEOUT);
112+
config.setDefaultValue(KEEPALIVE_INTERVAL);
113+
config.setDefaultValue(PRODUCER_SEND_TIMEOUT);
114+
config.setDefaultValue(PRODUCER_PENDING_QUEUE_SIZE);
115+
config.setDefaultValue(PRODUCER_PENDING_SIZE);
116+
config.setDefaultValue(PRODUCER_BLOCK_QUEUE_FULL);
117+
}
103118
}

hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@
6060
import org.apache.hudi.utilities.UtilHelpers;
6161
import org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallback;
6262
import org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallbackConfig;
63+
import org.apache.hudi.utilities.callback.pulsar.HoodieWriteCommitPulsarCallback;
64+
import org.apache.hudi.utilities.callback.pulsar.HoodieWriteCommitPulsarCallbackConfig;
6365
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.Config;
6466
import org.apache.hudi.utilities.exception.HoodieDeltaStreamerException;
6567
import org.apache.hudi.utilities.schema.DelegatingSchemaProvider;
@@ -729,9 +731,16 @@ private HoodieWriteConfig getHoodieClientConfig(Schema schema) {
729731

730732
HoodieWriteConfig config = builder.build();
731733

732-
// set default value for {@link HoodieWriteCommitKafkaCallbackConfig} if needed.
733-
if (config.writeCommitCallbackOn() && HoodieWriteCommitKafkaCallback.class.getName().equals(config.getCallbackClass())) {
734-
HoodieWriteCommitKafkaCallbackConfig.setCallbackKafkaConfigIfNeeded(config);
734+
if (config.writeCommitCallbackOn()) {
735+
// set default value for {@link HoodieWriteCommitKafkaCallbackConfig} if needed.
736+
if (HoodieWriteCommitKafkaCallback.class.getName().equals(config.getCallbackClass())) {
737+
HoodieWriteCommitKafkaCallbackConfig.setCallbackKafkaConfigIfNeeded(config);
738+
}
739+
740+
// set default value for {@link HoodieWriteCommitPulsarCallbackConfig} if needed.
741+
if (HoodieWriteCommitPulsarCallback.class.getName().equals(config.getCallbackClass())) {
742+
HoodieWriteCommitPulsarCallbackConfig.setCallbackPulsarConfigIfNeeded(config);
743+
}
735744
}
736745

737746
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.from(props);

0 commit comments

Comments
 (0)