Skip to content

[data] Remove stats update thread#57971

Merged
bveeramani merged 35 commits intoray-project:masterfrom
iamjustinhsu:jhsu/remove-stats-thread
Nov 14, 2025
Merged

[data] Remove stats update thread#57971
bveeramani merged 35 commits intoray-project:masterfrom
iamjustinhsu:jhsu/remove-stats-thread

Conversation

@iamjustinhsu
Copy link
Contributor

@iamjustinhsu iamjustinhsu commented Oct 21, 2025

Description

Before this PR, the metrics would follow this path

  1. StreamingExecutor collects metrics per operator
  2. _StatsManager creates a thread to export metrics
  3. StreamingExecutor sends metrics to _StatsManager, which performs a copy and holds a _stats_lock.
  4. Stats Thread reads the metrics sent from 2)
  5. Stats Thread sleeps every 5-10 seconds before exporting metrics to _StatsActor. These metrics can come in 2 forms: iteration and execution metrics.

I believe the purpose of the stats thread created in 2) was 2-fold

  • Don't export stats very frequently
  • Don't export Iteration and Execution stats separately (have them sent in the same rpc call)

However, this creates a lot of complexity (handling idle threads, etc...) and also makes it harder to perform histogram metrics, which need to copy an entire list of values. See #57851 for more details.

By removing the stats thread in 2), we can reduce complexity of management, and also avoid wasteful copying of metrics. The downside is that iteration and execution metrics are now sent separately, increasing the # of rpc calls. I don't think this is a concern, because the async updates to the _StatsActor were happening previously, and we can also tweak the update interval.

It's important to note that _stats_lock still lives on to update the last timestamps of each dataset. See * below for more details.

Now the new flow is:

  1. StreamingExecutor collects metrics per operator
  2. StreamingExecutor checks the last time _StatsActor was updated. If more than a default 5 seconds has passed since last updated, we send metrics to _StatsActor through the _StatsManager. Afterwards, we update the last updated timestamp. See * below for caveat.

*[important] Ray Data supports running multiple datasets concurrently. Therefore, I must keep track of each dataset last updated timestamp. _stats_lock is used to update that dictionary[dataset, last_updated] safely on register_dataset and on shutdown. On update, we don't require the lock because it does not update the dictionary's size. If we want to remove the lock entirely, I can think of 2 workarounds.

  1. Create a per dataset StatsManager. Pros: no thread lock. Cons: Much more code changes. The iteration metrics go through a separate code path that is independent of the streaming executor, which will make this more challenging.
  2. Update on every unix_epoch_timestamp % interval == 0, so that at 12:00, 12:05, etc.. the updates will be on that interval. Pros: easy to implement and it's stateless. Cons: Breaks down for slower streaming executors.
  3. I can removed the lock by keeping the state in the 2 areas
  • BatchIterator
  • StreamingExecutor

I also verified that #55163 still solves the original issue

Related issues

Additional information

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
@iamjustinhsu iamjustinhsu changed the title [data] Remote stats update thread [data] Remove stats update thread Oct 24, 2025
@iamjustinhsu iamjustinhsu marked this pull request as ready for review October 29, 2025 16:29
@iamjustinhsu iamjustinhsu requested a review from a team as a code owner October 29, 2025 16:30
cursor[bot]

This comment was marked as outdated.

…calls + create an interval for updating stats per dataset

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
cursor[bot]

This comment was marked as outdated.

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
cursor[bot]

This comment was marked as outdated.

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
@ray-gardener ray-gardener bot added performance data Ray Data-related issues labels Oct 29, 2025
cursor[bot]

This comment was marked as outdated.

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
cursor[bot]

This comment was marked as outdated.

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
cursor[bot]

This comment was marked as outdated.

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Comment on lines -238 to -242
def before_epoch_start(self):
self._yielded_first_batch = False
@contextmanager
def _epoch_context(self):
"""Context manager for epoch lifecycle: setup and cleanup.

def after_epoch_end(self):
StatsManager.clear_iteration_metrics(self._dataset_tag)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually find this callback methods (before/after) a bit easier to understand, let's keep them

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For handling stateful stuff, if an exception is occurred, then it doesn't get cleaned up. If we go for 1 : 1 streaming executor -> StatsManager, then I can remove clear_iteration_metrics, and go back to the original version safelly

# NOTE: This must be thread-safe because multiple datasets can
# be running at the same time. Decreasing the size of the dictionary
# is not thread-safe
with self._update_last_updated_lock:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's get rid of that:

  • Instantiate StatsManager in StreamingExecutor (so that these are 1:1)
  • Remove all locks

Copy link
Contributor Author

@iamjustinhsu iamjustinhsu Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I decided to remove the lock by making StatsManager stateless, and push that information to the batch iterator and executor.

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
@iamjustinhsu iamjustinhsu force-pushed the jhsu/remove-stats-thread branch from 5366997 to 90ce0d6 Compare November 1, 2025 20:03
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Copy link
Member

@bveeramani bveeramani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM.

ty. This is a useful refactor

Comment on lines -770 to -772
# Creating/getting an actor from multiple threads is not safe.
# https://github.com/ray-project/ray/issues/41324
_stats_actor_lock: threading.RLock = threading.RLock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we need this anymore?

Copy link
Contributor Author

@iamjustinhsu iamjustinhsu Nov 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two reasons:

Comment on lines 117 to +120

self._metrics_last_updated: float = 0.0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Here and in iterator -- wasn't totally obvious to me what this variable represents based on the name alone. Would recommend either choosing a more descriptive name or adding a comment

Comment on lines +859 to +860
if force_update:
ray.wait([ref], timeout=1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the motivation for waiting here? I don't think we waited in the previous implementation, and blocking calls in the scheduling loop (even with 1s timeout) makes me feel nervous.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To keep the previous force_update. Honestly, it's not required and have removed it

Comment on lines +246 to +248
_StatsManager.update_iteration_metrics(
self._stats, self._dataset_tag, force_update=True
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that, in the previous implementation, we don't update any metrics in after_epoch_end.

Don't think it's an issue, but what's the motivation for performing a force update here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I think this was a subtle bug in the previous implementation.

Essentially, the streaming executor force updates the metrics on shutdown to zero out the metrics. A similar story lies here as well -- If we don't update the metrics after_epoch_end, then the last iteration metrics aren't updated. I don't think "clearing" the metrics was the right move because then we wouldn't finalize the last iteration metrics.

lmk if that makes sense

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
now = time.time()
if (now - self._metrics_last_updated) > self.UPDATE_METRICS_INTERVAL_S:
_StatsManager.update_iteration_metrics(self._stats, self._dataset_tag)
self._metrics_last_updated = now
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Incorrect timing skews metrics update intervals.

The metrics update interval check in yield_batch_context measures time incorrectly by calling time.time() after user code execution completes. This includes user processing time in the interval calculation, causing metrics to update at unpredictable intervals rather than the intended 5-second cadence. The timestamp should be captured before the yield statement or at the start of the context manager.

Fix in Cursor Fix in Web

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
def yield_batch_context(self, batch: Batch):
with self._stats.iter_user_s.timer() if self._stats else nullcontext():
if self._stats is None:
return
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Context Manager Yielding Error Breaks Context

The yield_batch_context context manager returns early without yielding when self._stats is None. Context managers decorated with @contextmanager must yield exactly once, otherwise a RuntimeError is raised when entering the context. The function should yield in both branches, similar to how get_next_batch_context handles the None case by yielding in the else branch, or use nullcontext() as the old implementation did.

Fix in Cursor Fix in Web

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
@iamjustinhsu iamjustinhsu added the go add ONLY when ready to merge, run all tests label Nov 12, 2025
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
@bveeramani bveeramani merged commit 698b614 into ray-project:master Nov 14, 2025
6 checks passed
@iamjustinhsu iamjustinhsu deleted the jhsu/remove-stats-thread branch November 14, 2025 22:48
ArturNiederfahrenhorst pushed a commit to ArturNiederfahrenhorst/ray that referenced this pull request Nov 16, 2025
## Description
Before this PR, the metrics would follow this path
1. `StreamingExecutor` collects metrics per operator
2. `_StatsManager` creates a thread to export metrics
3. `StreamingExecutor` sends metrics to `_StatsManager`, which performs
a copy and holds a `_stats_lock`.
4. Stats Thread reads the metrics sent from 2)
5. Stats Thread sleeps every 5-10 seconds before exporting metrics to
`_StatsActor`. These metrics can come in 2 forms: iteration and
execution metrics.

I believe the purpose of the stats thread created in 2) was 2-fold
- Don't export stats very frequently
- Don't export Iteration and Execution stats separately (have them sent
in the same rpc call)

However, this creates a lot of complexity (handling idle threads,
etc...) and also makes it harder to perform histogram metrics, which
need to copy an entire list of values. See
ray-project#57851 for more details.

By removing the stats thread in 2), we can reduce complexity of
management, and also avoid wasteful copying of metrics. The downside is
that iteration and execution metrics are now sent separately, increasing
the # of rpc calls. I don't think this is a concern, because the async
updates to the `_StatsActor` were happening previously, and we can also
tweak the update interval.

~~It's important to note that `_stats_lock` still lives on to update the
last timestamps of each dataset. See * below for more details.~~

Now the new flow is:
1. `StreamingExecutor` collects metrics per operator
2. `StreamingExecutor` checks the last time `_StatsActor` was updated.
If more than a default 5 seconds has passed since last updated, we send
metrics to `_StatsActor` through the `_StatsManager`. Afterwards, we
update the last updated timestamp. See * below for caveat.

~~\*[important] Ray Data supports running multiple datasets
concurrently. Therefore, I must keep track of each dataset last updated
timestamp. `_stats_lock` is used to update that dictionary[dataset,
last_updated] safely on `register_dataset` and on `shutdown`. On update,
we don't require the lock because it does not update the dictionary's
size. If we want to remove the lock entirely, I can think of 2
workarounds.~~
1. ~~Create a per dataset `StatsManager`. Pros: no thread lock. Cons:
Much more code changes. The iteration metrics go through a separate code
path that is independent of the streaming executor, which will make this
more challenging.~~
2. ~~Update on every unix_epoch_timestamp % interval == 0, so that at
12:00, 12:05, etc.. the updates will be on that interval. Pros: easy to
implement and it's stateless. Cons: Breaks down for slower streaming
executors.~~
3. I can removed the lock by keeping the state in the 2 areas
- BatchIterator
- StreamingExecutor

I also verified that ray-project#55163 still
solves the original issue
## Related issues

## Additional information

---------

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Aydin-ab pushed a commit to Aydin-ab/ray-aydin that referenced this pull request Nov 19, 2025
## Description
Before this PR, the metrics would follow this path
1. `StreamingExecutor` collects metrics per operator
2. `_StatsManager` creates a thread to export metrics
3. `StreamingExecutor` sends metrics to `_StatsManager`, which performs
a copy and holds a `_stats_lock`.
4. Stats Thread reads the metrics sent from 2)
5. Stats Thread sleeps every 5-10 seconds before exporting metrics to
`_StatsActor`. These metrics can come in 2 forms: iteration and
execution metrics.

I believe the purpose of the stats thread created in 2) was 2-fold
- Don't export stats very frequently
- Don't export Iteration and Execution stats separately (have them sent
in the same rpc call)

However, this creates a lot of complexity (handling idle threads,
etc...) and also makes it harder to perform histogram metrics, which
need to copy an entire list of values. See
ray-project#57851 for more details.

By removing the stats thread in 2), we can reduce complexity of
management, and also avoid wasteful copying of metrics. The downside is
that iteration and execution metrics are now sent separately, increasing
the # of rpc calls. I don't think this is a concern, because the async
updates to the `_StatsActor` were happening previously, and we can also
tweak the update interval.

~~It's important to note that `_stats_lock` still lives on to update the
last timestamps of each dataset. See * below for more details.~~

Now the new flow is:
1. `StreamingExecutor` collects metrics per operator
2. `StreamingExecutor` checks the last time `_StatsActor` was updated.
If more than a default 5 seconds has passed since last updated, we send
metrics to `_StatsActor` through the `_StatsManager`. Afterwards, we
update the last updated timestamp. See * below for caveat.

~~\*[important] Ray Data supports running multiple datasets
concurrently. Therefore, I must keep track of each dataset last updated
timestamp. `_stats_lock` is used to update that dictionary[dataset,
last_updated] safely on `register_dataset` and on `shutdown`. On update,
we don't require the lock because it does not update the dictionary's
size. If we want to remove the lock entirely, I can think of 2
workarounds.~~
1. ~~Create a per dataset `StatsManager`. Pros: no thread lock. Cons:
Much more code changes. The iteration metrics go through a separate code
path that is independent of the streaming executor, which will make this
more challenging.~~
2. ~~Update on every unix_epoch_timestamp % interval == 0, so that at
12:00, 12:05, etc.. the updates will be on that interval. Pros: easy to
implement and it's stateless. Cons: Breaks down for slower streaming
executors.~~
3. I can removed the lock by keeping the state in the 2 areas
- BatchIterator
- StreamingExecutor

I also verified that ray-project#55163 still
solves the original issue
## Related issues

## Additional information

---------

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: Aydin Abiar <aydin@anyscale.com>
ykdojo pushed a commit to ykdojo/ray that referenced this pull request Nov 27, 2025
## Description
Before this PR, the metrics would follow this path
1. `StreamingExecutor` collects metrics per operator
2. `_StatsManager` creates a thread to export metrics
3. `StreamingExecutor` sends metrics to `_StatsManager`, which performs
a copy and holds a `_stats_lock`.
4. Stats Thread reads the metrics sent from 2)
5. Stats Thread sleeps every 5-10 seconds before exporting metrics to
`_StatsActor`. These metrics can come in 2 forms: iteration and
execution metrics.

I believe the purpose of the stats thread created in 2) was 2-fold
- Don't export stats very frequently
- Don't export Iteration and Execution stats separately (have them sent
in the same rpc call)

However, this creates a lot of complexity (handling idle threads,
etc...) and also makes it harder to perform histogram metrics, which
need to copy an entire list of values. See
ray-project#57851 for more details.

By removing the stats thread in 2), we can reduce complexity of
management, and also avoid wasteful copying of metrics. The downside is
that iteration and execution metrics are now sent separately, increasing
the # of rpc calls. I don't think this is a concern, because the async
updates to the `_StatsActor` were happening previously, and we can also
tweak the update interval.

~~It's important to note that `_stats_lock` still lives on to update the
last timestamps of each dataset. See * below for more details.~~

Now the new flow is:
1. `StreamingExecutor` collects metrics per operator
2. `StreamingExecutor` checks the last time `_StatsActor` was updated.
If more than a default 5 seconds has passed since last updated, we send
metrics to `_StatsActor` through the `_StatsManager`. Afterwards, we
update the last updated timestamp. See * below for caveat.

~~\*[important] Ray Data supports running multiple datasets
concurrently. Therefore, I must keep track of each dataset last updated
timestamp. `_stats_lock` is used to update that dictionary[dataset,
last_updated] safely on `register_dataset` and on `shutdown`. On update,
we don't require the lock because it does not update the dictionary's
size. If we want to remove the lock entirely, I can think of 2
workarounds.~~
1. ~~Create a per dataset `StatsManager`. Pros: no thread lock. Cons:
Much more code changes. The iteration metrics go through a separate code
path that is independent of the streaming executor, which will make this
more challenging.~~
2. ~~Update on every unix_epoch_timestamp % interval == 0, so that at
12:00, 12:05, etc.. the updates will be on that interval. Pros: easy to
implement and it's stateless. Cons: Breaks down for slower streaming
executors.~~
3. I can removed the lock by keeping the state in the 2 areas
- BatchIterator
- StreamingExecutor

I also verified that ray-project#55163 still
solves the original issue
## Related issues

## Additional information

---------

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: YK <1811651+ykdojo@users.noreply.github.com>
SheldonTsen pushed a commit to SheldonTsen/ray that referenced this pull request Dec 1, 2025
## Description
Before this PR, the metrics would follow this path
1. `StreamingExecutor` collects metrics per operator
2. `_StatsManager` creates a thread to export metrics
3. `StreamingExecutor` sends metrics to `_StatsManager`, which performs
a copy and holds a `_stats_lock`.
4. Stats Thread reads the metrics sent from 2)
5. Stats Thread sleeps every 5-10 seconds before exporting metrics to
`_StatsActor`. These metrics can come in 2 forms: iteration and
execution metrics.

I believe the purpose of the stats thread created in 2) was 2-fold
- Don't export stats very frequently
- Don't export Iteration and Execution stats separately (have them sent
in the same rpc call)

However, this creates a lot of complexity (handling idle threads,
etc...) and also makes it harder to perform histogram metrics, which
need to copy an entire list of values. See
ray-project#57851 for more details.

By removing the stats thread in 2), we can reduce complexity of
management, and also avoid wasteful copying of metrics. The downside is
that iteration and execution metrics are now sent separately, increasing
the # of rpc calls. I don't think this is a concern, because the async
updates to the `_StatsActor` were happening previously, and we can also
tweak the update interval.

~~It's important to note that `_stats_lock` still lives on to update the
last timestamps of each dataset. See * below for more details.~~

Now the new flow is:
1. `StreamingExecutor` collects metrics per operator
2. `StreamingExecutor` checks the last time `_StatsActor` was updated.
If more than a default 5 seconds has passed since last updated, we send
metrics to `_StatsActor` through the `_StatsManager`. Afterwards, we
update the last updated timestamp. See * below for caveat.

~~\*[important] Ray Data supports running multiple datasets
concurrently. Therefore, I must keep track of each dataset last updated
timestamp. `_stats_lock` is used to update that dictionary[dataset,
last_updated] safely on `register_dataset` and on `shutdown`. On update,
we don't require the lock because it does not update the dictionary's
size. If we want to remove the lock entirely, I can think of 2
workarounds.~~
1. ~~Create a per dataset `StatsManager`. Pros: no thread lock. Cons:
Much more code changes. The iteration metrics go through a separate code
path that is independent of the streaming executor, which will make this
more challenging.~~
2. ~~Update on every unix_epoch_timestamp % interval == 0, so that at
12:00, 12:05, etc.. the updates will be on that interval. Pros: easy to
implement and it's stateless. Cons: Breaks down for slower streaming
executors.~~
3. I can removed the lock by keeping the state in the 2 areas
- BatchIterator
- StreamingExecutor

I also verified that ray-project#55163 still
solves the original issue
## Related issues

## Additional information

---------

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Future-Outlier pushed a commit to Future-Outlier/ray that referenced this pull request Dec 7, 2025
## Description
Before this PR, the metrics would follow this path
1. `StreamingExecutor` collects metrics per operator
2. `_StatsManager` creates a thread to export metrics
3. `StreamingExecutor` sends metrics to `_StatsManager`, which performs
a copy and holds a `_stats_lock`.
4. Stats Thread reads the metrics sent from 2)
5. Stats Thread sleeps every 5-10 seconds before exporting metrics to
`_StatsActor`. These metrics can come in 2 forms: iteration and
execution metrics.

I believe the purpose of the stats thread created in 2) was 2-fold
- Don't export stats very frequently
- Don't export Iteration and Execution stats separately (have them sent
in the same rpc call)

However, this creates a lot of complexity (handling idle threads,
etc...) and also makes it harder to perform histogram metrics, which
need to copy an entire list of values. See
ray-project#57851 for more details.

By removing the stats thread in 2), we can reduce complexity of
management, and also avoid wasteful copying of metrics. The downside is
that iteration and execution metrics are now sent separately, increasing
the # of rpc calls. I don't think this is a concern, because the async
updates to the `_StatsActor` were happening previously, and we can also
tweak the update interval.

~~It's important to note that `_stats_lock` still lives on to update the
last timestamps of each dataset. See * below for more details.~~

Now the new flow is:
1. `StreamingExecutor` collects metrics per operator
2. `StreamingExecutor` checks the last time `_StatsActor` was updated.
If more than a default 5 seconds has passed since last updated, we send
metrics to `_StatsActor` through the `_StatsManager`. Afterwards, we
update the last updated timestamp. See * below for caveat.

~~\*[important] Ray Data supports running multiple datasets
concurrently. Therefore, I must keep track of each dataset last updated
timestamp. `_stats_lock` is used to update that dictionary[dataset,
last_updated] safely on `register_dataset` and on `shutdown`. On update,
we don't require the lock because it does not update the dictionary's
size. If we want to remove the lock entirely, I can think of 2
workarounds.~~
1. ~~Create a per dataset `StatsManager`. Pros: no thread lock. Cons:
Much more code changes. The iteration metrics go through a separate code
path that is independent of the streaming executor, which will make this
more challenging.~~
2. ~~Update on every unix_epoch_timestamp % interval == 0, so that at
12:00, 12:05, etc.. the updates will be on that interval. Pros: easy to
implement and it's stateless. Cons: Breaks down for slower streaming
executors.~~
3. I can removed the lock by keeping the state in the 2 areas
- BatchIterator
- StreamingExecutor

I also verified that ray-project#55163 still
solves the original issue
## Related issues

## Additional information

---------

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: Future-Outlier <eric901201@gmail.com>
peterxcli pushed a commit to peterxcli/ray that referenced this pull request Feb 25, 2026
## Description
Before this PR, the metrics would follow this path
1. `StreamingExecutor` collects metrics per operator
2. `_StatsManager` creates a thread to export metrics
3. `StreamingExecutor` sends metrics to `_StatsManager`, which performs
a copy and holds a `_stats_lock`.
4. Stats Thread reads the metrics sent from 2)
5. Stats Thread sleeps every 5-10 seconds before exporting metrics to
`_StatsActor`. These metrics can come in 2 forms: iteration and
execution metrics.

I believe the purpose of the stats thread created in 2) was 2-fold
- Don't export stats very frequently
- Don't export Iteration and Execution stats separately (have them sent
in the same rpc call)

However, this creates a lot of complexity (handling idle threads,
etc...) and also makes it harder to perform histogram metrics, which
need to copy an entire list of values. See
ray-project#57851 for more details.

By removing the stats thread in 2), we can reduce complexity of
management, and also avoid wasteful copying of metrics. The downside is
that iteration and execution metrics are now sent separately, increasing
the # of rpc calls. I don't think this is a concern, because the async
updates to the `_StatsActor` were happening previously, and we can also
tweak the update interval.

~~It's important to note that `_stats_lock` still lives on to update the
last timestamps of each dataset. See * below for more details.~~

Now the new flow is:
1. `StreamingExecutor` collects metrics per operator
2. `StreamingExecutor` checks the last time `_StatsActor` was updated.
If more than a default 5 seconds has passed since last updated, we send
metrics to `_StatsActor` through the `_StatsManager`. Afterwards, we
update the last updated timestamp. See * below for caveat.

~~\*[important] Ray Data supports running multiple datasets
concurrently. Therefore, I must keep track of each dataset last updated
timestamp. `_stats_lock` is used to update that dictionary[dataset,
last_updated] safely on `register_dataset` and on `shutdown`. On update,
we don't require the lock because it does not update the dictionary's
size. If we want to remove the lock entirely, I can think of 2
workarounds.~~
1. ~~Create a per dataset `StatsManager`. Pros: no thread lock. Cons:
Much more code changes. The iteration metrics go through a separate code
path that is independent of the streaming executor, which will make this
more challenging.~~
2. ~~Update on every unix_epoch_timestamp % interval == 0, so that at
12:00, 12:05, etc.. the updates will be on that interval. Pros: easy to
implement and it's stateless. Cons: Breaks down for slower streaming
executors.~~
3. I can removed the lock by keeping the state in the 2 areas
- BatchIterator
- StreamingExecutor

I also verified that ray-project#55163 still
solves the original issue
## Related issues

## Additional information

---------

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests performance

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants