-
Notifications
You must be signed in to change notification settings - Fork 3k
Flink: put everything together for range distribution in Flink sink #10859
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| .defaultValue(StatisticsType.Auto.name()); | ||
|
|
||
| public static final ConfigOption<Double> CLOSE_FILE_COST_WEIGHT_PERCENTAGE = | ||
| ConfigOptions.key("close-file-cost-weight-percentage").doubleType().defaultValue(0.02d); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
open to feedback on the config name, type, and default value. 0.02 means 2% of close file weight on the target weight per task. it avoids placing more than 50 files in one writer task.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also keeps open 50 writers (which means high memory footprint).
Does 0.02d mean 2 percent? In this case we can use close-file-cost-weight.
Or alternatively if percentage is in the name, then the value should be 2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, 0.02d meant 2%. I didn't go with the percentage in the naming and integer as value in order to get a bit more flexibility like maybe like 0.005d for 0.5%.
agree with the comment on naming that is probably not the best. maybe close-file-cost-weight. In the doc, we already explained 0.02d meant 2%.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I renamed this config to RANGE_DISTRIBUTION_SORT_KEY_BASE_WEIGHT as I think it is more accurate. added more extensive Javadoc and explanation in the doc. hope that it is more clear to users.
Will follow up with a separate PR to change internal code from closeFileCost to sortKeyBaseWeight. it will touch a bunch of internal files and lines.
|
|
||
| // Convert the requested flink table schema to flink row type. | ||
| RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema); | ||
| int writerParallelism = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
writer parallelism is also needed by distributeDataStream method as the downstream operator parallelism for range partitioner.
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
Outdated
Show resolved
Hide resolved
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
Outdated
Show resolved
Hide resolved
.../flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkDistributionMode.java
Outdated
Show resolved
Hide resolved
|
@rodmeneses: This will effect your PR as well. Please sync with @stevenzwu about the order of the commits |
I don't think we should worry about the order. we can integrate the range distribution with the v2 sink separately after the v2 sink is merged. |
65edd7e to
2aaa30c
Compare
2aaa30c to
46825f9
Compare
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
Outdated
Show resolved
Hide resolved
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
Outdated
Show resolved
Hide resolved
pvary
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few nits, but LGTM
6e1cbc9 to
1c612da
Compare
docs/docs/flink-configuration.md
Outdated
|
|
||
| Config value is a enum type: `Map`, `Sketch`, `Auto`. | ||
| <ul> | ||
| <li>Map: collect accurate sampling count for every single key. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
collect -> collects
docs/docs/flink-writes.md
Outdated
|
|
||
| ### Range distribution (experimental) | ||
|
|
||
| RANGE distribution shuffle data by partition key or sort order via a custom range partitioner. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shuffle -> shuffles ?
docs/docs/flink-writes.md
Outdated
|
|
||
| #### Use cases | ||
|
|
||
| RANGE distribution can be applied an Iceberg table that either is partitioned or |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can be applied -> can be applied to
| .withDescription("Type of statistics collection: Auto, Map, Sketch"); | ||
|
|
||
| public static final ConfigOption<Double> RANGE_DISTRIBUTION_SORT_KEY_BASE_WEIGHT = | ||
| ConfigOptions.key("ange-distribution-sort-key-base-weight") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ange-distribution-sort-key-base-weight -> range-distribution-sort-key-base-weight
docs/docs/flink-writes.md
Outdated
| <li>For low cardinality scenario (like hundreds or thousands), | ||
| HashMap is used to track traffic distribution for every key. | ||
| If a new sort key value shows up, range partitioner would just | ||
| round-robin it to the writer tasks before traffic distribution has been learned. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
learned. -> learned, let's remove the extra .
…ight as it is more accurate and probably more user friendly.
1c612da to
f8d559f
Compare
…pache#10859) (cherry picked from commit ed07fd1)
…pache#10859) (cherry picked from commit ed07fd1)
(cherry picked from commit ce772a6)
last PR for put everything together from the project: [Priority 2] Flink: support range distribution (view)