-
Notifications
You must be signed in to change notification settings - Fork 7.3k
[Data] Fix streaming executor to drain upstream output queue(s) #56941
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -7,6 +7,8 @@ | |||||
|
|
||||||
| import numpy as np | ||||||
| import pandas as pd | ||||||
| import pyarrow as pa | ||||||
| import pyarrow.parquet as pq | ||||||
| import pytest | ||||||
|
|
||||||
| import ray | ||||||
|
|
@@ -40,7 +42,9 @@ | |||||
| from ray.data._internal.execution.operators.task_pool_map_operator import ( | ||||||
| TaskPoolMapOperator, | ||||||
| ) | ||||||
| from ray.data._internal.execution.streaming_executor import StreamingExecutor | ||||||
| from ray.data._internal.execution.util import make_ref_bundles | ||||||
| from ray.data._internal.logical.optimizers import get_execution_plan | ||||||
| from ray.data._internal.output_buffer import OutputBlockSizeOption | ||||||
| from ray.data._internal.stats import Timer | ||||||
| from ray.data.block import Block, BlockAccessor | ||||||
|
|
@@ -826,6 +830,58 @@ def test_limit_operator(ray_start_regular_shared): | |||||
| assert limit_op.completed(), limit | ||||||
|
|
||||||
|
|
||||||
| def test_limit_operator_memory_leak_fix(ray_start_regular_shared, tmp_path): | ||||||
| """Test that LimitOperator properly drains upstream output queues. | ||||||
|
|
||||||
| This test verifies the memory leak fix by directly using StreamingExecutor | ||||||
| to access the actual topology and check queued blocks after execution. | ||||||
| """ | ||||||
| for i in range(100): | ||||||
| data = [{"id": i * 5 + j, "value": f"row_{i * 5 + j}"} for j in range(5)] | ||||||
| table = pa.Table.from_pydict( | ||||||
| {"id": [row["id"] for row in data], "value": [row["value"] for row in data]} | ||||||
| ) | ||||||
| parquet_file = tmp_path / f"test_data_{i}.parquet" | ||||||
| pq.write_table(table, str(parquet_file)) | ||||||
|
|
||||||
| parquet_files = [str(tmp_path / f"test_data_{i}.parquet") for i in range(100)] | ||||||
|
|
||||||
| ds = ( | ||||||
| ray.data.read_parquet(parquet_files, override_num_blocks=100) | ||||||
| .limit(5) | ||||||
| .map(lambda x: x) | ||||||
| ) | ||||||
|
|
||||||
| execution_plan = ds._plan | ||||||
| physical_plan = get_execution_plan(execution_plan._logical_plan) | ||||||
|
|
||||||
| # Use StreamingExecutor directly to have access to the actual topology | ||||||
| executor = StreamingExecutor(DataContext.get_current()) | ||||||
| output_iterator = executor.execute(physical_plan.dag) | ||||||
|
|
||||||
| # Collect all results and count rows | ||||||
| total_rows = 0 | ||||||
| for bundle in output_iterator: | ||||||
| for block_ref in bundle.block_refs: | ||||||
| block = ray.get(block_ref) | ||||||
| total_rows += block.num_rows | ||||||
| assert ( | ||||||
| total_rows == 5 | ||||||
| ), f"Expected exactly 5 rows after limit(5), but got {total_rows}" | ||||||
|
|
||||||
| # Find the ReadParquet operator's OpState | ||||||
| topology = executor._topology | ||||||
| read_parquet_op_state = None | ||||||
| for op, op_state in topology.items(): | ||||||
| if "ReadParquet" in op.name: | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The condition
Suggested change
|
||||||
| read_parquet_op_state = op_state | ||||||
| break | ||||||
|
|
||||||
| # Check the output queue size | ||||||
| output_queue_size = len(read_parquet_op_state.output_queue) | ||||||
| assert output_queue_size == 0, f"Expected 0 items, but got {output_queue_size}." | ||||||
|
|
||||||
|
|
||||||
| def _get_bundles(bundle: RefBundle): | ||||||
| output = [] | ||||||
| for block_ref in bundle.block_refs: | ||||||
|
|
||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Downstream Completion Triggers Premature Queue Clearing
Unconditionally clearing an upstream operator's output queue when any downstream operator finishes execution can lead to data loss. In fan-out scenarios, this starves other active downstream operators that still depend on that data.