Skip to content

Commit d9815ae

Browse files
authored
Merge branch 'master' into HUDI-2737-compaction-clustering-instant
2 parents 301a278 + fe57e9b commit d9815ae

20 files changed

Lines changed: 133 additions & 67 deletions

File tree

hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public static long countNewRecords(HoodieTableMetaClient target, List<String> co
5151

5252
public static String getTimeDaysAgo(int numberOfDays) {
5353
Date date = Date.from(ZonedDateTime.now().minusDays(numberOfDays).toInstant());
54-
return HoodieActiveTimeline.formatInstantTime(date);
54+
return HoodieActiveTimeline.formatDate(date);
5555
}
5656

5757
/**
@@ -61,8 +61,8 @@ public static String getTimeDaysAgo(int numberOfDays) {
6161
* b) hours: -1, returns 20200202010000
6262
*/
6363
public static String addHours(String compactionCommitTime, int hours) throws ParseException {
64-
Instant instant = HoodieActiveTimeline.parseInstantTime(compactionCommitTime).toInstant();
64+
Instant instant = HoodieActiveTimeline.parseDateFromInstantTime(compactionCommitTime).toInstant();
6565
ZonedDateTime commitDateTime = ZonedDateTime.ofInstant(instant, ZoneId.systemDefault());
66-
return HoodieActiveTimeline.formatInstantTime(Date.from(commitDateTime.plusHours(hours).toInstant()));
66+
return HoodieActiveTimeline.formatDate(Date.from(commitDateTime.plusHours(hours).toInstant()));
6767
}
6868
}

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ void emitCommitMetrics(String instantTime, HoodieCommitMetadata metadata, String
233233

234234
if (writeTimer != null) {
235235
long durationInMs = metrics.getDurationInMs(writeTimer.stop());
236-
metrics.updateCommitMetrics(HoodieActiveTimeline.parseInstantTime(instantTime).getTime(), durationInMs,
236+
metrics.updateCommitMetrics(HoodieActiveTimeline.parseDateFromInstantTime(instantTime).getTime(), durationInMs,
237237
metadata, actionType);
238238
writeTimer = null;
239239
}

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ private boolean needCompact(CompactionTriggerStrategy compactionTriggerStrategy)
184184
private Long parsedToSeconds(String time) {
185185
long timestamp;
186186
try {
187-
timestamp = HoodieActiveTimeline.parseInstantTime(time).getTime() / 1000;
187+
timestamp = HoodieActiveTimeline.parseDateFromInstantTime(time).getTime() / 1000;
188188
} catch (ParseException e) {
189189
throw new HoodieCompactionException(e.getMessage(), e);
190190
}

hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -371,7 +371,7 @@ public void completeCompaction(
371371
if (compactionTimer != null) {
372372
long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
373373
try {
374-
metrics.updateCommitMetrics(HoodieActiveTimeline.parseInstantTime(compactionCommitTime).getTime(),
374+
metrics.updateCommitMetrics(HoodieActiveTimeline.parseDateFromInstantTime(compactionCommitTime).getTime(),
375375
durationInMs, metadata, HoodieActiveTimeline.COMPACTION_ACTION);
376376
} catch (ParseException e) {
377377
throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction "

hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,7 @@ protected void completeCompaction(HoodieCommitMetadata metadata, JavaRDD<WriteSt
324324
if (compactionTimer != null) {
325325
long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
326326
try {
327-
metrics.updateCommitMetrics(HoodieActiveTimeline.parseInstantTime(compactionCommitTime).getTime(),
327+
metrics.updateCommitMetrics(HoodieActiveTimeline.parseDateFromInstantTime(compactionCommitTime).getTime(),
328328
durationInMs, metadata, HoodieActiveTimeline.COMPACTION_ACTION);
329329
} catch (ParseException e) {
330330
throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction "
@@ -405,7 +405,7 @@ private void completeClustering(HoodieReplaceCommitMetadata metadata, JavaRDD<Wr
405405
if (clusteringTimer != null) {
406406
long durationInMs = metrics.getDurationInMs(clusteringTimer.stop());
407407
try {
408-
metrics.updateCommitMetrics(HoodieActiveTimeline.parseInstantTime(clusteringCommitTime).getTime(),
408+
metrics.updateCommitMetrics(HoodieActiveTimeline.parseDateFromInstantTime(clusteringCommitTime).getTime(),
409409
durationInMs, metadata, HoodieActiveTimeline.REPLACE_COMMIT_ACTION);
410410
} catch (ParseException e) {
411411
throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction "

hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -567,7 +567,7 @@ public boolean rollbackCommit(String instantTime) {
567567
BufferedMutator mutator = hbaseConnection.getBufferedMutator(TableName.valueOf(tableName))) {
568568
final RateLimiter limiter = RateLimiter.create(multiPutBatchSize, TimeUnit.SECONDS);
569569

570-
Long rollbackTime = HoodieActiveTimeline.parseInstantTime(instantTime).getTime();
570+
Long rollbackTime = HoodieActiveTimeline.parseDateFromInstantTime(instantTime).getTime();
571571
Long currentTime = new Date().getTime();
572572
Scan scan = new Scan();
573573
scan.addFamily(SYSTEM_COLUMN_FAMILY);

hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,10 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
7272
protected HoodieTableMetaClient metaClient;
7373

7474
/**
75-
* Parse the timestamp of an Instant and return a {@code SimpleDateFormat}.
75+
* Parse the timestamp of an Instant and return a {@code Date}.
7676
*/
77-
public static Date parseInstantTime(String timestamp) throws ParseException {
78-
return HoodieInstantTimeGenerator.parseInstantTime(timestamp);
77+
public static Date parseDateFromInstantTime(String timestamp) throws ParseException {
78+
return HoodieInstantTimeGenerator.parseDateFromInstantTime(timestamp);
7979
}
8080

8181
/**
@@ -88,8 +88,8 @@ public static String formatInstantTime(Instant timestamp) {
8888
/**
8989
* Format the Date to a String representing the timestamp of a Hoodie Instant.
9090
*/
91-
public static String formatInstantTime(Date timestamp) {
92-
return HoodieInstantTimeGenerator.formatInstantTime(timestamp);
91+
public static String formatDate(Date timestamp) {
92+
return HoodieInstantTimeGenerator.formatDate(timestamp);
9393
}
9494

9595
/**
@@ -100,6 +100,7 @@ public static String createNewInstantTime() {
100100
return HoodieInstantTimeGenerator.createNewInstantTime(0);
101101
}
102102

103+
103104
/**
104105
* Returns next instant time that adds N milliseconds to current time.
105106
* Ensures each instant time is atleast 1 second apart since we create instant times at second granularity

hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java

Lines changed: 58 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323
import java.time.LocalDateTime;
2424
import java.time.ZoneId;
2525
import java.time.format.DateTimeFormatter;
26+
import java.time.format.DateTimeFormatterBuilder;
2627
import java.time.format.DateTimeParseException;
28+
import java.time.temporal.ChronoField;
2729
import java.time.temporal.TemporalAccessor;
2830
import java.util.Date;
2931
import java.util.concurrent.atomic.AtomicReference;
@@ -33,14 +35,27 @@
3335
*/
3436
public class HoodieInstantTimeGenerator {
3537
// Format of the timestamp used for an Instant
36-
private static final String INSTANT_TIMESTAMP_FORMAT = "yyyyMMddHHmmss";
37-
private static final int INSTANT_TIMESTAMP_FORMAT_LENGTH = INSTANT_TIMESTAMP_FORMAT.length();
38+
public static final String SECS_INSTANT_TIMESTAMP_FORMAT = "yyyyMMddHHmmss";
39+
public static final int SECS_INSTANT_ID_LENGTH = SECS_INSTANT_TIMESTAMP_FORMAT.length();
40+
public static final String MILLIS_INSTANT_TIMESTAMP_FORMAT = "yyyyMMddHHmmssSSS";
41+
public static final int MILLIS_INSTANT_ID_LENGTH = MILLIS_INSTANT_TIMESTAMP_FORMAT.length();
42+
public static final int MILLIS_INSTANT_TIMESTAMP_FORMAT_LENGTH = MILLIS_INSTANT_TIMESTAMP_FORMAT.length();
3843
// Formatter to generate Instant timestamps
39-
private static DateTimeFormatter INSTANT_TIME_FORMATTER = DateTimeFormatter.ofPattern(INSTANT_TIMESTAMP_FORMAT);
44+
// Unfortunately millisecond format is not parsable as is https://bugs.openjdk.java.net/browse/JDK-8031085. hence have to do appendValue()
45+
private static DateTimeFormatter MILLIS_INSTANT_TIME_FORMATTER = new DateTimeFormatterBuilder().appendPattern(SECS_INSTANT_TIMESTAMP_FORMAT)
46+
.appendValue(ChronoField.MILLI_OF_SECOND, 3).toFormatter();
47+
private static final String MILLIS_GRANULARITY_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS";
48+
private static DateTimeFormatter MILLIS_GRANULARITY_DATE_FORMATTER = DateTimeFormatter.ofPattern(MILLIS_GRANULARITY_DATE_FORMAT);
49+
4050
// The last Instant timestamp generated
4151
private static AtomicReference<String> lastInstantTime = new AtomicReference<>(String.valueOf(Integer.MIN_VALUE));
4252
private static final String ALL_ZERO_TIMESTAMP = "00000000000000";
4353

54+
// The default number of milliseconds that we add if they are not present
55+
// We prefer the max timestamp as it mimics the current behavior with second granularity
56+
// when performing comparisons such as LESS_THAN_OR_EQUAL_TO
57+
private static final String DEFAULT_MILLIS_EXT = "999";
58+
4459
/**
4560
* Returns next instant time that adds N milliseconds to the current time.
4661
* Ensures each instant time is atleast 1 second apart since we create instant times at second granularity
@@ -52,36 +67,65 @@ public static String createNewInstantTime(long milliseconds) {
5267
String newCommitTime;
5368
do {
5469
Date d = new Date(System.currentTimeMillis() + milliseconds);
55-
newCommitTime = INSTANT_TIME_FORMATTER.format(convertDateToTemporalAccessor(d));
70+
newCommitTime = MILLIS_INSTANT_TIME_FORMATTER.format(convertDateToTemporalAccessor(d));
5671
} while (HoodieTimeline.compareTimestamps(newCommitTime, HoodieActiveTimeline.LESSER_THAN_OR_EQUALS, oldVal));
5772
return newCommitTime;
5873
});
5974
}
6075

61-
public static Date parseInstantTime(String timestamp) throws ParseException {
76+
public static Date parseDateFromInstantTime(String timestamp) throws ParseException {
6277
try {
63-
LocalDateTime dt = LocalDateTime.parse(timestamp, INSTANT_TIME_FORMATTER);
78+
// Enables backwards compatibility with non-millisecond granularity instants
79+
String timestampInMillis = timestamp;
80+
if (isSecondGranularity(timestamp)) {
81+
// Add milliseconds to the instant in order to parse successfully
82+
timestampInMillis = timestamp + DEFAULT_MILLIS_EXT;
83+
} else if (timestamp.length() > MILLIS_INSTANT_TIMESTAMP_FORMAT_LENGTH) {
84+
// compaction and cleaning in metadata has special format. handling it by trimming extra chars and treating it with ms granularity
85+
timestampInMillis = timestamp.substring(0, MILLIS_INSTANT_TIMESTAMP_FORMAT_LENGTH);
86+
}
87+
88+
LocalDateTime dt = LocalDateTime.parse(timestampInMillis, MILLIS_INSTANT_TIME_FORMATTER);
6489
return Date.from(dt.atZone(ZoneId.systemDefault()).toInstant());
6590
} catch (DateTimeParseException e) {
6691
// Special handling for all zero timestamp which is not parsable by DateTimeFormatter
6792
if (timestamp.equals(ALL_ZERO_TIMESTAMP)) {
6893
return new Date(0);
6994
}
70-
// compaction and cleaning in metadata has special format. handling it by trimming extra chars and treating it with secs granularity
71-
if (timestamp.length() > INSTANT_TIMESTAMP_FORMAT_LENGTH) {
72-
LocalDateTime dt = LocalDateTime.parse(timestamp.substring(0, INSTANT_TIMESTAMP_FORMAT_LENGTH), INSTANT_TIME_FORMATTER);
73-
return Date.from(dt.atZone(ZoneId.systemDefault()).toInstant());
74-
}
7595
throw e;
7696
}
7797
}
7898

99+
private static boolean isSecondGranularity(String instant) {
100+
return instant.length() == SECS_INSTANT_ID_LENGTH;
101+
}
102+
79103
public static String formatInstantTime(Instant timestamp) {
80-
return INSTANT_TIME_FORMATTER.format(timestamp);
104+
return MILLIS_INSTANT_TIME_FORMATTER.format(timestamp);
81105
}
82106

83-
public static String formatInstantTime(Date timestamp) {
84-
return INSTANT_TIME_FORMATTER.format(convertDateToTemporalAccessor(timestamp));
107+
public static String formatDate(Date timestamp) {
108+
return getInstantFromTemporalAccessor(convertDateToTemporalAccessor(timestamp));
109+
}
110+
111+
public static String getInstantFromTemporalAccessor(TemporalAccessor temporalAccessor) {
112+
return MILLIS_INSTANT_TIME_FORMATTER.format(temporalAccessor);
113+
}
114+
115+
/**
116+
* Creates an instant string given a valid date-time string.
117+
* @param dateString A date-time string in the format yyyy-MM-dd HH:mm:ss[:SSS]
118+
* @return A timeline instant
119+
* @throws ParseException If we cannot parse the date string
120+
*/
121+
public static String getInstantForDateString(String dateString) {
122+
try {
123+
return getInstantFromTemporalAccessor(LocalDateTime.parse(dateString, MILLIS_GRANULARITY_DATE_FORMATTER));
124+
} catch (Exception e) {
125+
// Attempt to add the milliseconds in order to complete parsing
126+
return getInstantFromTemporalAccessor(LocalDateTime.parse(
127+
String.format("%s:%s", dateString, DEFAULT_MILLIS_EXT), MILLIS_GRANULARITY_DATE_FORMATTER));
128+
}
85129
}
86130

87131
private static TemporalAccessor convertDateToTemporalAccessor(Date d) {

hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ protected void refreshTimeline(HoodieTimeline visibleActiveTimeline) {
125125
/**
126126
* Adds the provided statuses into the file system view, and also caches it inside this object.
127127
*/
128-
protected List<HoodieFileGroup> addFilesToView(FileStatus[] statuses) {
128+
public List<HoodieFileGroup> addFilesToView(FileStatus[] statuses) {
129129
HoodieTimer timer = new HoodieTimer().startTimer();
130130
List<HoodieFileGroup> fileGroups = buildFileGroups(statuses, visibleCommitsAndCompactionTimeline, true);
131131
long fgBuildTimeTakenMs = timer.endTimer();
@@ -925,7 +925,7 @@ Stream<FileSlice> fetchAllFileSlices(String partitionPath) {
925925
/**
926926
* Default implementation for fetching latest base-files for the partition-path.
927927
*/
928-
Stream<HoodieBaseFile> fetchLatestBaseFiles(final String partitionPath) {
928+
public Stream<HoodieBaseFile> fetchLatestBaseFiles(final String partitionPath) {
929929
return fetchAllStoredFileGroups(partitionPath)
930930
.map(fg -> Pair.of(fg.getFileGroupId(), getLatestBaseFile(fg)))
931931
.filter(p -> p.getValue().isPresent())

hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,14 +79,14 @@ public void setUp() throws IOException {
7979

8080
@Test
8181
public void testMakeDataFileName() {
82-
String instantTime = HoodieActiveTimeline.formatInstantTime(new Date());
82+
String instantTime = HoodieActiveTimeline.formatDate(new Date());
8383
String fileName = UUID.randomUUID().toString();
8484
assertEquals(FSUtils.makeDataFileName(instantTime, TEST_WRITE_TOKEN, fileName), fileName + "_" + TEST_WRITE_TOKEN + "_" + instantTime + BASE_FILE_EXTENSION);
8585
}
8686

8787
@Test
8888
public void testMaskFileName() {
89-
String instantTime = HoodieActiveTimeline.formatInstantTime(new Date());
89+
String instantTime = HoodieActiveTimeline.formatDate(new Date());
9090
int taskPartitionId = 2;
9191
assertEquals(FSUtils.maskWithoutFileId(instantTime, taskPartitionId), "*_" + taskPartitionId + "_" + instantTime + BASE_FILE_EXTENSION);
9292
}
@@ -154,7 +154,7 @@ public void testProcessFiles() throws Exception {
154154

155155
@Test
156156
public void testGetCommitTime() {
157-
String instantTime = HoodieActiveTimeline.formatInstantTime(new Date());
157+
String instantTime = HoodieActiveTimeline.formatDate(new Date());
158158
String fileName = UUID.randomUUID().toString();
159159
String fullFileName = FSUtils.makeDataFileName(instantTime, TEST_WRITE_TOKEN, fileName);
160160
assertEquals(instantTime, FSUtils.getCommitTime(fullFileName));
@@ -165,7 +165,7 @@ public void testGetCommitTime() {
165165

166166
@Test
167167
public void testGetFileNameWithoutMeta() {
168-
String instantTime = HoodieActiveTimeline.formatInstantTime(new Date());
168+
String instantTime = HoodieActiveTimeline.formatDate(new Date());
169169
String fileName = UUID.randomUUID().toString();
170170
String fullFileName = FSUtils.makeDataFileName(instantTime, TEST_WRITE_TOKEN, fileName);
171171
assertEquals(fileName, FSUtils.getFileId(fullFileName));

0 commit comments

Comments
 (0)