Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
20f87ea
feat: add node id to RayEvent use in CoreWorker and GCS
machichima Dec 7, 2025
2d1fb08
feat: set node id when starting gcs
machichima Dec 7, 2025
1a10875
fix: use constant node_id
machichima Dec 7, 2025
e36063a
fix: remove setting node id in HandleRegisterNode
machichima Dec 7, 2025
bc9e254
refactor: remove unused comment
machichima Dec 7, 2025
6add8fa
build: precommit
machichima Dec 7, 2025
cfd0252
test: pass node id in tests
machichima Dec 7, 2025
d30bdab
test: pass node id in tests
machichima Dec 8, 2025
1735885
test: pass node id in test and precommit
machichima Dec 8, 2025
98c59eb
docs: update docs
machichima Dec 8, 2025
476f69d
fix tests
Future-Outlier Dec 9, 2025
cf2f8d1
refactor: from random to nil
machichima Dec 9, 2025
c4c3f94
Trigger CI
Future-Outlier Dec 9, 2025
007b7bd
Merge branch 'master' into add-node-id-to-base-event
Future-Outlier Dec 9, 2025
6fa2e00
refactor: node_id to local_node_id
machichima Dec 11, 2025
d44563a
fix: node_id_ with type NodeID
machichima Dec 11, 2025
21eac30
refactor: log gcs node id even if node id is nil
machichima Dec 11, 2025
06220f8
test: check node id set correctly
machichima Dec 12, 2025
9332dd5
test: check node id in event in python tests
machichima Dec 12, 2025
8c58e2b
build: precommit
machichima Dec 12, 2025
6dbeb6d
fix: set node_id through every recorder
machichima Dec 16, 2025
09c4646
fix: remove other node_id settings
machichima Dec 16, 2025
72c811d
test: update src/ray/observability/tests/
machichima Dec 16, 2025
3b22657
test: remove assertion
machichima Dec 16, 2025
f472da6
refactor: precommit
machichima Dec 16, 2025
a139a09
fix: remove node id from src/ray/gcs
machichima Dec 17, 2025
41c424c
fix: remove node id assertion
machichima Dec 17, 2025
e282e1a
test: assert node id in event recorder test
machichima Dec 17, 2025
9055ad7
Merge branch 'master' into add-node-id-to-base-event
machichima Dec 17, 2025
7a6b4b1
Merge branch 'master' of github.com:ray-project/ray into add-node-id-…
machichima Dec 25, 2025
d40e494
refactor: keep node_id only
machichima Dec 25, 2025
cd76a95
fix: remove duplicate --node-id
machichima Dec 25, 2025
eabbbda
Trigger CI
machichima Dec 28, 2025
d7c661c
Merge branch 'master' into add-node-id-to-base-event
Future-Outlier Dec 28, 2025
abf678c
Merge branch 'master' into add-node-id-to-base-event
MengjinYan Dec 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ An example of a Task Definition Event and a Task Execution Event:

}
},
"nodeId":"ZvxTI6x9dlMFqMlIHErJpg5UEGK1INsKhW2zyg==",
"message":""
}

Expand Down Expand Up @@ -114,6 +115,7 @@ An example of a Task Definition Event and a Task Execution Event:
"taskAttempt":0,
"workerPid":0
},
"nodeId":"ZvxTI6x9dlMFqMlIHErJpg5UEGK1INsKhW2zyg==",
"message":""
}

Expand Down Expand Up @@ -147,6 +149,7 @@ For each actor, Ray exports two types of events: Actor Definition Events and Act
"placementGroupId": "",
"labelSelector": {}
},
"nodeId": "zpLG7coqThVMl8df9RYHnhK6thhJqrgPodtfjg==",
"message": ""
}

Expand All @@ -169,6 +172,7 @@ For each actor, Ray exports two types of events: Actor Definition Events and Act
}
]
},
"nodeId": "zpLG7coqThVMl8df9RYHnhK6thhJqrgPodtfjg==",
"message": ""
}

Expand Down Expand Up @@ -200,6 +204,7 @@ For each driver job, Ray exports two types of events: Driver Job Definition Even
"metadata": {}
}
},
"nodeId": "zpLG7coqThVMl8df9RYHnhK6thhJqrgPodtfjg==",
"message": ""
}

Expand All @@ -220,6 +225,7 @@ For each driver job, Ray exports two types of events: Driver Job Definition Even
}
]
},
"nodeId": "zpLG7coqThVMl8df9RYHnhK6thhJqrgPodtfjg==",
"message": ""
}

Expand Down Expand Up @@ -249,6 +255,7 @@ For each node, Ray exports two types of events: Node Definition Events and Node
},
"startTimestamp": "2025-10-24T21:19:14.063Z"
},
"nodeId": "zpLG7coqThVMl8df9RYHnhK6thhJqrgPodtfjg==",
"message": ""
}

Expand All @@ -271,6 +278,7 @@ For each node, Ray exports two types of events: Node Definition Events and Node
}
]
},
"nodeId": "zpLG7coqThVMl8df9RYHnhK6thhJqrgPodtfjg==",
"message": ""
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def test_ray_actor_events(ray_start_cluster, httpserver):
},
)
cluster.wait_for_nodes()
head_node_id = cluster.head_node.node_id
all_nodes_ids = [node.node_id for node in cluster.list_all_nodes()]

class A:
Expand All @@ -69,9 +70,11 @@ def ping(self):
base64.b64decode(req_json[0]["actorDefinitionEvent"]["actorId"]).hex()
== a._actor_id.hex()
)
assert base64.b64decode(req_json[0]["nodeId"]).hex() == head_node_id
# Verify ActorId and state for ActorLifecycleEvents
has_alive_state = False
for actorLifeCycleEvent in req_json[1:]:
assert base64.b64decode(actorLifeCycleEvent["nodeId"]).hex() == head_node_id
assert (
base64.b64decode(
actorLifeCycleEvent["actorLifecycleEvent"]["actorId"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ def f():
wait_for_condition(lambda: len(httpserver.log) >= 1)
req, _ = httpserver.log[0]
req_json = json.loads(req.data)
head_node_id = cluster.head_node.node_id
assert base64.b64decode(req_json[0]["nodeId"]).hex() == head_node_id
assert (
base64.b64decode(req_json[0]["driverJobDefinitionEvent"]["jobId"]).hex()
== ray.get_runtime_context().get_job_id()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def test_ray_node_events(ray_start_cluster, httpserver):
},
)
cluster.wait_for_nodes()
head_node_id = cluster.head_node.node_id
ray.init(address=cluster.address)
wait_for_dashboard_agent_available(cluster)

Expand All @@ -40,10 +41,12 @@ def test_ray_node_events(ray_start_cluster, httpserver):
req, _ = httpserver.log[0]
req_json = json.loads(req.data)
assert len(req_json) == 2
assert base64.b64decode(req_json[0]["nodeId"]).hex() == head_node_id
assert (
base64.b64decode(req_json[0]["nodeDefinitionEvent"]["nodeId"]).hex()
== cluster.head_node.node_id
)
assert base64.b64decode(req_json[1]["nodeId"]).hex() == head_node_id
assert (
base64.b64decode(req_json[1]["nodeLifecycleEvent"]["nodeId"]).hex()
== cluster.head_node.node_id
Expand Down
79 changes: 58 additions & 21 deletions python/ray/tests/test_ray_event_export_task_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,17 +117,25 @@ def get_job_id_and_driver_script_task_id_from_events(
return driver_script_job_id, driver_task_id


def check_task_event_base_fields(event: json, preserve_proto_field_name: bool):
def check_task_event_base_fields(
event: json, preserve_proto_field_name: bool, head_node_id: str = None
):
assert event["timestamp"] is not None
assert event["severity"] == "INFO"
if preserve_proto_field_name:
assert event["event_id"] is not None
assert event["source_type"] == "CORE_WORKER"
assert event["session_name"] is not None
if head_node_id is not None:
assert "node_id" in event
assert base64.b64decode(event["node_id"]).hex() == head_node_id
else:
assert event["eventId"] is not None
assert event["sourceType"] == "CORE_WORKER"
assert event["sessionName"] is not None
if head_node_id is not None:
assert "nodeId" in event
assert base64.b64decode(event["nodeId"]).hex() == head_node_id


def check_task_lifecycle_event_states_and_error_info(
Expand Down Expand Up @@ -223,7 +231,12 @@ def run_driver_script_and_wait_for_events(script, httpserver, cluster, validatio
# issue to track this: https://github.com/ray-project/ray/issues/58007
assert wait_until_grpc_channel_ready(cluster.gcs_address, node_ids)
run_string_as_driver_nonblocking(script)
wait_for_condition(lambda: get_and_validate_events(httpserver, validation_func))
wait_for_condition(
lambda: get_and_validate_events(
httpserver,
lambda events: validation_func(events, cluster.head_node.node_id),
)
)


class TestNormalTaskEvents:
Expand All @@ -244,7 +257,7 @@ def normal_task():
ray.get(normal_task.remote())
"""

def validate_events(events):
def validate_events(events, head_node_id):
(
driver_script_job_id,
driver_task_id,
Expand All @@ -267,7 +280,9 @@ def validate_events(events):
for event in events:
if preserve_proto_field_name:
if event["event_type"] == "TASK_DEFINITION_EVENT":
check_task_event_base_fields(event, preserve_proto_field_name)
check_task_event_base_fields(
event, preserve_proto_field_name, head_node_id
)

if event["task_definition_event"]["task_type"] == "DRIVER_TASK":
if (
Expand Down Expand Up @@ -331,7 +346,9 @@ def validate_events(events):
assert event["event_type"] == "TASK_LIFECYCLE_EVENT"
else:
if event["eventType"] == "TASK_DEFINITION_EVENT":
check_task_event_base_fields(event, preserve_proto_field_name)
check_task_event_base_fields(
event, preserve_proto_field_name, head_node_id
)

if event["taskDefinitionEvent"]["taskType"] == "DRIVER_TASK":
if event["taskDefinitionEvent"]["taskId"] != driver_task_id:
Expand Down Expand Up @@ -428,7 +445,7 @@ def normal_task():
pass
"""

def validate_events(events: json):
def validate_events(events: json, head_node_id):
(
driver_script_job_id,
driver_task_id,
Expand All @@ -443,7 +460,9 @@ def validate_events(events: json):
for event in events:
if preserve_proto_field_name:
if event["event_type"] == "TASK_DEFINITION_EVENT":
check_task_event_base_fields(event, preserve_proto_field_name)
check_task_event_base_fields(
event, preserve_proto_field_name, head_node_id
)

if event["task_definition_event"]["task_type"] == "DRIVER_TASK":
if (
Expand Down Expand Up @@ -512,7 +531,9 @@ def validate_events(events: json):
assert event["event_type"] == "TASK_LIFECYCLE_EVENT"
else:
if event["eventType"] == "TASK_DEFINITION_EVENT":
check_task_event_base_fields(event, preserve_proto_field_name)
check_task_event_base_fields(
event, preserve_proto_field_name, head_node_id
)

if event["taskDefinitionEvent"]["taskType"] == "DRIVER_TASK":
if event["taskDefinitionEvent"]["taskId"] != driver_task_id:
Expand Down Expand Up @@ -651,7 +672,7 @@ def sleep():
pass
"""
# Run the driver script and wait for the sleep task to be executing
def validate_task_running(events: json):
def validate_task_running(events: json, head_node_id):
# Obtain the task id of the sleep task
normal_task_id = None
for event in events:
Expand Down Expand Up @@ -706,7 +727,7 @@ def validate_task_running(events: json):
cluster.remove_node(node)

# Wait and verify the task events
def validate_task_killed(events: json):
def validate_task_killed(events: json, head_node_id):
(
driver_script_job_id,
driver_task_id,
Expand All @@ -720,7 +741,9 @@ def validate_task_killed(events: json):
for event in events:
if preserve_proto_field_name:
if event["event_type"] == "TASK_DEFINITION_EVENT":
check_task_event_base_fields(event, preserve_proto_field_name)
check_task_event_base_fields(
event, preserve_proto_field_name, head_node_id
)

if event["task_definition_event"]["task_type"] == "DRIVER_TASK":
if (
Expand Down Expand Up @@ -784,7 +807,9 @@ def validate_task_killed(events: json):
assert event["event_type"] == "TASK_LIFECYCLE_EVENT"
else:
if event["eventType"] == "TASK_DEFINITION_EVENT":
check_task_event_base_fields(event, preserve_proto_field_name)
check_task_event_base_fields(
event, preserve_proto_field_name, head_node_id
)

if event["taskDefinitionEvent"]["taskType"] == "DRIVER_TASK":
if event["taskDefinitionEvent"]["taskId"] != driver_task_id:
Expand Down Expand Up @@ -906,7 +931,7 @@ def task(self, arg):
ray.get(actor.task.remote(obj))
"""

def validate_events(events: json):
def validate_events(events: json, head_node_id):
(
driver_script_job_id,
driver_task_id,
Expand All @@ -920,7 +945,9 @@ def validate_events(events: json):
for event in events:
if preserve_proto_field_name:
if event["event_type"] == "TASK_DEFINITION_EVENT":
check_task_event_base_fields(event, preserve_proto_field_name)
check_task_event_base_fields(
event, preserve_proto_field_name, head_node_id
)

if event["task_definition_event"]["task_type"] == "DRIVER_TASK":
driver_task_definition_received = True
Expand Down Expand Up @@ -1036,7 +1063,9 @@ def validate_events(events: json):
assert event["event_type"] == "TASK_LIFECYCLE_EVENT"
else:
if event["eventType"] == "TASK_DEFINITION_EVENT":
check_task_event_base_fields(event, preserve_proto_field_name)
check_task_event_base_fields(
event, preserve_proto_field_name, head_node_id
)

if event["taskDefinitionEvent"]["taskType"] == "DRIVER_TASK":
driver_task_definition_received = True
Expand Down Expand Up @@ -1208,7 +1237,7 @@ def task(self):
ray.get(actor.task.options().remote())
"""

def validate_events(events: json):
def validate_events(events: json, head_node_id):
(
driver_script_job_id,
driver_task_id,
Expand All @@ -1222,7 +1251,9 @@ def validate_events(events: json):
for event in events:
if preserve_proto_field_name:
if event["event_type"] == "TASK_DEFINITION_EVENT":
check_task_event_base_fields(event, preserve_proto_field_name)
check_task_event_base_fields(
event, preserve_proto_field_name, head_node_id
)

if event["task_definition_event"]["task_type"] == "DRIVER_TASK":
driver_task_definition_received = True
Expand Down Expand Up @@ -1336,7 +1367,9 @@ def validate_events(events: json):
assert event["event_type"] == "TASK_LIFECYCLE_EVENT"
else:
if event["eventType"] == "TASK_DEFINITION_EVENT":
check_task_event_base_fields(event, preserve_proto_field_name)
check_task_event_base_fields(
event, preserve_proto_field_name, head_node_id
)

if event["taskDefinitionEvent"]["taskType"] == "DRIVER_TASK":
driver_task_definition_received = True
Expand Down Expand Up @@ -1518,7 +1551,7 @@ def task(self):
ray.kill(actor)
"""

def validate_events(events: json):
def validate_events(events: json, head_node_id):
(
driver_script_job_id,
driver_task_id,
Expand All @@ -1531,7 +1564,9 @@ def validate_events(events: json):
for event in events:
if preserve_proto_field_name:
if event["event_type"] == "TASK_DEFINITION_EVENT":
check_task_event_base_fields(event, preserve_proto_field_name)
check_task_event_base_fields(
event, preserve_proto_field_name, head_node_id
)

if event["task_definition_event"]["task_type"] == "DRIVER_TASK":
driver_task_definition_received = True
Expand Down Expand Up @@ -1597,7 +1632,9 @@ def validate_events(events: json):
assert event["event_type"] == "TASK_LIFECYCLE_EVENT"
else:
if event["eventType"] == "TASK_DEFINITION_EVENT":
check_task_event_base_fields(event, preserve_proto_field_name)
check_task_event_base_fields(
event, preserve_proto_field_name, head_node_id
)

if event["taskDefinitionEvent"]["taskType"] == "DRIVER_TASK":
driver_task_definition_received = True
Expand Down
4 changes: 3 additions & 1 deletion src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,7 @@ CoreWorker::CoreWorker(
/*timestamp=*/absl::GetCurrentTimeNanos(),
/*is_actor_task_event=*/false,
options_.session_name,
GetCurrentNodeId(),
std::make_shared<const TaskSpecification>(std::move(spec)));
task_event_buffer_->AddTaskEvent(std::move(task_event));
}
Expand Down Expand Up @@ -575,7 +576,8 @@ void CoreWorker::Disconnect(
rpc::TaskStatus::FINISHED,
/*timestamp=*/absl::GetCurrentTimeNanos(),
/*is_actor_task_event=*/worker_context_->GetCurrentActorID().IsNil(),
options_.session_name);
options_.session_name,
GetCurrentNodeId());
task_event_buffer_->AddTaskEvent(std::move(task_event));
}

Expand Down
13 changes: 7 additions & 6 deletions src/ray/core_worker/core_worker_process.cc
Original file line number Diff line number Diff line change
Expand Up @@ -164,12 +164,6 @@ std::shared_ptr<CoreWorker> CoreWorkerProcessImpl::CreateCoreWorker(
#endif
}

auto task_event_buffer = std::make_unique<worker::TaskEventBufferImpl>(
std::make_unique<gcs::GcsClient>(options.gcs_options, options.node_ip_address),
std::make_unique<rpc::EventAggregatorClientImpl>(options.metrics_agent_port,
*client_call_manager_),
options.session_name);

// Start the IO thread first to make sure the checker is working.
boost::thread::attributes io_thread_attrs;
#if defined(__APPLE__)
Expand Down Expand Up @@ -227,6 +221,13 @@ std::shared_ptr<CoreWorker> CoreWorkerProcessImpl::CreateCoreWorker(
}
RAY_CHECK_GE(assigned_port, 0);

auto task_event_buffer = std::make_unique<worker::TaskEventBufferImpl>(
std::make_unique<gcs::GcsClient>(options.gcs_options, options.node_ip_address),
std::make_unique<rpc::EventAggregatorClientImpl>(options.metrics_agent_port,
*client_call_manager_),
options.session_name,
local_node_id);

// Initialize raylet client.
// NOTE(edoakes): the core_worker_server_ must be running before registering with
// the raylet, as the raylet will start sending some RPC messages immediately.
Expand Down
Loading