Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class IncrementalRelation(val sqlContext: SQLContext,
throw new HoodieException(s"Specify the begin instant time to pull from using " +
s"option ${DataSourceReadOptions.BEGIN_INSTANTTIME.key}")
}

if (!metaClient.getTableConfig.populateMetaFields()) {
throw new HoodieException("Incremental queries are not supported when meta fields are disabled")
}
Expand Down Expand Up @@ -188,71 +189,90 @@ class IncrementalRelation(val sqlContext: SQLContext,
case HoodieFileFormat.ORC => "orc"
}
sqlContext.sparkContext.hadoopConfiguration.unset("mapreduce.input.pathFilter.class")

// Fallback to full table scan if any of the following conditions matches:
// 1. the start commit is archived
// 2. the end commit is archived
// 3. there are files in metadata be deleted
val fallbackToFullTableScan = optParams.getOrElse(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.key,
DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.defaultValue).toBoolean

val sOpts = optParams.filter(p => !p._1.equalsIgnoreCase("path"))
if (filteredRegularFullPaths.isEmpty && filteredMetaBootstrapFullPaths.isEmpty) {
sqlContext.sparkContext.emptyRDD[Row]
} else {
log.info("Additional Filters to be applied to incremental source are :" + filters.mkString("Array(", ", ", ")"))

var df: DataFrame = sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row], usedSchema)
val startInstantTime = optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key)
val startInstantArchived = commitTimeline.isBeforeTimelineStarts(startInstantTime)
val endInstantTime = optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME.key(), lastInstant.getTimestamp)
val endInstantArchived = commitTimeline.isBeforeTimelineStarts(endInstantTime)

val fallbackToFullTableScan = optParams.getOrElse(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.key,
DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.defaultValue).toBoolean
val scanDf = if (fallbackToFullTableScan && (startInstantArchived || endInstantArchived)) {
log.info(s"Falling back to full table scan as startInstantArchived: $startInstantArchived, endInstantArchived: $endInstantArchived")
fullTableScanDataFrame(startInstantTime, endInstantTime)
} else {
if (filteredRegularFullPaths.isEmpty && filteredMetaBootstrapFullPaths.isEmpty) {
sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row], usedSchema)
} else {
log.info("Additional Filters to be applied to incremental source are :" + filters.mkString("Array(", ", ", ")"))

var doFullTableScan = false
var df: DataFrame = sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row], usedSchema)

if (fallbackToFullTableScan) {
val fs = basePath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration);
val timer = new HoodieTimer().startTimer();
var doFullTableScan = false

val allFilesToCheck = filteredMetaBootstrapFullPaths ++ filteredRegularFullPaths
val firstNotFoundPath = allFilesToCheck.find(path => !fs.exists(new Path(path)))
val timeTaken = timer.endTimer()
log.info("Checking if paths exists took " + timeTaken + "ms")
if (fallbackToFullTableScan) {
val fs = basePath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration);
val timer = new HoodieTimer().startTimer();

val optStartTs = optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key)
val isInstantArchived = optStartTs.compareTo(commitTimeline.firstInstant().get().getTimestamp) < 0 // True if optStartTs < activeTimeline.first
val allFilesToCheck = filteredMetaBootstrapFullPaths ++ filteredRegularFullPaths
val firstNotFoundPath = allFilesToCheck.find(path => !fs.exists(new Path(path)))
val timeTaken = timer.endTimer()
log.info("Checking if paths exists took " + timeTaken + "ms")

if (isInstantArchived || firstNotFoundPath.isDefined) {
doFullTableScan = true
log.info("Falling back to full table scan")
if (firstNotFoundPath.isDefined) {
doFullTableScan = true
log.info("Falling back to full table scan as some files cannot be found.")
}
}
}

if (doFullTableScan) {
val hudiDF = sqlContext.read
.format("hudi_v1")
.schema(usedSchema)
.load(basePath.toString)
.filter(String.format("%s > '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, //Notice the > in place of >= because we are working with optParam instead of first commit > optParam
optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key)))
.filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD,
commitsToReturn.last.getTimestamp))
// schema enforcement does not happen in above spark.read with hudi. hence selecting explicitly w/ right column order
val fieldNames : Array[String] = df.schema.fields.map(field => field.name)
df = df.union(hudiDF.select(fieldNames.head, fieldNames.tail: _*))
} else {
if (metaBootstrapFileIdToFullPath.nonEmpty) {
df = sqlContext.sparkSession.read
.format("hudi_v1")
.schema(usedSchema)
.option(DataSourceReadOptions.READ_PATHS.key, filteredMetaBootstrapFullPaths.mkString(","))
.load()
}
if (doFullTableScan) {
fullTableScanDataFrame(startInstantTime, endInstantTime)
} else {
if (metaBootstrapFileIdToFullPath.nonEmpty) {
df = sqlContext.sparkSession.read
.format("hudi_v1")
.schema(usedSchema)
.option(DataSourceReadOptions.READ_PATHS.key, filteredMetaBootstrapFullPaths.mkString(","))
.load()
}

if (regularFileIdToFullPath.nonEmpty) {
df = df.union(sqlContext.read.options(sOpts)
.schema(usedSchema).format(formatClassName)
.load(filteredRegularFullPaths.toList: _*)
.filter(String.format("%s >= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD,
commitsToReturn.head.getTimestamp))
.filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD,
commitsToReturn.last.getTimestamp)))
if (regularFileIdToFullPath.nonEmpty) {
df = df.union(sqlContext.read.options(sOpts)
.schema(usedSchema).format(formatClassName)
.load(filteredRegularFullPaths.toList: _*)
.filter(String.format("%s >= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD,
commitsToReturn.head.getTimestamp))
.filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD,
commitsToReturn.last.getTimestamp)))
}
df
}
}

filters.foldLeft(df)((e, f) => e.filter(f)).rdd
}

filters.foldLeft(scanDf)((e, f) => e.filter(f)).rdd
}
}

private def fullTableScanDataFrame(startInstantTime: String, endInstantTime: String): DataFrame = {
val hudiDF = sqlContext.read
.format("hudi_v1")
.schema(usedSchema)
.load(basePath.toString)
.filter(String.format("%s > '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, //Notice the > in place of >= because we are working with optParam instead of first commit > optParam
startInstantTime))
.filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD,
endInstantTime))

// schema enforcement does not happen in above spark.read with hudi. hence selecting explicitly w/ right column order
val fieldNames = usedSchema.fieldNames
hudiDF.select(fieldNames.head, fieldNames.tail: _*)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.hudi

import org.apache.hadoop.fs.{GlobPattern, Path}
import org.apache.hadoop.fs.{FileStatus, GlobPattern, Path}
import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.common.model.{FileSlice, HoodieRecord}
import org.apache.hudi.common.table.HoodieTableMetaClient
Expand Down Expand Up @@ -53,9 +53,11 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
}

override protected def timeline: HoodieTimeline = {
val startTimestamp = optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key)
val endTimestamp = optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME.key, super.timeline.lastInstant().get.getTimestamp)
super.timeline.findInstantsInRange(startTimestamp, endTimestamp)
if (fullTableScan) {
super.timeline
} else {
super.timeline.findInstantsInRange(startTimestamp, endTimestamp)
}
}

protected override def composeRDD(fileSplits: Seq[HoodieMergeOnReadFileSplit],
Expand Down Expand Up @@ -87,17 +89,19 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
if (includedCommits.isEmpty) {
List()
} else {
val latestCommit = includedCommits.last.getTimestamp
val commitsMetadata = includedCommits.map(getCommitMetadata(_, timeline)).asJava
val fileSlices = if (fullTableScan) {
listLatestFileSlices(Seq(), partitionFilters, dataFilters)
} else {
val latestCommit = includedCommits.last.getTimestamp

val modifiedFiles = listAffectedFilesForCommits(conf, new Path(metaClient.getBasePath), commitsMetadata)
val fsView = new HoodieTableFileSystemView(metaClient, timeline, modifiedFiles)
val fsView = new HoodieTableFileSystemView(metaClient, timeline, affectedFilesInCommits)

val modifiedPartitions = getWritePartitionPaths(commitsMetadata)
val modifiedPartitions = getWritePartitionPaths(commitsMetadata)

val fileSlices = modifiedPartitions.asScala.flatMap { relativePartitionPath =>
fsView.getLatestMergedFileSlicesBeforeOrOn(relativePartitionPath, latestCommit).iterator().asScala
}.toSeq
modifiedPartitions.asScala.flatMap { relativePartitionPath =>
fsView.getLatestMergedFileSlicesBeforeOrOn(relativePartitionPath, latestCommit).iterator().asScala
}.toSeq
}

buildSplits(filterFileSlices(fileSlices, globPattern))
}
Expand All @@ -124,14 +128,48 @@ trait HoodieIncrementalRelationTrait extends HoodieBaseRelation {
// Validate this Incremental implementation is properly configured
validate()

protected lazy val includedCommits: immutable.Seq[HoodieInstant] = timeline.getInstants.iterator().asScala.toList
protected def startTimestamp: String = optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key)
protected def endTimestamp: String = optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME.key, super.timeline.lastInstant().get.getTimestamp)

protected def startInstantArchived: Boolean = super.timeline.isBeforeTimelineStarts(startTimestamp)
protected def endInstantArchived: Boolean = super.timeline.isBeforeTimelineStarts(endTimestamp)

// Fallback to full table scan if any of the following conditions matches:
// 1. the start commit is archived
// 2. the end commit is archived
// 3. there are files in metadata be deleted
protected lazy val fullTableScan: Boolean = {
val fallbackToFullTableScan = optParams.getOrElse(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.key,
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here should we align with the Flink side which does not introduce a new param to control wether enable fullTableScan?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main change seems add end commit out of range check, so maybe we add a
end commit must be greater than start commit constraint.

And even if end commit is out of range, the case that
the end commit is greater than the latest commit
is a valid case.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And even if end commit is out of range, the case that the end commit is greater than the latest commit is a valid case

Yea, Looks the IncrementalRelation doesn't support this, I'll fix it as well...

DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.defaultValue).toBoolean

fallbackToFullTableScan && (startInstantArchived || endInstantArchived || affectedFilesInCommits.exists(fileStatus => !metaClient.getFs.exists(fileStatus.getPath)))
}

protected lazy val includedCommits: immutable.Seq[HoodieInstant] = {
if (!startInstantArchived || !endInstantArchived) {
// If endTimestamp commit is not archived, will filter instants
// before endTimestamp.
super.timeline.findInstantsInRange(startTimestamp, endTimestamp).getInstants.iterator().asScala.toList
} else {
super.timeline.getInstants.iterator().asScala.toList
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this right ? Why not filter the instants with range ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If endInstantArchived, we'll get all instants from timeline, this is used to get latest commit, same behavior in the flink side:

// Step3: decides the read end commit
    final String endInstant = fullTableScan
        ? commitTimeline.lastInstant().get().getTimestamp()
        : instants.get(instants.size() - 1).getTimestamp();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But we should also take the start instant into consideration.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, there could be a situation that startInstant is not archived, while endInstant is archived, should return empty commits

}

protected lazy val commitsMetadata = includedCommits.map(getCommitMetadata(_, super.timeline)).asJava

protected lazy val affectedFilesInCommits: Array[FileStatus] = {
listAffectedFilesForCommits(conf, new Path(metaClient.getBasePath), commitsMetadata)
}

// Record filters making sure that only records w/in the requested bounds are being fetched as part of the
// scan collected by this relation
protected lazy val incrementalSpanRecordFilters: Seq[Filter] = {
val isNotNullFilter = IsNotNull(HoodieRecord.COMMIT_TIME_METADATA_FIELD)
val largerThanFilter = GreaterThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, includedCommits.head.getTimestamp)
val lessThanFilter = LessThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, includedCommits.last.getTimestamp)

val largerThanFilter = GreaterThan(HoodieRecord.COMMIT_TIME_METADATA_FIELD, startTimestamp)

val lessThanFilter = LessThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD,
if (endInstantArchived) endTimestamp else includedCommits.last.getTimestamp)

Seq(isNotNullFilter, largerThanFilter, lessThanFilter)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -859,89 +859,6 @@ class TestCOWDataSource extends HoodieClientTestBase {
assertEquals(numRecords - numRecordsToDelete, snapshotDF2.count())
}

@Test def testFailEarlyForIncrViewQueryForNonExistingFiles(): Unit = {
// Create 10 commits
for (i <- 1 to 10) {
val records = recordsToStrings(dataGen.generateInserts("%05d".format(i), 100)).toList
val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
inputDF.write.format("org.apache.hudi")
.options(commonOpts)
.option("hoodie.cleaner.commits.retained", "3")
.option("hoodie.keep.min.commits", "4")
.option("hoodie.keep.max.commits", "5")
.option(DataSourceWriteOptions.OPERATION.key(), DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.mode(SaveMode.Append)
.save(basePath)
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why moving the test around ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we don't need to write same codes(generate data) both in the TestCOWDataSource and TestMORDataSource, so move all these codes to the new test TestIncrementalReadWithFullTableScan

val hoodieMetaClient = HoodieTableMetaClient.builder().setConf(spark.sparkContext.hadoopConfiguration).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build()
/**
* State of timeline after 10 commits
* +------------------+--------------------------------------+
* | Archived | Active Timeline |
* +------------------+--------------+-----------------------+
* | C0 C1 C2 C3 | C4 C5 | C6 C7 C8 C9 |
* +------------------+--------------+-----------------------+
* | Data cleaned | Data exists in table |
* +---------------------------------+-----------------------+
*/

val completedCommits = hoodieMetaClient.getCommitsTimeline.filterCompletedInstants() // C4 to C9
//Anything less than 2 is a valid commit in the sense no cleanup has been done for those commit files
var startTs = completedCommits.nthInstant(0).get().getTimestamp //C4
var endTs = completedCommits.nthInstant(1).get().getTimestamp //C5

//Calling without the fallback should result in Path does not exist
var hoodieIncViewDF = spark.read.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE.key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.BEGIN_INSTANTTIME.key(), startTs)
.option(DataSourceReadOptions.END_INSTANTTIME.key(), endTs)
.load(basePath)

val msg = "Should fail with Path does not exist"
assertThrows(classOf[AnalysisException], new Executable {
override def execute(): Unit = {
hoodieIncViewDF.count()
}
}, msg)

//Should work with fallback enabled
hoodieIncViewDF = spark.read.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE.key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.BEGIN_INSTANTTIME.key(), startTs)
.option(DataSourceReadOptions.END_INSTANTTIME.key(), endTs)
.option(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.key(), "true")
.load(basePath)
assertEquals(100, hoodieIncViewDF.count())

//Test out for archived commits
val archivedInstants = hoodieMetaClient.getArchivedTimeline.getInstants.distinct().toArray
startTs = archivedInstants(0).asInstanceOf[HoodieInstant].getTimestamp //C0
endTs = completedCommits.nthInstant(1).get().getTimestamp //C5

//Calling without the fallback should result in Path does not exist
hoodieIncViewDF = spark.read.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE.key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.BEGIN_INSTANTTIME.key(), startTs)
.option(DataSourceReadOptions.END_INSTANTTIME.key(), endTs)
.load(basePath)

assertThrows(classOf[AnalysisException], new Executable {
override def execute(): Unit = {
hoodieIncViewDF.count()
}
}, msg)

//Should work with fallback enabled
hoodieIncViewDF = spark.read.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE.key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.BEGIN_INSTANTTIME.key(), startTs)
.option(DataSourceReadOptions.END_INSTANTTIME.key(), endTs)
.option(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.key(), "true")
.load(basePath)
assertEquals(500, hoodieIncViewDF.count())
}

@Test
def testWriteSmallPrecisionDecimalTable(): Unit = {
val records1 = recordsToStrings(dataGen.generateInserts("001", 5)).toList
Expand Down
Loading