Skip to content

Commit 7dd9e22

Browse files
iamjustinhsuzma2
authored andcommitted
[data] Reset external queue metrics (ray-project#56604)
<!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? <!-- Please give a short summary of the change and the problem this solves. --> before: we updated metrics when we added an output, but the queue size after the operator is done adding outputs. after: we update every time we take an input. This should rise and fall back to 0 naturally <img width="741" height="279" alt="image" src="https://github.com/user-attachments/assets/e1a48097-e1dd-4b7b-9ce2-a23ab65d590b" /> ## Related issue number <!-- For example: "Closes ray-project#1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com> Signed-off-by: Zhiqiang Ma <zhiqiang.ma@intel.com>
1 parent a941ac9 commit 7dd9e22

File tree

2 files changed

+14
-18
lines changed

2 files changed

+14
-18
lines changed

python/ray/data/_internal/execution/streaming_executor_state.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -307,8 +307,8 @@ def add_output(self, ref: RefBundle) -> None:
307307
self.op.metrics.num_restarting_actors = actor_info.restarting
308308
self.op.metrics.num_pending_actors = actor_info.pending
309309
for next_op in self.op.output_dependencies:
310-
next_op.metrics.num_external_inqueue_blocks = self.output_queue.num_blocks
311-
next_op.metrics.num_external_inqueue_bytes = self.output_queue.memory_usage
310+
next_op.metrics.num_external_inqueue_blocks += len(ref.blocks)
311+
next_op.metrics.num_external_inqueue_bytes += ref.size_bytes()
312312

313313
def refresh_progress_bar(self, resource_manager: ResourceManager) -> None:
314314
"""Update the console with the latest operator progress."""
@@ -353,6 +353,8 @@ def dispatch_next_task(self) -> None:
353353
ref = inqueue.pop()
354354
if ref is not None:
355355
self.op.add_input(ref, input_index=i)
356+
self.op.metrics.num_external_inqueue_bytes -= ref.size_bytes()
357+
self.op.metrics.num_external_inqueue_blocks -= len(ref.blocks)
356358
return
357359

358360
assert False, "Nothing to dispatch"

python/ray/data/tests/test_stats.py

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,8 @@ def gen_expected_metrics(
103103
"'num_outputs_of_finished_tasks': N",
104104
"'bytes_outputs_of_finished_tasks': N",
105105
"'rows_outputs_of_finished_tasks': N",
106-
"'num_external_inqueue_blocks': N",
107-
"'num_external_inqueue_bytes': N",
106+
"'num_external_inqueue_blocks': Z",
107+
"'num_external_inqueue_bytes': Z",
108108
"'num_tasks_submitted': N",
109109
"'num_tasks_running': Z",
110110
"'num_tasks_have_outputs': N",
@@ -166,8 +166,8 @@ def gen_expected_metrics(
166166
"'num_outputs_of_finished_tasks': Z",
167167
"'bytes_outputs_of_finished_tasks': Z",
168168
"'rows_outputs_of_finished_tasks': Z",
169-
"'num_external_inqueue_blocks': N",
170-
"'num_external_inqueue_bytes': N",
169+
"'num_external_inqueue_blocks': Z",
170+
"'num_external_inqueue_bytes': Z",
171171
"'num_tasks_submitted': Z",
172172
"'num_tasks_running': Z",
173173
"'num_tasks_have_outputs': Z",
@@ -710,8 +710,8 @@ def test_dataset__repr__(ray_start_regular_shared, restore_data_context):
710710
" num_outputs_of_finished_tasks: N,\n"
711711
" bytes_outputs_of_finished_tasks: N,\n"
712712
" rows_outputs_of_finished_tasks: N,\n"
713-
" num_external_inqueue_blocks: N,\n"
714-
" num_external_inqueue_bytes: N,\n"
713+
" num_external_inqueue_blocks: Z,\n"
714+
" num_external_inqueue_bytes: Z,\n"
715715
" num_tasks_submitted: N,\n"
716716
" num_tasks_running: Z,\n"
717717
" num_tasks_have_outputs: N,\n"
@@ -842,8 +842,8 @@ def check_stats():
842842
" num_outputs_of_finished_tasks: N,\n"
843843
" bytes_outputs_of_finished_tasks: N,\n"
844844
" rows_outputs_of_finished_tasks: N,\n"
845-
" num_external_inqueue_blocks: N,\n"
846-
" num_external_inqueue_bytes: N,\n"
845+
" num_external_inqueue_blocks: Z,\n"
846+
" num_external_inqueue_bytes: Z,\n"
847847
" num_tasks_submitted: N,\n"
848848
" num_tasks_running: Z,\n"
849849
" num_tasks_have_outputs: N,\n"
@@ -929,8 +929,8 @@ def check_stats():
929929
" num_outputs_of_finished_tasks: N,\n"
930930
" bytes_outputs_of_finished_tasks: N,\n"
931931
" rows_outputs_of_finished_tasks: N,\n"
932-
" num_external_inqueue_blocks: N,\n"
933-
" num_external_inqueue_bytes: N,\n"
932+
" num_external_inqueue_blocks: Z,\n"
933+
" num_external_inqueue_bytes: Z,\n"
934934
" num_tasks_submitted: N,\n"
935935
" num_tasks_running: Z,\n"
936936
" num_tasks_have_outputs: N,\n"
@@ -1985,12 +1985,6 @@ def test_op_metrics_logging():
19851985
+ gen_expected_metrics(is_map=False)
19861986
) # .replace("'obj_store_mem_used': N", "'obj_store_mem_used': Z")
19871987
# InputDataBuffer has no inqueue, manually set to 0
1988-
input_str = input_str.replace(
1989-
"'num_external_inqueue_blocks': N", "'num_external_inqueue_blocks': Z"
1990-
)
1991-
input_str = input_str.replace(
1992-
"'num_external_inqueue_bytes': N", "'num_external_inqueue_bytes': Z"
1993-
)
19941988
map_str = (
19951989
"Operator TaskPoolMapOperator[ReadRange->MapBatches(<lambda>)] completed. "
19961990
"Operator Metrics:\n"

0 commit comments

Comments
 (0)