Skip to content

Commit 54ff252

Browse files
committed
Merge branch 'master' into HUDI-2150
2 parents 584bff3 + 26c967b commit 54ff252

9 files changed

Lines changed: 796 additions & 5 deletions

File tree

Lines changed: 254 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,254 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hudi.cli;
20+
21+
import org.apache.hadoop.conf.Configuration;
22+
import org.apache.hadoop.fs.FileSystem;
23+
import org.apache.hadoop.fs.Path;
24+
import org.apache.hudi.DataSourceWriteOptions;
25+
import org.apache.hudi.client.SparkRDDWriteClient;
26+
import org.apache.hudi.client.common.HoodieSparkEngineContext;
27+
import org.apache.hudi.common.config.TypedProperties;
28+
import org.apache.hudi.common.table.HoodieTableConfig;
29+
import org.apache.hudi.common.table.HoodieTableMetaClient;
30+
import org.apache.hudi.common.util.Option;
31+
import org.apache.hudi.common.util.ReflectionUtils;
32+
import org.apache.hudi.common.util.StringUtils;
33+
import org.apache.hudi.common.util.ValidationUtils;
34+
import org.apache.hudi.config.HoodieCompactionConfig;
35+
import org.apache.hudi.config.HoodieIndexConfig;
36+
import org.apache.hudi.config.HoodieWriteConfig;
37+
import org.apache.hudi.exception.HoodieException;
38+
import org.apache.hudi.hive.HiveSyncConfig;
39+
import org.apache.hudi.hive.HiveSyncTool;
40+
import org.apache.hudi.index.HoodieIndex;
41+
import org.apache.hudi.sync.common.HoodieSyncConfig;
42+
import org.apache.log4j.LogManager;
43+
import org.apache.log4j.Logger;
44+
import org.apache.spark.api.java.JavaSparkContext;
45+
46+
import java.io.IOException;
47+
import java.io.Serializable;
48+
import java.util.HashMap;
49+
50+
import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER;
51+
52+
/**
53+
* Performs bootstrap from a non-hudi source.
54+
*/
55+
public class BootstrapExecutorUtils implements Serializable {
56+
57+
private static final Logger LOG = LogManager.getLogger(BootstrapExecutorUtils.class);
58+
59+
/**
60+
* Config.
61+
*/
62+
private final Config cfg;
63+
64+
/**
65+
* Spark context.
66+
*/
67+
private final transient JavaSparkContext jssc;
68+
69+
/**
70+
* Bag of properties with source, hoodie client, key generator etc.
71+
*/
72+
private final TypedProperties props;
73+
74+
/**
75+
* Hadoop Configuration.
76+
*/
77+
private final Configuration configuration;
78+
79+
/**
80+
* Bootstrap Configuration.
81+
*/
82+
private final HoodieWriteConfig bootstrapConfig;
83+
84+
/**
85+
* FileSystem instance.
86+
*/
87+
private final transient FileSystem fs;
88+
89+
private final String bootstrapBasePath;
90+
91+
public static final String CHECKPOINT_KEY = HoodieWriteConfig.DELTASTREAMER_CHECKPOINT_KEY;
92+
93+
/**
94+
* Bootstrap Executor.
95+
*
96+
* @param cfg DeltaStreamer Config
97+
* @param jssc Java Spark Context
98+
* @param fs File System
99+
* @param properties Bootstrap Writer Properties
100+
* @throws IOException
101+
*/
102+
public BootstrapExecutorUtils(Config cfg, JavaSparkContext jssc, FileSystem fs, Configuration conf,
103+
TypedProperties properties) throws IOException {
104+
this.cfg = cfg;
105+
this.jssc = jssc;
106+
this.fs = fs;
107+
this.configuration = conf;
108+
this.props = properties;
109+
110+
ValidationUtils.checkArgument(properties.containsKey(HoodieTableConfig.BOOTSTRAP_BASE_PATH
111+
.key()),
112+
HoodieTableConfig.BOOTSTRAP_BASE_PATH.key() + " must be specified.");
113+
this.bootstrapBasePath = properties.getString(HoodieTableConfig.BOOTSTRAP_BASE_PATH.key());
114+
115+
// Add more defaults if full bootstrap requested
116+
this.props.putIfAbsent(DataSourceWriteOptions.PAYLOAD_CLASS_NAME().key(),
117+
DataSourceWriteOptions.PAYLOAD_CLASS_NAME().defaultValue());
118+
/**
119+
* Schema provider that supplies the command for reading the input and writing out the target table.
120+
*/
121+
SchemaProvider schemaProvider = createSchemaProvider(cfg.schemaProviderClass, props, jssc);
122+
HoodieWriteConfig.Builder builder =
123+
HoodieWriteConfig.newBuilder().withPath(cfg.basePath)
124+
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withInlineCompaction(false).build())
125+
.forTable(cfg.tableName)
126+
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
127+
.withAutoCommit(true)
128+
.withProps(props);
129+
130+
if (null != schemaProvider && null != schemaProvider.getTargetSchema()) {
131+
builder = builder.withSchema(schemaProvider.getTargetSchema().toString());
132+
}
133+
this.bootstrapConfig = builder.build();
134+
LOG.info("Created bootstrap executor with configs : " + bootstrapConfig.getProps());
135+
}
136+
137+
public static SchemaProvider createSchemaProvider(String schemaProviderClass, TypedProperties cfg,
138+
JavaSparkContext jssc) throws IOException {
139+
try {
140+
return StringUtils.isNullOrEmpty(schemaProviderClass) ? null
141+
: (SchemaProvider) ReflectionUtils.loadClass(schemaProviderClass, cfg, jssc);
142+
} catch (Throwable e) {
143+
throw new IOException("Could not load schema provider class " + schemaProviderClass, e);
144+
}
145+
}
146+
147+
/**
148+
* Executes Bootstrap.
149+
*/
150+
public void execute() throws IOException {
151+
initializeTable();
152+
153+
try (SparkRDDWriteClient bootstrapClient = new SparkRDDWriteClient(new HoodieSparkEngineContext(jssc), bootstrapConfig)) {
154+
HashMap<String, String> checkpointCommitMetadata = new HashMap<>();
155+
checkpointCommitMetadata.put(CHECKPOINT_KEY, Config.checkpoint);
156+
bootstrapClient.bootstrap(Option.of(checkpointCommitMetadata));
157+
syncHive();
158+
}
159+
}
160+
161+
/**
162+
* Sync to Hive.
163+
*/
164+
private void syncHive() {
165+
if (cfg.enableHiveSync) {
166+
TypedProperties metaProps = new TypedProperties();
167+
metaProps.putAll(props);
168+
metaProps.put(HoodieSyncConfig.META_SYNC_BASE_PATH.key(), cfg.basePath);
169+
metaProps.put(HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT.key(), cfg.baseFileFormat);
170+
if (props.getBoolean(HiveSyncConfig.HIVE_SYNC_BUCKET_SYNC.key(), HiveSyncConfig.HIVE_SYNC_BUCKET_SYNC.defaultValue())) {
171+
metaProps.put(HiveSyncConfig.HIVE_SYNC_BUCKET_SYNC_SPEC.key(), HiveSyncConfig.getBucketSpec(props.getString(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD.key()),
172+
props.getInteger(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key())));
173+
}
174+
175+
new HiveSyncTool(metaProps, configuration, fs).syncHoodieTable();
176+
}
177+
}
178+
179+
private void initializeTable() throws IOException {
180+
Path basePath = new Path(cfg.basePath);
181+
if (fs.exists(basePath)) {
182+
if (cfg.bootstrapOverwrite) {
183+
LOG.warn("Target base path already exists, overwrite it");
184+
fs.delete(basePath, true);
185+
} else {
186+
throw new HoodieException("target base path already exists at " + cfg.basePath
187+
+ ". Cannot bootstrap data on top of an existing table");
188+
}
189+
}
190+
HoodieTableMetaClient.withPropertyBuilder()
191+
.fromProperties(props)
192+
.setTableType(cfg.tableType)
193+
.setTableName(cfg.tableName)
194+
.setArchiveLogFolder(ARCHIVELOG_FOLDER.defaultValue())
195+
.setPayloadClassName(cfg.payloadClass)
196+
.setBaseFileFormat(cfg.baseFileFormat)
197+
.setBootstrapIndexClass(cfg.bootstrapIndexClass)
198+
.setBootstrapBasePath(bootstrapBasePath)
199+
.initTable(new Configuration(jssc.hadoopConfiguration()), cfg.basePath);
200+
}
201+
202+
public static class Config {
203+
private String tableName;
204+
private String tableType;
205+
206+
private String basePath;
207+
208+
private String baseFileFormat;
209+
private String bootstrapIndexClass;
210+
private String schemaProviderClass;
211+
private String payloadClass;
212+
private Boolean enableHiveSync;
213+
214+
private Boolean bootstrapOverwrite;
215+
216+
public static String checkpoint = null;
217+
218+
public void setTableName(String tableName) {
219+
this.tableName = tableName;
220+
}
221+
222+
public void setTableType(String tableType) {
223+
this.tableType = tableType;
224+
}
225+
226+
public void setBasePath(String basePath) {
227+
this.basePath = basePath;
228+
}
229+
230+
public void setBaseFileFormat(String baseFileFormat) {
231+
this.baseFileFormat = baseFileFormat;
232+
}
233+
234+
public void setBootstrapIndexClass(String bootstrapIndexClass) {
235+
this.bootstrapIndexClass = bootstrapIndexClass;
236+
}
237+
238+
public void setSchemaProviderClass(String schemaProviderClass) {
239+
this.schemaProviderClass = schemaProviderClass;
240+
}
241+
242+
public void setPayloadClass(String payloadClass) {
243+
this.payloadClass = payloadClass;
244+
}
245+
246+
public void setEnableHiveSync(Boolean enableHiveSync) {
247+
this.enableHiveSync = enableHiveSync;
248+
}
249+
250+
public void setBootstrapOverwrite(Boolean bootstrapOverwrite) {
251+
this.bootstrapOverwrite = bootstrapOverwrite;
252+
}
253+
}
254+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hudi.cli;
20+
21+
import org.apache.avro.Schema;
22+
import org.apache.hudi.ApiMaturityLevel;
23+
import org.apache.hudi.PublicAPIClass;
24+
import org.apache.hudi.PublicAPIMethod;
25+
import org.apache.hudi.common.config.TypedProperties;
26+
import org.apache.spark.api.java.JavaSparkContext;
27+
28+
import java.io.Serializable;
29+
30+
/**
31+
* Class to provide schema for reading data and also writing into a Hoodie table,
32+
* used by deltastreamer (runs over Spark).
33+
*/
34+
@PublicAPIClass(maturity = ApiMaturityLevel.STABLE)
35+
public abstract class SchemaProvider implements Serializable {
36+
37+
protected TypedProperties config;
38+
39+
protected JavaSparkContext jssc;
40+
41+
public SchemaProvider(TypedProperties props) {
42+
this(props, null);
43+
}
44+
45+
protected SchemaProvider(TypedProperties props, JavaSparkContext jssc) {
46+
this.config = props;
47+
this.jssc = jssc;
48+
}
49+
50+
@PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
51+
public abstract Schema getSourceSchema();
52+
53+
@PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
54+
public Schema getTargetSchema() {
55+
// by default, use source schema as target for hoodie table as well
56+
return getSourceSchema();
57+
}
58+
}

hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,9 @@ object HoodieProcedures {
5555
mapBuilder.put(StatsWriteAmplificationProcedure.NAME, StatsWriteAmplificationProcedure.builder)
5656
mapBuilder.put(StatsFileSizeProcedure.NAME, StatsFileSizeProcedure.builder)
5757
mapBuilder.put(HdfsParquetImportProcedure.NAME, HdfsParquetImportProcedure.builder)
58+
mapBuilder.put(RunBootstrapProcedure.NAME, RunBootstrapProcedure.builder)
59+
mapBuilder.put(ShowBootstrapMappingProcedure.NAME, ShowBootstrapMappingProcedure.builder)
60+
mapBuilder.put(ShowBootstrapPartitionsProcedure.NAME, ShowBootstrapPartitionsProcedure.builder)
5861
mapBuilder.build
5962
}
6063
}

0 commit comments

Comments
 (0)