Skip to content

[train][checkpoint] Add checkpoint_upload_function to ray.train.report#56208

Merged
justinvyu merged 14 commits intoray-project:masterfrom
TimothySeah:tseah/report-checkpoint-upload-function
Oct 1, 2025
Merged

[train][checkpoint] Add checkpoint_upload_function to ray.train.report#56208
justinvyu merged 14 commits intoray-project:masterfrom
TimothySeah:tseah/report-checkpoint-upload-function

Conversation

@TimothySeah
Copy link
Contributor

@TimothySeah TimothySeah commented Sep 3, 2025

Summary

After #55637, ray.train.report will allow users to upload checkpoints from disk to remote storage asynchronously.

If they want to use framework-specific async checkpointing like torch.async_save, they can manage torch.async_save themselves and then call ray.train.report(…checkpoint_upload_mode=CheckpointUploadMode.NO_UPLOAD).

However, it would also be nice to allow ray.train.report to handle rate limiting and report ordering for framework-specific async checkpointing as well. This PR achieves this by exposing a checkpoint_upload_function argument that can replace the persist_current_checkpoint call.

Tests

Tested with torch.distributed's async_save in a workspace. The code looks like

storage_context = ray.train.get_context().get_storage()
checkpoint_path = f"s3://{storage_context.build_checkpoint_path_from_name(str(epoch))}"
storage_writer = S3StorageWriter(region='us-west-2', path=checkpoint_path)
model_dict, opt_dict = get_state_dict(model=model, optimizers=optimizer)
ckpt_ref = async_save(
    {"model": model_dict, "opt": opt_dict},
    storage_writer=storage_writer,
)

def wait_async_save(checkpoint, checkpoint_dir):
    t2 = time.time()
    ckpt_ref.result()
    return checkpoint

checkpoint = Checkpoint(checkpoint_path)
ray.train.report(
    metrics={"loss": 10, "accuracy": 0.5},
    checkpoint=checkpoint,
    checkpoint_upload_mode=CheckpointUploadMode.ASYNC,
    checkpoint_upload_function=wait_async_save,
)

and correctly returned a Result object with 3 checkpoints and their associated metrics. Note that the checkpoint path must be inside the "experiment path" which is configured in the RunConfig and accessed through build_checkpoint_path_from_name.


Note

Adds an optional checkpoint_upload_fn to ray.train.report (and internal plumbing) to override checkpoint upload/persistence, with corresponding tests.

  • API:
    • ray.train.report(...) adds checkpoint_upload_fn to allow custom checkpoint upload logic.
  • Internal Execution:
    • Thread checkpoint_upload_fn through TrainContext.report and _upload_checkpoint; if provided, use it instead of StorageContext.persist_current_checkpoint.
    • Propagate parameter via TrainFnUtils (abstract, distributed, local) and public API wrappers.
  • Tests:
    • Add test_report_checkpoint_upload_fn validating custom upload function integrates with storage and result checkpoint loading.
    • Minor test adjustments (utilities imports, shutil usage).

Written by Cursor Bugbot for commit e21ce19. This will update automatically on new commits. Configure here.

@TimothySeah TimothySeah marked this pull request as ready for review September 5, 2025 23:46
@TimothySeah TimothySeah requested a review from a team as a code owner September 5, 2025 23:46
@ray-gardener ray-gardener bot added the train Ray Train Related Issue label Sep 6, 2025
@TimothySeah TimothySeah marked this pull request as draft September 9, 2025 21:33
Signed-off-by: Timothy Seah <tseah@anyscale.com>
Signed-off-by: Timothy Seah <tseah@anyscale.com>
@TimothySeah TimothySeah force-pushed the tseah/report-checkpoint-upload-function branch from 2b2e1e2 to c123ffd Compare September 15, 2025 23:21
Signed-off-by: Timothy Seah <tseah@anyscale.com>
@TimothySeah TimothySeah marked this pull request as ready for review September 16, 2025 23:19
Signed-off-by: Timothy Seah <tseah@anyscale.com>
cursor[bot]

This comment was marked as outdated.

Signed-off-by: Timothy Seah <tseah@anyscale.com>
Signed-off-by: Timothy Seah <tseah@anyscale.com>
This reverts commit c623310.

Signed-off-by: Timothy Seah <tseah@anyscale.com>
This reverts commit 76cf38e.

Signed-off-by: Timothy Seah <tseah@anyscale.com>
Signed-off-by: Timothy Seah <tseah@anyscale.com>
cursor[bot]

This comment was marked as outdated.

Signed-off-by: Timothy Seah <tseah@anyscale.com>
Signed-off-by: Timothy Seah <tseah@anyscale.com>
Copy link
Contributor

@justinvyu justinvyu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

API looks good to me now!

Signed-off-by: Timothy Seah <tseah@anyscale.com>
Copy link
Contributor

@justinvyu justinvyu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

📈

@justinvyu justinvyu enabled auto-merge (squash) October 1, 2025 21:48
@github-actions github-actions bot added the go add ONLY when ready to merge, run all tests label Oct 1, 2025
@justinvyu justinvyu merged commit f8465ee into ray-project:master Oct 1, 2025
8 checks passed
eicherseiji pushed a commit to eicherseiji/ray that referenced this pull request Oct 6, 2025
ray-project#56208)

After ray-project#55637, `ray.train.report`
will allow users to upload checkpoints from disk to remote storage
asynchronously.

If they want to use framework-specific async checkpointing like
`torch.async_save`, they can manage `torch.async_save` themselves and
then call
`ray.train.report(…checkpoint_upload_mode=CheckpointUploadMode.NO_UPLOAD)`.

However, it would also be nice to allow `ray.train.report` to handle
rate limiting and report ordering for framework-specific async
checkpointing as well. This PR achieves this by exposing a
`checkpoint_upload_function` argument that can replace the
`persist_current_checkpoint` call.

---------

Signed-off-by: Timothy Seah <tseah@anyscale.com>
Signed-off-by: Seiji Eicher <seiji@anyscale.com>
dstrodtman pushed a commit that referenced this pull request Oct 6, 2025
#56208)

After #55637, `ray.train.report`
will allow users to upload checkpoints from disk to remote storage
asynchronously.

If they want to use framework-specific async checkpointing like
`torch.async_save`, they can manage `torch.async_save` themselves and
then call
`ray.train.report(…checkpoint_upload_mode=CheckpointUploadMode.NO_UPLOAD)`.

However, it would also be nice to allow `ray.train.report` to handle
rate limiting and report ordering for framework-specific async
checkpointing as well. This PR achieves this by exposing a
`checkpoint_upload_function` argument that can replace the
`persist_current_checkpoint` call.

---------

Signed-off-by: Timothy Seah <tseah@anyscale.com>
Signed-off-by: Douglas Strodtman <douglas@anyscale.com>
liulehui pushed a commit to liulehui/ray that referenced this pull request Oct 9, 2025
ray-project#56208)

After ray-project#55637, `ray.train.report`
will allow users to upload checkpoints from disk to remote storage
asynchronously.

If they want to use framework-specific async checkpointing like
`torch.async_save`, they can manage `torch.async_save` themselves and
then call
`ray.train.report(…checkpoint_upload_mode=CheckpointUploadMode.NO_UPLOAD)`.

However, it would also be nice to allow `ray.train.report` to handle
rate limiting and report ordering for framework-specific async
checkpointing as well. This PR achieves this by exposing a
`checkpoint_upload_function` argument that can replace the
`persist_current_checkpoint` call.

---------

Signed-off-by: Timothy Seah <tseah@anyscale.com>
joshkodi pushed a commit to joshkodi/ray that referenced this pull request Oct 13, 2025
ray-project#56208)

After ray-project#55637, `ray.train.report`
will allow users to upload checkpoints from disk to remote storage
asynchronously.

If they want to use framework-specific async checkpointing like
`torch.async_save`, they can manage `torch.async_save` themselves and
then call
`ray.train.report(…checkpoint_upload_mode=CheckpointUploadMode.NO_UPLOAD)`.

However, it would also be nice to allow `ray.train.report` to handle
rate limiting and report ordering for framework-specific async
checkpointing as well. This PR achieves this by exposing a
`checkpoint_upload_function` argument that can replace the
`persist_current_checkpoint` call.

---------

Signed-off-by: Timothy Seah <tseah@anyscale.com>
Signed-off-by: Josh Kodi <joshkodi@gmail.com>
justinyeh1995 pushed a commit to justinyeh1995/ray that referenced this pull request Oct 20, 2025
ray-project#56208)

After ray-project#55637, `ray.train.report`
will allow users to upload checkpoints from disk to remote storage
asynchronously.

If they want to use framework-specific async checkpointing like
`torch.async_save`, they can manage `torch.async_save` themselves and
then call
`ray.train.report(…checkpoint_upload_mode=CheckpointUploadMode.NO_UPLOAD)`.

However, it would also be nice to allow `ray.train.report` to handle
rate limiting and report ordering for framework-specific async
checkpointing as well. This PR achieves this by exposing a
`checkpoint_upload_function` argument that can replace the
`persist_current_checkpoint` call.

---------

Signed-off-by: Timothy Seah <tseah@anyscale.com>
landscapepainter pushed a commit to landscapepainter/ray that referenced this pull request Nov 17, 2025
ray-project#56208)

After ray-project#55637, `ray.train.report`
will allow users to upload checkpoints from disk to remote storage
asynchronously.

If they want to use framework-specific async checkpointing like
`torch.async_save`, they can manage `torch.async_save` themselves and
then call
`ray.train.report(…checkpoint_upload_mode=CheckpointUploadMode.NO_UPLOAD)`.

However, it would also be nice to allow `ray.train.report` to handle
rate limiting and report ordering for framework-specific async
checkpointing as well. This PR achieves this by exposing a
`checkpoint_upload_function` argument that can replace the
`persist_current_checkpoint` call.

---------

Signed-off-by: Timothy Seah <tseah@anyscale.com>
Aydin-ab pushed a commit to Aydin-ab/ray-aydin that referenced this pull request Nov 19, 2025
ray-project#56208)

After ray-project#55637, `ray.train.report`
will allow users to upload checkpoints from disk to remote storage
asynchronously.

If they want to use framework-specific async checkpointing like
`torch.async_save`, they can manage `torch.async_save` themselves and
then call
`ray.train.report(…checkpoint_upload_mode=CheckpointUploadMode.NO_UPLOAD)`.

However, it would also be nice to allow `ray.train.report` to handle
rate limiting and report ordering for framework-specific async
checkpointing as well. This PR achieves this by exposing a
`checkpoint_upload_function` argument that can replace the
`persist_current_checkpoint` call.

---------

Signed-off-by: Timothy Seah <tseah@anyscale.com>
Signed-off-by: Aydin Abiar <aydin@anyscale.com>
Future-Outlier pushed a commit to Future-Outlier/ray that referenced this pull request Dec 7, 2025
ray-project#56208)

After ray-project#55637, `ray.train.report`
will allow users to upload checkpoints from disk to remote storage
asynchronously.

If they want to use framework-specific async checkpointing like
`torch.async_save`, they can manage `torch.async_save` themselves and
then call
`ray.train.report(…checkpoint_upload_mode=CheckpointUploadMode.NO_UPLOAD)`.

However, it would also be nice to allow `ray.train.report` to handle
rate limiting and report ordering for framework-specific async
checkpointing as well. This PR achieves this by exposing a
`checkpoint_upload_function` argument that can replace the
`persist_current_checkpoint` call.

---------

Signed-off-by: Timothy Seah <tseah@anyscale.com>
Signed-off-by: Future-Outlier <eric901201@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

go add ONLY when ready to merge, run all tests train Ray Train Related Issue

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants