Skip to content

[Core] Adding the node id to the base event#59242

Merged
edoakes merged 35 commits intoray-project:masterfrom
machichima:add-node-id-to-base-event
Jan 4, 2026
Merged

[Core] Adding the node id to the base event#59242
edoakes merged 35 commits intoray-project:masterfrom
machichima:add-node-id-to-base-event

Conversation

@machichima
Copy link
Contributor

@machichima machichima commented Dec 7, 2025

Description

When the Ray process unexpected terminated and node id changed, we cannot know the previous and current node id through event as we did not include node_id in the base event.

This feature is needed by the history server for the Ray cluster. When the Ray process unexpected terminated, we have to flush the events generated by the previous node. If the ray process was restarted fast, it is difficult to know which events are generated by the previous node.

This PR add the node_id into the base event showing where the event is being emitted.

Main changes

  • src/ray/protobuf/public/events_base_event.proto
    • Add node id to the base event proto (RayEvent)

For GCS:

  • src/ray/gcs/gcs_server_main.cc
    • add --node_id as cli args
  • src/ray/observability/ and src/ray/gcs/ (some files)
    • Add node_id as arguments and pass to RayEvent

For CoreWorker

  • src/ray/core_worker/
    • Passing the node_id to the RayEvent

Python side

  • python/ray/_private/node.py
    • Passing node_id when starting gcs server

Related issues

Closes #58879

Additional information

Testing process

  1. export env var:
    • RAY_enable_core_worker_ray_event_to_aggregator=1
    • RAY_DASHBOARD_AGGREGATOR_AGENT_EVENTS_EXPORT_ADDR="http://localhost:8000"
  2. ray start --head --system-config='{"enable_ray_event":true}'
  3. Submit simple job ray job submit -- python rayjob.py. E.g.
import ray

@ray.remote
def hello_world():
    return "hello world"

ray.init()
print(ray.get(hello_world.remote()))
  1. Run event listener (script below) to start listening the event export host python event_listener.py
import http.server
import socketserver
import json
import logging

PORT = 8000

class EventReceiver(http.server.SimpleHTTPRequestHandler):
    def do_POST(self):
        content_length = int(self.headers['Content-Length'])
        post_data = self.rfile.read(content_length)
        print(json.loads(post_data.decode('utf-8')))
        self.send_response(200)
        self.send_header('Content-type', 'application/json')
        self.end_headers()
        self.wfile.write(json.dumps({"status": "success", "message": "Event received"}).encode('utf-8'))


if __name__ == "__main__":
    with socketserver.TCPServer(("", PORT), EventReceiver) as httpd:
        print(f"Serving event listener on http://localhost:{PORT}")
        print("Set RAY_DASHBOARD_AGGREGATOR_AGENT_EVENTS_EXPORT_ADDR to this address.")
        httpd.serve_forever()

Will get the event as below:

  • GCS event:
[
   {
      "eventId":"A+yrzknbyDjQALBvaimTPcyZWll9Te3Tw+FEnQ==",
      "sourceType":"GCS",
      "eventType":"DRIVER_JOB_LIFECYCLE_EVENT",
      "timestamp":"2025-12-07T10: 54: 12.621560Z",
      "severity":"INFO",
      "sessionName":"session_2025-12-07_17-33-33_853835_27993",
      "driverJobLifecycleEvent":{
         "jobId":"BAAAAA==",
         "stateTransitions":[
            {
               "state":"FINISHED",
               "timestamp":"2025-12-07T10: 54: 12.621562Z"
            }
         ]
      },
      "nodeId":"k4hj3FDLYStB38nSSRZQRwOjEV32EoAjQe3KPw==",   // <- nodeId set
      "message":""
   }
]
  • CoreWorker event:
[
   {
      "eventId":"TIAp8D4NwN/ne3VhPHQ0QnsBCYSkZmOUWoe6zQ==",
      "sourceType":"CORE_WORKER",
      "eventType":"TASK_DEFINITION_EVENT",
      "timestamp":"2025-12-07T10:54:12.025967Z",
      "severity":"INFO",
      "sessionName":"session_2025-12-07_17-33-33_853835_27993",
      "taskDefinitionEvent":{
         "taskId":"yoDzqOi6LlD///////////////8EAAAA",
         "taskFunc":{
            "pythonFunctionDescriptor":{
               "moduleName":"rayjob",
               "functionName":"hello_world",
               "functionHash":"a37aacc3b7884c2da4aec32db6151d65",
               "className":""
            }
         },
         "taskName":"hello_world",
         "requiredResources":{
            "CPU":1.0
         },
         "jobId":"BAAAAA==",
         "parentTaskId":"//////////////////////////8EAAAA",
         "placementGroupId":"////////////////////////",
         "serializedRuntimeEnv":"{}",
         "taskAttempt":0,
         "taskType":"NORMAL_TASK",
         "language":"PYTHON",
         "refIds":{
            
         }
      },
      "nodeId":"k4hj3FDLYStB38nSSRZQRwOjEV32EoAjQe3KPw==",   // <- nodeId set here
      "message":""
   }
]

Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
@machichima machichima requested review from a team, edoakes and jjyao as code owners December 7, 2025 10:13
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request adds the node_id to the base event, which is a valuable addition for observability, especially in scenarios involving unexpected process termination. The changes are extensive, plumbing the node_id through various layers of the system, from Python service startup scripts to the core C++ components and protobuf definitions. The implementation appears solid and consistent. I've made a few minor suggestions to improve code robustness by using const for member variables that are initialized once and never modified. Overall, this is a well-executed and important change.

Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
@ray-gardener ray-gardener bot added core Issues that should be addressed in Ray Core observability Issues related to the Ray Dashboard, Logging, Metrics, Tracing, and/or Profiling community-contribution Contributed by the community labels Dec 7, 2025
Signed-off-by: machichima <nary12321@gmail.com>
@machichima
Copy link
Contributor Author

For most of the test changes, I just pass in the node_id to the Event class constructor. Added the assertion for checking if node_id is set correctly in src/ray/gcs/tests/gcs_node_manager_test.cc

Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
@machichima machichima requested a review from a team as a code owner December 8, 2025 11:01
Signed-off-by: Future-Outlier <eric901201@gmail.com>
std::make_unique<rpc::EventAggregatorClientImpl>(0, *client_call_manager_),
"test_session");
"test_session",
NodeID::Nil());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @machichima
why some tests use NodeID::Nil()), but some tests use NodeID::FromRandom())?
Can we unify them?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes sure. I will make tests use NodeID::Nil() and only use NodeID::FromRandom() in src/ray/gcs/tests/gcs_node_manager_test.cc that has the assertion check for the node_id

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in cf2f8d1

Signed-off-by: machichima <nary12321@gmail.com>
@machichima machichima force-pushed the add-node-id-to-base-event branch from d785d51 to cf2f8d1 Compare December 9, 2025 10:57
@Future-Outlier Future-Outlier added the go add ONLY when ready to merge, run all tests label Dec 9, 2025
Signed-off-by: Future-Outlier <eric901201@gmail.com>
@Future-Outlier
Copy link
Member

cc @MengjinYan to review or ping others who have expertise if you have time, this is related to the history server's log collector

Copy link
Contributor

@MengjinYan MengjinYan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All nit comments. Thanks for the effort!

To make sure correctness, you might also want to update the following test files to verify the node id as well:

  • test_ray_event_export_task_events.py
  • test_ray_actor_events.py
  • test_ray_node_events.py
  • test_ray_job_event.py

std::make_unique<MockEventAggregatorClient>(),
"test_session_name");
"test_session_name",
NodeID::Nil());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Probably we should use a valid the node id to test whether the node id can be populated correctly in task event.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this test is testing the Export API, which writes rpc::ExportEvent to log files and I think it's separate from the RayEvent that I modified. I put the NodeID::Nil() here as TaskEventBuffer requires a node_id parameter in its constructor

RayNodeDefinitionEvent::RayNodeDefinitionEvent(const rpc::GcsNodeInfo &data,
const std::string &session_name)
const std::string &session_name,
const NodeID &node_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid confusion, we need to add comment saying that this is the gcs node id but not the node id from the GcsNodeInfo data or renaming the parameter name to be local_node_id.

Similar comment to the lifecycle event as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you! Renamed in 6fa2e00

ray::rpc::events::RayEvent::Severity severity_;
std::string message_;
std::string session_name_;
const std::string node_id_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: I think it's better to make the node_id_ with type NodeID and call node_id_.Binary() during serialization to be less confusion and it also moves the conversion to the non critical path. Or is there any reason why we should make it a string?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for pointing this out! This is the mistake on my side, changing it to NodeID type in d44563a

(*data.mutable_label_selector())["tier"] = "prod";

auto event = std::make_unique<RayActorDefinitionEvent>(data, "test_session_name");
auto event =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should use a real node id for the tests related to events and make sure the node ids are set correctly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you! Just want to confirm if I should modify all tests that related to event to use NodeID::FromRandom() and assert if it's set correctly? Or is it just needed for this test file?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think maybe you can hardcode a node id for testing?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think anything that is not Nil should be fine. Can either be NodeID::FromRandom() or a hardcoded value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modify tests in: 06220f8

Also modified for:

  • src/ray/observability/tests/ray_actor_lifecycle_event_test.cc
  • src/ray/observability/tests/ray_driver_job_lifecycle_event_test.cc

// Init GCS table storage. Note this is on the default io context, not the one with
// GcsInternalKVManager, to avoid congestion on the latter.
RAY_LOG(INFO) << "GCS storage type is " << storage_type_;
if (!gcs_node_id_.IsNil()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think we should log the line no matter if the gcs node id value is Nil or not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem! Modified in 21eac30

machichima and others added 4 commits December 17, 2025 21:02
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
@machichima machichima requested a review from jjyao December 18, 2025 11:28
@Future-Outlier
Copy link
Member

Hi, @machichima can you solve the merge conflict?

}
for (auto &event : grouped_events) {
rpc::events::RayEvent ray_event = std::move(*event).Serialize();
// Set node_id centrally for all events
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! I think it's better to put the logic in the AddEvents function so that the event constructions will all when we add the event to the buffer.

Copy link
Contributor Author

@machichima machichima Dec 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to move ray_event.set_node_id(node_id_.Binary()) to RayEventRecorder::AddEvents, we need to add node_id to RayEvent constructor. This requires a lot more changes for the classes that inherit from RayEvent (e.g. RayActorLifecycleEvent, RayActorDefinitionEvent, ...), like what we did before.

Current approach set the node_id in RayEventRecorder::ExportEvents. It's adding node_id to protobuf directly, so we do not need to change the constructor, making code much cleaner.

I think it's better to keep the current approach. WDYT?

Thank you!

…to-base-event

Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
@machichima machichima force-pushed the add-node-id-to-base-event branch from 013a298 to eabbbda Compare December 28, 2025 06:08
@MengjinYan
Copy link
Contributor

Please let me know when the comment is fixed. I'll do a final review on the PR. Thanks!

@Future-Outlier
Copy link
Member

cc @dayshah to merge if you are not in vacation, thank you! < 3

Copy link
Contributor

@dayshah dayshah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one super nit

explicit TaskEvent(TaskID task_id,
JobID job_id,
int32_t attempt_number,
const NodeID &node_id = NodeID::Nil());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this is always set by its subclasses, does this need a default?

Suggested change
const NodeID &node_id = NodeID::Nil());
const NodeID &node_id);

@dayshah
Copy link
Contributor

dayshah commented Jan 3, 2026

I think @edoakes needs to merge this one because codeowner on one of these files

@edoakes edoakes merged commit 6f4121d into ray-project:master Jan 4, 2026
6 checks passed
AYou0207 pushed a commit to AYou0207/ray that referenced this pull request Jan 13, 2026
## Description

When the Ray process unexpected terminated and node id changed, we
cannot know the previous and current node id through event as we did not
include `node_id` in the base event.

This feature is needed by the history server for the Ray cluster. When
the Ray process unexpected terminated, we have to flush the events
generated by the previous node. If the ray process was restarted fast,
it is difficult to know which events are generated by the previous node.

This PR add the `node_id` into the base event showing where the event is
being emitted.

### Main changes

- `src/ray/protobuf/public/events_base_event.proto`
    - Add node id to the base event proto (`RayEvent`)

For GCS:

- `src/ray/gcs/gcs_server_main.cc`
    - add `--node_id` as cli args
- `src/ray/observability/` and `src/ray/gcs/` (some files)
    - Add `node_id` as arguments and pass to `RayEvent`

For CoreWorker

- `src/ray/core_worker/`
    - Passing the `node_id` to the `RayEvent`

Python side

- `python/ray/_private/node.py`
    - Passing `node_id` when starting gcs server

## Related issues

Closes ray-project#58879

## Additional information

### Testing process

1. export env var:
    - `RAY_enable_core_worker_ray_event_to_aggregator=1`
-
`RAY_DASHBOARD_AGGREGATOR_AGENT_EVENTS_EXPORT_ADDR="http://localhost:8000"`
2. `ray start --head --system-config='{"enable_ray_event":true}'`
3. Submit simple job `ray job submit -- python rayjob.py`. E.g.

```py
import ray

@ray.remote
def hello_world():
    return "hello world"

ray.init()
print(ray.get(hello_world.remote()))
```

4. Run event listener (script below) to start listening the event export
host `python event_listener.py`

```py
import http.server
import socketserver
import json
import logging

PORT = 8000

class EventReceiver(http.server.SimpleHTTPRequestHandler):
    def do_POST(self):
        content_length = int(self.headers['Content-Length'])
        post_data = self.rfile.read(content_length)
        print(json.loads(post_data.decode('utf-8')))
        self.send_response(200)
        self.send_header('Content-type', 'application/json')
        self.end_headers()
        self.wfile.write(json.dumps({"status": "success", "message": "Event received"}).encode('utf-8'))

if __name__ == "__main__":
    with socketserver.TCPServer(("", PORT), EventReceiver) as httpd:
        print(f"Serving event listener on http://localhost:{PORT}")
        print("Set RAY_DASHBOARD_AGGREGATOR_AGENT_EVENTS_EXPORT_ADDR to this address.")
        httpd.serve_forever()
```

Will get the event as below:

- GCS event:

```json
[
   {
      "eventId":"A+yrzknbyDjQALBvaimTPcyZWll9Te3Tw+FEnQ==",
      "sourceType":"GCS",
      "eventType":"DRIVER_JOB_LIFECYCLE_EVENT",
      "timestamp":"2025-12-07T10: 54: 12.621560Z",
      "severity":"INFO",
      "sessionName":"session_2025-12-07_17-33-33_853835_27993",
      "driverJobLifecycleEvent":{
         "jobId":"BAAAAA==",
         "stateTransitions":[
            {
               "state":"FINISHED",
               "timestamp":"2025-12-07T10: 54: 12.621562Z"
            }
         ]
      },
      "nodeId":"k4hj3FDLYStB38nSSRZQRwOjEV32EoAjQe3KPw==",   // <- nodeId set
      "message":""
   }
]
```

- CoreWorker event:

```json
[
   {
      "eventId":"TIAp8D4NwN/ne3VhPHQ0QnsBCYSkZmOUWoe6zQ==",
      "sourceType":"CORE_WORKER",
      "eventType":"TASK_DEFINITION_EVENT",
      "timestamp":"2025-12-07T10:54:12.025967Z",
      "severity":"INFO",
      "sessionName":"session_2025-12-07_17-33-33_853835_27993",
      "taskDefinitionEvent":{
         "taskId":"yoDzqOi6LlD///////////////8EAAAA",
         "taskFunc":{
            "pythonFunctionDescriptor":{
               "moduleName":"rayjob",
               "functionName":"hello_world",
               "functionHash":"a37aacc3b7884c2da4aec32db6151d65",
               "className":""
            }
         },
         "taskName":"hello_world",
         "requiredResources":{
            "CPU":1.0
         },
         "jobId":"BAAAAA==",
         "parentTaskId":"//////////////////////////8EAAAA",
         "placementGroupId":"////////////////////////",
         "serializedRuntimeEnv":"{}",
         "taskAttempt":0,
         "taskType":"NORMAL_TASK",
         "language":"PYTHON",
         "refIds":{

         }
      },
      "nodeId":"k4hj3FDLYStB38nSSRZQRwOjEV32EoAjQe3KPw==",   // <- nodeId set here
      "message":""
   }
]
```

---------

Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: Future-Outlier <eric901201@gmail.com>
Co-authored-by: Future-Outlier <eric901201@gmail.com>
Co-authored-by: Mengjin Yan <mengjinyan3@gmail.com>
Signed-off-by: jasonwrwang <jasonwrwang@tencent.com>
win5923 pushed a commit to win5923/ray that referenced this pull request Jan 14, 2026
## Description

When the Ray process unexpected terminated and node id changed, we
cannot know the previous and current node id through event as we did not
include `node_id` in the base event.

This feature is needed by the history server for the Ray cluster. When
the Ray process unexpected terminated, we have to flush the events
generated by the previous node. If the ray process was restarted fast,
it is difficult to know which events are generated by the previous node.

This PR add the `node_id` into the base event showing where the event is
being emitted.

### Main changes

- `src/ray/protobuf/public/events_base_event.proto`
    - Add node id to the base event proto (`RayEvent`)

For GCS:

- `src/ray/gcs/gcs_server_main.cc`
    - add `--node_id` as cli args
- `src/ray/observability/` and `src/ray/gcs/` (some files)
    - Add `node_id` as arguments and pass to `RayEvent`

For CoreWorker

- `src/ray/core_worker/`
    - Passing the `node_id` to the `RayEvent`

Python side

- `python/ray/_private/node.py`
    - Passing `node_id` when starting gcs server

## Related issues

Closes ray-project#58879

## Additional information

### Testing process

1. export env var:
    - `RAY_enable_core_worker_ray_event_to_aggregator=1`
-
`RAY_DASHBOARD_AGGREGATOR_AGENT_EVENTS_EXPORT_ADDR="http://localhost:8000"`
2. `ray start --head --system-config='{"enable_ray_event":true}'`
3. Submit simple job `ray job submit -- python rayjob.py`. E.g.

```py
import ray

@ray.remote
def hello_world():
    return "hello world"

ray.init()
print(ray.get(hello_world.remote()))
```

4. Run event listener (script below) to start listening the event export
host `python event_listener.py`

```py
import http.server
import socketserver
import json
import logging

PORT = 8000

class EventReceiver(http.server.SimpleHTTPRequestHandler):
    def do_POST(self):
        content_length = int(self.headers['Content-Length'])
        post_data = self.rfile.read(content_length)
        print(json.loads(post_data.decode('utf-8')))
        self.send_response(200)
        self.send_header('Content-type', 'application/json')
        self.end_headers()
        self.wfile.write(json.dumps({"status": "success", "message": "Event received"}).encode('utf-8'))

if __name__ == "__main__":
    with socketserver.TCPServer(("", PORT), EventReceiver) as httpd:
        print(f"Serving event listener on http://localhost:{PORT}")
        print("Set RAY_DASHBOARD_AGGREGATOR_AGENT_EVENTS_EXPORT_ADDR to this address.")
        httpd.serve_forever()
```

Will get the event as below:

- GCS event:

```json
[
   {
      "eventId":"A+yrzknbyDjQALBvaimTPcyZWll9Te3Tw+FEnQ==",
      "sourceType":"GCS",
      "eventType":"DRIVER_JOB_LIFECYCLE_EVENT",
      "timestamp":"2025-12-07T10: 54: 12.621560Z",
      "severity":"INFO",
      "sessionName":"session_2025-12-07_17-33-33_853835_27993",
      "driverJobLifecycleEvent":{
         "jobId":"BAAAAA==",
         "stateTransitions":[
            {
               "state":"FINISHED",
               "timestamp":"2025-12-07T10: 54: 12.621562Z"
            }
         ]
      },
      "nodeId":"k4hj3FDLYStB38nSSRZQRwOjEV32EoAjQe3KPw==",   // <- nodeId set
      "message":""
   }
]
```

- CoreWorker event:

```json
[
   {
      "eventId":"TIAp8D4NwN/ne3VhPHQ0QnsBCYSkZmOUWoe6zQ==",
      "sourceType":"CORE_WORKER",
      "eventType":"TASK_DEFINITION_EVENT",
      "timestamp":"2025-12-07T10:54:12.025967Z",
      "severity":"INFO",
      "sessionName":"session_2025-12-07_17-33-33_853835_27993",
      "taskDefinitionEvent":{
         "taskId":"yoDzqOi6LlD///////////////8EAAAA",
         "taskFunc":{
            "pythonFunctionDescriptor":{
               "moduleName":"rayjob",
               "functionName":"hello_world",
               "functionHash":"a37aacc3b7884c2da4aec32db6151d65",
               "className":""
            }
         },
         "taskName":"hello_world",
         "requiredResources":{
            "CPU":1.0
         },
         "jobId":"BAAAAA==",
         "parentTaskId":"//////////////////////////8EAAAA",
         "placementGroupId":"////////////////////////",
         "serializedRuntimeEnv":"{}",
         "taskAttempt":0,
         "taskType":"NORMAL_TASK",
         "language":"PYTHON",
         "refIds":{

         }
      },
      "nodeId":"k4hj3FDLYStB38nSSRZQRwOjEV32EoAjQe3KPw==",   // <- nodeId set here
      "message":""
   }
]
```

---------

Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: Future-Outlier <eric901201@gmail.com>
Co-authored-by: Future-Outlier <eric901201@gmail.com>
Co-authored-by: Mengjin Yan <mengjinyan3@gmail.com>
lee1258561 pushed a commit to pinterest/ray that referenced this pull request Feb 3, 2026
## Description

When the Ray process unexpected terminated and node id changed, we
cannot know the previous and current node id through event as we did not
include `node_id` in the base event.

This feature is needed by the history server for the Ray cluster. When
the Ray process unexpected terminated, we have to flush the events
generated by the previous node. If the ray process was restarted fast,
it is difficult to know which events are generated by the previous node.

This PR add the `node_id` into the base event showing where the event is
being emitted.

### Main changes

- `src/ray/protobuf/public/events_base_event.proto`
    - Add node id to the base event proto (`RayEvent`)

For GCS:

- `src/ray/gcs/gcs_server_main.cc`
    - add `--node_id` as cli args
- `src/ray/observability/` and `src/ray/gcs/` (some files)
    - Add `node_id` as arguments and pass to `RayEvent`

For CoreWorker

- `src/ray/core_worker/`
    - Passing the `node_id` to the `RayEvent`

Python side

- `python/ray/_private/node.py`
    - Passing `node_id` when starting gcs server


## Related issues

Closes ray-project#58879

## Additional information

### Testing process

1. export env var:
    - `RAY_enable_core_worker_ray_event_to_aggregator=1`
-
`RAY_DASHBOARD_AGGREGATOR_AGENT_EVENTS_EXPORT_ADDR="http://localhost:8000"`
2. `ray start --head --system-config='{"enable_ray_event":true}'`
3. Submit simple job `ray job submit -- python rayjob.py`. E.g.

```py
import ray

@ray.remote
def hello_world():
    return "hello world"

ray.init()
print(ray.get(hello_world.remote()))
```

4. Run event listener (script below) to start listening the event export
host `python event_listener.py`

```py
import http.server
import socketserver
import json
import logging

PORT = 8000

class EventReceiver(http.server.SimpleHTTPRequestHandler):
    def do_POST(self):
        content_length = int(self.headers['Content-Length'])
        post_data = self.rfile.read(content_length)
        print(json.loads(post_data.decode('utf-8')))
        self.send_response(200)
        self.send_header('Content-type', 'application/json')
        self.end_headers()
        self.wfile.write(json.dumps({"status": "success", "message": "Event received"}).encode('utf-8'))


if __name__ == "__main__":
    with socketserver.TCPServer(("", PORT), EventReceiver) as httpd:
        print(f"Serving event listener on http://localhost:{PORT}")
        print("Set RAY_DASHBOARD_AGGREGATOR_AGENT_EVENTS_EXPORT_ADDR to this address.")
        httpd.serve_forever()
```

Will get the event as below:

- GCS event:

```json
[
   {
      "eventId":"A+yrzknbyDjQALBvaimTPcyZWll9Te3Tw+FEnQ==",
      "sourceType":"GCS",
      "eventType":"DRIVER_JOB_LIFECYCLE_EVENT",
      "timestamp":"2025-12-07T10: 54: 12.621560Z",
      "severity":"INFO",
      "sessionName":"session_2025-12-07_17-33-33_853835_27993",
      "driverJobLifecycleEvent":{
         "jobId":"BAAAAA==",
         "stateTransitions":[
            {
               "state":"FINISHED",
               "timestamp":"2025-12-07T10: 54: 12.621562Z"
            }
         ]
      },
      "nodeId":"k4hj3FDLYStB38nSSRZQRwOjEV32EoAjQe3KPw==",   // <- nodeId set
      "message":""
   }
]
```

- CoreWorker event:

```json
[
   {
      "eventId":"TIAp8D4NwN/ne3VhPHQ0QnsBCYSkZmOUWoe6zQ==",
      "sourceType":"CORE_WORKER",
      "eventType":"TASK_DEFINITION_EVENT",
      "timestamp":"2025-12-07T10:54:12.025967Z",
      "severity":"INFO",
      "sessionName":"session_2025-12-07_17-33-33_853835_27993",
      "taskDefinitionEvent":{
         "taskId":"yoDzqOi6LlD///////////////8EAAAA",
         "taskFunc":{
            "pythonFunctionDescriptor":{
               "moduleName":"rayjob",
               "functionName":"hello_world",
               "functionHash":"a37aacc3b7884c2da4aec32db6151d65",
               "className":""
            }
         },
         "taskName":"hello_world",
         "requiredResources":{
            "CPU":1.0
         },
         "jobId":"BAAAAA==",
         "parentTaskId":"//////////////////////////8EAAAA",
         "placementGroupId":"////////////////////////",
         "serializedRuntimeEnv":"{}",
         "taskAttempt":0,
         "taskType":"NORMAL_TASK",
         "language":"PYTHON",
         "refIds":{
            
         }
      },
      "nodeId":"k4hj3FDLYStB38nSSRZQRwOjEV32EoAjQe3KPw==",   // <- nodeId set here
      "message":""
   }
]
```

---------

Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: Future-Outlier <eric901201@gmail.com>
Co-authored-by: Future-Outlier <eric901201@gmail.com>
Co-authored-by: Mengjin Yan <mengjinyan3@gmail.com>
ryanaoleary pushed a commit to ryanaoleary/ray that referenced this pull request Feb 3, 2026
## Description

When the Ray process unexpected terminated and node id changed, we
cannot know the previous and current node id through event as we did not
include `node_id` in the base event.

This feature is needed by the history server for the Ray cluster. When
the Ray process unexpected terminated, we have to flush the events
generated by the previous node. If the ray process was restarted fast,
it is difficult to know which events are generated by the previous node.

This PR add the `node_id` into the base event showing where the event is
being emitted.

### Main changes

- `src/ray/protobuf/public/events_base_event.proto`
    - Add node id to the base event proto (`RayEvent`)

For GCS:

- `src/ray/gcs/gcs_server_main.cc`
    - add `--node_id` as cli args
- `src/ray/observability/` and `src/ray/gcs/` (some files)
    - Add `node_id` as arguments and pass to `RayEvent`

For CoreWorker

- `src/ray/core_worker/`
    - Passing the `node_id` to the `RayEvent`

Python side

- `python/ray/_private/node.py`
    - Passing `node_id` when starting gcs server


## Related issues

Closes ray-project#58879

## Additional information

### Testing process

1. export env var:
    - `RAY_enable_core_worker_ray_event_to_aggregator=1`
-
`RAY_DASHBOARD_AGGREGATOR_AGENT_EVENTS_EXPORT_ADDR="http://localhost:8000"`
2. `ray start --head --system-config='{"enable_ray_event":true}'`
3. Submit simple job `ray job submit -- python rayjob.py`. E.g.

```py
import ray

@ray.remote
def hello_world():
    return "hello world"

ray.init()
print(ray.get(hello_world.remote()))
```

4. Run event listener (script below) to start listening the event export
host `python event_listener.py`

```py
import http.server
import socketserver
import json
import logging

PORT = 8000

class EventReceiver(http.server.SimpleHTTPRequestHandler):
    def do_POST(self):
        content_length = int(self.headers['Content-Length'])
        post_data = self.rfile.read(content_length)
        print(json.loads(post_data.decode('utf-8')))
        self.send_response(200)
        self.send_header('Content-type', 'application/json')
        self.end_headers()
        self.wfile.write(json.dumps({"status": "success", "message": "Event received"}).encode('utf-8'))


if __name__ == "__main__":
    with socketserver.TCPServer(("", PORT), EventReceiver) as httpd:
        print(f"Serving event listener on http://localhost:{PORT}")
        print("Set RAY_DASHBOARD_AGGREGATOR_AGENT_EVENTS_EXPORT_ADDR to this address.")
        httpd.serve_forever()
```

Will get the event as below:

- GCS event:

```json
[
   {
      "eventId":"A+yrzknbyDjQALBvaimTPcyZWll9Te3Tw+FEnQ==",
      "sourceType":"GCS",
      "eventType":"DRIVER_JOB_LIFECYCLE_EVENT",
      "timestamp":"2025-12-07T10: 54: 12.621560Z",
      "severity":"INFO",
      "sessionName":"session_2025-12-07_17-33-33_853835_27993",
      "driverJobLifecycleEvent":{
         "jobId":"BAAAAA==",
         "stateTransitions":[
            {
               "state":"FINISHED",
               "timestamp":"2025-12-07T10: 54: 12.621562Z"
            }
         ]
      },
      "nodeId":"k4hj3FDLYStB38nSSRZQRwOjEV32EoAjQe3KPw==",   // <- nodeId set
      "message":""
   }
]
```

- CoreWorker event:

```json
[
   {
      "eventId":"TIAp8D4NwN/ne3VhPHQ0QnsBCYSkZmOUWoe6zQ==",
      "sourceType":"CORE_WORKER",
      "eventType":"TASK_DEFINITION_EVENT",
      "timestamp":"2025-12-07T10:54:12.025967Z",
      "severity":"INFO",
      "sessionName":"session_2025-12-07_17-33-33_853835_27993",
      "taskDefinitionEvent":{
         "taskId":"yoDzqOi6LlD///////////////8EAAAA",
         "taskFunc":{
            "pythonFunctionDescriptor":{
               "moduleName":"rayjob",
               "functionName":"hello_world",
               "functionHash":"a37aacc3b7884c2da4aec32db6151d65",
               "className":""
            }
         },
         "taskName":"hello_world",
         "requiredResources":{
            "CPU":1.0
         },
         "jobId":"BAAAAA==",
         "parentTaskId":"//////////////////////////8EAAAA",
         "placementGroupId":"////////////////////////",
         "serializedRuntimeEnv":"{}",
         "taskAttempt":0,
         "taskType":"NORMAL_TASK",
         "language":"PYTHON",
         "refIds":{
            
         }
      },
      "nodeId":"k4hj3FDLYStB38nSSRZQRwOjEV32EoAjQe3KPw==",   // <- nodeId set here
      "message":""
   }
]
```

---------

Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: Future-Outlier <eric901201@gmail.com>
Co-authored-by: Future-Outlier <eric901201@gmail.com>
Co-authored-by: Mengjin Yan <mengjinyan3@gmail.com>
peterxcli pushed a commit to peterxcli/ray that referenced this pull request Feb 25, 2026
## Description

When the Ray process unexpected terminated and node id changed, we
cannot know the previous and current node id through event as we did not
include `node_id` in the base event.

This feature is needed by the history server for the Ray cluster. When
the Ray process unexpected terminated, we have to flush the events
generated by the previous node. If the ray process was restarted fast,
it is difficult to know which events are generated by the previous node.

This PR add the `node_id` into the base event showing where the event is
being emitted.

### Main changes

- `src/ray/protobuf/public/events_base_event.proto`
    - Add node id to the base event proto (`RayEvent`)

For GCS:

- `src/ray/gcs/gcs_server_main.cc`
    - add `--node_id` as cli args
- `src/ray/observability/` and `src/ray/gcs/` (some files)
    - Add `node_id` as arguments and pass to `RayEvent`

For CoreWorker

- `src/ray/core_worker/`
    - Passing the `node_id` to the `RayEvent`

Python side

- `python/ray/_private/node.py`
    - Passing `node_id` when starting gcs server

## Related issues

Closes ray-project#58879

## Additional information

### Testing process

1. export env var:
    - `RAY_enable_core_worker_ray_event_to_aggregator=1`
-
`RAY_DASHBOARD_AGGREGATOR_AGENT_EVENTS_EXPORT_ADDR="http://localhost:8000"`
2. `ray start --head --system-config='{"enable_ray_event":true}'`
3. Submit simple job `ray job submit -- python rayjob.py`. E.g.

```py
import ray

@ray.remote
def hello_world():
    return "hello world"

ray.init()
print(ray.get(hello_world.remote()))
```

4. Run event listener (script below) to start listening the event export
host `python event_listener.py`

```py
import http.server
import socketserver
import json
import logging

PORT = 8000

class EventReceiver(http.server.SimpleHTTPRequestHandler):
    def do_POST(self):
        content_length = int(self.headers['Content-Length'])
        post_data = self.rfile.read(content_length)
        print(json.loads(post_data.decode('utf-8')))
        self.send_response(200)
        self.send_header('Content-type', 'application/json')
        self.end_headers()
        self.wfile.write(json.dumps({"status": "success", "message": "Event received"}).encode('utf-8'))

if __name__ == "__main__":
    with socketserver.TCPServer(("", PORT), EventReceiver) as httpd:
        print(f"Serving event listener on http://localhost:{PORT}")
        print("Set RAY_DASHBOARD_AGGREGATOR_AGENT_EVENTS_EXPORT_ADDR to this address.")
        httpd.serve_forever()
```

Will get the event as below:

- GCS event:

```json
[
   {
      "eventId":"A+yrzknbyDjQALBvaimTPcyZWll9Te3Tw+FEnQ==",
      "sourceType":"GCS",
      "eventType":"DRIVER_JOB_LIFECYCLE_EVENT",
      "timestamp":"2025-12-07T10: 54: 12.621560Z",
      "severity":"INFO",
      "sessionName":"session_2025-12-07_17-33-33_853835_27993",
      "driverJobLifecycleEvent":{
         "jobId":"BAAAAA==",
         "stateTransitions":[
            {
               "state":"FINISHED",
               "timestamp":"2025-12-07T10: 54: 12.621562Z"
            }
         ]
      },
      "nodeId":"k4hj3FDLYStB38nSSRZQRwOjEV32EoAjQe3KPw==",   // <- nodeId set
      "message":""
   }
]
```

- CoreWorker event:

```json
[
   {
      "eventId":"TIAp8D4NwN/ne3VhPHQ0QnsBCYSkZmOUWoe6zQ==",
      "sourceType":"CORE_WORKER",
      "eventType":"TASK_DEFINITION_EVENT",
      "timestamp":"2025-12-07T10:54:12.025967Z",
      "severity":"INFO",
      "sessionName":"session_2025-12-07_17-33-33_853835_27993",
      "taskDefinitionEvent":{
         "taskId":"yoDzqOi6LlD///////////////8EAAAA",
         "taskFunc":{
            "pythonFunctionDescriptor":{
               "moduleName":"rayjob",
               "functionName":"hello_world",
               "functionHash":"a37aacc3b7884c2da4aec32db6151d65",
               "className":""
            }
         },
         "taskName":"hello_world",
         "requiredResources":{
            "CPU":1.0
         },
         "jobId":"BAAAAA==",
         "parentTaskId":"//////////////////////////8EAAAA",
         "placementGroupId":"////////////////////////",
         "serializedRuntimeEnv":"{}",
         "taskAttempt":0,
         "taskType":"NORMAL_TASK",
         "language":"PYTHON",
         "refIds":{

         }
      },
      "nodeId":"k4hj3FDLYStB38nSSRZQRwOjEV32EoAjQe3KPw==",   // <- nodeId set here
      "message":""
   }
]
```

---------

Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: Future-Outlier <eric901201@gmail.com>
Co-authored-by: Future-Outlier <eric901201@gmail.com>
Co-authored-by: Mengjin Yan <mengjinyan3@gmail.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
peterxcli pushed a commit to peterxcli/ray that referenced this pull request Feb 25, 2026
## Description

When the Ray process unexpected terminated and node id changed, we
cannot know the previous and current node id through event as we did not
include `node_id` in the base event.

This feature is needed by the history server for the Ray cluster. When
the Ray process unexpected terminated, we have to flush the events
generated by the previous node. If the ray process was restarted fast,
it is difficult to know which events are generated by the previous node.

This PR add the `node_id` into the base event showing where the event is
being emitted.

### Main changes

- `src/ray/protobuf/public/events_base_event.proto`
    - Add node id to the base event proto (`RayEvent`)

For GCS:

- `src/ray/gcs/gcs_server_main.cc`
    - add `--node_id` as cli args
- `src/ray/observability/` and `src/ray/gcs/` (some files)
    - Add `node_id` as arguments and pass to `RayEvent`

For CoreWorker

- `src/ray/core_worker/`
    - Passing the `node_id` to the `RayEvent`

Python side

- `python/ray/_private/node.py`
    - Passing `node_id` when starting gcs server

## Related issues

Closes ray-project#58879

## Additional information

### Testing process

1. export env var:
    - `RAY_enable_core_worker_ray_event_to_aggregator=1`
-
`RAY_DASHBOARD_AGGREGATOR_AGENT_EVENTS_EXPORT_ADDR="http://localhost:8000"`
2. `ray start --head --system-config='{"enable_ray_event":true}'`
3. Submit simple job `ray job submit -- python rayjob.py`. E.g.

```py
import ray

@ray.remote
def hello_world():
    return "hello world"

ray.init()
print(ray.get(hello_world.remote()))
```

4. Run event listener (script below) to start listening the event export
host `python event_listener.py`

```py
import http.server
import socketserver
import json
import logging

PORT = 8000

class EventReceiver(http.server.SimpleHTTPRequestHandler):
    def do_POST(self):
        content_length = int(self.headers['Content-Length'])
        post_data = self.rfile.read(content_length)
        print(json.loads(post_data.decode('utf-8')))
        self.send_response(200)
        self.send_header('Content-type', 'application/json')
        self.end_headers()
        self.wfile.write(json.dumps({"status": "success", "message": "Event received"}).encode('utf-8'))

if __name__ == "__main__":
    with socketserver.TCPServer(("", PORT), EventReceiver) as httpd:
        print(f"Serving event listener on http://localhost:{PORT}")
        print("Set RAY_DASHBOARD_AGGREGATOR_AGENT_EVENTS_EXPORT_ADDR to this address.")
        httpd.serve_forever()
```

Will get the event as below:

- GCS event:

```json
[
   {
      "eventId":"A+yrzknbyDjQALBvaimTPcyZWll9Te3Tw+FEnQ==",
      "sourceType":"GCS",
      "eventType":"DRIVER_JOB_LIFECYCLE_EVENT",
      "timestamp":"2025-12-07T10: 54: 12.621560Z",
      "severity":"INFO",
      "sessionName":"session_2025-12-07_17-33-33_853835_27993",
      "driverJobLifecycleEvent":{
         "jobId":"BAAAAA==",
         "stateTransitions":[
            {
               "state":"FINISHED",
               "timestamp":"2025-12-07T10: 54: 12.621562Z"
            }
         ]
      },
      "nodeId":"k4hj3FDLYStB38nSSRZQRwOjEV32EoAjQe3KPw==",   // <- nodeId set
      "message":""
   }
]
```

- CoreWorker event:

```json
[
   {
      "eventId":"TIAp8D4NwN/ne3VhPHQ0QnsBCYSkZmOUWoe6zQ==",
      "sourceType":"CORE_WORKER",
      "eventType":"TASK_DEFINITION_EVENT",
      "timestamp":"2025-12-07T10:54:12.025967Z",
      "severity":"INFO",
      "sessionName":"session_2025-12-07_17-33-33_853835_27993",
      "taskDefinitionEvent":{
         "taskId":"yoDzqOi6LlD///////////////8EAAAA",
         "taskFunc":{
            "pythonFunctionDescriptor":{
               "moduleName":"rayjob",
               "functionName":"hello_world",
               "functionHash":"a37aacc3b7884c2da4aec32db6151d65",
               "className":""
            }
         },
         "taskName":"hello_world",
         "requiredResources":{
            "CPU":1.0
         },
         "jobId":"BAAAAA==",
         "parentTaskId":"//////////////////////////8EAAAA",
         "placementGroupId":"////////////////////////",
         "serializedRuntimeEnv":"{}",
         "taskAttempt":0,
         "taskType":"NORMAL_TASK",
         "language":"PYTHON",
         "refIds":{

         }
      },
      "nodeId":"k4hj3FDLYStB38nSSRZQRwOjEV32EoAjQe3KPw==",   // <- nodeId set here
      "message":""
   }
]
```

---------

Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: Future-Outlier <eric901201@gmail.com>
Co-authored-by: Future-Outlier <eric901201@gmail.com>
Co-authored-by: Mengjin Yan <mengjinyan3@gmail.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
MengjinYan pushed a commit that referenced this pull request Mar 4, 2026
…stead of just executor node (#61478)

## Description
[pr](#59242) incorrectly set
TaskLifecycleEvent.node_id to submitter node_id in all cases except when
task_status=SUBMITTED_TO_WORKER (when task_status = SUBMITTED_TO_WORKER,
the node_id gets overwritten to executor node_id). in this pr the bug is
fixed and a test is added to verify the expected behaviour.

## Related issues
fixes #61474

---------

Signed-off-by: sampan <sampan@anyscale.com>
Co-authored-by: sampan <sampan@anyscale.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-contribution Contributed by the community core Issues that should be addressed in Ray Core go add ONLY when ready to merge, run all tests observability Issues related to the Ray Dashboard, Logging, Metrics, Tracing, and/or Profiling

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Core] Adding the node id to the base event

6 participants