Skip to content

Commit a25c26d

Browse files
wangxianghuyuzhaojing
authored andcommitted
[HUDI-3523] Introduce AddColumnSchemaPostProcessor to support add columns to the end of a schema (#5031)
1 parent a91b03f commit a25c26d

2 files changed

Lines changed: 229 additions & 5 deletions

File tree

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hudi.utilities.schema;
20+
21+
import org.apache.hudi.common.config.ConfigProperty;
22+
import org.apache.hudi.common.config.TypedProperties;
23+
import org.apache.hudi.common.model.HoodieRecord;
24+
import org.apache.hudi.utilities.exception.HoodieSchemaPostProcessException;
25+
26+
import org.apache.avro.LogicalTypes;
27+
import org.apache.avro.Schema;
28+
import org.apache.log4j.LogManager;
29+
import org.apache.log4j.Logger;
30+
import org.apache.spark.api.java.JavaSparkContext;
31+
32+
import java.util.ArrayList;
33+
import java.util.List;
34+
import java.util.Locale;
35+
36+
/**
37+
* A {@link SchemaPostProcessor} use to add column to given schema. Currently. only supports adding one column at a time.
38+
* Users can specify the position of new column by config {@link Config#SCHEMA_POST_PROCESSOR_ADD_COLUMN_NEXT_PROP},
39+
* the new column will be added before this column.
40+
* <p>
41+
* Currently supported types : bytes, string, int, long, float, double, boolean, decimal
42+
*/
43+
public class AddColumnSchemaPostProcessor extends SchemaPostProcessor {
44+
45+
private static final Logger LOG = LogManager.getLogger(AddColumnSchemaPostProcessor.class);
46+
47+
public AddColumnSchemaPostProcessor(TypedProperties props, JavaSparkContext jssc) {
48+
super(props, jssc);
49+
}
50+
51+
/**
52+
* Configs supported.
53+
*/
54+
public static class Config {
55+
public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_NAME_PROP = ConfigProperty
56+
.key("hoodie.deltastreamer.schemaprovider.schema_post_processor.add.column.name")
57+
.noDefaultValue()
58+
.withDocumentation("New column's name");
59+
60+
public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_TYPE_PROP = ConfigProperty
61+
.key("hoodie.deltastreamer.schemaprovider.schema_post_processor.add.column.type")
62+
.noDefaultValue()
63+
.withDocumentation("New column's type");
64+
65+
public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_DOC_PROP = ConfigProperty
66+
.key("hoodie.deltastreamer.schemaprovider.schema_post_processor.add.column.doc")
67+
.noDefaultValue()
68+
.withDocumentation("New column's doc");
69+
70+
public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_DEFAULT_PROP = ConfigProperty
71+
.key("hoodie.deltastreamer.schemaprovider.schema_post_processor.add.column.default")
72+
.noDefaultValue()
73+
.withDocumentation("New column's default value");
74+
75+
public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_SIZE_PROP = ConfigProperty
76+
.key("hoodie.deltastreamer.schemaprovider.schema_post_processor.add.column.size")
77+
.noDefaultValue()
78+
.withDocumentation("New column's size, used in decimal type");
79+
80+
public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_PRECISION_PROP = ConfigProperty
81+
.key("hoodie.deltastreamer.schemaprovider.schema_post_processor.add.column.precision")
82+
.noDefaultValue()
83+
.withDocumentation("New column's precision, used in decimal type");
84+
85+
public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_SCALE_PROP = ConfigProperty
86+
.key("hoodie.deltastreamer.schemaprovider.schema_post_processor.add.column.scale")
87+
.noDefaultValue()
88+
.withDocumentation("New column's precision, used in decimal type");
89+
90+
public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_NEXT_PROP = ConfigProperty
91+
.key("hoodie.deltastreamer.schemaprovider.schema_post_processor.add.column.next")
92+
.defaultValue(HoodieRecord.HOODIE_IS_DELETED)
93+
.withDocumentation("Column name which locate next to new column, `_hoodie_is_deleted` by default.");
94+
}
95+
96+
public static final String BYTES = "BYTES";
97+
public static final String STRING = "STRING";
98+
public static final String INT = "INT";
99+
public static final String LONG = "LONG";
100+
public static final String FLOAT = "FLOAT";
101+
public static final String DOUBLE = "DOUBLE";
102+
public static final String BOOLEAN = "BOOLEAN";
103+
public static final String DECIMAL = "DECIMAL";
104+
105+
@Override
106+
public Schema processSchema(Schema schema) {
107+
String newColumnName = this.config.getString(Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_NAME_PROP.key());
108+
109+
if (schema.getField(newColumnName) != null) {
110+
LOG.warn(String.format("Column %s already exist!", newColumnName));
111+
return schema;
112+
}
113+
114+
List<Schema.Field> sourceFields = schema.getFields();
115+
List<Schema.Field> targetFields = new ArrayList<>(sourceFields.size() + 1);
116+
117+
String nextColumnName = this.config.getString(Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_NEXT_PROP.key(),
118+
Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_NEXT_PROP.defaultValue());
119+
120+
// mark whether the new column is added
121+
boolean isAdded = false;
122+
for (Schema.Field sourceField : sourceFields) {
123+
if (sourceField.name().equals(nextColumnName)) {
124+
targetFields.add(buildNewColumn());
125+
isAdded = true;
126+
}
127+
targetFields.add(new Schema.Field(sourceField.name(), sourceField.schema(), sourceField.doc(), sourceField.defaultVal()));
128+
}
129+
130+
// this would happen when `nextColumn` does not exist. just append the new column to the end
131+
if (!isAdded) {
132+
targetFields.add(buildNewColumn());
133+
}
134+
135+
return Schema.createRecord(schema.getName(), schema.getDoc(), schema.getNamespace(), false, targetFields);
136+
}
137+
138+
private Schema.Field buildNewColumn() {
139+
Schema.Field result;
140+
141+
String columnName = this.config.getString(Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_NAME_PROP.key());
142+
String type = this.config.getString(Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_TYPE_PROP.key()).toUpperCase(Locale.ROOT);
143+
String doc = this.config.getString(Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_DOC_PROP.key(), null);
144+
Object defaultValue = this.config.getOrDefault(Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_DEFAULT_PROP.key(),
145+
null);
146+
147+
switch (type) {
148+
case STRING:
149+
case BYTES:
150+
case INT:
151+
case LONG:
152+
case FLOAT:
153+
case DOUBLE:
154+
case BOOLEAN:
155+
result = new Schema.Field(columnName, Schema.create(Schema.Type.valueOf(type)), doc, defaultValue);
156+
break;
157+
case DECIMAL:
158+
int size = this.config.getInteger(Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_SIZE_PROP.key(), 10);
159+
int precision = this.config.getInteger(Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_PRECISION_PROP.key());
160+
int scale = this.config.getInteger(Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_SCALE_PROP.key());
161+
162+
Schema decimalSchema = Schema.createFixed(null, null, null, size);
163+
LogicalTypes.decimal(precision, scale).addToSchema(decimalSchema);
164+
165+
result = new Schema.Field(columnName, decimalSchema, doc, defaultValue);
166+
break;
167+
default:
168+
throw new HoodieSchemaPostProcessException(String.format("Type %s is not supported", type));
169+
}
170+
return result;
171+
}
172+
}

hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java

Lines changed: 57 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.hudi.common.config.TypedProperties;
2222
import org.apache.hudi.utilities.exception.HoodieSchemaPostProcessException;
23+
import org.apache.hudi.utilities.schema.AddColumnSchemaPostProcessor;
2324
import org.apache.hudi.utilities.schema.DeleteSupportSchemaPostProcessor;
2425
import org.apache.hudi.utilities.schema.DropColumnSchemaPostProcessor;
2526
import org.apache.hudi.utilities.schema.SchemaPostProcessor;
@@ -33,10 +34,14 @@
3334
import org.apache.avro.Schema.Type;
3435
import org.junit.jupiter.api.Assertions;
3536
import org.junit.jupiter.api.Test;
37+
import org.junit.jupiter.params.ParameterizedTest;
38+
import org.junit.jupiter.params.provider.Arguments;
39+
import org.junit.jupiter.params.provider.MethodSource;
3640

3741
import java.io.IOException;
3842
import java.util.ArrayList;
3943
import java.util.List;
44+
import java.util.stream.Stream;
4045

4146
import static org.junit.jupiter.api.Assertions.assertEquals;
4247
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -55,13 +60,18 @@ public class TestSchemaPostProcessor extends UtilitiesTestBase {
5560
+ "{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\","
5661
+ "\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"}]}";
5762

63+
private static Stream<Arguments> configParams() {
64+
String[] types = {"bytes", "string", "int", "long", "float", "double", "boolean"};
65+
return Stream.of(types).map(Arguments::of);
66+
}
67+
5868
@Test
5969
public void testPostProcessor() throws IOException {
6070
properties.put(Config.SCHEMA_POST_PROCESSOR_PROP, DummySchemaPostProcessor.class.getName());
6171
SchemaProvider provider =
6272
UtilHelpers.wrapSchemaProviderWithPostProcessor(
63-
UtilHelpers.createSchemaProvider(DummySchemaProvider.class.getName(), properties, jsc),
64-
properties, jsc,null);
73+
UtilHelpers.createSchemaProvider(DummySchemaProvider.class.getName(), properties, jsc),
74+
properties, jsc, null);
6575

6676
Schema schema = provider.getSourceSchema();
6777
assertEquals(schema.getType(), Type.RECORD);
@@ -76,9 +86,9 @@ public void testSparkAvro() throws IOException {
7686
transformerClassNames.add(FlatteningTransformer.class.getName());
7787

7888
SchemaProvider provider =
79-
UtilHelpers.wrapSchemaProviderWithPostProcessor(
80-
UtilHelpers.createSchemaProvider(SparkAvroSchemaProvider.class.getName(), properties, jsc),
81-
properties, jsc, transformerClassNames);
89+
UtilHelpers.wrapSchemaProviderWithPostProcessor(
90+
UtilHelpers.createSchemaProvider(SparkAvroSchemaProvider.class.getName(), properties, jsc),
91+
properties, jsc, transformerClassNames);
8292

8393
Schema schema = provider.getSourceSchema();
8494
assertEquals(schema.getType(), Type.RECORD);
@@ -144,6 +154,48 @@ public void testDeleteColumnThrows() {
144154
Assertions.assertThrows(HoodieSchemaPostProcessException.class, () -> processor.processSchema(schema));
145155
}
146156

157+
@ParameterizedTest
158+
@MethodSource("configParams")
159+
public void testAddPrimitiveTypeColumn(String type) {
160+
properties.put(AddColumnSchemaPostProcessor.Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_NAME_PROP.key(), "primitive_column");
161+
properties.put(AddColumnSchemaPostProcessor.Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_TYPE_PROP.key(), type);
162+
properties.put(AddColumnSchemaPostProcessor.Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_NEXT_PROP.key(), "fare");
163+
properties.put(AddColumnSchemaPostProcessor.Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_DOC_PROP.key(), "primitive column test");
164+
165+
AddColumnSchemaPostProcessor processor = new AddColumnSchemaPostProcessor(properties, null);
166+
Schema schema = new Schema.Parser().parse(ORIGINAL_SCHEMA);
167+
Schema targetSchema = processor.processSchema(schema);
168+
169+
Schema.Field newColumn = targetSchema.getField("primitive_column");
170+
Schema.Field nextColumn = targetSchema.getField("fare");
171+
172+
assertNotNull(newColumn);
173+
assertEquals("primitive column test", newColumn.doc());
174+
assertEquals(type, newColumn.schema().getType().getName());
175+
assertEquals(nextColumn.pos(), newColumn.pos() + 1);
176+
}
177+
178+
@Test
179+
public void testAddDecimalColumn() {
180+
properties.put(AddColumnSchemaPostProcessor.Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_NAME_PROP.key(), "decimal_column");
181+
properties.put(AddColumnSchemaPostProcessor.Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_TYPE_PROP.key(), "decimal");
182+
properties.put(AddColumnSchemaPostProcessor.Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_DOC_PROP.key(), "decimal column test");
183+
properties.put(AddColumnSchemaPostProcessor.Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_DEFAULT_PROP.key(), "0.75");
184+
properties.put(AddColumnSchemaPostProcessor.Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_PRECISION_PROP.key(), "8");
185+
properties.put(AddColumnSchemaPostProcessor.Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_SCALE_PROP.key(), "6");
186+
properties.put(AddColumnSchemaPostProcessor.Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_SIZE_PROP.key(), "8");
187+
188+
AddColumnSchemaPostProcessor processor = new AddColumnSchemaPostProcessor(properties, null);
189+
Schema schema = new Schema.Parser().parse(ORIGINAL_SCHEMA);
190+
Schema targetSchema = processor.processSchema(schema);
191+
192+
Schema.Field newColumn = targetSchema.getField("decimal_column");
193+
194+
assertNotNull(newColumn);
195+
assertEquals("decimal", newColumn.schema().getLogicalType().getName());
196+
assertEquals(5, newColumn.pos());
197+
}
198+
147199
@Test
148200
public void testSparkAvroSchema() throws IOException {
149201
SparkAvroPostProcessor processor = new SparkAvroPostProcessor(properties, null);

0 commit comments

Comments
 (0)