Skip to content

Commit 9c5b93b

Browse files
committed
Use Public State API for Fetching Actors
Instead of fetching actors from the internal state API / migrating it to the common module, simply reference the public state API that relies on the dashboard server. Signed-off-by: Jason <jcarlson212@gmail.com>
1 parent f9640e6 commit 9c5b93b

18 files changed

+260
-211
lines changed

python/ray/_common/state.py

Lines changed: 0 additions & 38 deletions
This file was deleted.

python/ray/_private/test_utils.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -548,11 +548,10 @@ def wait_for_num_actors(num_actors, state=None, timeout=10):
548548
while time.time() - start_time < timeout:
549549
if (
550550
len(
551-
[
552-
_
553-
for _ in ray._common.state.actors().values()
554-
if state is None or _["State"] == state
555-
]
551+
ray.util.state.list_actors(
552+
filters=("state", "=", state) if state else None,
553+
limit=num_actors,
554+
)
556555
)
557556
>= num_actors
558557
):
@@ -563,14 +562,14 @@ def wait_for_num_actors(num_actors, state=None, timeout=10):
563562

564563
def kill_actor_and_wait_for_failure(actor, timeout=10, retry_interval_ms=100):
565564
actor_id = actor._actor_id.hex()
566-
current_num_restarts = ray._common.state.actors(actor_id)["NumRestarts"]
565+
current_num_restarts = ray.util.state.get_actor(actor_id)["num_restarts"]
567566
ray.kill(actor)
568567
start = time.time()
569568
while time.time() - start <= timeout:
570-
actor_status = ray._common.state.actors(actor_id)
569+
actor_state = ray.util.state.get_actor(actor_id)
571570
if (
572-
actor_status["State"] == convert_actor_state(gcs_utils.ActorTableData.DEAD)
573-
or actor_status["NumRestarts"] > current_num_restarts
571+
actor_state["state"] == convert_actor_state(gcs_utils.ActorTableData.DEAD)
572+
or actor_state["num_restarts"] > current_num_restarts
574573
):
575574
return
576575
time.sleep(retry_interval_ms / 1000.0)

python/ray/data/dataset.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2086,12 +2086,17 @@ def build_block_refs_by_node_id(
20862086

20872087
def build_node_id_by_actor(actors: List[Any]) -> Dict[Any, str]:
20882088
"""Build a map from a actor to its node_id."""
2089-
actors_state = ray._common.state.actors()
2089+
actors_state = {
2090+
actor.actor_id: actor.node_id
2091+
for actor in ray.util.state.list_actors(
2092+
limit=ray.util.state.summarize_actors()
2093+
.get("cluster", {})
2094+
.get("total_actors", 0)
2095+
+ 100 # fetch current actors. Some staleness is assumed fine.
2096+
)
2097+
}
20902098
return {
2091-
actor: actors_state.get(actor._actor_id.hex(), {})
2092-
.get("Address", {})
2093-
.get("NodeID")
2094-
for actor in actors
2099+
actor: actors_state.get(actor._actor_id.hex(), {}) for actor in actors
20952100
}
20962101

20972102
# expected number of blocks to be allocated for each actor

python/ray/data/tests/test_split.py

Lines changed: 48 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,11 @@ def test_equal_split_balanced_grid(ray_start_regular_shared_2_cpus):
155155
_test_equal_split_balanced(block_sizes, num_splits)
156156

157157

158+
@pytest.mark.parametrize(
159+
"ray_start_regular_shared_2_cpus",
160+
[{"num_cpus": 1, "include_dashboard": True}],
161+
indirect=True,
162+
)
158163
def test_split_small(ray_start_regular_shared_2_cpus):
159164
x = [Counter.remote() for _ in range(4)]
160165
data = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]
@@ -366,6 +371,11 @@ def test_split(ray_start_regular_shared_2_cpus):
366371
assert 190 == sum([dataset.sum("id") or 0 for dataset in datasets])
367372

368373

374+
@pytest.mark.parametrize(
375+
"ray_start_regular_shared_2_cpus",
376+
[{"num_cpus": 1, "include_dashboard": True}],
377+
indirect=True,
378+
)
369379
def test_split_hints(ray_start_regular_shared_2_cpus):
370380
@ray.remote
371381
class Actor(object):
@@ -394,30 +404,45 @@ def assert_split_assignment(block_node_ids, actor_node_ids, expected_split_resul
394404
blocks = _ref_bundles_iterator_to_block_refs_list(bundles)
395405
assert len(block_node_ids) == len(blocks)
396406
actors = [Actor.remote() for i in range(len(actor_node_ids))]
397-
with patch("ray.experimental.get_object_locations") as location_mock:
398-
with patch("ray._common.state.actors") as state_mock:
399-
block_locations = {}
400-
for i, node_id in enumerate(block_node_ids):
401-
if node_id:
402-
block_locations[blocks[i]] = {"node_ids": [node_id]}
403-
location_mock.return_value = block_locations
404-
405-
actor_state = {}
406-
for i, node_id in enumerate(actor_node_ids):
407-
actor_state[actors[i]._actor_id.hex()] = {
408-
"Address": {"NodeID": node_id}
409-
}
410-
411-
state_mock.return_value = actor_state
412-
413-
datasets = ds.split(len(actors), locality_hints=actors)
414-
assert len(datasets) == len(actors)
415-
for i in range(len(actors)):
416-
assert {blocks[j] for j in expected_split_result[i]} == set(
417-
_ref_bundles_iterator_to_block_refs_list(
418-
datasets[i].iter_internal_ref_bundles()
407+
with patch("ray.util.state.summarize_actors") as state_summary_mock:
408+
state_summary_mock.return_value = {"cluster": {"total_actors": len(actors)}}
409+
with patch("ray.experimental.get_object_locations") as location_mock:
410+
with patch("ray.util.state.list_actors") as state_mock:
411+
block_locations = {}
412+
for i, node_id in enumerate(block_node_ids):
413+
if node_id:
414+
block_locations[blocks[i]] = {"node_ids": [node_id]}
415+
location_mock.return_value = block_locations
416+
417+
class ActorStateMock:
418+
def __init__(self, actor_id, node_id):
419+
self._actor_id = actor_id
420+
self._node_id = node_id
421+
422+
@property
423+
def actor_id(self):
424+
return self._actor_id
425+
426+
@property
427+
def node_id(self):
428+
return self._node_id
429+
430+
actor_state = []
431+
for i, node_id in enumerate(actor_node_ids):
432+
actor_state.append(
433+
ActorStateMock(actors[i]._actor_id.hex(), node_id)
434+
)
435+
436+
state_mock.return_value = actor_state
437+
438+
datasets = ds.split(len(actors), locality_hints=actors)
439+
assert len(datasets) == len(actors)
440+
for i in range(len(actors)):
441+
assert {blocks[j] for j in expected_split_result[i]} == set(
442+
_ref_bundles_iterator_to_block_refs_list(
443+
datasets[i].iter_internal_ref_bundles()
444+
)
419445
)
420-
)
421446

422447
assert_split_assignment(
423448
["node2", "node1", "node1"], ["node1", "node2"], [[1, 2], [0]]

python/ray/runtime_context.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -405,8 +405,8 @@ def was_current_actor_reconstructed(self):
405405
assert (
406406
not self.actor_id.is_nil()
407407
), "This method should't be called inside Ray tasks."
408-
actor_info = ray._common.state.actors(self.actor_id.hex())
409-
return actor_info and actor_info["NumRestarts"] != 0
408+
actors = ray.util.state.list_actors(filters=[("id", "=", self.actor_id.hex())])
409+
return any(act.num_restarts or 0 for act in actors)
410410

411411
@property
412412
@Deprecated(message="Use get_placement_group_id() instead", warning=True)

python/ray/tests/test_actor_advanced.py

Lines changed: 31 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -526,6 +526,11 @@ def foobar(self):
526526
assert ray.get(detached_actor.foobar.remote()) == ["bar", "bar"]
527527

528528

529+
@pytest.mark.parametrize(
530+
"ray_start_regular",
531+
[{"num_cpus": 1, "include_dashboard": True}],
532+
indirect=True,
533+
)
529534
def test_detached_actor_cleanup(ray_start_regular):
530535
@ray.remote
531536
class DetachedActor:
@@ -546,15 +551,11 @@ def create_and_kill_actor(actor_name):
546551
detached_actor = ray.get_actor(dup_actor_name)
547552
ray.kill(detached_actor)
548553
# Wait until actor dies.
549-
actor_status = ray._common.state.actors(actor_id=detached_actor._actor_id.hex())
554+
actor_status = ray.util.state.get_actor(id=detached_actor._actor_id.hex())
550555
max_wait_time = 10
551556
wait_time = 0
552-
while actor_status["State"] != convert_actor_state(
553-
gcs_utils.ActorTableData.DEAD
554-
):
555-
actor_status = ray._common.state.actors(
556-
actor_id=detached_actor._actor_id.hex()
557-
)
557+
while actor_status.state != convert_actor_state(gcs_utils.ActorTableData.DEAD):
558+
actor_status = ray.util.state.get_actor(id=detached_actor._actor_id.hex())
558559
time.sleep(1.0)
559560
wait_time += 1
560561
if wait_time >= max_wait_time:
@@ -586,11 +587,11 @@ def ping(self):
586587
assert ray.get(detached_actor.ping.remote()) == "pong"
587588
ray.kill(detached_actor)
588589
# Wait until actor dies.
589-
actor_status = ray._common.state.actors(actor_id=detached_actor._actor_id.hex())
590+
actor_status = ray.util.state.get_actor(id=detached_actor._actor_id.hex())
590591
max_wait_time = 10
591592
wait_time = 0
592-
while actor_status["State"] != convert_actor_state(gcs_utils.ActorTableData.DEAD): # noqa
593-
actor_status = ray._common.state.actors(actor_id=detached_actor._actor_id.hex())
593+
while actor_status.state != convert_actor_state(gcs_utils.ActorTableData.DEAD): # noqa
594+
actor_status = ray.util.state.get_actor(id=detached_actor._actor_id.hex())
594595
time.sleep(1.0)
595596
wait_time += 1
596597
if wait_time >= max_wait_time:
@@ -640,7 +641,14 @@ def hi(self):
640641

641642
@pytest.mark.parametrize(
642643
"ray_start_cluster",
643-
[{"num_cpus": 3, "num_nodes": 1, "resources": {"first_node": 5}}],
644+
[
645+
{
646+
"num_cpus": 3,
647+
"num_nodes": 1,
648+
"resources": {"first_node": 5},
649+
"include_dashboard": True,
650+
}
651+
],
644652
indirect=True,
645653
)
646654
def test_detached_actor_cleanup_due_to_failure(ray_start_cluster):
@@ -661,13 +669,13 @@ def kill_itself(self):
661669
node_failure_actor_name = "node_failure_actor_name"
662670

663671
def wait_until_actor_dead(handle):
664-
actor_status = ray._common.state.actors(actor_id=handle._actor_id.hex())
672+
actor_status = ray.util.state.get_actor(id=handle._actor_id.hex())
665673
max_wait_time = 10
666674
wait_time = 0
667-
while actor_status["State"] != convert_actor_state(
675+
while actor_status["state"] != convert_actor_state(
668676
gcs_utils.ActorTableData.DEAD
669677
):
670-
actor_status = ray._common.state.actors(actor_id=handle._actor_id.hex())
678+
actor_status = ray.util.state.get_actor(id=handle._actor_id.hex())
671679
time.sleep(1.0)
672680
wait_time += 1
673681
if wait_time >= max_wait_time:
@@ -991,6 +999,8 @@ def condition2():
991999

9921000

9931001
def test_actor_timestamps(ray_start_regular):
1002+
"""Tests the internal actor state representation's timestamps. The dashboard APIs don't support this yet."""
1003+
9941004
@ray.remote
9951005
class Foo:
9961006
def get_id(self):
@@ -1003,11 +1013,12 @@ def graceful_exit():
10031013
actor = Foo.remote()
10041014
actor_id = ray.get(actor.get_id.remote())
10051015

1006-
state_after_starting = ray._common.state.actors()[actor_id]
1016+
state_after_starting = ray._private.state.actors()[actor_id]
1017+
10071018
time.sleep(1)
10081019
del actor
10091020
time.sleep(1)
1010-
state_after_ending = ray._common.state.actors()[actor_id]
1021+
state_after_ending = ray._private.state.actors()[actor_id]
10111022

10121023
assert state_after_starting["StartTime"] == state_after_ending["StartTime"]
10131024
start_time = state_after_ending["StartTime"]
@@ -1018,11 +1029,11 @@ def not_graceful_exit():
10181029
actor = Foo.remote()
10191030
actor_id = ray.get(actor.get_id.remote())
10201031

1021-
state_after_starting = ray._common.state.actors()[actor_id]
1032+
state_after_starting = ray._private.state.actors()[actor_id]
10221033
time.sleep(1)
10231034
actor.kill_self.remote()
10241035
time.sleep(1)
1025-
state_after_ending = ray._common.state.actors()[actor_id]
1036+
state_after_ending = ray._private.state.actors()[actor_id]
10261037

10271038
assert state_after_starting["StartTime"] == state_after_ending["StartTime"]
10281039

@@ -1034,13 +1045,13 @@ def restarted():
10341045
actor = Foo.options(max_restarts=1, max_task_retries=-1).remote()
10351046
actor_id = ray.get(actor.get_id.remote())
10361047

1037-
state_after_starting = ray._common.state.actors()[actor_id]
1048+
state_after_starting = ray._private.state.actors()[actor_id]
10381049
time.sleep(1)
10391050
actor.kill_self.remote()
10401051
time.sleep(1)
10411052
actor.kill_self.remote()
10421053
time.sleep(1)
1043-
state_after_ending = ray._common.state.actors()[actor_id]
1054+
state_after_ending = ray._private.state.actors()[actor_id]
10441055

10451056
assert state_after_starting["StartTime"] == state_after_ending["StartTime"]
10461057

python/ray/tests/test_actor_group.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,18 @@ def test_actor_creation_num_cpus(ray_start_2_cpus):
4242
ag.shutdown()
4343

4444

45+
@pytest.mark.parametrize(
46+
"ray_start_2_cpus",
47+
[{"num_cpus": 1, "include_dashboard": True}],
48+
indirect=True,
49+
)
4550
def test_actor_shutdown(ray_start_2_cpus):
4651
assert ray.available_resources()["CPU"] == 2
4752
ag = ActorGroup(actor_cls=DummyActor, num_actors=2)
4853
time.sleep(1)
4954
assert "CPU" not in ray.available_resources()
50-
assert len(ray._common.state.actors()) == 2
55+
56+
assert len(ray.util.state.list_actors()) == 2
5157
ag.shutdown()
5258
time.sleep(1)
5359
assert ray.available_resources()["CPU"] == 2

python/ray/tests/test_advanced_3.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,11 @@
2020
logger = logging.getLogger(__name__)
2121

2222

23+
@pytest.mark.parametrize(
24+
"shutdown_only",
25+
[{"num_cpus": 1, "include_dashboard": True}],
26+
indirect=True,
27+
)
2328
def test_global_state_api(shutdown_only):
2429

2530
ray.init(num_cpus=5, num_gpus=3, resources={"CustomResource": 1})
@@ -47,15 +52,14 @@ def __init__(self):
4752
# Wait for actor to be created
4853
wait_for_num_actors(1)
4954

50-
actor_table = ray._common.state.actors()
55+
actor_table = (
56+
ray.util.state.list_actors()
57+
) # should be using this API now for fetching actors
5158
assert len(actor_table) == 1
5259

5360
(actor_info,) = actor_table.values()
54-
assert actor_info["JobID"] == job_id.hex()
55-
assert actor_info["Name"] == "test_actor"
56-
assert "IPAddress" in actor_info["Address"]
57-
assert "IPAddress" in actor_info["OwnerAddress"]
58-
assert actor_info["Address"]["Port"] != actor_info["OwnerAddress"]["Port"]
61+
assert actor_info.job_id == job_id.hex()
62+
assert actor_info.name == "test_actor"
5963

6064
job_table = ray._private.state.jobs()
6165

0 commit comments

Comments
 (0)