[WIP] Track column statistics in cuDF-Polars#19130
[WIP] Track column statistics in cuDF-Polars#19130rjzamora wants to merge 44 commits intorapidsai:branch-25.08from
Conversation
|
Auto-sync is disabled for draft pull requests in this repository. Workflows must be run manually. Contributors can view more details about this message here. |
| lifo.append(child) | ||
|
|
||
|
|
||
| def post_traversal(nodes: Sequence[NodeT]) -> Generator[NodeT, None, None]: |
There was a problem hiding this comment.
NOTE: This was copied from wence-:more-tablestat-doodles.
|
/ok to test |
TomAugspurger
left a comment
There was a problem hiding this comment.
Happy to see this coming together. Gave a quick pass and will look more closely later.
| _SOURCE_STATS_CACHE_MAX_ITEMS: int = 10 | ||
|
|
||
|
|
||
| def _update_source_stats_cache( |
There was a problem hiding this comment.
I haven't looked closely, but would functools.lru_cache(maxsize=10) work? https://docs.python.org/3/library/functools.html#functools.lru_cache
There was a problem hiding this comment.
Looked through this some more. I think the tl/dr is that we could maybe
force this into using lru_cache. It's unclear to me whether we should.
The idea here is to cache tuple[path, ...] -> dict[str, ColumnSourceStats], i.e.
the column statistics for a given tuple of paths. Note that the paths
in the key are for everything in the IR, not just the ones sampled.
Some of the complexity seems to come potentially hitting this cache with
the same paths, but a different subset of columns of interest: We might
have something like
a = pl.scan_parquet("data.parquet", columns=["a", "b"])
b = pl.scan_parquet("data.parquet", columns=["b", "c"])
Note that b is in both, and the full table might have many columns. We want
to avoid computing the stats for b twice, and we want to avoid computing the
stats for columns that we'll never use. This means a simple lru_cache with
an entry per fileir.paths isn't going to give us what we want.
This seems doable with a functools.lru_cache on the tuple[path, ...] key by
having it return a (mutable) dict[str, ColumnSourceStats]. Then any callers
asking for the stats of a given set of paths will get a view on the same dict,
which they can mutate inplace. Something like
@functools.lru_cache
def get_source_stats(paths: tuple[str, ...]) -> dict[str, ColumnSourceStats]:
return {}
source_stats_cached = get_source_stats(paths)
for column in need_columns - set(source_stats_cached):
# compute for that column
source_stats_cached[column] = ColumnSourceStats(...)Maybe this isn't much better. Mutating cached values is dangerous.
But it does give us an LRU cache rather than a FIFO cache, along with all
the nice things from lru_cache like cache_info().
There was a problem hiding this comment.
You could indirect things by having a two-level scheme:
@functools.lru_cache
def stats_getter(paths: tuple[str, ...]) -> Callable[[str], ColumnSourceStats]:
@functools.lru_cache
def colstats(column: str) -> ColumnSourceStats:
return ...
...
colstats = stats_getter(paths)("column")
?
There was a problem hiding this comment.
Ah I was trying to figure out how to get the two layer thing to work, but couldn't do it in a way that didn't break the maxsize of the outer cache. But I see now that that should be equivalent: set a maxsize of 10 or whatever on the stats_getter and an unlimited size on the colstats. I think this should work out nicely.
There was a problem hiding this comment.
I think this is the only review item I haven't tackled yet. I'm trying to work out a way to use the two-level scheme, but I'm struggling to find a way to make it "efficient".
If we cache the statistics for each column individually, then we presumably need to define the logic to "get" the statistics for an individual column within colstats. However, it would be very inefficient to read a row-group for a single column in that function if we are also interested in the statistics for 10+ other columns in the same file (we definitely want to read all 10+ column-chunks at once).
I suppose one possibility is to just collect statistics for all columns the first time, and avoid the need for any further collection when the same dataset is read in again. This would be fine for PDS, but could be an issue for "wide" datasets in the wild.
There was a problem hiding this comment.
However, it would be very inefficient to read a row-group for a single column in that function if we are also interested in the statistics for 10+ other columns in the same file (we definitely want to read all 10+ column-chunks at once).
IIUC, the problem is that we don't want to do something like this?
stats = [stats_getter(path)(col) for col in columns]Since each of those would trigger a separate parquet metadata read. There are probably ways to work around that, but I'm not coming up with a way that retains the convenience provided by lru_cache (aside from it returning a dict that everyone is expected to mutate).
Maybe it's worth stepping back to remember that we have the full query plan, and so we should be able to make the optimal number of read_parquet necessary (one per parquet source across the entire plan, and only the columns used, I think). This would span multiple IR nodes, though, so I don't know how easy it'd be to integrate into our current translation process.
There was a problem hiding this comment.
Maybe it's worth stepping back to remember that we have the full query plan
I think Polars should do subquery optimization so that we only read the data once (with all necessary columns). The reason we cache is to improve performance between distinct calls to collect.
There was a problem hiding this comment.
Lazy question: when do this code run? Via _sample_pq_stats, which runs during add_source_stats, which runs during collect_source_stats, which runs during lower_ir_graph, on the lowered IR nodes?
import polars as pl
import rich.pretty
from cudf_polars.utils.config import ConfigOptions
from cudf_polars.dsl.translate import Translator
from cudf_polars.experimental.parallel import lower_ir_graph, task_graph
a = pl.scan_parquet("/datasets/rzamora/data/tpch-data/scale-10.0/lineitem.parquet").select(["l_orderkey", "l_linenumber"])
b = pl.scan_parquet("/datasets/rzamora/data/tpch-data/scale-10.0/lineitem.parquet").select(["l_orderkey", "l_quantity"])
c = a.join(b, on="l_orderkey", how="left")
q = c.group_by("l_orderkey").agg(pl.col("l_quantity").sum())
engine = pl.GPUEngine(executor="streaming")
config_options = ConfigOptions.from_polars_engine(engine)
ir = Translator(q._ldf.visit(), engine=engine).translate_ir()
ir, partition_info = lower_ir_graph(ir, config_options)
# graph, key = task_graph(ir, partition_info, config_options)
rich.pretty.pprint(ir)shows
Select(
│ schema={'l_orderkey': <DataType(polars=Int64, plc=<type_id.INT64: 4>)>, 'l_quantity': <DataType(polars=Int64, plc=<type_id.INT64: 4>)>},
│ exprs=(NamedExpr(l_orderkey, Col(<DataType(polars=Int64, plc=<type_id.INT64: 4>)>, 'l_orderkey')), NamedExpr(l_quantity, UnaryFunction(<DataType(polars=Int64, plc=<type_id.INT64: 4>)>, 'fill_null', (), Col(<DataType(polars=Int64, plc=<type_id.INT64: 4>)>, 'l_quantity'), Literal(<DataType(polars=Int64, plc=<type_id.INT64: 4>)>, 0)))),
│ should_broadcast=True,
│ GroupBy(
│ │ schema={'l_orderkey': <DataType(polars=Int64, plc=<type_id.INT64: 4>)>, 'l_quantity': <DataType(polars=Int64, plc=<type_id.INT64: 4>)>},
│ │ keys=(NamedExpr(l_orderkey, Col(<DataType(polars=Int64, plc=<type_id.INT64: 4>)>, 'l_orderkey')),),
│ │ agg_requests=(NamedExpr(l_quantity, Agg(<DataType(polars=Int64, plc=<type_id.INT64: 4>)>, 'sum', None, Col(<DataType(polars=Int64, plc=<type_id.INT64: 4>)>, 'l_quantity'))),),
│ │ maintain_order=False,
│ │ zlice=None,
│ │ Join(
│ │ │ schema={'l_orderkey': <DataType(polars=Int64, plc=<type_id.INT64: 4>)>, 'l_quantity': <DataType(polars=Int64, plc=<type_id.INT64: 4>)>},
│ │ │ left_on=(NamedExpr(l_orderkey, Col(<DataType(polars=Int64, plc=<type_id.INT64: 4>)>, 'l_orderkey')),),
│ │ │ right_on=(NamedExpr(l_orderkey, Col(<DataType(polars=Int64, plc=<type_id.INT64: 4>)>, 'l_orderkey')),),
│ │ │ options=('Left', False, None, '_right', True, 'none'),
│ │ │ Union(
│ │ │ │ schema={'l_orderkey': <DataType(polars=Int64, plc=<type_id.INT64: 4>)>},
│ │ │ │ zlice=None,
│ │ │ │ Scan(
│ │ │ │ │ schema={'l_orderkey': <DataType(polars=Int64, plc=<type_id.INT64: 4>)>},
│ │ │ │ │ typ='parquet',
│ │ │ │ │ reader_options={'schema': None, 'parallel': 'Auto', 'low_memory': False, 'use_statistics': True},
│ │ │ │ │ cloud_options={'max_retries': 2, 'file_cache_ttl': 3600, 'config': None, 'credential_provider': None},
│ │ │ │ │ paths=[PosixPath('/datasets/rzamora/data/tpch-data/scale-10.0/lineitem.parquet')],
│ │ │ │ │ with_columns=['l_orderkey'],
│ │ │ │ │ skip_rows=0,
│ │ │ │ │ n_rows=-1,
│ │ │ │ │ row_index=None,
│ │ │ │ │ include_file_paths=None,
│ │ │ │ │ predicate=None,
│ │ │ │ │ parquet_options=ParquetOptions(chunked=True, chunk_read_limit=0, pass_read_limit=0)
│ │ │ │ )
│ │ │ ),
│ │ │ Union(
│ │ │ │ schema={'l_orderkey': <DataType(polars=Int64, plc=<type_id.INT64: 4>)>, 'l_quantity': <DataType(polars=Int64, plc=<type_id.INT64: 4>)>},
│ │ │ │ zlice=None,
│ │ │ │ Scan(
│ │ │ │ │ schema={'l_orderkey': <DataType(polars=Int64, plc=<type_id.INT64: 4>)>, 'l_quantity': <DataType(polars=Int64, plc=<type_id.INT64: 4>)>},
│ │ │ │ │ typ='parquet',
│ │ │ │ │ reader_options={'schema': None, 'parallel': 'Auto', 'low_memory': False, 'use_statistics': True},
│ │ │ │ │ cloud_options={'max_retries': 2, 'file_cache_ttl': 3600, 'config': None, 'credential_provider': None},
│ │ │ │ │ paths=[PosixPath('/datasets/rzamora/data/tpch-data/scale-10.0/lineitem.parquet')],
│ │ │ │ │ with_columns=['l_orderkey', 'l_quantity'],
│ │ │ │ │ skip_rows=0,
│ │ │ │ │ n_rows=-1,
│ │ │ │ │ row_index=None,
│ │ │ │ │ include_file_paths=None,
│ │ │ │ │ predicate=None,
│ │ │ │ │ parquet_options=ParquetOptions(chunked=True, chunk_read_limit=0, pass_read_limit=0)
│ │ │ │ )
│ │ │ )
│ │ )
│ )
Do the two scans going into the Join look like we'll do two read_parquets from the same file to you? (I can pull this branch down and add some print statements to check).
There was a problem hiding this comment.
Oh my earlier example was faulty since I didn't end up using l_linenumber from a anywhere and polars eliminated it. With this example
import polars as pl
import rich.pretty
from cudf_polars.utils.config import ConfigOptions
from cudf_polars.dsl.translate import Translator
from cudf_polars.experimental.parallel import lower_ir_graph, task_graph
a = pl.scan_parquet("/datasets/rzamora/data/tpch-data/scale-10.0/lineitem.parquet").select(["l_orderkey", "l_partkey"])
b = pl.scan_parquet("/datasets/rzamora/data/tpch-data/scale-10.0/lineitem.parquet").select(["l_orderkey", "l_quantity"])
c = a.join(b, on="l_orderkey", how="left")
q = c.group_by("l_orderkey", "l_partkey").agg(pl.col("l_quantity").sum())
engine = pl.GPUEngine(executor="streaming")
config_options = ConfigOptions.from_polars_engine(engine)
ir = Translator(q._ldf.visit(), engine=engine).translate_ir()
ir, partition_info = lower_ir_graph(ir, config_options)
# graph, key = task_graph(ir, partition_info, config_options)
rich.pretty.pprint(ir)And this diff for debugging
Looks like for this example at least we do read all the stats we need the first time through:
diff --git a/python/cudf_polars/cudf_polars/experimental/io.py b/python/cudf_polars/cudf_polars/experimental/io.py
index 5ad9cde9d5..9ca7fe1b58 100644
--- a/python/cudf_polars/cudf_polars/experimental/io.py
+++ b/python/cudf_polars/cudf_polars/experimental/io.py
@@ -325,8 +325,10 @@ def _sample_pq_stats(
finally:
source_stats = source_stats_cached
+ print("sample pq stats", ir.paths, ir.with_columns)
if need_columns := (set(ir.schema) - source_stats_cached.keys()):
# Still need columns missing from the cache
+ print("New read", ir.paths, ir.with_columns, need_columns)
sample_metadata = plc.io.parquet_metadata.read_parquet_metadata(
plc.io.SourceInfo(sample_paths)
)We do seem to make two stats reads from the source, one to get {'l_orderkey', 'l_quantity'} and a second to get the new key {'l_partkey'}:
sample pq stats [PosixPath('/datasets/rzamora/data/tpch-data/scale-10.0/lineitem.parquet')] ['l_orderkey', 'l_quantity']
New read [PosixPath('/datasets/rzamora/data/tpch-data/scale-10.0/lineitem.parquet')] ['l_orderkey', 'l_quantity'] {'l_orderkey', 'l_quantity'}
sample pq stats [PosixPath('/datasets/rzamora/data/tpch-data/scale-10.0/lineitem.parquet')] ['l_orderkey', 'l_partkey']
New read [PosixPath('/datasets/rzamora/data/tpch-data/scale-10.0/lineitem.parquet')] ['l_orderkey', 'l_partkey'] {'l_partkey'}
I also worry a bit about caching these across calls to .collect(), at least without any kind of cache invalidation if the source changes... Though this should only affect optimizations and not correctness so maybe that's not a big deal.
There was a problem hiding this comment.
Aha - Looks like your example shows that we do indeed read from the same files more than once. Interesting.
when do this code run? Via _sample_pq_stats, which runs during add_source_stats, which runs during collect_source_stats, which runs during lower_ir_graph, on the lowered IR nodes?
_sample_pq_stats is called in add_source_stats(Scan), which is called in lower_ir_graph. However, we are collecting statistics for the UNlowered IR nodes. I was trying to avoid changes to the lower_ir_graph API in the PR, but when we add "full" statistics collection, we will probably want evaluate_streaming to looks something like:
def evaluate_streaming(ir: IR, config_options: ConfigOptions) -> DataFrame:
# `collect_source_stats` would be called within `collect_column_stats`
stats = collect_column_stats(ir, config_options)
ir, partition_info = lower_ir_graph(ir, config_options, stats)
graph, key = task_graph(ir, partition_info, config_options)
return get_scheduler(config_options)(graph, key)
TomAugspurger
left a comment
There was a problem hiding this comment.
Went through building the ColumnSourceStats in parquet. I haven't gone through the caching yet.
| rowgroup_offsets_per_file = np.insert( | ||
| np.cumsum(num_row_groups_per_file_samples), 0, 0 |
There was a problem hiding this comment.
Should be the same & probably faster
| rowgroup_offsets_per_file = np.insert( | |
| np.cumsum(num_row_groups_per_file_samples), 0, 0 | |
| rowgroup_offsets_per_file = np.cumsum([0] + num_row_groups_per_file_samples) |
| column_sizes = {} | ||
| for name, uncompressed_sizes in metadata.columnchunk_metadata().items(): | ||
| if name in need_columns: | ||
| column_sizes[name] = np.array( |
There was a problem hiding this comment.
Why do we need an array here, rather than an aggregated value? I see we derive total_uncompressed_size by taking a mean later. Does something else use the unaggregated values?
Note to self: the length of this array should exactly match the number of files sampled.
| # We have un-cached column metadata to process | ||
|
|
||
| # Calculate the mean per-file `total_uncompressed_size` for each column | ||
| total_uncompressed_size = { |
There was a problem hiding this comment.
Is total correct here, or should this be avg? IIUC this represents our estimate of the in-memory size of a given column for any given source file.
| for path, num_rgs in zip( | ||
| paths, num_row_groups_per_file_samples, strict=True | ||
| ): | ||
| for rg_id in range(num_rgs): | ||
| n += 1 | ||
| samples[path].append(rg_id) | ||
| if n == num_rg_samples: | ||
| break | ||
| if n == num_rg_samples: | ||
| break |
There was a problem hiding this comment.
IIUC, this is building up a list of specific row groups to sample. It will be biased to sample row groups from early files.
Maybe it'd be better to build up the full list of (file, rowgroup_id) and then slice through that, like we do with the files above? Roughly:
samples = [
(file, i)
for file, num_rgs in zip(paths, num_row_groups_per_file_samples, strict=True)
for i in range(num_rgs)
]
stride = max(1, int(len(samples) / num_rg_samples))
samples = samples[::stride]Maybe not though. The number of files will be limited by the config option, but the number of for groups per file could be very large (depends on the parquet file) so that list could be pretty large. There's probably a smart way to do this with some math.
| unique_fraction_estimates[name] = max( | ||
| min(1.0, row_group_unique_count / row_group_num_rows), | ||
| 0.00001, |
There was a problem hiding this comment.
What's the motivation behind this max? I suppose that with very large row groups, this fraction can get arbitrarily close to zero. And that's something we don't want callers having to think about?
Do we lose anything by truncating to 0.00001?
| # Leave out unique stats if they were defined by the | ||
| # user. This allows us to avoid collecting stats for | ||
| # columns that are know to be problematic. | ||
| user_fractions = ir.config_options.executor.unique_fraction |
There was a problem hiding this comment.
Thinking through how I feel about this design. IIUC, the intent is to always give preference to user-provided statistics over stats from the source, which makes sense. _get_unique_fractions merges these two here.
Initially I wondered why we had to worry about this in two places: here and _get_unique_fractions. I think the answers are
- We worry about them here as an optimization: we avoid computing stats for things that'll just be overridden later
- We worry about them in
_get_unique_fractionssince we can (in principal) have other places generating these statistics (like aDataFrameScan).
So we could maybe cut out _get_unique_fractions having to merge these by requiring whoever produces these ColumnSourceStats to do the merging, with preference for user-provided stats. Dunno if that's worth it, but I wrote this up to understand things so I'll submit it for discussion :)
|
/ok to test |
|
Can I suggest we split this into (at least) two further pieces.
|
Yeah, part of the reason this is sill marked "WIP" is that I'm trying to (roughly) nail down how (1) and (2) fit together before I break pieces off. |
| self.cardinality = cardinality | ||
| self.storage_size_per_file = storage_size_per_file | ||
| self.exact = exact | ||
| self._unique_stats: Callable[..., UniqueSourceStats] | UniqueSourceStats |
There was a problem hiding this comment.
What's the motivation for making unique_stats a callable? Delayed evaluation of something?
If possible, I'd prefer the simplicity of just having a plain value here.
There was a problem hiding this comment.
So this seems to be from functools.partial(UniquePqSampler.__call__, name). If I'm reading this correctly, we build one of these per column in the file. And perhaps we don't want to collect these types of statistics, which seems to involve a reading some row groups and doing a distinct_count, if we don't actually use it. Hence the delayed collection. Makes sense.
I wonder if we have enough information here to know what columns we actually want statistics for (based on which columns we do operations on that would make use of those statistics). I see _get_unique_fractions uses unique_fraction.
There was a problem hiding this comment.
Yeah, I think you're understanding is correct. To be completely honest, I've wasted quite a bit of time struggling to find an "elegant" way to do everything I want:
- Whenever we need to estimate the number/fraction of unique values in a column, we should sample the metadata/data to make an estimate.
- Right now we rely on the user-provided "cardinality_factor" config value (renamed to "unique_fraction" here). In my opinion, this config value is a "hack".
- It is rare for unique-value statistics to be in the parquet metadata, so this PR focuses on reading a small number of real row-groups
- This PR does centralize unique-value queries in one location:
_get_unique_fractions - It is relatively common to query unique-value statistics for multiple columns in the same dataset. Therefore, we definitely want to collect these statistics lazily (especially important for remote data).
- It would be nice to avoid traversing the IR graph multiple times to collect source data, because we will ultimately need to travers the graph again multiple times (to apply join heuristics and "lower" the graph).
The current implementation "works" and largely handles caching pretty well. However, I'm still pretty unhappy with the design.
I wonder if the "key" to cleaning this up is to roll back the decision to store the column-specific source statistics under ColumnStats.source_stats instead of storing a reference to the general table-source statistics. For example, we probably want something like:
class UniqueStats:
# Simple class to hold materialized unique-value stats
count: int | None
fraction: float | None
class SourceSampler:
# Lazy source sampler.
# - We would use a `PqSourceSampler` for parquet.
# - These samplers can be generated by a helper function with lru caching.
def file_storage_size(self, column: str) -> int
def cardinality(self) -> int
def add_shuffle_keys(self, columns: Sequence[str]) -> None
def unique_stats(self, column: str) -> UniqueStats
class ColumnStats:
name: str
unique_count: int | None
source_name: str | None
source_sampler: SourceSampler | NoneWith this (or something like it), we can completely avoid sampling "real" source data until we are actually "using". As the source-stats are propagated through the graph, we could use SourceSampler.add_shuffle_keys to populate the set of source statistics that may require unique-value sampling.
There was a problem hiding this comment.
I'll just note that I too don't have an elegant design for this in mind :/ It's a big feature and the requirements we need out of it are still a bit fuzzy (to me).
I think I mostly agree with your list of desires, maybe other than the `"avoid traversing the IR graph multiple times" just since the information we gather will affect the IR graph we produce. Maybe that can all be done in one shot, since the source stats will be gathered early on.
I do think it's worth spending the time to get the design right (easy for me to say, I know). But I'm also fine with getting something in sooner rather than later and iterating on it, especially if the "right" design isn't clear.
There was a problem hiding this comment.
I have a working design in another branch that is closer to making me happy. I'm going to start breaking off pieces and opening separate PRs now.
| cardinality: int | None = None, | ||
| storage_size_per_file: int | None = None, | ||
| exact: tuple[str, ...] = (), | ||
| unique_stats: Any = None, |
There was a problem hiding this comment.
| unique_stats: Any = None, | |
| unique_stats: Callable[[], UniqueSourceStats] | UniqueSourceStates | None = None, |
| column_statistics, | ||
| ) | ||
|
|
||
| unique_fraction = ( |
There was a problem hiding this comment.
Similar question at https://github.com/rapidsai/cudf/pull/19130/files#diff-c93704a29e1c3263ea7267bf2028f187f43d40727bda1b8ffbcdd87076bac77bR187 in expressions.py.
The output here is float | None, which does have a very different meaning in lower_distinct. But the input max_fraction_dict seems to always be a dict (possibly empty).
So how about unique_fraction = max(unique_fraction_dict.values(), default=None)? Not a big deal either way.
| When the number of rows exceeds this value, the query will be split into | ||
| multiple partitions and executed in parallel. | ||
| cardinality_factor | ||
| unique_fraction |
There was a problem hiding this comment.
Do we care about backwards compatibility here? Well, not here specifically since this isn't part of the public API, but in ConfigOptions.from_polars_engine should we translate from the old names to the new ones?
(My vote: this is currently experimental so fine to break. But after the next release when streaming becomes the default we should be careful not to change these without a deprecation).
|
|
||
| # Calculate the `mean_uncompressed_size_per_file` for each column | ||
| mean_uncompressed_size_per_file = { | ||
| name: statistics.mean(sizes) for name, sizes in column_sizes_per_file.items() |
There was a problem hiding this comment.
Does the mean need to be rounded to an integer? ColumnSourceStats expects an int there I think.
|
I've started breaking this feature into smaller pieces. Already open PRs:
Once those are in, I will open a third PR with the basic classes I need to extract/store statistics. This PR will not include any dispatching/traversal yet. It will be focused on lazy collection of statistics from Scan/DataFrameScan nodes. Once we have those three pieces in place, we can add the dispatching/traversal bits. |
- Adds `post_traversal` API. - Copied from @wence-'s [wence-:more-tablestat-doodles](https://github.com/wence-/cudf/tree/wence/fea/more-tablestat-doodles) branch. - Used in #19130 (creating this separate API to simplify code review) Authors: - Richard (Rick) Zamora (https://github.com/rjzamora) - Tom Augspurger (https://github.com/TomAugspurger) Approvers: - Tom Augspurger (https://github.com/TomAugspurger) - Matthew Murray (https://github.com/Matt711) - Lawrence Mitchell (https://github.com/wence-) - Matthew Roeschke (https://github.com/mroeschke) URL: #19258
This PR splits off some of the changes used by the ongoing column-statistics work (e.g. #19130). - Renames `"cardinality_factor"` to `"unique_fraction"`, because the original name doesn't really make any sense. - I've been trying to rename this config for a while, and would really like to get it done in 25.08. - I don't think we *need* backwards compatibility for `"cardinality_factor"`, but this PR adds it (just to be safe). - Adds central `_get_unique_fractions` utility to extract the unique-value statistics for a specific subset of columns. This logic is currently repeated several times, and it will be much easier to incorporate sampled statistics (in a follow-up) if the logic is all in one place. Authors: - Richard (Rick) Zamora (https://github.com/rjzamora) Approvers: - Tom Augspurger (https://github.com/TomAugspurger) URL: #19273
Probably supersedes #19130 The goal of this PR is to define the classes needed to store column statistics for an `IR` node. Some cirteria: - We need the statistics for a column to contain a reference to the underlying datasource information (e.g. unique-value statistics, row-count, and average storage/file size). - We want caching for each datasource and column. - We want the option to perform metadata/data sampling lazily on the datasource. - We want our Parquet partitioning logic to use the same infrastructure (to avoid redundant sampling). - We want to record when a specific statistic is "exact" (rather than estimated). Also related: - #19258 - #19273 Authors: - Richard (Rick) Zamora (https://github.com/rjzamora) Approvers: - Tom Augspurger (https://github.com/TomAugspurger) - Matthew Murray (https://github.com/Matt711) URL: #19276
Description
Supersedes #18865
An important goal of this PR is to lay the foundation for the kind of Join-based statistics gathering prototyped by @wence- in wence-:more-tablestat-doodles. This PR does NOT implement most of logic in that branch. However, it does implement a compatible foundation for that work.
In wence-:more-tablestat-doodles, statistics are collected in two passes over the original logical plan. The first pass is essentially collecting statistics originating only from
Scan/DataFrameScanIR nodes. The second pass updates these "base" statistics to account for Join/Filters/GroupBy/etc. This PR only implements the first pass. However, we now collect the "base" statistics (now referred to as "source" statistics) in a format that can also be used for partitioning, and to choose between shuffle- and reduction-based aggregations.Checklist