Skip to content

Commit 7544fa5

Browse files
jonvexyihua
andcommitted
fix(ingest): Fix Timestamp Conversions, Add legacy api support (#14076)
Co-authored-by: Jonathan Vexler <=> Co-authored-by: Y Ethan Guo <yihua@apache.org>
1 parent 4ecd272 commit 7544fa5

11 files changed

Lines changed: 308 additions & 59 deletions

File tree

hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/stats/SparkValueMetadataUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ public static Comparable convertSparkToJava(ValueMetadata valueMetadata, Object
144144
* we need to return java.sql.Timestamp and java.sql.Date
145145
*
146146
*/
147-
public static Object convertJavaTypeToSparkType(Comparable<?> javaVal, boolean useJava8api) {
147+
public static Object convertJavaTypeToSparkType(Object javaVal, boolean useJava8api) {
148148
if (!useJava8api) {
149149
if (javaVal instanceof Instant) {
150150
return Timestamp.from((Instant) javaVal);

hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,12 @@ public class HoodieTestDataGenerator implements AutoCloseable {
179179
+ "{\"name\":\"dec_fixed_small\",\"type\":{\"type\":\"fixed\",\"name\":\"decFixedSmall\",\"size\":3,\"logicalType\":\"decimal\",\"precision\":5,\"scale\":2}},"
180180
+ "{\"name\":\"dec_fixed_large\",\"type\":{\"type\":\"fixed\",\"name\":\"decFixedLarge\",\"size\":8,\"logicalType\":\"decimal\",\"precision\":18,\"scale\":9}},";
181181

182+
public static final String EXTENDED_LOGICAL_TYPES_SCHEMA_NO_LTS_V6 = "{\"name\":\"ts_millis\",\"type\":{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}},"
183+
+ "{\"name\":\"ts_micros\",\"type\":{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"}},"
184+
+ "{\"name\":\"event_date\",\"type\":{\"type\":\"int\",\"logicalType\":\"date\"}},"
185+
+ "{\"name\":\"dec_fixed_small\",\"type\":{\"type\":\"fixed\",\"name\":\"decFixedSmall\",\"size\":3,\"logicalType\":\"decimal\",\"precision\":5,\"scale\":2}},"
186+
+ "{\"name\":\"dec_fixed_large\",\"type\":{\"type\":\"fixed\",\"name\":\"decFixedLarge\",\"size\":8,\"logicalType\":\"decimal\",\"precision\":18,\"scale\":9}},";
187+
182188
public static final String EXTRA_COL_SCHEMA1 = "{\"name\": \"extra_column1\", \"type\": [\"null\", \"string\"], \"default\": null },";
183189
public static final String EXTRA_COL_SCHEMA2 = "{\"name\": \"extra_column2\", \"type\": [\"null\", \"string\"], \"default\": null},";
184190
public static final String EXTRA_COL_SCHEMA_FOR_AWS_DMS_PAYLOAD = "{\"name\": \"Op\", \"type\": [\"null\", \"string\"], \"default\": null},";
@@ -202,6 +208,9 @@ public class HoodieTestDataGenerator implements AutoCloseable {
202208
public static final String TRIP_LOGICAL_TYPES_SCHEMA_NO_LTS =
203209
TRIP_SCHEMA_PREFIX + EXTENDED_LOGICAL_TYPES_SCHEMA_NO_LTS + TRIP_SCHEMA_SUFFIX;
204210

211+
public static final String TRIP_LOGICAL_TYPES_SCHEMA_NO_LTS_V6 =
212+
TRIP_SCHEMA_PREFIX + EXTENDED_LOGICAL_TYPES_SCHEMA_NO_LTS_V6 + TRIP_SCHEMA_SUFFIX;
213+
205214

206215
public static final String TRIP_NESTED_EXAMPLE_SCHEMA =
207216
TRIP_SCHEMA_PREFIX + FARE_NESTED_SCHEMA + TRIP_SCHEMA_SUFFIX;
@@ -233,6 +242,7 @@ public class HoodieTestDataGenerator implements AutoCloseable {
233242
public static final Schema AVRO_TRIP_LOGICAL_TYPES_SCHEMA = new Schema.Parser().parse(TRIP_LOGICAL_TYPES_SCHEMA);
234243
public static final Schema AVRO_TRIP_LOGICAL_TYPES_SCHEMA_V6 = new Schema.Parser().parse(TRIP_LOGICAL_TYPES_SCHEMA_V6);
235244
public static final Schema AVRO_TRIP_LOGICAL_TYPES_SCHEMA_NO_LTS = new Schema.Parser().parse(TRIP_LOGICAL_TYPES_SCHEMA_NO_LTS);
245+
public static final Schema AVRO_TRIP_LOGICAL_TYPES_SCHEMA_NO_LTS_V6 = new Schema.Parser().parse(TRIP_LOGICAL_TYPES_SCHEMA_NO_LTS_V6);
236246
public static final Schema AVRO_TRIP_SCHEMA = new Schema.Parser().parse(TRIP_SCHEMA);
237247
public static final Schema FLATTENED_AVRO_SCHEMA = new Schema.Parser().parse(TRIP_FLATTENED_SCHEMA);
238248

@@ -380,6 +390,8 @@ public IndexedRecord generateRandomValueAsPerSchema(String schemaStr, HoodieKey
380390
return generatePayloadForLogicalTypesSchemaV6(key, commitTime, false, timestamp);
381391
} else if (TRIP_LOGICAL_TYPES_SCHEMA_NO_LTS.equals(schemaStr)) {
382392
return generatePayloadForLogicalTypesSchemaNoLTS(key, commitTime, false, timestamp);
393+
} else if (TRIP_LOGICAL_TYPES_SCHEMA_NO_LTS_V6.equals(schemaStr)) {
394+
return generatePayloadForLogicalTypesSchemaNoLTSV6(key, commitTime, false, timestamp);
383395
}
384396
} else {
385397
if (TRIP_EXAMPLE_SCHEMA.equals(schemaStr)) {
@@ -390,6 +402,8 @@ public IndexedRecord generateRandomValueAsPerSchema(String schemaStr, HoodieKey
390402
return generatePayloadForLogicalTypesSchemaV6(key, commitTime, true, timestamp);
391403
} else if (TRIP_LOGICAL_TYPES_SCHEMA_NO_LTS.equals(schemaStr)) {
392404
return generatePayloadForLogicalTypesSchemaNoLTS(key, commitTime, true, timestamp);
405+
} else if (TRIP_LOGICAL_TYPES_SCHEMA_NO_LTS_V6.equals(schemaStr)) {
406+
return generatePayloadForLogicalTypesSchemaNoLTSV6(key, commitTime, true, timestamp);
393407
}
394408
}
395409

@@ -451,6 +465,10 @@ public IndexedRecord generatePayloadForTripEncodedDecimalSchema(HoodieKey key, S
451465
public IndexedRecord generatePayloadForLogicalTypesSchemaNoLTS(HoodieKey key, String commitTime, boolean isDelete, long timestamp) {
452466
return generateRecordForTripLogicalTypesSchema(key, "rider-" + commitTime, "driver-" + commitTime, timestamp, isDelete, false, false);
453467
}
468+
469+
public IndexedRecord generatePayloadForLogicalTypesSchemaNoLTSV6(HoodieKey key, String commitTime, boolean isDelete, long timestamp) {
470+
return generateRecordForTripLogicalTypesSchema(key, "rider-" + commitTime, "driver-" + commitTime, timestamp, isDelete, true, false);
471+
}
454472

455473
public IndexedRecord generatePayloadForLogicalTypesSchema(HoodieKey key, String commitTime, boolean isDelete, long timestamp) {
456474
return generateRecordForTripLogicalTypesSchema(key, "rider-" + commitTime, "driver-" + commitTime, timestamp, isDelete, false, true);
@@ -679,8 +697,11 @@ public GenericRecord generateRecordForTripLogicalTypesSchema(HoodieKey key, Stri
679697
long timestamp, boolean isDeleteRecord, boolean v6, boolean hasLTS) {
680698
GenericRecord rec;
681699
if (!hasLTS) {
682-
// LTS = Local Timestamp
683-
rec = new GenericData.Record(AVRO_TRIP_LOGICAL_TYPES_SCHEMA_NO_LTS);
700+
if (v6) {
701+
rec = new GenericData.Record(AVRO_TRIP_LOGICAL_TYPES_SCHEMA_NO_LTS_V6);
702+
} else {
703+
rec = new GenericData.Record(AVRO_TRIP_LOGICAL_TYPES_SCHEMA_NO_LTS);
704+
}
684705
} else if (v6) {
685706
rec = new GenericData.Record(AVRO_TRIP_LOGICAL_TYPES_SCHEMA_V6);
686707
} else {

hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/MercifulJsonToRowConverter.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,10 @@
3030
import org.apache.hudi.avro.processors.Parser;
3131
import org.apache.hudi.avro.processors.TimestampMicroLogicalTypeProcessor;
3232
import org.apache.hudi.avro.processors.TimestampMilliLogicalTypeProcessor;
33+
import org.apache.hudi.common.util.DateTimeUtils;
3334
import org.apache.hudi.common.util.collection.Pair;
3435
import org.apache.hudi.exception.HoodieException;
36+
import org.apache.hudi.stats.SparkValueMetadataUtils;
3537
import org.apache.hudi.stats.ValueType;
3638
import org.apache.hudi.utilities.exception.HoodieJsonToRowConversionException;
3739

@@ -47,7 +49,6 @@
4749

4850
import java.io.IOException;
4951
import java.math.BigDecimal;
50-
import java.sql.Timestamp;
5152
import java.time.Instant;
5253
import java.time.LocalDateTime;
5354
import java.time.temporal.ChronoField;
@@ -63,19 +64,21 @@
6364
* Converts Json record to Row Record.
6465
*/
6566
public class MercifulJsonToRowConverter extends MercifulJsonConverter {
67+
private final boolean useJava8api;
6668

6769
/**
6870
* Allows enabling sanitization and allows choice of invalidCharMask for sanitization
6971
*/
70-
public MercifulJsonToRowConverter(boolean shouldSanitize, String invalidCharMask) {
71-
this(new ObjectMapper().enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS), shouldSanitize, invalidCharMask);
72+
public MercifulJsonToRowConverter(boolean shouldSanitize, String invalidCharMask, boolean useJava8api) {
73+
this(new ObjectMapper().enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS), shouldSanitize, invalidCharMask, useJava8api);
7274
}
7375

7476
/**
7577
* Allows a configured ObjectMapper to be passed for converting json records to row.
7678
*/
77-
public MercifulJsonToRowConverter(ObjectMapper mapper, boolean shouldSanitize, String invalidCharMask) {
79+
public MercifulJsonToRowConverter(ObjectMapper mapper, boolean shouldSanitize, String invalidCharMask, boolean useJava8api) {
7880
super(mapper, shouldSanitize, invalidCharMask);
81+
this.useJava8api = useJava8api;
7982
}
8083

8184
/**
@@ -102,7 +105,7 @@ private Row convertJsonToRow(Map<String, Object> inputJson, Schema schema) {
102105
for (Schema.Field f : fields) {
103106
Object val = shouldSanitize ? getFieldFromJson(f, inputJson, schema.getFullName(), invalidCharMask) : inputJson.get(f.name());
104107
if (val != null) {
105-
values.set(f.pos(), convertJsonField(val, f.name(), f.schema()));
108+
values.set(f.pos(), SparkValueMetadataUtils.convertJavaTypeToSparkType(convertJsonField(val, f.name(), f.schema()), useJava8api));
106109
}
107110
}
108111
return RowFactory.create(values.toArray());
@@ -238,7 +241,7 @@ public Pair<Boolean, Object> handleStringValue(String value) {
238241
},
239242
value, schema);
240243
if (result.getLeft()) {
241-
return Pair.of(true, new Timestamp((Long) result.getRight()));
244+
return Pair.of(true, Instant.ofEpochMilli((Long) result.getRight()));
242245
}
243246
return Pair.of(false, null);
244247
}
@@ -275,8 +278,7 @@ public Pair<Boolean, Object> handleStringValue(String value) {
275278
},
276279
value, schema);
277280
if (result.getLeft()) {
278-
// timestamp in spark sql doesn't support precision to the micro.
279-
return Pair.of(true, new Timestamp(((Long) result.getRight()) / 1000));
281+
return Pair.of(true, DateTimeUtils.microsToInstant((Long) result.getRight()));
280282
}
281283
return Pair.of(false, null);
282284
}

hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/RowConverter.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,17 +43,19 @@ public class RowConverter implements Serializable {
4343
private final String schemaStr;
4444
private final String invalidCharMask;
4545
private final boolean shouldSanitize;
46+
private final boolean useJava8api;
4647

4748
/**
4849
* To be lazily initialized on executors.
4950
*/
5051
private transient MercifulJsonToRowConverter jsonConverter;
5152

52-
public RowConverter(Schema schema, boolean shouldSanitize, String invalidCharMask) {
53+
public RowConverter(Schema schema, boolean shouldSanitize, String invalidCharMask, boolean useJava8api) {
5354
this.schemaStr = schema.toString();
5455
this.schema = schema;
5556
this.shouldSanitize = shouldSanitize;
5657
this.invalidCharMask = invalidCharMask;
58+
this.useJava8api = useJava8api;
5759
}
5860

5961
private void initSchema() {
@@ -65,7 +67,7 @@ private void initSchema() {
6567

6668
private void initJsonConvertor() {
6769
if (jsonConverter == null) {
68-
jsonConverter = new MercifulJsonToRowConverter(this.shouldSanitize, this.invalidCharMask);
70+
jsonConverter = new MercifulJsonToRowConverter(this.shouldSanitize, this.invalidCharMask, this.useJava8api);
6971
}
7072
}
7173

hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.apache.spark.sql.Column;
4848
import org.apache.spark.sql.Dataset;
4949
import org.apache.spark.sql.Row;
50+
import org.apache.spark.sql.internal.SQLConf;
5051
import org.apache.spark.sql.types.DataTypes;
5152
import org.apache.spark.sql.types.Metadata;
5253
import org.apache.spark.sql.types.StructField;
@@ -77,6 +78,9 @@ public class SourceFormatAdapter implements Closeable {
7778
private boolean wrapWithException = ROW_THROW_EXPLICIT_EXCEPTIONS.defaultValue();
7879
private String invalidCharMask = SCHEMA_FIELD_NAME_INVALID_CHAR_MASK.defaultValue();
7980

81+
private boolean useJava8api = (boolean) SQLConf.DATETIME_JAVA8API_ENABLED().defaultValue().get();
82+
83+
8084
private Option<BaseErrorTableWriter> errorTableWriter = Option.empty();
8185

8286
public SourceFormatAdapter(Source source) {
@@ -90,6 +94,7 @@ public SourceFormatAdapter(Source source, Option<BaseErrorTableWriter> errorTabl
9094
this.shouldSanitize = SanitizationUtils.shouldSanitize(props.get());
9195
this.invalidCharMask = SanitizationUtils.getInvalidCharMask(props.get());
9296
this.wrapWithException = ConfigUtils.getBooleanWithAltKeys(props.get(), ROW_THROW_EXPLICIT_EXCEPTIONS);
97+
this.useJava8api = (boolean) getSource().getSparkSession().conf().get(SQLConf.DATETIME_JAVA8API_ENABLED());
9398
}
9499
if (this.shouldSanitize && source.getSourceType() == Source.SourceType.PROTO) {
95100
throw new IllegalArgumentException("PROTO cannot be sanitized");
@@ -112,6 +117,10 @@ private String getInvalidCharMask() {
112117
return invalidCharMask;
113118
}
114119

120+
private boolean getUseJava8api() {
121+
return useJava8api;
122+
}
123+
115124
/**
116125
* transform input rdd of json string to generic records with support for adding error events to error table
117126
* @param inputBatch
@@ -134,7 +143,7 @@ private JavaRDD<GenericRecord> transformJsonToGenericRdd(InputBatch<JavaRDD<Stri
134143

135144
private JavaRDD<Row> transformJsonToRowRdd(InputBatch<JavaRDD<String>> inputBatch) {
136145
MercifulJsonConverter.clearCache(inputBatch.getSchemaProvider().getSourceSchema().getFullName());
137-
RowConverter convertor = new RowConverter(inputBatch.getSchemaProvider().getSourceSchema(), isFieldNameSanitizingEnabled(), getInvalidCharMask());
146+
RowConverter convertor = new RowConverter(inputBatch.getSchemaProvider().getSourceSchema(), isFieldNameSanitizingEnabled(), getInvalidCharMask(), getUseJava8api());
138147
return inputBatch.getBatch().map(rdd -> {
139148
if (errorTableWriter.isPresent()) {
140149
JavaRDD<Either<Row, String>> javaRDD = rdd.map(convertor::fromJsonToRowWithError);

0 commit comments

Comments
 (0)