Skip to content

Fix materialize commit#1331

Merged
bohou-aryn merged 1 commit into
mainfrom
materializecommit
Jun 10, 2025
Merged

Fix materialize commit#1331
bohou-aryn merged 1 commit into
mainfrom
materializecommit

Conversation

@bohou-aryn
Copy link
Copy Markdown
Collaborator

@bohou-aryn bohou-aryn commented Jun 4, 2025

Replace the dataset operators with docset operators to handle
materialize success handling.

@bohou-aryn bohou-aryn force-pushed the materializecommit branch 2 times, most recently from c2acb45 to c1322c7 Compare June 4, 2025 23:56
Copy link
Copy Markdown
Contributor

@dhruvkaliraman7 dhruvkaliraman7 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have some tests please, not obvious what the code is doing.

@bohou-aryn bohou-aryn force-pushed the materializecommit branch from c1322c7 to a087595 Compare June 5, 2025 20:45

count = values.count()
samples = values if count < 1000 else values.random_sample(1000.0 / count)
count = self.count()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this code is going to force execution twice.
You don't know the count unless you execute the entire pipeline. I think that self.count() will run through the standard Executor so will materialize (if that's enabled) and at least not re-do the full effort.

A more efficient implementation would be to do a streaming read and use the choose k out of n algorithm from https://en.wikipedia.org/wiki/Fisher%E2%80%93Yates_shuffle; so you store the 1000 documents, and process all of them exactly once. I'd probably just put this in as a TODO.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remember to add the TODO.

return f"DatasetScan({self._dataset})"


class DatasetTransplant(UnaryNode):
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand what this is for. I think what is happening is it's assuming that some other work happened to produce dataset from node, and that we return that, but I don't think this will interact correctly with our plan rewriting stuff. I think the dataset is computed earlier (and I can't quite tell exactly when it is forced to be computed. but if plan rewriting inserts some nodes, those inserts wouldn't be respected.

Probably just need to chat.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

docs = sycamore.init().read().map().map().filter().llm_generate_group().aggregate().
take_all() { actual_node = rewrite(node); return [ Document(d) for d in actual_node.execute() ] }
add a node rewriter that adds a filter step right after read (for security purposes)
The filter will be ignored because llm_generate_group will have calculated the pipeline when the filter step doesn't exist.

def aggregate(self, child):
    ds = child.execute() # this computes a ray dataset, which binds the execution tree to what is happening with child at the time of the call
    return DataSetTransplant(ds, child)
  1. node tree up to aggregate
  2. transform that node tree into a ray dataset
  3. add a node after the initial read
  4. call take_all which ignores the node added in step 3 -- security is broken here because the filter doesn't happen
child = [1,2,3,4]
dataset = map(lambda x: x + 1, child)
child[3] = 17
# dataset is unchanged

execute() function to turn a node into a ray_dataset

If node -> ray form was effectively 
  ray_dataset = ray_dataset.ray_apply(lambda: node_as_function(node))
  this would let you update node and have take effect until ray runs.
  but you couldn't insert a node.
  
but the actual code looks more like:
  node = Filter(...)
  ray_dataset = ray_dataset.filter(fn = node.fn)
  now updating node.fn has no effect

nodes = Scan Node -> Entity Extraction -> GroupBy [not a docset here] -> Aggregate.
ds = agg() # DocSet
class DocSet:
     def agg(self):
           return DocSet(Node(...))

ctx = sycamore.init()
ds1 = ctx.read()
# here is where the rewrite rule would insert the filter
ds2 = ds1.ee()
# Eric guesses adding rule here will work
gb = ds2.gb()
# Here might work
ds3 = gb.collect()
# Here won't work because collect has already called execute on ds2, so the rule is ignored.
docs = ds.take_all()

This is hard to figure out what it is doing.
Write a test with a rewrite rule that inserts a filter to remove one of the groups and verify the group vanishes with the rule enabled.

@bohou-aryn bohou-aryn force-pushed the materializecommit branch 3 times, most recently from 5c36743 to 7b586c9 Compare June 6, 2025 21:10
1. Replace the dataset operators with docset operators to handle
materialize success handling.
2. Rewrite groupby count and collect in terms of DocSet instead of
using DatasetScan.
@bohou-aryn bohou-aryn force-pushed the materializecommit branch from 7b586c9 to 3abd525 Compare June 6, 2025 21:24

count = values.count()
samples = values if count < 1000 else values.random_sample(1000.0 / count)
count = self.count()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remember to add the TODO.


def group_udf(batch):
import numpy as np
class AggregateCount(NonCPUUser, NonGPUUser, Transform):
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

# TODO: share a base class between AggregateCount and AggregateCollect. The respective execute functions are identical.

@bohou-aryn bohou-aryn merged commit 1554cbc into main Jun 10, 2025
12 of 15 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants