-
Notifications
You must be signed in to change notification settings - Fork 3.5k
Description
After updating from Trino v444 to v479 we found that MERGE INTO queries that do a lot of updates in the target table create thousands of position delete files.
To reproduce,
- Create an Iceberg table with more than one data file:
create table faker.default.source
(
uid uuid not null,
age integer not null,
name VARCHAR NOT NULL WITH (generator = '#{Name.first_name} #{Name.last_name}'),
address1 VARCHAR NOT NULL WITH (generator = '#{Address.fullAddress}'),
address2 VARCHAR NOT NULL WITH (generator = '#{Address.fullAddress}'),
address3 VARCHAR NOT NULL WITH (generator = '#{Address.fullAddress}'),
enrolled BOOLEAN not null
)
with (default_limit = 10000000)
;
create table iceberg.default.merge_target
as
select *
from faker.default.source
;I was testing on a local DevServer with just two nodes (one coordinator with node-scheduler.include-coordinator=true and one worker) and in my case I got 14 data files:
select content, count() as count
from iceberg.default."merge_target$files"
group by 1| content | count |
|---|---|
| 0 | 14 |
- Create a delta table with ~1/10-th of the original UUIDs + a few new ones:
create table iceberg.default.merge_delta
as
select *
from iceberg.default.merge_target
order by rand()
limit 1000000
;
insert into iceberg.default.merge_delta
select *
from faker.default.source
limit 100000
;- Now, executing the following
MERGE INTO:
merge into iceberg.default.merge_target target
using iceberg.default.merge_delta delta
on target.uid = delta.uid
when matched then update
set uid = delta.uid,
age = delta.age,
name = delta.name,
address1 = delta.address1,
address2 = delta.address2,
address3 = delta.address3,
enrolled = delta.enrolled
when not matched then insert
(uid,
age,
name,
address1,
address2,
address3,
enrolled)
values (delta.uid,
delta.age,
delta.name,
delta.address1,
delta.address2,
delta.address3,
delta.enrolled)
;ends up writing 2 new data files and 28 position delete files:
select content, count() as count
from iceberg.default."merge_target$files"
group by 1| content | count |
|---|---|
| 0 | 16 |
| 1 | 28 |
The min(number of workers, 100) × number of data files upper limit seems to apply to our production cluster as well — for example, today a MERGE INTO query updating a table with ~540M rows across 1367 data files, running on 160 workers, added
- 3.39M rows in 100 data files, and
- 3.36M position delete rows in 115923 delete files — fairly close to 100×1367
I was able to bisect the git history and find the culprit commit: 3359c28 from #23432, released with Trino v468. The same test query running on its parent creates 4 data and, as expected, 14 delete files.
The difference, as far as I can tell, is that the output from MergeProcessor in stage 2 was being partitioned by the operation code and location of the row in the original data files (field) before:
...
Fragment 2 [HASH]
Output layout: [..., operation, case_number_26, field, insert_from_update]
Output partitioning: MERGE [update = iceberg:INSTANCE] [operation, field]
MergeProcessor[]
...
and after it became round robin with 100 partitions:
...
Fragment 2 [HASH]
Output layout: [..., operation, case_number_26, field, insert_from_update]
Output partitioning: ROUND_ROBIN []
Output partition count: 100
MergeProcessor[]
...
After Trino v468, at some point the MergeProcessor started being merged with MergeWriter via LocalExchange in the same stage, so the relevant part of the plan looks like this on v479:
...
Fragment 1 [HASH]
Output layout: [partialrows, fragment]
Output partitioning: SINGLE []
MergeWriter[table = iceberg:default.merge_target$data@2949827968648840322]
│ Layout: [partialrows:bigint, fragment:varbinary]
└─ LocalExchange[partitioning = SINGLE]
│ Layout: [..., operation:tinyint, case_number_27:integer, field:row(_file varchar, _pos bigint, partition_spec_id integer, partition_data varchar), insert_from_update:tinyint]
│ Estimates: {rows: ? (?), cpu: 0, memory: 0B, network: 0B}
└─ MergeProcessor[]
│ Layout: [..., operation:tinyint, case_number_27:integer, field:row(_file varchar, _pos bigint, partition_spec_id integer, partition_data varchar), insert_from_update:tinyint]
│ target: iceberg:default.merge_target$data@2949827968648840322
│ merge row column: merge_row
│ row id column: field
│ redistribution columns: []
...
Disabling the feature introduced in #23432 by setting iceberg.bucket_execution_enabled to false doesn’t help, which, I think, is expected.
Of course, delete vectors cannot come soon enough, but is there anything that can be done to help in this case with v2?