[data] Remove stats update thread#57971
Conversation
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
…calls + create an interval for updating stats per dataset Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
…/remove-stats-thread
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
| def before_epoch_start(self): | ||
| self._yielded_first_batch = False | ||
| @contextmanager | ||
| def _epoch_context(self): | ||
| """Context manager for epoch lifecycle: setup and cleanup. | ||
|
|
||
| def after_epoch_end(self): | ||
| StatsManager.clear_iteration_metrics(self._dataset_tag) |
There was a problem hiding this comment.
Actually find this callback methods (before/after) a bit easier to understand, let's keep them
There was a problem hiding this comment.
For handling stateful stuff, if an exception is occurred, then it doesn't get cleaned up. If we go for 1 : 1 streaming executor -> StatsManager, then I can remove clear_iteration_metrics, and go back to the original version safelly
python/ray/data/_internal/stats.py
Outdated
| # NOTE: This must be thread-safe because multiple datasets can | ||
| # be running at the same time. Decreasing the size of the dictionary | ||
| # is not thread-safe | ||
| with self._update_last_updated_lock: |
There was a problem hiding this comment.
Let's get rid of that:
- Instantiate StatsManager in StreamingExecutor (so that these are 1:1)
- Remove all locks
There was a problem hiding this comment.
I decided to remove the lock by making StatsManager stateless, and push that information to the batch iterator and executor.
5366997 to
90ce0d6
Compare
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
bveeramani
left a comment
There was a problem hiding this comment.
Overall LGTM.
ty. This is a useful refactor
| # Creating/getting an actor from multiple threads is not safe. | ||
| # https://github.com/ray-project/ray/issues/41324 | ||
| _stats_actor_lock: threading.RLock = threading.RLock() |
There was a problem hiding this comment.
Why don't we need this anymore?
There was a problem hiding this comment.
Two reasons:
- [Core] getting/creating an actor from multiple thread errors #41324 is resolved
- This follows the pattern of
get_or_create_actor_location_trackerwhich also creates an actor on the driver process. As far as I know, we haven't had issues with ^ so I think it's safe to remove the lock
|
|
||
| self._metrics_last_updated: float = 0.0 |
There was a problem hiding this comment.
Nit: Here and in iterator -- wasn't totally obvious to me what this variable represents based on the name alone. Would recommend either choosing a more descriptive name or adding a comment
python/ray/data/_internal/stats.py
Outdated
| if force_update: | ||
| ray.wait([ref], timeout=1) |
There was a problem hiding this comment.
What's the motivation for waiting here? I don't think we waited in the previous implementation, and blocking calls in the scheduling loop (even with 1s timeout) makes me feel nervous.
There was a problem hiding this comment.
To keep the previous force_update. Honestly, it's not required and have removed it
| _StatsManager.update_iteration_metrics( | ||
| self._stats, self._dataset_tag, force_update=True | ||
| ) |
There was a problem hiding this comment.
My understanding is that, in the previous implementation, we don't update any metrics in after_epoch_end.
Don't think it's an issue, but what's the motivation for performing a force update here?
There was a problem hiding this comment.
Hmm, I think this was a subtle bug in the previous implementation.
Essentially, the streaming executor force updates the metrics on shutdown to zero out the metrics. A similar story lies here as well -- If we don't update the metrics after_epoch_end, then the last iteration metrics aren't updated. I don't think "clearing" the metrics was the right move because then we wouldn't finalize the last iteration metrics.
lmk if that makes sense
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
| now = time.time() | ||
| if (now - self._metrics_last_updated) > self.UPDATE_METRICS_INTERVAL_S: | ||
| _StatsManager.update_iteration_metrics(self._stats, self._dataset_tag) | ||
| self._metrics_last_updated = now |
There was a problem hiding this comment.
Bug: Incorrect timing skews metrics update intervals.
The metrics update interval check in yield_batch_context measures time incorrectly by calling time.time() after user code execution completes. This includes user processing time in the interval calculation, causing metrics to update at unpredictable intervals rather than the intended 5-second cadence. The timestamp should be captured before the yield statement or at the start of the context manager.
…/remove-stats-thread
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
| def yield_batch_context(self, batch: Batch): | ||
| with self._stats.iter_user_s.timer() if self._stats else nullcontext(): | ||
| if self._stats is None: | ||
| return |
There was a problem hiding this comment.
Bug: Context Manager Yielding Error Breaks Context
The yield_batch_context context manager returns early without yielding when self._stats is None. Context managers decorated with @contextmanager must yield exactly once, otherwise a RuntimeError is raised when entering the context. The function should yield in both branches, similar to how get_next_batch_context handles the None case by yielding in the else branch, or use nullcontext() as the old implementation did.
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
## Description Before this PR, the metrics would follow this path 1. `StreamingExecutor` collects metrics per operator 2. `_StatsManager` creates a thread to export metrics 3. `StreamingExecutor` sends metrics to `_StatsManager`, which performs a copy and holds a `_stats_lock`. 4. Stats Thread reads the metrics sent from 2) 5. Stats Thread sleeps every 5-10 seconds before exporting metrics to `_StatsActor`. These metrics can come in 2 forms: iteration and execution metrics. I believe the purpose of the stats thread created in 2) was 2-fold - Don't export stats very frequently - Don't export Iteration and Execution stats separately (have them sent in the same rpc call) However, this creates a lot of complexity (handling idle threads, etc...) and also makes it harder to perform histogram metrics, which need to copy an entire list of values. See ray-project#57851 for more details. By removing the stats thread in 2), we can reduce complexity of management, and also avoid wasteful copying of metrics. The downside is that iteration and execution metrics are now sent separately, increasing the # of rpc calls. I don't think this is a concern, because the async updates to the `_StatsActor` were happening previously, and we can also tweak the update interval. ~~It's important to note that `_stats_lock` still lives on to update the last timestamps of each dataset. See * below for more details.~~ Now the new flow is: 1. `StreamingExecutor` collects metrics per operator 2. `StreamingExecutor` checks the last time `_StatsActor` was updated. If more than a default 5 seconds has passed since last updated, we send metrics to `_StatsActor` through the `_StatsManager`. Afterwards, we update the last updated timestamp. See * below for caveat. ~~\*[important] Ray Data supports running multiple datasets concurrently. Therefore, I must keep track of each dataset last updated timestamp. `_stats_lock` is used to update that dictionary[dataset, last_updated] safely on `register_dataset` and on `shutdown`. On update, we don't require the lock because it does not update the dictionary's size. If we want to remove the lock entirely, I can think of 2 workarounds.~~ 1. ~~Create a per dataset `StatsManager`. Pros: no thread lock. Cons: Much more code changes. The iteration metrics go through a separate code path that is independent of the streaming executor, which will make this more challenging.~~ 2. ~~Update on every unix_epoch_timestamp % interval == 0, so that at 12:00, 12:05, etc.. the updates will be on that interval. Pros: easy to implement and it's stateless. Cons: Breaks down for slower streaming executors.~~ 3. I can removed the lock by keeping the state in the 2 areas - BatchIterator - StreamingExecutor I also verified that ray-project#55163 still solves the original issue ## Related issues ## Additional information --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
## Description Before this PR, the metrics would follow this path 1. `StreamingExecutor` collects metrics per operator 2. `_StatsManager` creates a thread to export metrics 3. `StreamingExecutor` sends metrics to `_StatsManager`, which performs a copy and holds a `_stats_lock`. 4. Stats Thread reads the metrics sent from 2) 5. Stats Thread sleeps every 5-10 seconds before exporting metrics to `_StatsActor`. These metrics can come in 2 forms: iteration and execution metrics. I believe the purpose of the stats thread created in 2) was 2-fold - Don't export stats very frequently - Don't export Iteration and Execution stats separately (have them sent in the same rpc call) However, this creates a lot of complexity (handling idle threads, etc...) and also makes it harder to perform histogram metrics, which need to copy an entire list of values. See ray-project#57851 for more details. By removing the stats thread in 2), we can reduce complexity of management, and also avoid wasteful copying of metrics. The downside is that iteration and execution metrics are now sent separately, increasing the # of rpc calls. I don't think this is a concern, because the async updates to the `_StatsActor` were happening previously, and we can also tweak the update interval. ~~It's important to note that `_stats_lock` still lives on to update the last timestamps of each dataset. See * below for more details.~~ Now the new flow is: 1. `StreamingExecutor` collects metrics per operator 2. `StreamingExecutor` checks the last time `_StatsActor` was updated. If more than a default 5 seconds has passed since last updated, we send metrics to `_StatsActor` through the `_StatsManager`. Afterwards, we update the last updated timestamp. See * below for caveat. ~~\*[important] Ray Data supports running multiple datasets concurrently. Therefore, I must keep track of each dataset last updated timestamp. `_stats_lock` is used to update that dictionary[dataset, last_updated] safely on `register_dataset` and on `shutdown`. On update, we don't require the lock because it does not update the dictionary's size. If we want to remove the lock entirely, I can think of 2 workarounds.~~ 1. ~~Create a per dataset `StatsManager`. Pros: no thread lock. Cons: Much more code changes. The iteration metrics go through a separate code path that is independent of the streaming executor, which will make this more challenging.~~ 2. ~~Update on every unix_epoch_timestamp % interval == 0, so that at 12:00, 12:05, etc.. the updates will be on that interval. Pros: easy to implement and it's stateless. Cons: Breaks down for slower streaming executors.~~ 3. I can removed the lock by keeping the state in the 2 areas - BatchIterator - StreamingExecutor I also verified that ray-project#55163 still solves the original issue ## Related issues ## Additional information --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com> Signed-off-by: Aydin Abiar <aydin@anyscale.com>
## Description Before this PR, the metrics would follow this path 1. `StreamingExecutor` collects metrics per operator 2. `_StatsManager` creates a thread to export metrics 3. `StreamingExecutor` sends metrics to `_StatsManager`, which performs a copy and holds a `_stats_lock`. 4. Stats Thread reads the metrics sent from 2) 5. Stats Thread sleeps every 5-10 seconds before exporting metrics to `_StatsActor`. These metrics can come in 2 forms: iteration and execution metrics. I believe the purpose of the stats thread created in 2) was 2-fold - Don't export stats very frequently - Don't export Iteration and Execution stats separately (have them sent in the same rpc call) However, this creates a lot of complexity (handling idle threads, etc...) and also makes it harder to perform histogram metrics, which need to copy an entire list of values. See ray-project#57851 for more details. By removing the stats thread in 2), we can reduce complexity of management, and also avoid wasteful copying of metrics. The downside is that iteration and execution metrics are now sent separately, increasing the # of rpc calls. I don't think this is a concern, because the async updates to the `_StatsActor` were happening previously, and we can also tweak the update interval. ~~It's important to note that `_stats_lock` still lives on to update the last timestamps of each dataset. See * below for more details.~~ Now the new flow is: 1. `StreamingExecutor` collects metrics per operator 2. `StreamingExecutor` checks the last time `_StatsActor` was updated. If more than a default 5 seconds has passed since last updated, we send metrics to `_StatsActor` through the `_StatsManager`. Afterwards, we update the last updated timestamp. See * below for caveat. ~~\*[important] Ray Data supports running multiple datasets concurrently. Therefore, I must keep track of each dataset last updated timestamp. `_stats_lock` is used to update that dictionary[dataset, last_updated] safely on `register_dataset` and on `shutdown`. On update, we don't require the lock because it does not update the dictionary's size. If we want to remove the lock entirely, I can think of 2 workarounds.~~ 1. ~~Create a per dataset `StatsManager`. Pros: no thread lock. Cons: Much more code changes. The iteration metrics go through a separate code path that is independent of the streaming executor, which will make this more challenging.~~ 2. ~~Update on every unix_epoch_timestamp % interval == 0, so that at 12:00, 12:05, etc.. the updates will be on that interval. Pros: easy to implement and it's stateless. Cons: Breaks down for slower streaming executors.~~ 3. I can removed the lock by keeping the state in the 2 areas - BatchIterator - StreamingExecutor I also verified that ray-project#55163 still solves the original issue ## Related issues ## Additional information --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com> Signed-off-by: YK <1811651+ykdojo@users.noreply.github.com>
## Description Before this PR, the metrics would follow this path 1. `StreamingExecutor` collects metrics per operator 2. `_StatsManager` creates a thread to export metrics 3. `StreamingExecutor` sends metrics to `_StatsManager`, which performs a copy and holds a `_stats_lock`. 4. Stats Thread reads the metrics sent from 2) 5. Stats Thread sleeps every 5-10 seconds before exporting metrics to `_StatsActor`. These metrics can come in 2 forms: iteration and execution metrics. I believe the purpose of the stats thread created in 2) was 2-fold - Don't export stats very frequently - Don't export Iteration and Execution stats separately (have them sent in the same rpc call) However, this creates a lot of complexity (handling idle threads, etc...) and also makes it harder to perform histogram metrics, which need to copy an entire list of values. See ray-project#57851 for more details. By removing the stats thread in 2), we can reduce complexity of management, and also avoid wasteful copying of metrics. The downside is that iteration and execution metrics are now sent separately, increasing the # of rpc calls. I don't think this is a concern, because the async updates to the `_StatsActor` were happening previously, and we can also tweak the update interval. ~~It's important to note that `_stats_lock` still lives on to update the last timestamps of each dataset. See * below for more details.~~ Now the new flow is: 1. `StreamingExecutor` collects metrics per operator 2. `StreamingExecutor` checks the last time `_StatsActor` was updated. If more than a default 5 seconds has passed since last updated, we send metrics to `_StatsActor` through the `_StatsManager`. Afterwards, we update the last updated timestamp. See * below for caveat. ~~\*[important] Ray Data supports running multiple datasets concurrently. Therefore, I must keep track of each dataset last updated timestamp. `_stats_lock` is used to update that dictionary[dataset, last_updated] safely on `register_dataset` and on `shutdown`. On update, we don't require the lock because it does not update the dictionary's size. If we want to remove the lock entirely, I can think of 2 workarounds.~~ 1. ~~Create a per dataset `StatsManager`. Pros: no thread lock. Cons: Much more code changes. The iteration metrics go through a separate code path that is independent of the streaming executor, which will make this more challenging.~~ 2. ~~Update on every unix_epoch_timestamp % interval == 0, so that at 12:00, 12:05, etc.. the updates will be on that interval. Pros: easy to implement and it's stateless. Cons: Breaks down for slower streaming executors.~~ 3. I can removed the lock by keeping the state in the 2 areas - BatchIterator - StreamingExecutor I also verified that ray-project#55163 still solves the original issue ## Related issues ## Additional information --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
## Description Before this PR, the metrics would follow this path 1. `StreamingExecutor` collects metrics per operator 2. `_StatsManager` creates a thread to export metrics 3. `StreamingExecutor` sends metrics to `_StatsManager`, which performs a copy and holds a `_stats_lock`. 4. Stats Thread reads the metrics sent from 2) 5. Stats Thread sleeps every 5-10 seconds before exporting metrics to `_StatsActor`. These metrics can come in 2 forms: iteration and execution metrics. I believe the purpose of the stats thread created in 2) was 2-fold - Don't export stats very frequently - Don't export Iteration and Execution stats separately (have them sent in the same rpc call) However, this creates a lot of complexity (handling idle threads, etc...) and also makes it harder to perform histogram metrics, which need to copy an entire list of values. See ray-project#57851 for more details. By removing the stats thread in 2), we can reduce complexity of management, and also avoid wasteful copying of metrics. The downside is that iteration and execution metrics are now sent separately, increasing the # of rpc calls. I don't think this is a concern, because the async updates to the `_StatsActor` were happening previously, and we can also tweak the update interval. ~~It's important to note that `_stats_lock` still lives on to update the last timestamps of each dataset. See * below for more details.~~ Now the new flow is: 1. `StreamingExecutor` collects metrics per operator 2. `StreamingExecutor` checks the last time `_StatsActor` was updated. If more than a default 5 seconds has passed since last updated, we send metrics to `_StatsActor` through the `_StatsManager`. Afterwards, we update the last updated timestamp. See * below for caveat. ~~\*[important] Ray Data supports running multiple datasets concurrently. Therefore, I must keep track of each dataset last updated timestamp. `_stats_lock` is used to update that dictionary[dataset, last_updated] safely on `register_dataset` and on `shutdown`. On update, we don't require the lock because it does not update the dictionary's size. If we want to remove the lock entirely, I can think of 2 workarounds.~~ 1. ~~Create a per dataset `StatsManager`. Pros: no thread lock. Cons: Much more code changes. The iteration metrics go through a separate code path that is independent of the streaming executor, which will make this more challenging.~~ 2. ~~Update on every unix_epoch_timestamp % interval == 0, so that at 12:00, 12:05, etc.. the updates will be on that interval. Pros: easy to implement and it's stateless. Cons: Breaks down for slower streaming executors.~~ 3. I can removed the lock by keeping the state in the 2 areas - BatchIterator - StreamingExecutor I also verified that ray-project#55163 still solves the original issue ## Related issues ## Additional information --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com> Signed-off-by: Future-Outlier <eric901201@gmail.com>
## Description Before this PR, the metrics would follow this path 1. `StreamingExecutor` collects metrics per operator 2. `_StatsManager` creates a thread to export metrics 3. `StreamingExecutor` sends metrics to `_StatsManager`, which performs a copy and holds a `_stats_lock`. 4. Stats Thread reads the metrics sent from 2) 5. Stats Thread sleeps every 5-10 seconds before exporting metrics to `_StatsActor`. These metrics can come in 2 forms: iteration and execution metrics. I believe the purpose of the stats thread created in 2) was 2-fold - Don't export stats very frequently - Don't export Iteration and Execution stats separately (have them sent in the same rpc call) However, this creates a lot of complexity (handling idle threads, etc...) and also makes it harder to perform histogram metrics, which need to copy an entire list of values. See ray-project#57851 for more details. By removing the stats thread in 2), we can reduce complexity of management, and also avoid wasteful copying of metrics. The downside is that iteration and execution metrics are now sent separately, increasing the # of rpc calls. I don't think this is a concern, because the async updates to the `_StatsActor` were happening previously, and we can also tweak the update interval. ~~It's important to note that `_stats_lock` still lives on to update the last timestamps of each dataset. See * below for more details.~~ Now the new flow is: 1. `StreamingExecutor` collects metrics per operator 2. `StreamingExecutor` checks the last time `_StatsActor` was updated. If more than a default 5 seconds has passed since last updated, we send metrics to `_StatsActor` through the `_StatsManager`. Afterwards, we update the last updated timestamp. See * below for caveat. ~~\*[important] Ray Data supports running multiple datasets concurrently. Therefore, I must keep track of each dataset last updated timestamp. `_stats_lock` is used to update that dictionary[dataset, last_updated] safely on `register_dataset` and on `shutdown`. On update, we don't require the lock because it does not update the dictionary's size. If we want to remove the lock entirely, I can think of 2 workarounds.~~ 1. ~~Create a per dataset `StatsManager`. Pros: no thread lock. Cons: Much more code changes. The iteration metrics go through a separate code path that is independent of the streaming executor, which will make this more challenging.~~ 2. ~~Update on every unix_epoch_timestamp % interval == 0, so that at 12:00, 12:05, etc.. the updates will be on that interval. Pros: easy to implement and it's stateless. Cons: Breaks down for slower streaming executors.~~ 3. I can removed the lock by keeping the state in the 2 areas - BatchIterator - StreamingExecutor I also verified that ray-project#55163 still solves the original issue ## Related issues ## Additional information --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com> Signed-off-by: peterxcli <peterxcli@gmail.com>
Description
Before this PR, the metrics would follow this path
StreamingExecutorcollects metrics per operator_StatsManagercreates a thread to export metricsStreamingExecutorsends metrics to_StatsManager, which performs a copy and holds a_stats_lock._StatsActor. These metrics can come in 2 forms: iteration and execution metrics.I believe the purpose of the stats thread created in 2) was 2-fold
However, this creates a lot of complexity (handling idle threads, etc...) and also makes it harder to perform histogram metrics, which need to copy an entire list of values. See #57851 for more details.
By removing the stats thread in 2), we can reduce complexity of management, and also avoid wasteful copying of metrics. The downside is that iteration and execution metrics are now sent separately, increasing the # of rpc calls. I don't think this is a concern, because the async updates to the
_StatsActorwere happening previously, and we can also tweak the update interval.It's important to note that_stats_lockstill lives on to update the last timestamps of each dataset. See * below for more details.Now the new flow is:
StreamingExecutorcollects metrics per operatorStreamingExecutorchecks the last time_StatsActorwas updated. If more than a default 5 seconds has passed since last updated, we send metrics to_StatsActorthrough the_StatsManager. Afterwards, we update the last updated timestamp. See * below for caveat.*[important] Ray Data supports running multiple datasets concurrently. Therefore, I must keep track of each dataset last updated timestamp._stats_lockis used to update that dictionary[dataset, last_updated] safely onregister_datasetand onshutdown. On update, we don't require the lock because it does not update the dictionary's size. If we want to remove the lock entirely, I can think of 2 workarounds.Create a per datasetStatsManager. Pros: no thread lock. Cons: Much more code changes. The iteration metrics go through a separate code path that is independent of the streaming executor, which will make this more challenging.Update on every unix_epoch_timestamp % interval == 0, so that at 12:00, 12:05, etc.. the updates will be on that interval. Pros: easy to implement and it's stateless. Cons: Breaks down for slower streaming executors.I also verified that #55163 still solves the original issue
Related issues
Additional information