[data] Add node_id, pid, attempt # for hanging tasks#59793
[data] Add node_id, pid, attempt # for hanging tasks#59793alexeykudinkin merged 11 commits intoray-project:masterfrom
Conversation
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
There was a problem hiding this comment.
Code Review
This pull request aims to enhance the debuggability of hanging tasks by incorporating node_id, pid, and attempt # into the hanging task detector's output. This is achieved by passing the task_id through the operator pipeline to OpRuntimeMetrics, which is then used by the HangingExecutionIssueDetector to fetch detailed task information. The implementation is generally sound, with a beneficial refactoring in physical_operator.py. However, I've identified a critical issue in hash_shuffle.py where arguments to on_task_submitted are incorrectly ordered, which would result in a runtime error.
python/ray/data/_internal/issue_detection/detectors/hanging_detector.py
Outdated
Show resolved
Hide resolved
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
|
|
||
|
|
||
| @dataclass | ||
| class RunningTaskInfo: |
There was a problem hiding this comment.
I assume this state is not serialized and persisted anywhere correct?
| task_state = ray.util.state.get_task( | ||
| task_info.task_id.hex(), | ||
| timeout=1.0 | ||
| ) |
There was a problem hiding this comment.
Can we pass in _explain=True and log the explanation in the event of a failure?
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
python/ray/data/_internal/issue_detection/detectors/hanging_detector.py
Outdated
Show resolved
Hide resolved
…tector.py Co-authored-by: Alexey Kudinkin <alexey.kudinkin@gmail.com> Signed-off-by: iamjustinhsu <140442892+iamjustinhsu@users.noreply.github.com>
…/add-more-info-to-hanging-tasks
## Description
Currently, when displaying hanging tasks, we show ray data level task
index, which is useless for ray core debugging. This PR adds more info
to long running tasks namely:
- node_id
- pid
- attempt #
I did consider adding this to high memory detector, but avoided for 2
reasons
- requires more refractor of `RunningTaskInfo`
- afaik, not helpful in debugging since high memory is _after the task
completes_
## Example script to trigger hanging issues
```python
import ray
import time
from ray.data._internal.issue_detection.detectors import HangingExecutionIssueDetectorConfig
ctx = ray.data.DataContext.get_current()
ctx.issue_detectors_config.hanging_detector_config = HangingExecutionIssueDetectorConfig(
detection_time_interval_s=1.0,
)
def sleep(x):
if x['id'] == 0:
time.sleep(100)
return x
ray.data.range(100, override_num_blocks=100).map_batches(sleep).materialize()
```
## Related issues
None
## Additional information
None
---------
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <140442892+iamjustinhsu@users.noreply.github.com>
Co-authored-by: Alexey Kudinkin <alexey.kudinkin@gmail.com>
Signed-off-by: jasonwrwang <jasonwrwang@tencent.com>
## Description Previously, I added `task_id`, `node_id`, and `attempt_number` for hanging tasks in #59793. However, this introduced a race condition when querying for task state: 1. Task is submitted 2. Issue detector immediately fires off 3. `get_task` returns `None` https://github.com/iamjustinhsu/ray/blob/75f9731f69f4b9c7b973f53b74d0580adb3c4ab9/python/ray/data/_internal/issue_detection/detectors/hanging_detector.py#L161 because task state not ready. for 2), we only fire off when the task wasn't hanging before, or if the task has produced bytes since last checked. My fix is to _also_ check if `previous_state.task_state` is `None` too I ran this many times, and the race condition stopped. Open to ideas on testing this too ## Related issues ## Additional information --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
## Description
Currently, when displaying hanging tasks, we show ray data level task
index, which is useless for ray core debugging. This PR adds more info
to long running tasks namely:
- node_id
- pid
- attempt #
I did consider adding this to high memory detector, but avoided for 2
reasons
- requires more refractor of `RunningTaskInfo`
- afaik, not helpful in debugging since high memory is _after the task
completes_
## Example script to trigger hanging issues
```python
import ray
import time
from ray.data._internal.issue_detection.detectors import HangingExecutionIssueDetectorConfig
ctx = ray.data.DataContext.get_current()
ctx.issue_detectors_config.hanging_detector_config = HangingExecutionIssueDetectorConfig(
detection_time_interval_s=1.0,
)
def sleep(x):
if x['id'] == 0:
time.sleep(100)
return x
ray.data.range(100, override_num_blocks=100).map_batches(sleep).materialize()
```
## Related issues
None
## Additional information
None
---------
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <140442892+iamjustinhsu@users.noreply.github.com>
Co-authored-by: Alexey Kudinkin <alexey.kudinkin@gmail.com>
## Description
Currently, when displaying hanging tasks, we show ray data level task
index, which is useless for ray core debugging. This PR adds more info
to long running tasks namely:
- node_id
- pid
- attempt #
I did consider adding this to high memory detector, but avoided for 2
reasons
- requires more refractor of `RunningTaskInfo`
- afaik, not helpful in debugging since high memory is _after the task
completes_
## Example script to trigger hanging issues
```python
import ray
import time
from ray.data._internal.issue_detection.detectors import HangingExecutionIssueDetectorConfig
ctx = ray.data.DataContext.get_current()
ctx.issue_detectors_config.hanging_detector_config = HangingExecutionIssueDetectorConfig(
detection_time_interval_s=1.0,
)
def sleep(x):
if x['id'] == 0:
time.sleep(100)
return x
ray.data.range(100, override_num_blocks=100).map_batches(sleep).materialize()
```
## Related issues
None
## Additional information
None
---------
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <140442892+iamjustinhsu@users.noreply.github.com>
Co-authored-by: Alexey Kudinkin <alexey.kudinkin@gmail.com>
…roject#60592) ## Description Previously, I added `task_id`, `node_id`, and `attempt_number` for hanging tasks in ray-project#59793. However, this introduced a race condition when querying for task state: 1. Task is submitted 2. Issue detector immediately fires off 3. `get_task` returns `None` https://github.com/iamjustinhsu/ray/blob/75f9731f69f4b9c7b973f53b74d0580adb3c4ab9/python/ray/data/_internal/issue_detection/detectors/hanging_detector.py#L161 because task state not ready. for 2), we only fire off when the task wasn't hanging before, or if the task has produced bytes since last checked. My fix is to _also_ check if `previous_state.task_state` is `None` too I ran this many times, and the race condition stopped. Open to ideas on testing this too ## Related issues ## Additional information --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com> Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
## Description Previously, I added `task_id`, `node_id`, and `attempt_number` for hanging tasks in #59793. However, this introduced a race condition when querying for task state: 1. Task is submitted 2. Issue detector immediately fires off 3. `get_task` returns `None` https://github.com/iamjustinhsu/ray/blob/75f9731f69f4b9c7b973f53b74d0580adb3c4ab9/python/ray/data/_internal/issue_detection/detectors/hanging_detector.py#L161 because task state not ready. for 2), we only fire off when the task wasn't hanging before, or if the task has produced bytes since last checked. My fix is to _also_ check if `previous_state.task_state` is `None` too I ran this many times, and the race condition stopped. Open to ideas on testing this too ## Related issues ## Additional information --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com> Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
## Description Previously, I added `task_id`, `node_id`, and `attempt_number` for hanging tasks in #59793. However, this introduced a race condition when querying for task state: 1. Task is submitted 2. Issue detector immediately fires off 3. `get_task` returns `None` https://github.com/iamjustinhsu/ray/blob/75f9731f69f4b9c7b973f53b74d0580adb3c4ab9/python/ray/data/_internal/issue_detection/detectors/hanging_detector.py#L161 because task state not ready. for 2), we only fire off when the task wasn't hanging before, or if the task has produced bytes since last checked. My fix is to _also_ check if `previous_state.task_state` is `None` too I ran this many times, and the race condition stopped. Open to ideas on testing this too ## Related issues ## Additional information --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
…roject#60592) ## Description Previously, I added `task_id`, `node_id`, and `attempt_number` for hanging tasks in ray-project#59793. However, this introduced a race condition when querying for task state: 1. Task is submitted 2. Issue detector immediately fires off 3. `get_task` returns `None` https://github.com/iamjustinhsu/ray/blob/75f9731f69f4b9c7b973f53b74d0580adb3c4ab9/python/ray/data/_internal/issue_detection/detectors/hanging_detector.py#L161 because task state not ready. for 2), we only fire off when the task wasn't hanging before, or if the task has produced bytes since last checked. My fix is to _also_ check if `previous_state.task_state` is `None` too I ran this many times, and the race condition stopped. Open to ideas on testing this too ## Related issues ## Additional information --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com> Signed-off-by: Adel Nour <ans9868@nyu.edu>
## Description
Currently, when displaying hanging tasks, we show ray data level task
index, which is useless for ray core debugging. This PR adds more info
to long running tasks namely:
- node_id
- pid
- attempt #
I did consider adding this to high memory detector, but avoided for 2
reasons
- requires more refractor of `RunningTaskInfo`
- afaik, not helpful in debugging since high memory is _after the task
completes_
## Example script to trigger hanging issues
```python
import ray
import time
from ray.data._internal.issue_detection.detectors import HangingExecutionIssueDetectorConfig
ctx = ray.data.DataContext.get_current()
ctx.issue_detectors_config.hanging_detector_config = HangingExecutionIssueDetectorConfig(
detection_time_interval_s=1.0,
)
def sleep(x):
if x['id'] == 0:
time.sleep(100)
return x
ray.data.range(100, override_num_blocks=100).map_batches(sleep).materialize()
```
## Related issues
None
## Additional information
None
---------
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <140442892+iamjustinhsu@users.noreply.github.com>
Co-authored-by: Alexey Kudinkin <alexey.kudinkin@gmail.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
…roject#60592) ## Description Previously, I added `task_id`, `node_id`, and `attempt_number` for hanging tasks in ray-project#59793. However, this introduced a race condition when querying for task state: 1. Task is submitted 2. Issue detector immediately fires off 3. `get_task` returns `None` https://github.com/iamjustinhsu/ray/blob/75f9731f69f4b9c7b973f53b74d0580adb3c4ab9/python/ray/data/_internal/issue_detection/detectors/hanging_detector.py#L161 because task state not ready. for 2), we only fire off when the task wasn't hanging before, or if the task has produced bytes since last checked. My fix is to _also_ check if `previous_state.task_state` is `None` too I ran this many times, and the race condition stopped. Open to ideas on testing this too ## Related issues ## Additional information --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com> Signed-off-by: peterxcli <peterxcli@gmail.com>
## Description
Currently, when displaying hanging tasks, we show ray data level task
index, which is useless for ray core debugging. This PR adds more info
to long running tasks namely:
- node_id
- pid
- attempt #
I did consider adding this to high memory detector, but avoided for 2
reasons
- requires more refractor of `RunningTaskInfo`
- afaik, not helpful in debugging since high memory is _after the task
completes_
## Example script to trigger hanging issues
```python
import ray
import time
from ray.data._internal.issue_detection.detectors import HangingExecutionIssueDetectorConfig
ctx = ray.data.DataContext.get_current()
ctx.issue_detectors_config.hanging_detector_config = HangingExecutionIssueDetectorConfig(
detection_time_interval_s=1.0,
)
def sleep(x):
if x['id'] == 0:
time.sleep(100)
return x
ray.data.range(100, override_num_blocks=100).map_batches(sleep).materialize()
```
## Related issues
None
## Additional information
None
---------
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <140442892+iamjustinhsu@users.noreply.github.com>
Co-authored-by: Alexey Kudinkin <alexey.kudinkin@gmail.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
…roject#60592) ## Description Previously, I added `task_id`, `node_id`, and `attempt_number` for hanging tasks in ray-project#59793. However, this introduced a race condition when querying for task state: 1. Task is submitted 2. Issue detector immediately fires off 3. `get_task` returns `None` https://github.com/iamjustinhsu/ray/blob/75f9731f69f4b9c7b973f53b74d0580adb3c4ab9/python/ray/data/_internal/issue_detection/detectors/hanging_detector.py#L161 because task state not ready. for 2), we only fire off when the task wasn't hanging before, or if the task has produced bytes since last checked. My fix is to _also_ check if `previous_state.task_state` is `None` too I ran this many times, and the race condition stopped. Open to ideas on testing this too ## Related issues ## Additional information --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com> Signed-off-by: peterxcli <peterxcli@gmail.com>
Description
Currently, when displaying hanging tasks, we show ray data level task index, which is useless for ray core debugging. This PR adds more info to long running tasks namely:
I did consider adding this to high memory detector, but avoided for 2 reasons
RunningTaskInfoExample script to trigger hanging issues
Related issues
None
Additional information
None