Skip to content

Conversation

@aokolnychyi
Copy link
Contributor

@aokolnychyi aokolnychyi commented Oct 11, 2024

This PR is a prototype for DVs in V3. It serves as a reference and is NOT meant to be merged.

@aokolnychyi aokolnychyi force-pushed the portable-roaring-bitmap branch 3 times, most recently from 507c1f9 to 4ea48f0 Compare October 13, 2024 22:55
@github-actions github-actions bot added the data label Oct 13, 2024
@aokolnychyi aokolnychyi force-pushed the portable-roaring-bitmap branch from 4ea48f0 to 3b3600d Compare October 13, 2024 23:08
Types.NestedField SORT_ORDER_ID =
optional(140, "sort_order_id", IntegerType.get(), "Sort order ID");
Types.NestedField SPEC_ID = optional(141, "spec_id", IntegerType.get(), "Partition spec ID");
Types.NestedField REFERENCED_DATA_FILE =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follows the proposed spec, reserving 142 for row lineage.

@aokolnychyi aokolnychyi force-pushed the portable-roaring-bitmap branch from 3b3600d to e5ddcd3 Compare October 15, 2024 19:17
@github-actions github-actions bot added the spark label Oct 15, 2024
@aokolnychyi aokolnychyi force-pushed the portable-roaring-bitmap branch from e5ddcd3 to daeb847 Compare October 18, 2024 00:28
@github-actions github-actions bot added the flink label Oct 18, 2024
@aokolnychyi aokolnychyi force-pushed the portable-roaring-bitmap branch 4 times, most recently from 78f0c2b to 149b55a Compare October 21, 2024 20:10
@aokolnychyi aokolnychyi force-pushed the portable-roaring-bitmap branch 6 times, most recently from c73257f to 9b4570b Compare October 30, 2024 16:07
CounterResult addedPositionalDeleteFiles();

@Nullable
CounterResult addedDVs();
Copy link
Contributor

@nastra nastra Oct 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
CounterResult addedDVs();
@Value.Default
default CounterResult addedDVs() {
return null;
}

same for the others so that we don't break APIs (and you'll be able to remove the RevAPI failures)

- code: "java.class.removed"
old: "enum org.apache.iceberg.BaseMetastoreTableOperations.CommitStatus"
justification: "Removing deprecated code"
- code: "java.method.addedToInterface"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've commented further below on how to avoid these

DeleteFile existingDV = dvByPath.putIfAbsent(path, dv);
if (existingDV != null) {
throw new ValidationException(
"Can't index multiple DVs for %s: %s and %s",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it make sense to add existing in the wording here so that readers of this failure know which one points to the existing DV?

Preconditions.checkArgument(contentOffset != null, "Content offset is required for DV");
Preconditions.checkArgument(contentSizeInBytes != null, "Content size is required for DV");
} else {
Preconditions.checkArgument(contentOffset == null, "Content offset is only for DV");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe Content offset must only be set for DVs? Same for content size

}

@Override
public void write(CharSequence path, long pos, PartitionSpec spec, StructLike partition) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder whether path should rather be a String now, since we changed other APIs and moved them from CharSequence to String (e.g. ContentFile now has location(), which returns a String)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did some minor updates and this should work I believe

   private final OutputFileFactory fileFactory;
-  private final Function<CharSequence, PositionDeleteIndex> loadPreviousDeletes;
-  private final CharSequenceMap<Deletes> deletesByPath = CharSequenceMap.create();
-  private final CharSequenceMap<BlobMetadata> blobsByPath = CharSequenceMap.create();
+  private final Function<String, PositionDeleteIndex> loadPreviousDeletes;
+  private final Map<String, Deletes> deletesByPath = Maps.newHashMap();
+  private final Map<String, BlobMetadata> blobsByPath = Maps.newHashMap();
   private DeleteWriteResult result = null;

   public BaseDVFileWriter(
-      OutputFileFactory fileFactory,
-      Function<CharSequence, PositionDeleteIndex> loadPreviousDeletes) {
+      OutputFileFactory fileFactory, Function<String, PositionDeleteIndex> loadPreviousDeletes) {
     this.fileFactory = fileFactory;
     this.loadPreviousDeletes = loadPreviousDeletes;
   }

   @Override
-  public void write(CharSequence path, long pos, PartitionSpec spec, StructLike partition) {
-    Deletes deletes = deletesByPath.computeIfAbsent(path, () -> new Deletes(path, spec, partition));
+  public void write(String path, long pos, PartitionSpec spec, StructLike partition) {
+    Deletes deletes = deletesByPath.computeIfAbsent(path, k -> new Deletes(path, spec, partition));
     PositionDeleteIndex positions = deletes.positions();
     positions.delete(pos);
   }
@@ -87,7 +87,7 @@ public class BaseDVFileWriter implements DVFileWriter {

       try (PuffinWriter closeableWriter = writer) {
         for (Deletes deletes : deletesByPath.values()) {
-          CharSequence path = deletes.path();
+          String path = deletes.path();
           PositionDeleteIndex positions = deletes.positions();
           PositionDeleteIndex previousPositions = loadPreviousDeletes.apply(path);
           if (previousPositions != null) {
@@ -108,7 +108,7 @@ public class BaseDVFileWriter implements DVFileWriter {
       String puffinPath = writer.location();
       long puffinFileSize = writer.fileSize();

-      for (CharSequence path : deletesByPath.keySet()) {
+      for (String path : deletesByPath.keySet()) {
         DeleteFile deleteFile = createDeleteFile(puffinPath, puffinFileSize, path);
         deleteFiles.add(deleteFile);
       }
@@ -117,8 +117,7 @@ public class BaseDVFileWriter implements DVFileWriter {
     }
   }

-  @SuppressWarnings("CollectionUndefinedEquality")
-  private DeleteFile createDeleteFile(String path, long size, CharSequence referencedDataFile) {
+  private DeleteFile createDeleteFile(String path, long size, String referencedDataFile) {
     Deletes deletes = deletesByPath.get(referencedDataFile);
     BlobMetadata blobMetadata = blobsByPath.get(referencedDataFile);
     return FileMetadata.deleteFileBuilder(deletes.spec())
@@ -135,7 +134,7 @@ public class BaseDVFileWriter implements DVFileWriter {
   }

   private void write(PuffinWriter writer, Deletes deletes) {
-    CharSequence path = deletes.path();
+    String path = deletes.path();
     PositionDeleteIndex positions = deletes.positions();
     BlobMetadata blobMetadata = writer.write(toBlob(positions, path));
     blobsByPath.put(path, blobMetadata);
@@ -147,7 +146,7 @@ public class BaseDVFileWriter implements DVFileWriter {
     return Puffin.write(outputFile).createdBy(ident).build();
   }

-  private Blob toBlob(PositionDeleteIndex positions, CharSequence path) {
+  private Blob toBlob(PositionDeleteIndex positions, String path) {
     return new Blob(
         StandardBlobTypes.DV_V1,
         ImmutableList.of(MetadataColumns.ROW_POSITION.fieldId()),
@@ -156,26 +155,23 @@ public class BaseDVFileWriter implements DVFileWriter {
         positions.serialize(),
         null /* uncompressed */,
         ImmutableMap.of(
-            REF_DATA_FILE_KEY,
-            path.toString(),
-            CARDINALITY_KEY,
-            String.valueOf(positions.cardinality())));
+            REF_DATA_FILE_KEY, path, CARDINALITY_KEY, String.valueOf(positions.cardinality())));
   }

   private static class Deletes {
-    private final CharSequence path;
+    private final String path;
     private final PositionDeleteIndex positions;
     private final PartitionSpec spec;
     private final StructLike partition;

-    private Deletes(CharSequence path, PartitionSpec spec, StructLike partition) {
+    private Deletes(String path, PartitionSpec spec, StructLike partition) {
       this.path = path;
       this.positions = new BitmapPositionDeleteIndex();
       this.spec = spec;
       this.partition = StructCopy.copy(partition);
     }

-    public CharSequence path() {
+    public String path() {
       return path;
     }

long expectedLength = deleteFile.contentSizeInBytes() - LENGTH_SIZE_BYTES - CRC_SIZE_BYTES;
Preconditions.checkArgument(
length == expectedLength,
"Invalid bitmap data length %s, expected %s",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"Invalid bitmap data length %s, expected %s",
"Invalid bitmap data length: %s, expected %s",

* @param spec the data file partition spec
* @param partition the data file partition
*/
void write(CharSequence path, long pos, PartitionSpec spec, StructLike partition);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
void write(CharSequence path, long pos, PartitionSpec spec, StructLike partition);
void write(String path, long pos, PartitionSpec spec, StructLike partition);

CounterResult positionalDeleteFiles();

@Nullable
CounterResult dvs();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as I mentioned in the CommitMetricsResult

long totalPosDeletes1 = dv1.recordCount() + dv2.recordCount();
long totalFileSize1 = dv1.contentSizeInBytes() + dv2.contentSizeInBytes();
assertThat(summary1)
.doesNotContainKey(SnapshotSummary.ADD_POS_DELETE_FILES_PROP)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: other tests in this class do a hasSize() check and check for all props (except for the iceberg-version). This is being done mainly to make sure the summary contains all the expected info.
I think it would be good here to add the hasSize(12) check and check that all of the 11 (not counting iceberg-version) have the right values

public PositionDeleteIndex loadPositionDeletes(
Iterable<DeleteFile> deleteFiles, CharSequence filePath) {
if (containsDVs(deleteFiles)) {
DeleteFile deleteFile = Iterables.getOnlyElement(deleteFiles);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: should we add some validation to make sure deleteFiles indeed only has a single entry?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am using Iterables.getOnlyElement(), which will complain if there is more than one.

}

private void validateDV(DeleteFile deleteFile, CharSequence filePath) {
Preconditions.checkArgument(deleteFile.contentOffset() != null, "DV offset is missing");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe Invalid DV: offset missing?

try (SeekableInputStream stream = inputFile.newStream()) {
byte[] bytes = new byte[length];

if (stream instanceof RangeReadable) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like this is re-using the same functionality that PuffinReader has internally, so not sure if it's worth extracting that into a common method

deleteFilesRecordCount.addAndGet(deleteFile.recordCount());
deleteFilesByteCount.addAndGet(deleteFile.fileSizeInBytes());
long contentSizeInBytes = ContentFileUtil.contentSizeInBytes(deleteFile);
deleteFilesByteCount.addAndGet(contentSizeInBytes);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to fix other Flink versions too?

.forEach(
deleteFile -> {
deleteFilesSizeHistogram.update(deleteFile.fileSizeInBytes());
deleteFilesSizeHistogram.update(ContentFileUtil.contentSizeInBytes(deleteFile));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to fix/update other Flink versions too?

@aokolnychyi aokolnychyi force-pushed the portable-roaring-bitmap branch from 9b4570b to d325075 Compare November 1, 2024 15:34
@aokolnychyi
Copy link
Contributor Author

Closing this in favor of other smaller PRs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants