-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-3189] Fallback to full table scan with incremental query when files are cleaned up or achived for MOR table #6141
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
e883203
b038b03
d7da6e9
8b11849
f88b7e4
22ae196
5da17ee
60c68b0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,7 +17,7 @@ | |
|
|
||
| package org.apache.hudi | ||
|
|
||
| import org.apache.hadoop.fs.{GlobPattern, Path} | ||
| import org.apache.hadoop.fs.{FileStatus, GlobPattern, Path} | ||
| import org.apache.hudi.HoodieConversionUtils.toScalaOption | ||
| import org.apache.hudi.common.model.{FileSlice, HoodieRecord} | ||
| import org.apache.hudi.common.table.HoodieTableMetaClient | ||
|
|
@@ -53,9 +53,11 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext, | |
| } | ||
|
|
||
| override protected def timeline: HoodieTimeline = { | ||
| val startTimestamp = optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key) | ||
| val endTimestamp = optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME.key, super.timeline.lastInstant().get.getTimestamp) | ||
| super.timeline.findInstantsInRange(startTimestamp, endTimestamp) | ||
| if (fullTableScan) { | ||
| super.timeline | ||
| } else { | ||
| super.timeline.findInstantsInRange(startTimestamp, endTimestamp) | ||
| } | ||
| } | ||
|
|
||
| protected override def composeRDD(fileSplits: Seq[HoodieMergeOnReadFileSplit], | ||
|
|
@@ -87,17 +89,19 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext, | |
| if (includedCommits.isEmpty) { | ||
| List() | ||
| } else { | ||
| val latestCommit = includedCommits.last.getTimestamp | ||
| val commitsMetadata = includedCommits.map(getCommitMetadata(_, timeline)).asJava | ||
| val fileSlices = if (fullTableScan) { | ||
| listLatestFileSlices(Seq(), partitionFilters, dataFilters) | ||
| } else { | ||
| val latestCommit = includedCommits.last.getTimestamp | ||
|
|
||
| val modifiedFiles = listAffectedFilesForCommits(conf, new Path(metaClient.getBasePath), commitsMetadata) | ||
| val fsView = new HoodieTableFileSystemView(metaClient, timeline, modifiedFiles) | ||
| val fsView = new HoodieTableFileSystemView(metaClient, timeline, affectedFilesInCommits) | ||
|
|
||
| val modifiedPartitions = getWritePartitionPaths(commitsMetadata) | ||
| val modifiedPartitions = getWritePartitionPaths(commitsMetadata) | ||
|
|
||
| val fileSlices = modifiedPartitions.asScala.flatMap { relativePartitionPath => | ||
| fsView.getLatestMergedFileSlicesBeforeOrOn(relativePartitionPath, latestCommit).iterator().asScala | ||
| }.toSeq | ||
| modifiedPartitions.asScala.flatMap { relativePartitionPath => | ||
| fsView.getLatestMergedFileSlicesBeforeOrOn(relativePartitionPath, latestCommit).iterator().asScala | ||
| }.toSeq | ||
| } | ||
|
|
||
| buildSplits(filterFileSlices(fileSlices, globPattern)) | ||
| } | ||
|
|
@@ -124,14 +128,48 @@ trait HoodieIncrementalRelationTrait extends HoodieBaseRelation { | |
| // Validate this Incremental implementation is properly configured | ||
| validate() | ||
|
|
||
| protected lazy val includedCommits: immutable.Seq[HoodieInstant] = timeline.getInstants.iterator().asScala.toList | ||
| protected def startTimestamp: String = optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key) | ||
| protected def endTimestamp: String = optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME.key, super.timeline.lastInstant().get.getTimestamp) | ||
|
|
||
| protected def startInstantArchived: Boolean = super.timeline.isBeforeTimelineStarts(startTimestamp) | ||
| protected def endInstantArchived: Boolean = super.timeline.isBeforeTimelineStarts(endTimestamp) | ||
|
|
||
| // Fallback to full table scan if any of the following conditions matches: | ||
| // 1. the start commit is archived | ||
| // 2. the end commit is archived | ||
| // 3. there are files in metadata be deleted | ||
| protected lazy val fullTableScan: Boolean = { | ||
| val fallbackToFullTableScan = optParams.getOrElse(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.key, | ||
| DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.defaultValue).toBoolean | ||
|
|
||
| fallbackToFullTableScan && (startInstantArchived || endInstantArchived || affectedFilesInCommits.exists(fileStatus => !metaClient.getFs.exists(fileStatus.getPath))) | ||
| } | ||
|
|
||
| protected lazy val includedCommits: immutable.Seq[HoodieInstant] = { | ||
| if (!startInstantArchived || !endInstantArchived) { | ||
| // If endTimestamp commit is not archived, will filter instants | ||
| // before endTimestamp. | ||
| super.timeline.findInstantsInRange(startTimestamp, endTimestamp).getInstants.iterator().asScala.toList | ||
| } else { | ||
| super.timeline.getInstants.iterator().asScala.toList | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this right ? Why not filter the instants with range ?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If endInstantArchived, we'll get all instants from timeline, this is used to get latest commit, same behavior in the flink side: // Step3: decides the read end commit
final String endInstant = fullTableScan
? commitTimeline.lastInstant().get().getTimestamp()
: instants.get(instants.size() - 1).getTimestamp();
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But we should also take the start instant into consideration.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yea, there could be a situation that startInstant is not archived, while endInstant is archived, should return empty commits |
||
| } | ||
|
|
||
| protected lazy val commitsMetadata = includedCommits.map(getCommitMetadata(_, super.timeline)).asJava | ||
|
|
||
| protected lazy val affectedFilesInCommits: Array[FileStatus] = { | ||
| listAffectedFilesForCommits(conf, new Path(metaClient.getBasePath), commitsMetadata) | ||
| } | ||
|
|
||
| // Record filters making sure that only records w/in the requested bounds are being fetched as part of the | ||
| // scan collected by this relation | ||
| protected lazy val incrementalSpanRecordFilters: Seq[Filter] = { | ||
| val isNotNullFilter = IsNotNull(HoodieRecord.COMMIT_TIME_METADATA_FIELD) | ||
| val largerThanFilter = GreaterThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, includedCommits.head.getTimestamp) | ||
| val lessThanFilter = LessThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, includedCommits.last.getTimestamp) | ||
|
|
||
| val largerThanFilter = GreaterThan(HoodieRecord.COMMIT_TIME_METADATA_FIELD, startTimestamp) | ||
|
|
||
| val lessThanFilter = LessThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, | ||
| if (endInstantArchived) endTimestamp else includedCommits.last.getTimestamp) | ||
|
|
||
| Seq(isNotNullFilter, largerThanFilter, lessThanFilter) | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -859,89 +859,6 @@ class TestCOWDataSource extends HoodieClientTestBase { | |
| assertEquals(numRecords - numRecordsToDelete, snapshotDF2.count()) | ||
| } | ||
|
|
||
| @Test def testFailEarlyForIncrViewQueryForNonExistingFiles(): Unit = { | ||
| // Create 10 commits | ||
| for (i <- 1 to 10) { | ||
| val records = recordsToStrings(dataGen.generateInserts("%05d".format(i), 100)).toList | ||
| val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2)) | ||
| inputDF.write.format("org.apache.hudi") | ||
| .options(commonOpts) | ||
| .option("hoodie.cleaner.commits.retained", "3") | ||
| .option("hoodie.keep.min.commits", "4") | ||
| .option("hoodie.keep.max.commits", "5") | ||
| .option(DataSourceWriteOptions.OPERATION.key(), DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) | ||
| .mode(SaveMode.Append) | ||
| .save(basePath) | ||
| } | ||
|
|
||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why moving the test around ?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As we don't need to write same codes(generate data) both in the |
||
| val hoodieMetaClient = HoodieTableMetaClient.builder().setConf(spark.sparkContext.hadoopConfiguration).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build() | ||
| /** | ||
| * State of timeline after 10 commits | ||
| * +------------------+--------------------------------------+ | ||
| * | Archived | Active Timeline | | ||
| * +------------------+--------------+-----------------------+ | ||
| * | C0 C1 C2 C3 | C4 C5 | C6 C7 C8 C9 | | ||
| * +------------------+--------------+-----------------------+ | ||
| * | Data cleaned | Data exists in table | | ||
| * +---------------------------------+-----------------------+ | ||
| */ | ||
|
|
||
| val completedCommits = hoodieMetaClient.getCommitsTimeline.filterCompletedInstants() // C4 to C9 | ||
| //Anything less than 2 is a valid commit in the sense no cleanup has been done for those commit files | ||
| var startTs = completedCommits.nthInstant(0).get().getTimestamp //C4 | ||
| var endTs = completedCommits.nthInstant(1).get().getTimestamp //C5 | ||
|
|
||
| //Calling without the fallback should result in Path does not exist | ||
| var hoodieIncViewDF = spark.read.format("org.apache.hudi") | ||
| .option(DataSourceReadOptions.QUERY_TYPE.key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) | ||
| .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key(), startTs) | ||
| .option(DataSourceReadOptions.END_INSTANTTIME.key(), endTs) | ||
| .load(basePath) | ||
|
|
||
| val msg = "Should fail with Path does not exist" | ||
| assertThrows(classOf[AnalysisException], new Executable { | ||
| override def execute(): Unit = { | ||
| hoodieIncViewDF.count() | ||
| } | ||
| }, msg) | ||
|
|
||
| //Should work with fallback enabled | ||
| hoodieIncViewDF = spark.read.format("org.apache.hudi") | ||
| .option(DataSourceReadOptions.QUERY_TYPE.key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) | ||
| .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key(), startTs) | ||
| .option(DataSourceReadOptions.END_INSTANTTIME.key(), endTs) | ||
| .option(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.key(), "true") | ||
| .load(basePath) | ||
| assertEquals(100, hoodieIncViewDF.count()) | ||
|
|
||
| //Test out for archived commits | ||
| val archivedInstants = hoodieMetaClient.getArchivedTimeline.getInstants.distinct().toArray | ||
| startTs = archivedInstants(0).asInstanceOf[HoodieInstant].getTimestamp //C0 | ||
| endTs = completedCommits.nthInstant(1).get().getTimestamp //C5 | ||
|
|
||
| //Calling without the fallback should result in Path does not exist | ||
| hoodieIncViewDF = spark.read.format("org.apache.hudi") | ||
| .option(DataSourceReadOptions.QUERY_TYPE.key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) | ||
| .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key(), startTs) | ||
| .option(DataSourceReadOptions.END_INSTANTTIME.key(), endTs) | ||
| .load(basePath) | ||
|
|
||
| assertThrows(classOf[AnalysisException], new Executable { | ||
| override def execute(): Unit = { | ||
| hoodieIncViewDF.count() | ||
| } | ||
| }, msg) | ||
|
|
||
| //Should work with fallback enabled | ||
| hoodieIncViewDF = spark.read.format("org.apache.hudi") | ||
| .option(DataSourceReadOptions.QUERY_TYPE.key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) | ||
| .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key(), startTs) | ||
| .option(DataSourceReadOptions.END_INSTANTTIME.key(), endTs) | ||
| .option(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.key(), "true") | ||
| .load(basePath) | ||
| assertEquals(500, hoodieIncViewDF.count()) | ||
| } | ||
|
|
||
| @Test | ||
| def testWriteSmallPrecisionDecimalTable(): Unit = { | ||
| val records1 = recordsToStrings(dataGen.generateInserts("001", 5)).toList | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here should we align with the Flink side which does not introduce a new param to control wether enable fullTableScan?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main change seems add end commit out of range check, so maybe we add a
end commit must be greater than start commit constraint.
And even if end commit is out of range, the case that
the end commit is greater than the latest commit
is a valid case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, Looks the
IncrementalRelationdoesn't support this, I'll fix it as well...