Fix materialize commit#1331
Conversation
c2acb45 to
c1322c7
Compare
dhruvkaliraman7
left a comment
There was a problem hiding this comment.
Can we have some tests please, not obvious what the code is doing.
c1322c7 to
a087595
Compare
|
|
||
| count = values.count() | ||
| samples = values if count < 1000 else values.random_sample(1000.0 / count) | ||
| count = self.count() |
There was a problem hiding this comment.
I think this code is going to force execution twice.
You don't know the count unless you execute the entire pipeline. I think that self.count() will run through the standard Executor so will materialize (if that's enabled) and at least not re-do the full effort.
A more efficient implementation would be to do a streaming read and use the choose k out of n algorithm from https://en.wikipedia.org/wiki/Fisher%E2%80%93Yates_shuffle; so you store the 1000 documents, and process all of them exactly once. I'd probably just put this in as a TODO.
There was a problem hiding this comment.
Remember to add the TODO.
| return f"DatasetScan({self._dataset})" | ||
|
|
||
|
|
||
| class DatasetTransplant(UnaryNode): |
There was a problem hiding this comment.
I don't understand what this is for. I think what is happening is it's assuming that some other work happened to produce dataset from node, and that we return that, but I don't think this will interact correctly with our plan rewriting stuff. I think the dataset is computed earlier (and I can't quite tell exactly when it is forced to be computed. but if plan rewriting inserts some nodes, those inserts wouldn't be respected.
Probably just need to chat.
There was a problem hiding this comment.
docs = sycamore.init().read().map().map().filter().llm_generate_group().aggregate().
take_all() { actual_node = rewrite(node); return [ Document(d) for d in actual_node.execute() ] }
add a node rewriter that adds a filter step right after read (for security purposes)
The filter will be ignored because llm_generate_group will have calculated the pipeline when the filter step doesn't exist.
def aggregate(self, child):
ds = child.execute() # this computes a ray dataset, which binds the execution tree to what is happening with child at the time of the call
return DataSetTransplant(ds, child)
- node tree up to aggregate
- transform that node tree into a ray dataset
- add a node after the initial read
- call take_all which ignores the node added in step 3 -- security is broken here because the filter doesn't happen
child = [1,2,3,4]
dataset = map(lambda x: x + 1, child)
child[3] = 17
# dataset is unchanged
execute() function to turn a node into a ray_dataset
If node -> ray form was effectively
ray_dataset = ray_dataset.ray_apply(lambda: node_as_function(node))
this would let you update node and have take effect until ray runs.
but you couldn't insert a node.
but the actual code looks more like:
node = Filter(...)
ray_dataset = ray_dataset.filter(fn = node.fn)
now updating node.fn has no effect
nodes = Scan Node -> Entity Extraction -> GroupBy [not a docset here] -> Aggregate.
ds = agg() # DocSet
class DocSet:
def agg(self):
return DocSet(Node(...))
ctx = sycamore.init()
ds1 = ctx.read()
# here is where the rewrite rule would insert the filter
ds2 = ds1.ee()
# Eric guesses adding rule here will work
gb = ds2.gb()
# Here might work
ds3 = gb.collect()
# Here won't work because collect has already called execute on ds2, so the rule is ignored.
docs = ds.take_all()
This is hard to figure out what it is doing.
Write a test with a rewrite rule that inserts a filter to remove one of the groups and verify the group vanishes with the rule enabled.
5c36743 to
7b586c9
Compare
1. Replace the dataset operators with docset operators to handle materialize success handling. 2. Rewrite groupby count and collect in terms of DocSet instead of using DatasetScan.
7b586c9 to
3abd525
Compare
|
|
||
| count = values.count() | ||
| samples = values if count < 1000 else values.random_sample(1000.0 / count) | ||
| count = self.count() |
There was a problem hiding this comment.
Remember to add the TODO.
|
|
||
| def group_udf(batch): | ||
| import numpy as np | ||
| class AggregateCount(NonCPUUser, NonGPUUser, Transform): |
There was a problem hiding this comment.
# TODO: share a base class between AggregateCount and AggregateCollect. The respective execute functions are identical.
Replace the dataset operators with docset operators to handle
materialize success handling.