-
Notifications
You must be signed in to change notification settings - Fork 3k
[WIP] Core: Prototype for DVs in V3 #11302
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
507c1f9 to
4ea48f0
Compare
4ea48f0 to
3b3600d
Compare
| Types.NestedField SORT_ORDER_ID = | ||
| optional(140, "sort_order_id", IntegerType.get(), "Sort order ID"); | ||
| Types.NestedField SPEC_ID = optional(141, "spec_id", IntegerType.get(), "Partition spec ID"); | ||
| Types.NestedField REFERENCED_DATA_FILE = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Follows the proposed spec, reserving 142 for row lineage.
3b3600d to
e5ddcd3
Compare
e5ddcd3 to
daeb847
Compare
78f0c2b to
149b55a
Compare
c73257f to
9b4570b
Compare
| CounterResult addedPositionalDeleteFiles(); | ||
|
|
||
| @Nullable | ||
| CounterResult addedDVs(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| CounterResult addedDVs(); | |
| @Value.Default | |
| default CounterResult addedDVs() { | |
| return null; | |
| } |
same for the others so that we don't break APIs (and you'll be able to remove the RevAPI failures)
.palantir/revapi.yml
Outdated
| - code: "java.class.removed" | ||
| old: "enum org.apache.iceberg.BaseMetastoreTableOperations.CommitStatus" | ||
| justification: "Removing deprecated code" | ||
| - code: "java.method.addedToInterface" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've commented further below on how to avoid these
| DeleteFile existingDV = dvByPath.putIfAbsent(path, dv); | ||
| if (existingDV != null) { | ||
| throw new ValidationException( | ||
| "Can't index multiple DVs for %s: %s and %s", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would it make sense to add existing in the wording here so that readers of this failure know which one points to the existing DV?
| Preconditions.checkArgument(contentOffset != null, "Content offset is required for DV"); | ||
| Preconditions.checkArgument(contentSizeInBytes != null, "Content size is required for DV"); | ||
| } else { | ||
| Preconditions.checkArgument(contentOffset == null, "Content offset is only for DV"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe Content offset must only be set for DVs? Same for content size
| } | ||
|
|
||
| @Override | ||
| public void write(CharSequence path, long pos, PartitionSpec spec, StructLike partition) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder whether path should rather be a String now, since we changed other APIs and moved them from CharSequence to String (e.g. ContentFile now has location(), which returns a String)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did some minor updates and this should work I believe
private final OutputFileFactory fileFactory;
- private final Function<CharSequence, PositionDeleteIndex> loadPreviousDeletes;
- private final CharSequenceMap<Deletes> deletesByPath = CharSequenceMap.create();
- private final CharSequenceMap<BlobMetadata> blobsByPath = CharSequenceMap.create();
+ private final Function<String, PositionDeleteIndex> loadPreviousDeletes;
+ private final Map<String, Deletes> deletesByPath = Maps.newHashMap();
+ private final Map<String, BlobMetadata> blobsByPath = Maps.newHashMap();
private DeleteWriteResult result = null;
public BaseDVFileWriter(
- OutputFileFactory fileFactory,
- Function<CharSequence, PositionDeleteIndex> loadPreviousDeletes) {
+ OutputFileFactory fileFactory, Function<String, PositionDeleteIndex> loadPreviousDeletes) {
this.fileFactory = fileFactory;
this.loadPreviousDeletes = loadPreviousDeletes;
}
@Override
- public void write(CharSequence path, long pos, PartitionSpec spec, StructLike partition) {
- Deletes deletes = deletesByPath.computeIfAbsent(path, () -> new Deletes(path, spec, partition));
+ public void write(String path, long pos, PartitionSpec spec, StructLike partition) {
+ Deletes deletes = deletesByPath.computeIfAbsent(path, k -> new Deletes(path, spec, partition));
PositionDeleteIndex positions = deletes.positions();
positions.delete(pos);
}
@@ -87,7 +87,7 @@ public class BaseDVFileWriter implements DVFileWriter {
try (PuffinWriter closeableWriter = writer) {
for (Deletes deletes : deletesByPath.values()) {
- CharSequence path = deletes.path();
+ String path = deletes.path();
PositionDeleteIndex positions = deletes.positions();
PositionDeleteIndex previousPositions = loadPreviousDeletes.apply(path);
if (previousPositions != null) {
@@ -108,7 +108,7 @@ public class BaseDVFileWriter implements DVFileWriter {
String puffinPath = writer.location();
long puffinFileSize = writer.fileSize();
- for (CharSequence path : deletesByPath.keySet()) {
+ for (String path : deletesByPath.keySet()) {
DeleteFile deleteFile = createDeleteFile(puffinPath, puffinFileSize, path);
deleteFiles.add(deleteFile);
}
@@ -117,8 +117,7 @@ public class BaseDVFileWriter implements DVFileWriter {
}
}
- @SuppressWarnings("CollectionUndefinedEquality")
- private DeleteFile createDeleteFile(String path, long size, CharSequence referencedDataFile) {
+ private DeleteFile createDeleteFile(String path, long size, String referencedDataFile) {
Deletes deletes = deletesByPath.get(referencedDataFile);
BlobMetadata blobMetadata = blobsByPath.get(referencedDataFile);
return FileMetadata.deleteFileBuilder(deletes.spec())
@@ -135,7 +134,7 @@ public class BaseDVFileWriter implements DVFileWriter {
}
private void write(PuffinWriter writer, Deletes deletes) {
- CharSequence path = deletes.path();
+ String path = deletes.path();
PositionDeleteIndex positions = deletes.positions();
BlobMetadata blobMetadata = writer.write(toBlob(positions, path));
blobsByPath.put(path, blobMetadata);
@@ -147,7 +146,7 @@ public class BaseDVFileWriter implements DVFileWriter {
return Puffin.write(outputFile).createdBy(ident).build();
}
- private Blob toBlob(PositionDeleteIndex positions, CharSequence path) {
+ private Blob toBlob(PositionDeleteIndex positions, String path) {
return new Blob(
StandardBlobTypes.DV_V1,
ImmutableList.of(MetadataColumns.ROW_POSITION.fieldId()),
@@ -156,26 +155,23 @@ public class BaseDVFileWriter implements DVFileWriter {
positions.serialize(),
null /* uncompressed */,
ImmutableMap.of(
- REF_DATA_FILE_KEY,
- path.toString(),
- CARDINALITY_KEY,
- String.valueOf(positions.cardinality())));
+ REF_DATA_FILE_KEY, path, CARDINALITY_KEY, String.valueOf(positions.cardinality())));
}
private static class Deletes {
- private final CharSequence path;
+ private final String path;
private final PositionDeleteIndex positions;
private final PartitionSpec spec;
private final StructLike partition;
- private Deletes(CharSequence path, PartitionSpec spec, StructLike partition) {
+ private Deletes(String path, PartitionSpec spec, StructLike partition) {
this.path = path;
this.positions = new BitmapPositionDeleteIndex();
this.spec = spec;
this.partition = StructCopy.copy(partition);
}
- public CharSequence path() {
+ public String path() {
return path;
}
| long expectedLength = deleteFile.contentSizeInBytes() - LENGTH_SIZE_BYTES - CRC_SIZE_BYTES; | ||
| Preconditions.checkArgument( | ||
| length == expectedLength, | ||
| "Invalid bitmap data length %s, expected %s", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| "Invalid bitmap data length %s, expected %s", | |
| "Invalid bitmap data length: %s, expected %s", |
| * @param spec the data file partition spec | ||
| * @param partition the data file partition | ||
| */ | ||
| void write(CharSequence path, long pos, PartitionSpec spec, StructLike partition); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| void write(CharSequence path, long pos, PartitionSpec spec, StructLike partition); | |
| void write(String path, long pos, PartitionSpec spec, StructLike partition); |
| CounterResult positionalDeleteFiles(); | ||
|
|
||
| @Nullable | ||
| CounterResult dvs(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as I mentioned in the CommitMetricsResult
| long totalPosDeletes1 = dv1.recordCount() + dv2.recordCount(); | ||
| long totalFileSize1 = dv1.contentSizeInBytes() + dv2.contentSizeInBytes(); | ||
| assertThat(summary1) | ||
| .doesNotContainKey(SnapshotSummary.ADD_POS_DELETE_FILES_PROP) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: other tests in this class do a hasSize() check and check for all props (except for the iceberg-version). This is being done mainly to make sure the summary contains all the expected info.
I think it would be good here to add the hasSize(12) check and check that all of the 11 (not counting iceberg-version) have the right values
| public PositionDeleteIndex loadPositionDeletes( | ||
| Iterable<DeleteFile> deleteFiles, CharSequence filePath) { | ||
| if (containsDVs(deleteFiles)) { | ||
| DeleteFile deleteFile = Iterables.getOnlyElement(deleteFiles); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: should we add some validation to make sure deleteFiles indeed only has a single entry?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am using Iterables.getOnlyElement(), which will complain if there is more than one.
| } | ||
|
|
||
| private void validateDV(DeleteFile deleteFile, CharSequence filePath) { | ||
| Preconditions.checkArgument(deleteFile.contentOffset() != null, "DV offset is missing"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe Invalid DV: offset missing?
| try (SeekableInputStream stream = inputFile.newStream()) { | ||
| byte[] bytes = new byte[length]; | ||
|
|
||
| if (stream instanceof RangeReadable) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like this is re-using the same functionality that PuffinReader has internally, so not sure if it's worth extracting that into a common method
| deleteFilesRecordCount.addAndGet(deleteFile.recordCount()); | ||
| deleteFilesByteCount.addAndGet(deleteFile.fileSizeInBytes()); | ||
| long contentSizeInBytes = ContentFileUtil.contentSizeInBytes(deleteFile); | ||
| deleteFilesByteCount.addAndGet(contentSizeInBytes); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to fix other Flink versions too?
| .forEach( | ||
| deleteFile -> { | ||
| deleteFilesSizeHistogram.update(deleteFile.fileSizeInBytes()); | ||
| deleteFilesSizeHistogram.update(ContentFileUtil.contentSizeInBytes(deleteFile)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to fix/update other Flink versions too?
9b4570b to
d325075
Compare
|
Closing this in favor of other smaller PRs. |
This PR is a prototype for DVs in V3. It serves as a reference and is NOT meant to be merged.