Skip to content

Conversation

@stevenzwu
Copy link
Contributor

@stevenzwu stevenzwu commented Aug 2, 2024

last PR for put everything together from the project: [Priority 2] Flink: support range distribution (view)

.defaultValue(StatisticsType.Auto.name());

public static final ConfigOption<Double> CLOSE_FILE_COST_WEIGHT_PERCENTAGE =
ConfigOptions.key("close-file-cost-weight-percentage").doubleType().defaultValue(0.02d);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

open to feedback on the config name, type, and default value. 0.02 means 2% of close file weight on the target weight per task. it avoids placing more than 50 files in one writer task.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also keeps open 50 writers (which means high memory footprint).

Does 0.02d mean 2 percent? In this case we can use close-file-cost-weight.
Or alternatively if percentage is in the name, then the value should be 2

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, 0.02d meant 2%. I didn't go with the percentage in the naming and integer as value in order to get a bit more flexibility like maybe like 0.005d for 0.5%.

agree with the comment on naming that is probably not the best. maybe close-file-cost-weight. In the doc, we already explained 0.02d meant 2%.

Copy link
Contributor Author

@stevenzwu stevenzwu Aug 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed this config to RANGE_DISTRIBUTION_SORT_KEY_BASE_WEIGHT as I think it is more accurate. added more extensive Javadoc and explanation in the doc. hope that it is more clear to users.

Will follow up with a separate PR to change internal code from closeFileCost to sortKeyBaseWeight. it will touch a bunch of internal files and lines.


// Convert the requested flink table schema to flink row type.
RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema);
int writerParallelism =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

writer parallelism is also needed by distributeDataStream method as the downstream operator parallelism for range partitioner.

@pvary
Copy link
Contributor

pvary commented Aug 6, 2024

@rodmeneses: This will effect your PR as well. Please sync with @stevenzwu about the order of the commits

@stevenzwu
Copy link
Contributor Author

@rodmeneses: This will effect your PR as well. Please sync with @stevenzwu about the order of the commits

I don't think we should worry about the order. we can integrate the range distribution with the v2 sink separately after the v2 sink is merged.

@stevenzwu stevenzwu force-pushed the flink-range-distribution branch from 65edd7e to 2aaa30c Compare August 7, 2024 06:08
@stevenzwu stevenzwu force-pushed the flink-range-distribution branch from 2aaa30c to 46825f9 Compare August 16, 2024 05:25
Copy link
Contributor

@pvary pvary left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few nits, but LGTM

@stevenzwu stevenzwu force-pushed the flink-range-distribution branch from 6e1cbc9 to 1c612da Compare August 16, 2024 16:46

Config value is a enum type: `Map`, `Sketch`, `Auto`.
<ul>
<li>Map: collect accurate sampling count for every single key.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

collect -> collects


### Range distribution (experimental)

RANGE distribution shuffle data by partition key or sort order via a custom range partitioner.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shuffle -> shuffles ?


#### Use cases

RANGE distribution can be applied an Iceberg table that either is partitioned or
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can be applied -> can be applied to

.withDescription("Type of statistics collection: Auto, Map, Sketch");

public static final ConfigOption<Double> RANGE_DISTRIBUTION_SORT_KEY_BASE_WEIGHT =
ConfigOptions.key("ange-distribution-sort-key-base-weight")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ange-distribution-sort-key-base-weight -> range-distribution-sort-key-base-weight

<li>For low cardinality scenario (like hundreds or thousands),
HashMap is used to track traffic distribution for every key.
If a new sort key value shows up, range partitioner would just
round-robin it to the writer tasks before traffic distribution has been learned.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

learned. -> learned, let's remove the extra .

@stevenzwu stevenzwu force-pushed the flink-range-distribution branch from 1c612da to f8d559f Compare August 19, 2024 17:26
@stevenzwu stevenzwu merged commit ed07fd1 into apache:main Aug 19, 2024
@stevenzwu stevenzwu deleted the flink-range-distribution branch August 19, 2024 22:00
stevenzwu added a commit to stevenzwu/iceberg that referenced this pull request Aug 19, 2024
stevenzwu added a commit to stevenzwu/iceberg that referenced this pull request Aug 22, 2024
zachdisc pushed a commit to zachdisc/iceberg that referenced this pull request Dec 23, 2024
czy006 pushed a commit to czy006/iceberg that referenced this pull request Apr 2, 2025
czy006 pushed a commit to czy006/iceberg that referenced this pull request Apr 2, 2025
czy006 pushed a commit to czy006/iceberg that referenced this pull request Apr 2, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Development

Successfully merging this pull request may close these issues.

3 participants