Add dynamic_distinct streaming node#21382
Conversation
| return TableChunk.from_pylibcudf_table(df.table, chunk.stream, exclusive_view=True) | ||
|
|
||
|
|
||
| async def chunkwise_evaluate( |
There was a problem hiding this comment.
This utility can be used by default_node_single, distinct_node (this PR), and groupby_node (next PR).
|
Auto-sync is disabled for draft pull requests in this repository. Workflows must be run manually. Contributors can view more details about this message here. |
|
/ok to test |
python/cudf_polars/cudf_polars/experimental/rapidsmpf/distinct.py
Outdated
Show resolved
Hide resolved
| initial_chunks | ||
| Optional chunks already received (e.g., during sampling) that should | ||
| be forwarded without re-evaluation. |
There was a problem hiding this comment.
"initial" feels like a bad name here. I was confused below seeing that these must have already been evaluated the same as the processing of remaining chunks.
But thinking about it, this imposes the restriction that these chunks must be processed by the caller identically to the "bulk".
I do not think this is necessarily always going to be the case. For example, even for this PR, we're just trying to determine if we should shuffle-distinct or reduce-distinct. For that, we need to have an estimate of the cardinality of the result. We could do that with the new cudf "approx_distinct_count" API and merge across ranks. In that case, the sampled chunks will not have been processed with actual distinct.
There was a problem hiding this comment.
Yeah, I suppose we could have both processed_chunks (already evaluated) and sampled_chunks (already pulled out of the channel, but not yet evaluated). We probably want to keep these buffered chunk in a SpillableMessages container as well.
| if input_schema is None: | ||
| input_schema = ir.children[0].schema |
There was a problem hiding this comment.
In what scenarios must we override the schema of the input?
There was a problem hiding this comment.
I was doing this for groupby at one point, but it shouldn't be necessary anymore. I can remove it.
| if initial_chunks: | ||
| for chunk in initial_chunks: | ||
| if tracer is not None: | ||
| tracer.add_chunk(table=chunk.table_view()) |
There was a problem hiding this comment.
This implicitly requires that the initial chunks are in device memory...
There was a problem hiding this comment.
That's true. Right now it will be in memory, but if we stage the data in a SpillableMessages container then we would need to log these kinds of chunks earlier/differently.
| net_memory_delta=0, | ||
| ) | ||
| with opaque_memory_usage(extra): | ||
| result = await asyncio.to_thread(evaluate_chunk, chunk, ir, ir_context) |
There was a problem hiding this comment.
It seems like the input_schema argument to evaluate_chunk is never used. Do we need it?
| net_memory_delta=0, | ||
| ) | ||
| with opaque_memory_usage(extra): | ||
| result = await asyncio.to_thread(evaluate_chunk, chunk, ir, ir_context) |
There was a problem hiding this comment.
Aside (we should fix this separately), asyncio.to_thread creates and uses an internal ThreadPoolExecutor. But when we run with a rapidsmpf context, we have used a ThreadPoolExecutor with some max number of threads. I suspect we should be using that executor to do the threaded offload.
That would be replacing all the asyncio.to_thread calls with:
loop = asyncio.get_running_loop()
await loop.run_in_executor(ctx.py_executor, partial(func, *args, **kwargs))
There was a problem hiding this comment.
We might be relying on asyncio.to_thread copying over contextvars too.
IIRC, we (or rapidsmpf?) create a new event loop, maybe implicitly via asycnio.run here. We could ensure the desired executor is used there: https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.set_default_executor. I'll make an issue in rapidsmpf.
Edit: rapidsai/rapidsmpf#855
|
|
||
| # Get collective IDs for this Distinct (may be empty if not reserved) | ||
| collective_ids = list(rec.state["collective_id_map"].get(ir, [])) | ||
|
|
There was a problem hiding this comment.
If there are no collective_ids, we're hosed right?
| # Get collective IDs for this Distinct (may be empty if not reserved) | ||
| collective_ids = list(rec.state["collective_id_map"].get(ir, [])) | ||
|
|
||
| # Create the dynamic unique node |
There was a problem hiding this comment.
| # Create the dynamic unique node |
| channels[ir.children[0]].reserve_output_slot(), | ||
| dynamic_planning.sample_chunk_count, | ||
| executor.target_partition_size, | ||
| executor.groupby_n_ary, # Reuse groupby n_ary for now |
There was a problem hiding this comment.
I still don't understand why we care to specify the width of the fan-in for the tree-reduce.
|
|
||
| if can_skip_global_comm: | ||
| # No global communication needed | ||
| if local_estimate < target_partition_size or require_tree: |
There was a problem hiding this comment.
Yeah, I think this local_estimate value is not the right one.
Imagine that you have 1000 chunks and after chunkwise distinct each chunk is 10MiB, then local_estimate will be 10GiB. But what if there are only 1000 distinct values across across all the chunks (i.e. the keys of all chunks turn out to overlap), so the actual final output is only 10MiB in size. But we'll have decided to do a shuffle (if target_partition_size is smaller than 10GiB) for no good reason?
| key_indices, | ||
| shuffle_context=context, | ||
| tracer=tracer, | ||
| ) |
There was a problem hiding this comment.
Since we want to minimise communication (whichever way we do things). I wonder if the strategy should be something like:
- always first do
chunkwise_distinct - Dispatch to appropriate case if we're already partitioned
- Otherwise: start by doing the first level of the "tree-reduce" case, keeping track of the growth of the output chunks.
- After we've seen some number of chunks, do an allreduce of some kind to decide if we're going to continue with the tree-reduce strategy or flip to a shuffle strategy
This is close to what you're doing here, but you've made an assumption of independence of values between chunks, which feels like (especially for groupby which will presumably follow a similar pattern) a very pessimistic assumption.
There was a problem hiding this comment.
My latest changes moved in this direction. Thanks for the suggestion.
|
Thank you for the great review @wence- - Just a note that I'm making heavy revisions, and will likely push the changes today. Note that I'm experimenting with the actual changes in a branch that also has dynamic join and groupby to make sure I still get reasonable perf. At this point, I do suspect that distinct and groupby can use most of the same code. However, I might leave that question for a follow-up. For now, I'm just trying to make them look as similar as possible. |
| Use when data is already partitioned on the relevant keys and each | ||
| chunk can be processed independently. |
There was a problem hiding this comment.
I wonder: does the requirement that the "data is already partitioned on the relevant keys" matter? I guess there's two kinds of operaitons:
- Operations where it's always safe to process the data chunk-by-chunk (like
+ 1) which I think we call piecewise here - Operations where it's safe to process the data chunk-by-chunk if the data has already been partitioned on the relevant keys (like a
distincton the partitioned keys)
Is that right?
There was a problem hiding this comment.
Yes, exactly right. For distinct and groupby, we are safe to process chunks independently if we know that chunk has all unique values for the "key" columns. Otherwise, we must reduce or shuffle to get the correct result.
| async def evaluate_chunk( | ||
| context: Context, | ||
| chunk: TableChunk, | ||
| ir: IR | list[IR], |
There was a problem hiding this comment.
I think it'd be simpler to just list[IR] or *IR, and then you can remove the isinstance below (and update the callers).
There was a problem hiding this comment.
Similar pattern in evaluate_batch.
|
|
||
| def test_dynamic_distinct_tree_strategy(df): | ||
| """Test that small output uses tree reduction (high target_partition_size).""" | ||
| # With a very high target_partition_size, tree reduction should be chosen |
There was a problem hiding this comment.
Is there anything in this test that actually asserts we used a tree reduction? I suppose that's unobservable from the outside.
| if shuffle_context is None: | ||
| # Create a temporary local context | ||
| options = Options(get_environment_variables()) | ||
| local_comm = single_comm(options) | ||
| shuffle_context = Context(local_comm, context.br(), options) |
There was a problem hiding this comment.
Similar to https://github.com/rapidsai/cudf/pull/21126/changes, I think this will fail to propagate the (stateful) statistics object.
I'll write up an issue on the rapidsmpf side proposing something like this.
|
Closing this in favor of #21433 |
- Closes #21246 - Closes #21248 - **ALTERNATIVE** to #21382 - Part of #20482 This PR implements the same changes as #21382, but it handle both `GroupBy` and `Distinct` within the same code path. Authors: - Richard (Rick) Zamora (https://github.com/rjzamora) - Matthew Murray (https://github.com/Matt711) Approvers: - Lawrence Mitchell (https://github.com/wence-) URL: #21433
Description
Distinctnode to cudf-polars #21248Dynamic Distinct Node Procedure
Receive metadata - Get input channel metadata (partitioning, local_count, duplicated)
Check partitioning - Call
is_partitioned_on_keys()to determine:partitioned_inter_rank: Data already partitioned between ranks on distinct keyspartitioned_local: Data also partitioned within rank (per chunk)Sample and reduce chunks - Receive up to
sample_chunk_count_distinctchunks:target_partition_size, merge chunks incrementallychunks_sampledfor output size estimationEstimate output size - Calculate
global_sizefor strategy selection:allgather_reduceto aggregatetotal_size,local_count, andchunks_sampledacross ranksglobal_size = (total_size / global_chunks_sampled) * global_chunk_countSelect strategy based on partitioning and
global_size < target_partition_size:fully_partitionedchunkwise- apply distinct per-chunkcan_skip_global_comm+ treetree_local- local tree reductioncan_skip_global_comm+ shuffleshuffle_local- local hash shuffletree_allgather- tree reduction + allgathershuffle- global hash shuffletarget_partition_sizefor incremental reductionChecklist