Skip to content

Add dynamic_distinct streaming node#21382

Closed
rjzamora wants to merge 24 commits intorapidsai:mainfrom
rjzamora:dynamic-distinct
Closed

Add dynamic_distinct streaming node#21382
rjzamora wants to merge 24 commits intorapidsai:mainfrom
rjzamora:dynamic-distinct

Conversation

@rjzamora
Copy link
Copy Markdown
Member

@rjzamora rjzamora commented Feb 9, 2026

Description

Dynamic Distinct Node Procedure

  1. Receive metadata - Get input channel metadata (partitioning, local_count, duplicated)

  2. Check partitioning - Call is_partitioned_on_keys() to determine:

    • partitioned_inter_rank: Data already partitioned between ranks on distinct keys
    • partitioned_local: Data also partitioned within rank (per chunk)
  3. Sample and reduce chunks - Receive up to sample_chunk_count_distinct chunks:

    • Apply local distinct to each incoming chunk
    • When accumulated size exceeds target_partition_size, merge chunks incrementally
    • Stop early if merged size still exceeds threshold (indicates poor compression)
    • Track chunks_sampled for output size estimation
  4. Estimate output size - Calculate global_size for strategy selection:

    • Use allgather_reduce to aggregate total_size, local_count, and chunks_sampled across ranks
    • Estimate: global_size = (total_size / global_chunks_sampled) * global_chunk_count
  5. Select strategy based on partitioning and global_size < target_partition_size:

Condition Strategy
fully_partitioned chunkwise - apply distinct per-chunk
can_skip_global_comm + tree tree_local - local tree reduction
can_skip_global_comm + shuffle shuffle_local - local hash shuffle
Global + tree tree_allgather - tree reduction + allgather
Global + shuffle shuffle - global hash shuffle
  1. Execute strategy - Delegate to the appropriate function, passing already-evaluated chunks and target_partition_size for incremental reduction

Checklist

  • I am familiar with the Contributing Guidelines.
  • New or existing tests cover these changes.
  • The documentation is up to date with these changes.

@rjzamora rjzamora self-assigned this Feb 9, 2026
@rjzamora rjzamora requested a review from a team as a code owner February 9, 2026 15:52
@rjzamora rjzamora requested review from bdice and vyasr February 9, 2026 15:52
@rjzamora rjzamora added feature request New feature or request 3 - Ready for Review Ready for review by team non-breaking Non-breaking change labels Feb 9, 2026
@github-actions github-actions bot added Python Affects Python cuDF API. cudf-polars Issues specific to cudf-polars labels Feb 9, 2026
@GPUtester GPUtester moved this to In Progress in cuDF Python Feb 9, 2026
return TableChunk.from_pylibcudf_table(df.table, chunk.stream, exclusive_view=True)


async def chunkwise_evaluate(
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This utility can be used by default_node_single, distinct_node (this PR), and groupby_node (next PR).

@rjzamora rjzamora marked this pull request as draft February 9, 2026 17:45
@copy-pr-bot
Copy link
Copy Markdown

copy-pr-bot bot commented Feb 9, 2026

Auto-sync is disabled for draft pull requests in this repository. Workflows must be run manually.

Contributors can view more details about this message here.

@rjzamora rjzamora added 2 - In Progress Currently a work in progress and removed 3 - Ready for Review Ready for review by team labels Feb 9, 2026
@rjzamora rjzamora marked this pull request as ready for review February 9, 2026 18:52
@rjzamora
Copy link
Copy Markdown
Member Author

rjzamora commented Feb 9, 2026

/ok to test

@rjzamora rjzamora added 3 - Ready for Review Ready for review by team and removed 2 - In Progress Currently a work in progress labels Feb 10, 2026
Copy link
Copy Markdown
Contributor

@wence- wence- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First pass

Comment on lines +291 to +293
initial_chunks
Optional chunks already received (e.g., during sampling) that should
be forwarded without re-evaluation.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"initial" feels like a bad name here. I was confused below seeing that these must have already been evaluated the same as the processing of remaining chunks.

But thinking about it, this imposes the restriction that these chunks must be processed by the caller identically to the "bulk".

I do not think this is necessarily always going to be the case. For example, even for this PR, we're just trying to determine if we should shuffle-distinct or reduce-distinct. For that, we need to have an estimate of the cardinality of the result. We could do that with the new cudf "approx_distinct_count" API and merge across ranks. In that case, the sampled chunks will not have been processed with actual distinct.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I suppose we could have both processed_chunks (already evaluated) and sampled_chunks (already pulled out of the channel, but not yet evaluated). We probably want to keep these buffered chunk in a SpillableMessages container as well.

Comment on lines +247 to +248
if input_schema is None:
input_schema = ir.children[0].schema
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In what scenarios must we override the schema of the input?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was doing this for groupby at one point, but it shouldn't be necessary anymore. I can remove it.

if initial_chunks:
for chunk in initial_chunks:
if tracer is not None:
tracer.add_chunk(table=chunk.table_view())
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implicitly requires that the initial chunks are in device memory...

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true. Right now it will be in memory, but if we stage the data in a SpillableMessages container then we would need to log these kinds of chunks earlier/differently.

net_memory_delta=0,
)
with opaque_memory_usage(extra):
result = await asyncio.to_thread(evaluate_chunk, chunk, ir, ir_context)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like the input_schema argument to evaluate_chunk is never used. Do we need it?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll remove it.

net_memory_delta=0,
)
with opaque_memory_usage(extra):
result = await asyncio.to_thread(evaluate_chunk, chunk, ir, ir_context)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aside (we should fix this separately), asyncio.to_thread creates and uses an internal ThreadPoolExecutor. But when we run with a rapidsmpf context, we have used a ThreadPoolExecutor with some max number of threads. I suspect we should be using that executor to do the threaded offload.

That would be replacing all the asyncio.to_thread calls with:

loop = asyncio.get_running_loop()
await loop.run_in_executor(ctx.py_executor, partial(func, *args, **kwargs))

Copy link
Copy Markdown
Contributor

@TomAugspurger TomAugspurger Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might be relying on asyncio.to_thread copying over contextvars too.

IIRC, we (or rapidsmpf?) create a new event loop, maybe implicitly via asycnio.run here. We could ensure the desired executor is used there: https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.set_default_executor. I'll make an issue in rapidsmpf.

Edit: rapidsai/rapidsmpf#855


# Get collective IDs for this Distinct (may be empty if not reserved)
collective_ids = list(rec.state["collective_id_map"].get(ir, []))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there are no collective_ids, we're hosed right?

# Get collective IDs for this Distinct (may be empty if not reserved)
collective_ids = list(rec.state["collective_id_map"].get(ir, []))

# Create the dynamic unique node
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# Create the dynamic unique node

channels[ir.children[0]].reserve_output_slot(),
dynamic_planning.sample_chunk_count,
executor.target_partition_size,
executor.groupby_n_ary, # Reuse groupby n_ary for now
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't understand why we care to specify the width of the fan-in for the tree-reduce.


if can_skip_global_comm:
# No global communication needed
if local_estimate < target_partition_size or require_tree:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think this local_estimate value is not the right one.

Imagine that you have 1000 chunks and after chunkwise distinct each chunk is 10MiB, then local_estimate will be 10GiB. But what if there are only 1000 distinct values across across all the chunks (i.e. the keys of all chunks turn out to overlap), so the actual final output is only 10MiB in size. But we'll have decided to do a shuffle (if target_partition_size is smaller than 10GiB) for no good reason?

key_indices,
shuffle_context=context,
tracer=tracer,
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we want to minimise communication (whichever way we do things). I wonder if the strategy should be something like:

  • always first do chunkwise_distinct
  • Dispatch to appropriate case if we're already partitioned
  • Otherwise: start by doing the first level of the "tree-reduce" case, keeping track of the growth of the output chunks.
  • After we've seen some number of chunks, do an allreduce of some kind to decide if we're going to continue with the tree-reduce strategy or flip to a shuffle strategy

This is close to what you're doing here, but you've made an assumption of independence of values between chunks, which feels like (especially for groupby which will presumably follow a similar pattern) a very pessimistic assumption.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My latest changes moved in this direction. Thanks for the suggestion.

@rjzamora
Copy link
Copy Markdown
Member Author

Thank you for the great review @wence- - Just a note that I'm making heavy revisions, and will likely push the changes today. Note that I'm experimenting with the actual changes in a branch that also has dynamic join and groupby to make sure I still get reasonable perf. At this point, I do suspect that distinct and groupby can use most of the same code. However, I might leave that question for a follow-up. For now, I'm just trying to make them look as similar as possible.

Comment on lines +462 to +463
Use when data is already partitioned on the relevant keys and each
chunk can be processed independently.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder: does the requirement that the "data is already partitioned on the relevant keys" matter? I guess there's two kinds of operaitons:

  1. Operations where it's always safe to process the data chunk-by-chunk (like + 1) which I think we call piecewise here
  2. Operations where it's safe to process the data chunk-by-chunk if the data has already been partitioned on the relevant keys (like a distinct on the partitioned keys)

Is that right?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, exactly right. For distinct and groupby, we are safe to process chunks independently if we know that chunk has all unique values for the "key" columns. Otherwise, we must reduce or shuffle to get the correct result.

async def evaluate_chunk(
context: Context,
chunk: TableChunk,
ir: IR | list[IR],
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it'd be simpler to just list[IR] or *IR, and then you can remove the isinstance below (and update the callers).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar pattern in evaluate_batch.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree - Making this change in #21433


def test_dynamic_distinct_tree_strategy(df):
"""Test that small output uses tree reduction (high target_partition_size)."""
# With a very high target_partition_size, tree reduction should be chosen
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there anything in this test that actually asserts we used a tree reduction? I suppose that's unobservable from the outside.

Comment on lines +292 to +296
if shuffle_context is None:
# Create a temporary local context
options = Options(get_environment_variables())
local_comm = single_comm(options)
shuffle_context = Context(local_comm, context.br(), options)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to https://github.com/rapidsai/cudf/pull/21126/changes, I think this will fail to propagate the (stateful) statistics object.

I'll write up an issue on the rapidsmpf side proposing something like this.

@rjzamora
Copy link
Copy Markdown
Member Author

Closing this in favor of #21433

@rjzamora rjzamora closed this Feb 17, 2026
@github-project-automation github-project-automation bot moved this from In Progress to Done in cuDF Python Feb 17, 2026
rapids-bot bot pushed a commit that referenced this pull request Feb 26, 2026
- Closes #21246
- Closes #21248
- **ALTERNATIVE** to #21382
- Part of #20482

This PR implements the same changes as #21382, but it handle both `GroupBy` and `Distinct` within the same code path.

Authors:
  - Richard (Rick) Zamora (https://github.com/rjzamora)
  - Matthew Murray (https://github.com/Matt711)

Approvers:
  - Lawrence Mitchell (https://github.com/wence-)

URL: #21433
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

3 - Ready for Review Ready for review by team cudf-polars Issues specific to cudf-polars feature request New feature or request non-breaking Non-breaking change Python Affects Python cuDF API.

Projects

Archived in project

Development

Successfully merging this pull request may close these issues.

[FEA] Add dynamic Distinct node to cudf-polars

4 participants