Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions python/ray/train/v2/tests/test_persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,9 @@ def train_fn(config):
print("Loaded back state from checkpoint:", state)
start = state["iter"] + 1

assert len(ray.train.get_all_reported_checkpoints()) == min(
start, config.get("num_to_keep", float("inf"))
)
got = len(ray.train.get_all_reported_checkpoints())
expected = min(start, config.get("num_to_keep", float("inf")))
assert got == expected, f"Expected {expected} checkpoints, got {got}"

for i in range(start, config.get("num_iterations", 5)):
time.sleep(config.get("time_per_iter", 0.25))
Expand Down Expand Up @@ -219,9 +219,9 @@ def train_fn(config):
ray.train.collective.barrier()

if i in config.get("fail_iters", []):
assert len(ray.train.get_all_reported_checkpoints()) == min(
i + 1, config.get("num_to_keep", float("inf"))
)
got = len(ray.train.get_all_reported_checkpoints())
expected = min(i + 1, config.get("num_to_keep", float("inf")))
assert got == expected, f"Expected {expected} checkpoints, got {got}"
raise RuntimeError(f"Failing on iter={i}!!")


Expand Down
14 changes: 12 additions & 2 deletions release/release_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1809,12 +1809,22 @@
num_nodes: 4

variations:
- __suffix__: aws
- __suffix__: gce
- __suffix__: aws_v1
- __suffix__: aws_v2
run:
script: RAY_TRAIN_V2_ENABLED=1 pytest -v test_persistence.py -s
- __suffix__: gce_v1
env: gce
frequency: manual
cluster:
cluster_compute: compute_gce.yaml
- __suffix__: gce_v2
env: gce
frequency: manual
cluster:
cluster_compute: compute_gce.yaml
run:
script: RAY_TRAIN_V2_ENABLED=1 pytest -v test_persistence.py -s

alert: default

Expand Down
215 changes: 130 additions & 85 deletions release/train_tests/multinode_persistence/test_persistence.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Train multi-node persistence/checkpoint release test.

This test is a multi-node version of `test_new_persistence.py` and is meant to
be run on a cluster with NFS or S3 storage configured.
This test is a multi-node version of `test_new_persistence.py`/`test_persistence.py`
and is meant to be run on a cluster with NFS or S3 storage configured.

This test also records timing metrics on checkpoint save (to disk), save (to storage),
and load (from storage) operations and outputs them as release test metrics.
Expand Down Expand Up @@ -39,15 +39,23 @@
from ray.air.constants import TRAINING_ITERATION
from ray.air._internal.uri_utils import URI
from ray.train import Checkpoint
from ray.train.base_trainer import TrainingFailedError
from ray.train.torch import TorchTrainer
from ray.train.v2._internal.constants import is_v2_enabled

if is_v2_enabled():
from test_v2_persistence import (
train_fn,
_assert_storage_contents,
)
from ray.train.v2.api.exceptions import WorkerGroupError
else:
from test_v1_persistence import (
train_fn,
_assert_storage_contents,
_resume_from_checkpoint,
)
from ray.train.base_trainer import TrainingFailedError

from test_new_persistence import (
train_fn,
_assert_storage_contents,
_resume_from_checkpoint,
)

# Add a unique ID to the storage path to avoid collisions between release test runs.
TEST_ID = uuid.uuid4().hex[:4] + "_" + datetime.today().strftime("%Y-%m-%d_%H-%M-%S")
Expand Down Expand Up @@ -207,49 +215,64 @@ def test_trainer(root_path_storage_filesystem_label, tmp_path, monkeypatch):

root_path, storage_filesystem, label = root_path_storage_filesystem_label
storage_path = root_path + label
checkpoint_config = train.CheckpointConfig(
num_to_keep=TestConstants.NUM_ITERATIONS // 2
)
num_to_keep = TestConstants.NUM_ITERATIONS // 2
checkpoint_config = train.CheckpointConfig(num_to_keep=num_to_keep)
exp_name = "test_trainer"

print(
"\nSaving results under (storage_path, exp_name) = "
f"({storage_path}, {exp_name})\n"
)

train_loop_config = {
"fail_iters": [3, 6, 8],
"time_per_iter": 1.0,
"num_iterations": TestConstants.NUM_ITERATIONS,
"custom_save_fn": custom_save_fn,
"custom_restore_fn": custom_restore_fn,
"num_to_keep": num_to_keep,
}
scaling_config = train.ScalingConfig(
num_workers=TestConstants.NUM_WORKERS,
resources_per_worker={"CPU": TestConstants.NUM_CPUS_PER_WORKER},
)
run_config = train.RunConfig(
failure_config=train.FailureConfig(max_failures=2),
name=exp_name,
storage_path=storage_path,
storage_filesystem=storage_filesystem,
checkpoint_config=checkpoint_config,
)
if not is_v2_enabled():
train_loop_config["in_trainer"] = True
scaling_config.trainer_resources = {"CPU": 0}
run_config.sync_config = train.SyncConfig(sync_artifacts=True)
trainer = TorchTrainer(
train_fn,
train_loop_config={
"in_trainer": True,
"fail_iters": [3, 6, 8],
"time_per_iter": 1.0,
"num_iterations": TestConstants.NUM_ITERATIONS,
"custom_save_fn": custom_save_fn,
"custom_restore_fn": custom_restore_fn,
},
scaling_config=train.ScalingConfig(
num_workers=TestConstants.NUM_WORKERS,
trainer_resources={"CPU": 0},
resources_per_worker={"CPU": TestConstants.NUM_CPUS_PER_WORKER},
),
run_config=train.RunConfig(
failure_config=train.FailureConfig(max_failures=2),
name=exp_name,
storage_path=storage_path,
storage_filesystem=storage_filesystem,
checkpoint_config=checkpoint_config,
sync_config=train.SyncConfig(sync_artifacts=True),
),
train_loop_config=train_loop_config,
scaling_config=scaling_config,
run_config=run_config,
)
print("\nStarting initial run.\n")
with pytest.raises(TrainingFailedError):
result = trainer.fit()
if is_v2_enabled():
with pytest.raises(WorkerGroupError):
trainer.fit()
else:
with pytest.raises(TrainingFailedError):
result = trainer.fit()

print("\nStarting manually restored run.\n")
restored_trainer = TorchTrainer.restore(
path=str(URI(storage_path) / exp_name),
storage_filesystem=storage_filesystem,
)
if is_v2_enabled():
restored_trainer = TorchTrainer(
train_fn,
train_loop_config=train_loop_config,
scaling_config=scaling_config,
run_config=run_config,
)
else:
restored_trainer = TorchTrainer.restore(
path=str(URI(storage_path) / exp_name),
storage_filesystem=storage_filesystem,
)
result = restored_trainer.fit()
print(result)

Expand All @@ -268,22 +291,31 @@ def test_trainer(root_path_storage_filesystem_label, tmp_path, monkeypatch):
else:
raise NotImplementedError(f"Invalid storage type: {label}")

_assert_storage_contents(
local_inspect_dir,
exp_name,
checkpoint_config,
"TorchTrainer",
test_trainer=True,
constants=TestConstants,
)
if is_v2_enabled():
_assert_storage_contents(
local_inspect_dir,
exp_name,
checkpoint_config,
constants=TestConstants,
)
else:
_assert_storage_contents(
local_inspect_dir,
exp_name,
checkpoint_config,
"TorchTrainer",
test_trainer=True,
constants=TestConstants,
)

# Test `resume_from_checkpoint`
_resume_from_checkpoint(
result.checkpoint,
expected_state={"iter": TestConstants.NUM_ITERATIONS - 1},
storage_path=storage_path,
storage_filesystem=storage_filesystem,
)
if not is_v2_enabled():
_resume_from_checkpoint(
result.checkpoint,
expected_state={"iter": TestConstants.NUM_ITERATIONS - 1},
storage_path=storage_path,
storage_filesystem=storage_filesystem,
)

# Upload checkpoint save and restore timing release test metrics
all_checkpoint_timing_metrics = collections.defaultdict(list)
Expand Down Expand Up @@ -328,54 +360,67 @@ def test_no_storage_error(tmp_path, monkeypatch):
w/ no persistent storage configured."""
ray.init(runtime_env={"working_dir": "."}, ignore_reinit_error=True)

train_loop_config = {
"time_per_iter": 1.0,
"num_iterations": TestConstants.NUM_ITERATIONS,
}
scaling_config = train.ScalingConfig(
num_workers=TestConstants.NUM_WORKERS,
resources_per_worker={"CPU": TestConstants.NUM_CPUS_PER_WORKER},
)
if not is_v2_enabled():
train_loop_config["in_trainer"] = True
scaling_config.trainer_resources = {"CPU": 0}
trainer = TorchTrainer(
train_fn,
train_loop_config={
"in_trainer": True,
"time_per_iter": 1.0,
"num_iterations": TestConstants.NUM_ITERATIONS,
},
scaling_config=train.ScalingConfig(
num_workers=TestConstants.NUM_WORKERS,
trainer_resources={"CPU": 0},
resources_per_worker={"CPU": TestConstants.NUM_CPUS_PER_WORKER},
),
train_loop_config=train_loop_config,
scaling_config=scaling_config,
run_config=train.RunConfig(name="test_trainer", storage_path=None),
)
with pytest.raises(TrainingFailedError):
trainer.fit()
if is_v2_enabled():
with pytest.raises(WorkerGroupError):
trainer.fit()
else:
with pytest.raises(TrainingFailedError):
trainer.fit()


def test_no_storage_no_checkpoints(tmp_path, monkeypatch):
"""Tests that it's ok to run multi-node with no persistent storage
if you never report checkpoints."""
ray.init(runtime_env={"working_dir": "."}, ignore_reinit_error=True)

train_loop_config = {
"time_per_iter": 1.0,
"num_iterations": TestConstants.NUM_ITERATIONS,
# Don't report any checkpoints
"no_checkpoint_ranks": list(range(TestConstants.NUM_WORKERS)),
}
scaling_config = train.ScalingConfig(
num_workers=TestConstants.NUM_WORKERS,
resources_per_worker={"CPU": TestConstants.NUM_CPUS_PER_WORKER},
)
run_config = train.RunConfig(
failure_config=train.FailureConfig(max_failures=2),
name="test_trainer",
storage_path=None,
)
if not is_v2_enabled():
train_loop_config["in_trainer"] = True
scaling_config.trainer_resources = {"CPU": 0}
run_config.sync_config = train.SyncConfig(sync_artifacts=True)
trainer = TorchTrainer(
train_fn,
train_loop_config={
"in_trainer": True,
"time_per_iter": 1.0,
"num_iterations": TestConstants.NUM_ITERATIONS,
# Don't report any checkpoints
"no_checkpoint_ranks": list(range(TestConstants.NUM_WORKERS)),
},
scaling_config=train.ScalingConfig(
num_workers=TestConstants.NUM_WORKERS,
trainer_resources={"CPU": 0},
resources_per_worker={"CPU": TestConstants.NUM_CPUS_PER_WORKER},
),
run_config=train.RunConfig(
failure_config=train.FailureConfig(max_failures=2),
name="test_trainer",
storage_path=None,
sync_config=train.SyncConfig(sync_artifacts=True),
),
train_loop_config=train_loop_config,
scaling_config=scaling_config,
run_config=run_config,
)
result = trainer.fit()

assert result.metrics[TRAINING_ITERATION] == TestConstants.NUM_ITERATIONS
assert len(result.metrics_dataframe) == TestConstants.NUM_ITERATIONS
# v2 does not support free floating metrics
if not is_v2_enabled():
assert result.metrics[TRAINING_ITERATION] == TestConstants.NUM_ITERATIONS
assert len(result.metrics_dataframe) == TestConstants.NUM_ITERATIONS


if __name__ == "__main__":
Expand Down