[Core] Adding the node id to the base event#59242
[Core] Adding the node id to the base event#59242edoakes merged 35 commits intoray-project:masterfrom
Conversation
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
There was a problem hiding this comment.
Code Review
This pull request adds the node_id to the base event, which is a valuable addition for observability, especially in scenarios involving unexpected process termination. The changes are extensive, plumbing the node_id through various layers of the system, from Python service startup scripts to the core C++ components and protobuf definitions. The implementation appears solid and consistent. I've made a few minor suggestions to improve code robustness by using const for member variables that are initialized once and never modified. Overall, this is a well-executed and important change.
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
|
For most of the test changes, I just pass in the |
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
| std::make_unique<rpc::EventAggregatorClientImpl>(0, *client_call_manager_), | ||
| "test_session"); | ||
| "test_session", | ||
| NodeID::Nil()); |
There was a problem hiding this comment.
Hi, @machichima
why some tests use NodeID::Nil()), but some tests use NodeID::FromRandom())?
Can we unify them?
There was a problem hiding this comment.
Yes sure. I will make tests use NodeID::Nil() and only use NodeID::FromRandom() in src/ray/gcs/tests/gcs_node_manager_test.cc that has the assertion check for the node_id
Signed-off-by: machichima <nary12321@gmail.com>
d785d51 to
cf2f8d1
Compare
Signed-off-by: Future-Outlier <eric901201@gmail.com>
|
cc @MengjinYan to review or ping others who have expertise if you have time, this is related to the history server's log collector |
MengjinYan
left a comment
There was a problem hiding this comment.
All nit comments. Thanks for the effort!
To make sure correctness, you might also want to update the following test files to verify the node id as well:
- test_ray_event_export_task_events.py
- test_ray_actor_events.py
- test_ray_node_events.py
- test_ray_job_event.py
| std::make_unique<MockEventAggregatorClient>(), | ||
| "test_session_name"); | ||
| "test_session_name", | ||
| NodeID::Nil()); |
There was a problem hiding this comment.
Nit: Probably we should use a valid the node id to test whether the node id can be populated correctly in task event.
There was a problem hiding this comment.
I think this test is testing the Export API, which writes rpc::ExportEvent to log files and I think it's separate from the RayEvent that I modified. I put the NodeID::Nil() here as TaskEventBuffer requires a node_id parameter in its constructor
| RayNodeDefinitionEvent::RayNodeDefinitionEvent(const rpc::GcsNodeInfo &data, | ||
| const std::string &session_name) | ||
| const std::string &session_name, | ||
| const NodeID &node_id) |
There was a problem hiding this comment.
To avoid confusion, we need to add comment saying that this is the gcs node id but not the node id from the GcsNodeInfo data or renaming the parameter name to be local_node_id.
Similar comment to the lifecycle event as well.
src/ray/observability/ray_event.h
Outdated
| ray::rpc::events::RayEvent::Severity severity_; | ||
| std::string message_; | ||
| std::string session_name_; | ||
| const std::string node_id_; |
There was a problem hiding this comment.
Minor: I think it's better to make the node_id_ with type NodeID and call node_id_.Binary() during serialization to be less confusion and it also moves the conversion to the non critical path. Or is there any reason why we should make it a string?
There was a problem hiding this comment.
Thank you for pointing this out! This is the mistake on my side, changing it to NodeID type in d44563a
| (*data.mutable_label_selector())["tier"] = "prod"; | ||
|
|
||
| auto event = std::make_unique<RayActorDefinitionEvent>(data, "test_session_name"); | ||
| auto event = |
There was a problem hiding this comment.
I think we should use a real node id for the tests related to events and make sure the node ids are set correctly.
There was a problem hiding this comment.
Thank you! Just want to confirm if I should modify all tests that related to event to use NodeID::FromRandom() and assert if it's set correctly? Or is it just needed for this test file?
There was a problem hiding this comment.
I think maybe you can hardcode a node id for testing?
There was a problem hiding this comment.
I think anything that is not Nil should be fine. Can either be NodeID::FromRandom() or a hardcoded value.
There was a problem hiding this comment.
Modify tests in: 06220f8
Also modified for:
src/ray/observability/tests/ray_actor_lifecycle_event_test.ccsrc/ray/observability/tests/ray_driver_job_lifecycle_event_test.cc
src/ray/gcs/gcs_server.cc
Outdated
| // Init GCS table storage. Note this is on the default io context, not the one with | ||
| // GcsInternalKVManager, to avoid congestion on the latter. | ||
| RAY_LOG(INFO) << "GCS storage type is " << storage_type_; | ||
| if (!gcs_node_id_.IsNil()) { |
There was a problem hiding this comment.
nit: I think we should log the line no matter if the gcs node id value is Nil or not.
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
|
Hi, @machichima can you solve the merge conflict? |
| } | ||
| for (auto &event : grouped_events) { | ||
| rpc::events::RayEvent ray_event = std::move(*event).Serialize(); | ||
| // Set node_id centrally for all events |
There was a problem hiding this comment.
Nice! I think it's better to put the logic in the AddEvents function so that the event constructions will all when we add the event to the buffer.
There was a problem hiding this comment.
If we want to move ray_event.set_node_id(node_id_.Binary()) to RayEventRecorder::AddEvents, we need to add node_id to RayEvent constructor. This requires a lot more changes for the classes that inherit from RayEvent (e.g. RayActorLifecycleEvent, RayActorDefinitionEvent, ...), like what we did before.
Current approach set the node_id in RayEventRecorder::ExportEvents. It's adding node_id to protobuf directly, so we do not need to change the constructor, making code much cleaner.
I think it's better to keep the current approach. WDYT?
Thank you!
…to-base-event Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
013a298 to
eabbbda
Compare
|
Please let me know when the comment is fixed. I'll do a final review on the PR. Thanks! |
|
cc @dayshah to merge if you are not in vacation, thank you! < 3 |
| explicit TaskEvent(TaskID task_id, | ||
| JobID job_id, | ||
| int32_t attempt_number, | ||
| const NodeID &node_id = NodeID::Nil()); |
There was a problem hiding this comment.
It looks like this is always set by its subclasses, does this need a default?
| const NodeID &node_id = NodeID::Nil()); | |
| const NodeID &node_id); |
|
I think @edoakes needs to merge this one because codeowner on one of these files |
## Description
When the Ray process unexpected terminated and node id changed, we
cannot know the previous and current node id through event as we did not
include `node_id` in the base event.
This feature is needed by the history server for the Ray cluster. When
the Ray process unexpected terminated, we have to flush the events
generated by the previous node. If the ray process was restarted fast,
it is difficult to know which events are generated by the previous node.
This PR add the `node_id` into the base event showing where the event is
being emitted.
### Main changes
- `src/ray/protobuf/public/events_base_event.proto`
- Add node id to the base event proto (`RayEvent`)
For GCS:
- `src/ray/gcs/gcs_server_main.cc`
- add `--node_id` as cli args
- `src/ray/observability/` and `src/ray/gcs/` (some files)
- Add `node_id` as arguments and pass to `RayEvent`
For CoreWorker
- `src/ray/core_worker/`
- Passing the `node_id` to the `RayEvent`
Python side
- `python/ray/_private/node.py`
- Passing `node_id` when starting gcs server
## Related issues
Closes ray-project#58879
## Additional information
### Testing process
1. export env var:
- `RAY_enable_core_worker_ray_event_to_aggregator=1`
-
`RAY_DASHBOARD_AGGREGATOR_AGENT_EVENTS_EXPORT_ADDR="http://localhost:8000"`
2. `ray start --head --system-config='{"enable_ray_event":true}'`
3. Submit simple job `ray job submit -- python rayjob.py`. E.g.
```py
import ray
@ray.remote
def hello_world():
return "hello world"
ray.init()
print(ray.get(hello_world.remote()))
```
4. Run event listener (script below) to start listening the event export
host `python event_listener.py`
```py
import http.server
import socketserver
import json
import logging
PORT = 8000
class EventReceiver(http.server.SimpleHTTPRequestHandler):
def do_POST(self):
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
print(json.loads(post_data.decode('utf-8')))
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps({"status": "success", "message": "Event received"}).encode('utf-8'))
if __name__ == "__main__":
with socketserver.TCPServer(("", PORT), EventReceiver) as httpd:
print(f"Serving event listener on http://localhost:{PORT}")
print("Set RAY_DASHBOARD_AGGREGATOR_AGENT_EVENTS_EXPORT_ADDR to this address.")
httpd.serve_forever()
```
Will get the event as below:
- GCS event:
```json
[
{
"eventId":"A+yrzknbyDjQALBvaimTPcyZWll9Te3Tw+FEnQ==",
"sourceType":"GCS",
"eventType":"DRIVER_JOB_LIFECYCLE_EVENT",
"timestamp":"2025-12-07T10: 54: 12.621560Z",
"severity":"INFO",
"sessionName":"session_2025-12-07_17-33-33_853835_27993",
"driverJobLifecycleEvent":{
"jobId":"BAAAAA==",
"stateTransitions":[
{
"state":"FINISHED",
"timestamp":"2025-12-07T10: 54: 12.621562Z"
}
]
},
"nodeId":"k4hj3FDLYStB38nSSRZQRwOjEV32EoAjQe3KPw==", // <- nodeId set
"message":""
}
]
```
- CoreWorker event:
```json
[
{
"eventId":"TIAp8D4NwN/ne3VhPHQ0QnsBCYSkZmOUWoe6zQ==",
"sourceType":"CORE_WORKER",
"eventType":"TASK_DEFINITION_EVENT",
"timestamp":"2025-12-07T10:54:12.025967Z",
"severity":"INFO",
"sessionName":"session_2025-12-07_17-33-33_853835_27993",
"taskDefinitionEvent":{
"taskId":"yoDzqOi6LlD///////////////8EAAAA",
"taskFunc":{
"pythonFunctionDescriptor":{
"moduleName":"rayjob",
"functionName":"hello_world",
"functionHash":"a37aacc3b7884c2da4aec32db6151d65",
"className":""
}
},
"taskName":"hello_world",
"requiredResources":{
"CPU":1.0
},
"jobId":"BAAAAA==",
"parentTaskId":"//////////////////////////8EAAAA",
"placementGroupId":"////////////////////////",
"serializedRuntimeEnv":"{}",
"taskAttempt":0,
"taskType":"NORMAL_TASK",
"language":"PYTHON",
"refIds":{
}
},
"nodeId":"k4hj3FDLYStB38nSSRZQRwOjEV32EoAjQe3KPw==", // <- nodeId set here
"message":""
}
]
```
---------
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: Future-Outlier <eric901201@gmail.com>
Co-authored-by: Future-Outlier <eric901201@gmail.com>
Co-authored-by: Mengjin Yan <mengjinyan3@gmail.com>
Signed-off-by: jasonwrwang <jasonwrwang@tencent.com>
## Description
When the Ray process unexpected terminated and node id changed, we
cannot know the previous and current node id through event as we did not
include `node_id` in the base event.
This feature is needed by the history server for the Ray cluster. When
the Ray process unexpected terminated, we have to flush the events
generated by the previous node. If the ray process was restarted fast,
it is difficult to know which events are generated by the previous node.
This PR add the `node_id` into the base event showing where the event is
being emitted.
### Main changes
- `src/ray/protobuf/public/events_base_event.proto`
- Add node id to the base event proto (`RayEvent`)
For GCS:
- `src/ray/gcs/gcs_server_main.cc`
- add `--node_id` as cli args
- `src/ray/observability/` and `src/ray/gcs/` (some files)
- Add `node_id` as arguments and pass to `RayEvent`
For CoreWorker
- `src/ray/core_worker/`
- Passing the `node_id` to the `RayEvent`
Python side
- `python/ray/_private/node.py`
- Passing `node_id` when starting gcs server
## Related issues
Closes ray-project#58879
## Additional information
### Testing process
1. export env var:
- `RAY_enable_core_worker_ray_event_to_aggregator=1`
-
`RAY_DASHBOARD_AGGREGATOR_AGENT_EVENTS_EXPORT_ADDR="http://localhost:8000"`
2. `ray start --head --system-config='{"enable_ray_event":true}'`
3. Submit simple job `ray job submit -- python rayjob.py`. E.g.
```py
import ray
@ray.remote
def hello_world():
return "hello world"
ray.init()
print(ray.get(hello_world.remote()))
```
4. Run event listener (script below) to start listening the event export
host `python event_listener.py`
```py
import http.server
import socketserver
import json
import logging
PORT = 8000
class EventReceiver(http.server.SimpleHTTPRequestHandler):
def do_POST(self):
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
print(json.loads(post_data.decode('utf-8')))
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps({"status": "success", "message": "Event received"}).encode('utf-8'))
if __name__ == "__main__":
with socketserver.TCPServer(("", PORT), EventReceiver) as httpd:
print(f"Serving event listener on http://localhost:{PORT}")
print("Set RAY_DASHBOARD_AGGREGATOR_AGENT_EVENTS_EXPORT_ADDR to this address.")
httpd.serve_forever()
```
Will get the event as below:
- GCS event:
```json
[
{
"eventId":"A+yrzknbyDjQALBvaimTPcyZWll9Te3Tw+FEnQ==",
"sourceType":"GCS",
"eventType":"DRIVER_JOB_LIFECYCLE_EVENT",
"timestamp":"2025-12-07T10: 54: 12.621560Z",
"severity":"INFO",
"sessionName":"session_2025-12-07_17-33-33_853835_27993",
"driverJobLifecycleEvent":{
"jobId":"BAAAAA==",
"stateTransitions":[
{
"state":"FINISHED",
"timestamp":"2025-12-07T10: 54: 12.621562Z"
}
]
},
"nodeId":"k4hj3FDLYStB38nSSRZQRwOjEV32EoAjQe3KPw==", // <- nodeId set
"message":""
}
]
```
- CoreWorker event:
```json
[
{
"eventId":"TIAp8D4NwN/ne3VhPHQ0QnsBCYSkZmOUWoe6zQ==",
"sourceType":"CORE_WORKER",
"eventType":"TASK_DEFINITION_EVENT",
"timestamp":"2025-12-07T10:54:12.025967Z",
"severity":"INFO",
"sessionName":"session_2025-12-07_17-33-33_853835_27993",
"taskDefinitionEvent":{
"taskId":"yoDzqOi6LlD///////////////8EAAAA",
"taskFunc":{
"pythonFunctionDescriptor":{
"moduleName":"rayjob",
"functionName":"hello_world",
"functionHash":"a37aacc3b7884c2da4aec32db6151d65",
"className":""
}
},
"taskName":"hello_world",
"requiredResources":{
"CPU":1.0
},
"jobId":"BAAAAA==",
"parentTaskId":"//////////////////////////8EAAAA",
"placementGroupId":"////////////////////////",
"serializedRuntimeEnv":"{}",
"taskAttempt":0,
"taskType":"NORMAL_TASK",
"language":"PYTHON",
"refIds":{
}
},
"nodeId":"k4hj3FDLYStB38nSSRZQRwOjEV32EoAjQe3KPw==", // <- nodeId set here
"message":""
}
]
```
---------
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: Future-Outlier <eric901201@gmail.com>
Co-authored-by: Future-Outlier <eric901201@gmail.com>
Co-authored-by: Mengjin Yan <mengjinyan3@gmail.com>
## Description
When the Ray process unexpected terminated and node id changed, we
cannot know the previous and current node id through event as we did not
include `node_id` in the base event.
This feature is needed by the history server for the Ray cluster. When
the Ray process unexpected terminated, we have to flush the events
generated by the previous node. If the ray process was restarted fast,
it is difficult to know which events are generated by the previous node.
This PR add the `node_id` into the base event showing where the event is
being emitted.
### Main changes
- `src/ray/protobuf/public/events_base_event.proto`
- Add node id to the base event proto (`RayEvent`)
For GCS:
- `src/ray/gcs/gcs_server_main.cc`
- add `--node_id` as cli args
- `src/ray/observability/` and `src/ray/gcs/` (some files)
- Add `node_id` as arguments and pass to `RayEvent`
For CoreWorker
- `src/ray/core_worker/`
- Passing the `node_id` to the `RayEvent`
Python side
- `python/ray/_private/node.py`
- Passing `node_id` when starting gcs server
## Related issues
Closes ray-project#58879
## Additional information
### Testing process
1. export env var:
- `RAY_enable_core_worker_ray_event_to_aggregator=1`
-
`RAY_DASHBOARD_AGGREGATOR_AGENT_EVENTS_EXPORT_ADDR="http://localhost:8000"`
2. `ray start --head --system-config='{"enable_ray_event":true}'`
3. Submit simple job `ray job submit -- python rayjob.py`. E.g.
```py
import ray
@ray.remote
def hello_world():
return "hello world"
ray.init()
print(ray.get(hello_world.remote()))
```
4. Run event listener (script below) to start listening the event export
host `python event_listener.py`
```py
import http.server
import socketserver
import json
import logging
PORT = 8000
class EventReceiver(http.server.SimpleHTTPRequestHandler):
def do_POST(self):
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
print(json.loads(post_data.decode('utf-8')))
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps({"status": "success", "message": "Event received"}).encode('utf-8'))
if __name__ == "__main__":
with socketserver.TCPServer(("", PORT), EventReceiver) as httpd:
print(f"Serving event listener on http://localhost:{PORT}")
print("Set RAY_DASHBOARD_AGGREGATOR_AGENT_EVENTS_EXPORT_ADDR to this address.")
httpd.serve_forever()
```
Will get the event as below:
- GCS event:
```json
[
{
"eventId":"A+yrzknbyDjQALBvaimTPcyZWll9Te3Tw+FEnQ==",
"sourceType":"GCS",
"eventType":"DRIVER_JOB_LIFECYCLE_EVENT",
"timestamp":"2025-12-07T10: 54: 12.621560Z",
"severity":"INFO",
"sessionName":"session_2025-12-07_17-33-33_853835_27993",
"driverJobLifecycleEvent":{
"jobId":"BAAAAA==",
"stateTransitions":[
{
"state":"FINISHED",
"timestamp":"2025-12-07T10: 54: 12.621562Z"
}
]
},
"nodeId":"k4hj3FDLYStB38nSSRZQRwOjEV32EoAjQe3KPw==", // <- nodeId set
"message":""
}
]
```
- CoreWorker event:
```json
[
{
"eventId":"TIAp8D4NwN/ne3VhPHQ0QnsBCYSkZmOUWoe6zQ==",
"sourceType":"CORE_WORKER",
"eventType":"TASK_DEFINITION_EVENT",
"timestamp":"2025-12-07T10:54:12.025967Z",
"severity":"INFO",
"sessionName":"session_2025-12-07_17-33-33_853835_27993",
"taskDefinitionEvent":{
"taskId":"yoDzqOi6LlD///////////////8EAAAA",
"taskFunc":{
"pythonFunctionDescriptor":{
"moduleName":"rayjob",
"functionName":"hello_world",
"functionHash":"a37aacc3b7884c2da4aec32db6151d65",
"className":""
}
},
"taskName":"hello_world",
"requiredResources":{
"CPU":1.0
},
"jobId":"BAAAAA==",
"parentTaskId":"//////////////////////////8EAAAA",
"placementGroupId":"////////////////////////",
"serializedRuntimeEnv":"{}",
"taskAttempt":0,
"taskType":"NORMAL_TASK",
"language":"PYTHON",
"refIds":{
}
},
"nodeId":"k4hj3FDLYStB38nSSRZQRwOjEV32EoAjQe3KPw==", // <- nodeId set here
"message":""
}
]
```
---------
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: Future-Outlier <eric901201@gmail.com>
Co-authored-by: Future-Outlier <eric901201@gmail.com>
Co-authored-by: Mengjin Yan <mengjinyan3@gmail.com>
## Description
When the Ray process unexpected terminated and node id changed, we
cannot know the previous and current node id through event as we did not
include `node_id` in the base event.
This feature is needed by the history server for the Ray cluster. When
the Ray process unexpected terminated, we have to flush the events
generated by the previous node. If the ray process was restarted fast,
it is difficult to know which events are generated by the previous node.
This PR add the `node_id` into the base event showing where the event is
being emitted.
### Main changes
- `src/ray/protobuf/public/events_base_event.proto`
- Add node id to the base event proto (`RayEvent`)
For GCS:
- `src/ray/gcs/gcs_server_main.cc`
- add `--node_id` as cli args
- `src/ray/observability/` and `src/ray/gcs/` (some files)
- Add `node_id` as arguments and pass to `RayEvent`
For CoreWorker
- `src/ray/core_worker/`
- Passing the `node_id` to the `RayEvent`
Python side
- `python/ray/_private/node.py`
- Passing `node_id` when starting gcs server
## Related issues
Closes ray-project#58879
## Additional information
### Testing process
1. export env var:
- `RAY_enable_core_worker_ray_event_to_aggregator=1`
-
`RAY_DASHBOARD_AGGREGATOR_AGENT_EVENTS_EXPORT_ADDR="http://localhost:8000"`
2. `ray start --head --system-config='{"enable_ray_event":true}'`
3. Submit simple job `ray job submit -- python rayjob.py`. E.g.
```py
import ray
@ray.remote
def hello_world():
return "hello world"
ray.init()
print(ray.get(hello_world.remote()))
```
4. Run event listener (script below) to start listening the event export
host `python event_listener.py`
```py
import http.server
import socketserver
import json
import logging
PORT = 8000
class EventReceiver(http.server.SimpleHTTPRequestHandler):
def do_POST(self):
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
print(json.loads(post_data.decode('utf-8')))
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps({"status": "success", "message": "Event received"}).encode('utf-8'))
if __name__ == "__main__":
with socketserver.TCPServer(("", PORT), EventReceiver) as httpd:
print(f"Serving event listener on http://localhost:{PORT}")
print("Set RAY_DASHBOARD_AGGREGATOR_AGENT_EVENTS_EXPORT_ADDR to this address.")
httpd.serve_forever()
```
Will get the event as below:
- GCS event:
```json
[
{
"eventId":"A+yrzknbyDjQALBvaimTPcyZWll9Te3Tw+FEnQ==",
"sourceType":"GCS",
"eventType":"DRIVER_JOB_LIFECYCLE_EVENT",
"timestamp":"2025-12-07T10: 54: 12.621560Z",
"severity":"INFO",
"sessionName":"session_2025-12-07_17-33-33_853835_27993",
"driverJobLifecycleEvent":{
"jobId":"BAAAAA==",
"stateTransitions":[
{
"state":"FINISHED",
"timestamp":"2025-12-07T10: 54: 12.621562Z"
}
]
},
"nodeId":"k4hj3FDLYStB38nSSRZQRwOjEV32EoAjQe3KPw==", // <- nodeId set
"message":""
}
]
```
- CoreWorker event:
```json
[
{
"eventId":"TIAp8D4NwN/ne3VhPHQ0QnsBCYSkZmOUWoe6zQ==",
"sourceType":"CORE_WORKER",
"eventType":"TASK_DEFINITION_EVENT",
"timestamp":"2025-12-07T10:54:12.025967Z",
"severity":"INFO",
"sessionName":"session_2025-12-07_17-33-33_853835_27993",
"taskDefinitionEvent":{
"taskId":"yoDzqOi6LlD///////////////8EAAAA",
"taskFunc":{
"pythonFunctionDescriptor":{
"moduleName":"rayjob",
"functionName":"hello_world",
"functionHash":"a37aacc3b7884c2da4aec32db6151d65",
"className":""
}
},
"taskName":"hello_world",
"requiredResources":{
"CPU":1.0
},
"jobId":"BAAAAA==",
"parentTaskId":"//////////////////////////8EAAAA",
"placementGroupId":"////////////////////////",
"serializedRuntimeEnv":"{}",
"taskAttempt":0,
"taskType":"NORMAL_TASK",
"language":"PYTHON",
"refIds":{
}
},
"nodeId":"k4hj3FDLYStB38nSSRZQRwOjEV32EoAjQe3KPw==", // <- nodeId set here
"message":""
}
]
```
---------
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: Future-Outlier <eric901201@gmail.com>
Co-authored-by: Future-Outlier <eric901201@gmail.com>
Co-authored-by: Mengjin Yan <mengjinyan3@gmail.com>
## Description
When the Ray process unexpected terminated and node id changed, we
cannot know the previous and current node id through event as we did not
include `node_id` in the base event.
This feature is needed by the history server for the Ray cluster. When
the Ray process unexpected terminated, we have to flush the events
generated by the previous node. If the ray process was restarted fast,
it is difficult to know which events are generated by the previous node.
This PR add the `node_id` into the base event showing where the event is
being emitted.
### Main changes
- `src/ray/protobuf/public/events_base_event.proto`
- Add node id to the base event proto (`RayEvent`)
For GCS:
- `src/ray/gcs/gcs_server_main.cc`
- add `--node_id` as cli args
- `src/ray/observability/` and `src/ray/gcs/` (some files)
- Add `node_id` as arguments and pass to `RayEvent`
For CoreWorker
- `src/ray/core_worker/`
- Passing the `node_id` to the `RayEvent`
Python side
- `python/ray/_private/node.py`
- Passing `node_id` when starting gcs server
## Related issues
Closes ray-project#58879
## Additional information
### Testing process
1. export env var:
- `RAY_enable_core_worker_ray_event_to_aggregator=1`
-
`RAY_DASHBOARD_AGGREGATOR_AGENT_EVENTS_EXPORT_ADDR="http://localhost:8000"`
2. `ray start --head --system-config='{"enable_ray_event":true}'`
3. Submit simple job `ray job submit -- python rayjob.py`. E.g.
```py
import ray
@ray.remote
def hello_world():
return "hello world"
ray.init()
print(ray.get(hello_world.remote()))
```
4. Run event listener (script below) to start listening the event export
host `python event_listener.py`
```py
import http.server
import socketserver
import json
import logging
PORT = 8000
class EventReceiver(http.server.SimpleHTTPRequestHandler):
def do_POST(self):
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
print(json.loads(post_data.decode('utf-8')))
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps({"status": "success", "message": "Event received"}).encode('utf-8'))
if __name__ == "__main__":
with socketserver.TCPServer(("", PORT), EventReceiver) as httpd:
print(f"Serving event listener on http://localhost:{PORT}")
print("Set RAY_DASHBOARD_AGGREGATOR_AGENT_EVENTS_EXPORT_ADDR to this address.")
httpd.serve_forever()
```
Will get the event as below:
- GCS event:
```json
[
{
"eventId":"A+yrzknbyDjQALBvaimTPcyZWll9Te3Tw+FEnQ==",
"sourceType":"GCS",
"eventType":"DRIVER_JOB_LIFECYCLE_EVENT",
"timestamp":"2025-12-07T10: 54: 12.621560Z",
"severity":"INFO",
"sessionName":"session_2025-12-07_17-33-33_853835_27993",
"driverJobLifecycleEvent":{
"jobId":"BAAAAA==",
"stateTransitions":[
{
"state":"FINISHED",
"timestamp":"2025-12-07T10: 54: 12.621562Z"
}
]
},
"nodeId":"k4hj3FDLYStB38nSSRZQRwOjEV32EoAjQe3KPw==", // <- nodeId set
"message":""
}
]
```
- CoreWorker event:
```json
[
{
"eventId":"TIAp8D4NwN/ne3VhPHQ0QnsBCYSkZmOUWoe6zQ==",
"sourceType":"CORE_WORKER",
"eventType":"TASK_DEFINITION_EVENT",
"timestamp":"2025-12-07T10:54:12.025967Z",
"severity":"INFO",
"sessionName":"session_2025-12-07_17-33-33_853835_27993",
"taskDefinitionEvent":{
"taskId":"yoDzqOi6LlD///////////////8EAAAA",
"taskFunc":{
"pythonFunctionDescriptor":{
"moduleName":"rayjob",
"functionName":"hello_world",
"functionHash":"a37aacc3b7884c2da4aec32db6151d65",
"className":""
}
},
"taskName":"hello_world",
"requiredResources":{
"CPU":1.0
},
"jobId":"BAAAAA==",
"parentTaskId":"//////////////////////////8EAAAA",
"placementGroupId":"////////////////////////",
"serializedRuntimeEnv":"{}",
"taskAttempt":0,
"taskType":"NORMAL_TASK",
"language":"PYTHON",
"refIds":{
}
},
"nodeId":"k4hj3FDLYStB38nSSRZQRwOjEV32EoAjQe3KPw==", // <- nodeId set here
"message":""
}
]
```
---------
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: Future-Outlier <eric901201@gmail.com>
Co-authored-by: Future-Outlier <eric901201@gmail.com>
Co-authored-by: Mengjin Yan <mengjinyan3@gmail.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
## Description
When the Ray process unexpected terminated and node id changed, we
cannot know the previous and current node id through event as we did not
include `node_id` in the base event.
This feature is needed by the history server for the Ray cluster. When
the Ray process unexpected terminated, we have to flush the events
generated by the previous node. If the ray process was restarted fast,
it is difficult to know which events are generated by the previous node.
This PR add the `node_id` into the base event showing where the event is
being emitted.
### Main changes
- `src/ray/protobuf/public/events_base_event.proto`
- Add node id to the base event proto (`RayEvent`)
For GCS:
- `src/ray/gcs/gcs_server_main.cc`
- add `--node_id` as cli args
- `src/ray/observability/` and `src/ray/gcs/` (some files)
- Add `node_id` as arguments and pass to `RayEvent`
For CoreWorker
- `src/ray/core_worker/`
- Passing the `node_id` to the `RayEvent`
Python side
- `python/ray/_private/node.py`
- Passing `node_id` when starting gcs server
## Related issues
Closes ray-project#58879
## Additional information
### Testing process
1. export env var:
- `RAY_enable_core_worker_ray_event_to_aggregator=1`
-
`RAY_DASHBOARD_AGGREGATOR_AGENT_EVENTS_EXPORT_ADDR="http://localhost:8000"`
2. `ray start --head --system-config='{"enable_ray_event":true}'`
3. Submit simple job `ray job submit -- python rayjob.py`. E.g.
```py
import ray
@ray.remote
def hello_world():
return "hello world"
ray.init()
print(ray.get(hello_world.remote()))
```
4. Run event listener (script below) to start listening the event export
host `python event_listener.py`
```py
import http.server
import socketserver
import json
import logging
PORT = 8000
class EventReceiver(http.server.SimpleHTTPRequestHandler):
def do_POST(self):
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
print(json.loads(post_data.decode('utf-8')))
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps({"status": "success", "message": "Event received"}).encode('utf-8'))
if __name__ == "__main__":
with socketserver.TCPServer(("", PORT), EventReceiver) as httpd:
print(f"Serving event listener on http://localhost:{PORT}")
print("Set RAY_DASHBOARD_AGGREGATOR_AGENT_EVENTS_EXPORT_ADDR to this address.")
httpd.serve_forever()
```
Will get the event as below:
- GCS event:
```json
[
{
"eventId":"A+yrzknbyDjQALBvaimTPcyZWll9Te3Tw+FEnQ==",
"sourceType":"GCS",
"eventType":"DRIVER_JOB_LIFECYCLE_EVENT",
"timestamp":"2025-12-07T10: 54: 12.621560Z",
"severity":"INFO",
"sessionName":"session_2025-12-07_17-33-33_853835_27993",
"driverJobLifecycleEvent":{
"jobId":"BAAAAA==",
"stateTransitions":[
{
"state":"FINISHED",
"timestamp":"2025-12-07T10: 54: 12.621562Z"
}
]
},
"nodeId":"k4hj3FDLYStB38nSSRZQRwOjEV32EoAjQe3KPw==", // <- nodeId set
"message":""
}
]
```
- CoreWorker event:
```json
[
{
"eventId":"TIAp8D4NwN/ne3VhPHQ0QnsBCYSkZmOUWoe6zQ==",
"sourceType":"CORE_WORKER",
"eventType":"TASK_DEFINITION_EVENT",
"timestamp":"2025-12-07T10:54:12.025967Z",
"severity":"INFO",
"sessionName":"session_2025-12-07_17-33-33_853835_27993",
"taskDefinitionEvent":{
"taskId":"yoDzqOi6LlD///////////////8EAAAA",
"taskFunc":{
"pythonFunctionDescriptor":{
"moduleName":"rayjob",
"functionName":"hello_world",
"functionHash":"a37aacc3b7884c2da4aec32db6151d65",
"className":""
}
},
"taskName":"hello_world",
"requiredResources":{
"CPU":1.0
},
"jobId":"BAAAAA==",
"parentTaskId":"//////////////////////////8EAAAA",
"placementGroupId":"////////////////////////",
"serializedRuntimeEnv":"{}",
"taskAttempt":0,
"taskType":"NORMAL_TASK",
"language":"PYTHON",
"refIds":{
}
},
"nodeId":"k4hj3FDLYStB38nSSRZQRwOjEV32EoAjQe3KPw==", // <- nodeId set here
"message":""
}
]
```
---------
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: Future-Outlier <eric901201@gmail.com>
Co-authored-by: Future-Outlier <eric901201@gmail.com>
Co-authored-by: Mengjin Yan <mengjinyan3@gmail.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
…stead of just executor node (#61478) ## Description [pr](#59242) incorrectly set TaskLifecycleEvent.node_id to submitter node_id in all cases except when task_status=SUBMITTED_TO_WORKER (when task_status = SUBMITTED_TO_WORKER, the node_id gets overwritten to executor node_id). in this pr the bug is fixed and a test is added to verify the expected behaviour. ## Related issues fixes #61474 --------- Signed-off-by: sampan <sampan@anyscale.com> Co-authored-by: sampan <sampan@anyscale.com>
Description
When the Ray process unexpected terminated and node id changed, we cannot know the previous and current node id through event as we did not include
node_idin the base event.This feature is needed by the history server for the Ray cluster. When the Ray process unexpected terminated, we have to flush the events generated by the previous node. If the ray process was restarted fast, it is difficult to know which events are generated by the previous node.
This PR add the
node_idinto the base event showing where the event is being emitted.Main changes
src/ray/protobuf/public/events_base_event.protoRayEvent)For GCS:
src/ray/gcs/gcs_server_main.cc--node_idas cli argssrc/ray/observability/andsrc/ray/gcs/(some files)node_idas arguments and pass toRayEventFor CoreWorker
src/ray/core_worker/node_idto theRayEventPython side
python/ray/_private/node.pynode_idwhen starting gcs serverRelated issues
Closes #58879
Additional information
Testing process
RAY_enable_core_worker_ray_event_to_aggregator=1RAY_DASHBOARD_AGGREGATOR_AGENT_EVENTS_EXPORT_ADDR="http://localhost:8000"ray start --head --system-config='{"enable_ray_event":true}'ray job submit -- python rayjob.py. E.g.python event_listener.pyWill get the event as below:
[ { "eventId":"A+yrzknbyDjQALBvaimTPcyZWll9Te3Tw+FEnQ==", "sourceType":"GCS", "eventType":"DRIVER_JOB_LIFECYCLE_EVENT", "timestamp":"2025-12-07T10: 54: 12.621560Z", "severity":"INFO", "sessionName":"session_2025-12-07_17-33-33_853835_27993", "driverJobLifecycleEvent":{ "jobId":"BAAAAA==", "stateTransitions":[ { "state":"FINISHED", "timestamp":"2025-12-07T10: 54: 12.621562Z" } ] }, "nodeId":"k4hj3FDLYStB38nSSRZQRwOjEV32EoAjQe3KPw==", // <- nodeId set "message":"" } ][ { "eventId":"TIAp8D4NwN/ne3VhPHQ0QnsBCYSkZmOUWoe6zQ==", "sourceType":"CORE_WORKER", "eventType":"TASK_DEFINITION_EVENT", "timestamp":"2025-12-07T10:54:12.025967Z", "severity":"INFO", "sessionName":"session_2025-12-07_17-33-33_853835_27993", "taskDefinitionEvent":{ "taskId":"yoDzqOi6LlD///////////////8EAAAA", "taskFunc":{ "pythonFunctionDescriptor":{ "moduleName":"rayjob", "functionName":"hello_world", "functionHash":"a37aacc3b7884c2da4aec32db6151d65", "className":"" } }, "taskName":"hello_world", "requiredResources":{ "CPU":1.0 }, "jobId":"BAAAAA==", "parentTaskId":"//////////////////////////8EAAAA", "placementGroupId":"////////////////////////", "serializedRuntimeEnv":"{}", "taskAttempt":0, "taskType":"NORMAL_TASK", "language":"PYTHON", "refIds":{ } }, "nodeId":"k4hj3FDLYStB38nSSRZQRwOjEV32EoAjQe3KPw==", // <- nodeId set here "message":"" } ]