[Data] Add Checkpointing to Ray Data#59409
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces a robust checkpointing mechanism to Ray Data, a significant feature for ensuring fault tolerance and efficient recovery in data pipelines. The implementation is comprehensive, touching upon planning, execution, configuration, and testing. Key additions include a CheckpointConfig for setup, dynamic plan adjustments in the Planner to inject checkpointing logic, and specific operators for filtering processed rows and writing checkpoint data. The use of an ExecutionCallback for managing the checkpoint lifecycle is a clean approach. The code is well-structured, and the tests are thorough, covering various success and failure scenarios. I have a few suggestions to enhance maintainability, improve memory efficiency, and make the configuration handling more robust. Overall, this is a very strong and valuable addition to Ray Data.
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
6e66b8f to
b7da48d
Compare
|
good feature. exactly what i need |
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
raulchen
left a comment
There was a problem hiding this comment.
stamping. already reviewed offline
## Description
Add checkpointing for data pipeline, currently only support pipeline
starts with `Read` and end ends with `Write`.
Example Script:
```py
import ray
import pandas as pd
from ray.data.checkpoint import CheckpointConfig
# Setup paths
base_dir = "/tmp/ray_checkpoint_demo"
input_path = os.path.join(base_dir, "input")
output_path = os.path.join(base_dir, "output")
checkpoint_path = os.path.join(base_dir, "checkpoint")
# Create sample data (10 rows with unique IDs)
df = pd.DataFrame({"id": range(10), "value": [f"row_{i}" for i in range(10)]})
df.to_parquet(os.path.join(input_path, "data.parquet"), index=False)
print(f"Created 10 rows of sample data")
def run_pipeline(fail_on_id_gt=None):
"""Run pipeline with optional simulated failure."""
ctx = ray.data.DataContext.get_current()
ctx.checkpoint_config = CheckpointConfig(
id_column="id",
checkpoint_path=checkpoint_path,
delete_checkpoint_on_success=False,
)
def process_batch(batch):
if fail_on_id_gt is not None and max(batch["id"]) > fail_on_id_gt:
raise RuntimeError(f"Simulated failure at id > {fail_on_id_gt}")
if fail_on_id_gt is not None:
batch["info"] = ["checkpointed from first run"] * len(batch["id"])
else:
batch["info"] = ["not checkpointed from first run"] * len(batch["id"])
return batch
ds = ray.data.read_parquet(input_path)
ds = ds.map_batches(process_batch, batch_size=5)
ds.write_parquet(output_path)
# Run 1: Fail after processing some rows
print("\n=== Run 1: Pipeline with simulated failure ===")
try:
run_pipeline(fail_on_id_gt=5)
except Exception as e:
print(f"Failed as expected: {e}")
# Run 2: Resume from checkpoint
print("\n=== Run 2: Resume from checkpoint ===")
run_pipeline(fail_on_id_gt=None) # No failure
# Verify results
print("\n=== Results ===")
result = ray.data.read_parquet(output_path)
print(f"Total rows in output: {result.count()}")
print(f"Result: {result.take_all()}")
```
```
{'id': 0, 'value': 'row_0', 'info': 'checkpointed from first run'}
{'id': 1, 'value': 'row_1', 'info': 'checkpointed from first run'}
{'id': 2, 'value': 'row_2', 'info': 'checkpointed from first run'}
{'id': 3, 'value': 'row_3', 'info': 'checkpointed from first run'}
{'id': 4, 'value': 'row_4', 'info': 'checkpointed from first run'}
{'id': 5, 'value': 'row_5', 'info': 'not checkpointed from first run'}
{'id': 6, 'value': 'row_6', 'info': 'not checkpointed from first run'}
{'id': 7, 'value': 'row_7', 'info': 'not checkpointed from first run'}
{'id': 8, 'value': 'row_8', 'info': 'not checkpointed from first run'}
{'id': 9, 'value': 'row_9', 'info': 'not checkpointed from first run'}
```
## Related issues
Closes ray-project#55008
## Additional information
---------
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: jasonwrwang <jasonwrwang@tencent.com>
## Description
Add checkpointing for data pipeline, currently only support pipeline
starts with `Read` and end ends with `Write`.
Example Script:
```py
import ray
import pandas as pd
from ray.data.checkpoint import CheckpointConfig
# Setup paths
base_dir = "/tmp/ray_checkpoint_demo"
input_path = os.path.join(base_dir, "input")
output_path = os.path.join(base_dir, "output")
checkpoint_path = os.path.join(base_dir, "checkpoint")
# Create sample data (10 rows with unique IDs)
df = pd.DataFrame({"id": range(10), "value": [f"row_{i}" for i in range(10)]})
df.to_parquet(os.path.join(input_path, "data.parquet"), index=False)
print(f"Created 10 rows of sample data")
def run_pipeline(fail_on_id_gt=None):
"""Run pipeline with optional simulated failure."""
ctx = ray.data.DataContext.get_current()
ctx.checkpoint_config = CheckpointConfig(
id_column="id",
checkpoint_path=checkpoint_path,
delete_checkpoint_on_success=False,
)
def process_batch(batch):
if fail_on_id_gt is not None and max(batch["id"]) > fail_on_id_gt:
raise RuntimeError(f"Simulated failure at id > {fail_on_id_gt}")
if fail_on_id_gt is not None:
batch["info"] = ["checkpointed from first run"] * len(batch["id"])
else:
batch["info"] = ["not checkpointed from first run"] * len(batch["id"])
return batch
ds = ray.data.read_parquet(input_path)
ds = ds.map_batches(process_batch, batch_size=5)
ds.write_parquet(output_path)
# Run 1: Fail after processing some rows
print("\n=== Run 1: Pipeline with simulated failure ===")
try:
run_pipeline(fail_on_id_gt=5)
except Exception as e:
print(f"Failed as expected: {e}")
# Run 2: Resume from checkpoint
print("\n=== Run 2: Resume from checkpoint ===")
run_pipeline(fail_on_id_gt=None) # No failure
# Verify results
print("\n=== Results ===")
result = ray.data.read_parquet(output_path)
print(f"Total rows in output: {result.count()}")
print(f"Result: {result.take_all()}")
```
```
{'id': 0, 'value': 'row_0', 'info': 'checkpointed from first run'}
{'id': 1, 'value': 'row_1', 'info': 'checkpointed from first run'}
{'id': 2, 'value': 'row_2', 'info': 'checkpointed from first run'}
{'id': 3, 'value': 'row_3', 'info': 'checkpointed from first run'}
{'id': 4, 'value': 'row_4', 'info': 'checkpointed from first run'}
{'id': 5, 'value': 'row_5', 'info': 'not checkpointed from first run'}
{'id': 6, 'value': 'row_6', 'info': 'not checkpointed from first run'}
{'id': 7, 'value': 'row_7', 'info': 'not checkpointed from first run'}
{'id': 8, 'value': 'row_8', 'info': 'not checkpointed from first run'}
{'id': 9, 'value': 'row_9', 'info': 'not checkpointed from first run'}
```
## Related issues
Closes ray-project#55008
## Additional information
---------
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
## Description
Add checkpointing for data pipeline, currently only support pipeline
starts with `Read` and end ends with `Write`.
Example Script:
```py
import ray
import pandas as pd
from ray.data.checkpoint import CheckpointConfig
# Setup paths
base_dir = "/tmp/ray_checkpoint_demo"
input_path = os.path.join(base_dir, "input")
output_path = os.path.join(base_dir, "output")
checkpoint_path = os.path.join(base_dir, "checkpoint")
# Create sample data (10 rows with unique IDs)
df = pd.DataFrame({"id": range(10), "value": [f"row_{i}" for i in range(10)]})
df.to_parquet(os.path.join(input_path, "data.parquet"), index=False)
print(f"Created 10 rows of sample data")
def run_pipeline(fail_on_id_gt=None):
"""Run pipeline with optional simulated failure."""
ctx = ray.data.DataContext.get_current()
ctx.checkpoint_config = CheckpointConfig(
id_column="id",
checkpoint_path=checkpoint_path,
delete_checkpoint_on_success=False,
)
def process_batch(batch):
if fail_on_id_gt is not None and max(batch["id"]) > fail_on_id_gt:
raise RuntimeError(f"Simulated failure at id > {fail_on_id_gt}")
if fail_on_id_gt is not None:
batch["info"] = ["checkpointed from first run"] * len(batch["id"])
else:
batch["info"] = ["not checkpointed from first run"] * len(batch["id"])
return batch
ds = ray.data.read_parquet(input_path)
ds = ds.map_batches(process_batch, batch_size=5)
ds.write_parquet(output_path)
# Run 1: Fail after processing some rows
print("\n=== Run 1: Pipeline with simulated failure ===")
try:
run_pipeline(fail_on_id_gt=5)
except Exception as e:
print(f"Failed as expected: {e}")
# Run 2: Resume from checkpoint
print("\n=== Run 2: Resume from checkpoint ===")
run_pipeline(fail_on_id_gt=None) # No failure
# Verify results
print("\n=== Results ===")
result = ray.data.read_parquet(output_path)
print(f"Total rows in output: {result.count()}")
print(f"Result: {result.take_all()}")
```
```
{'id': 0, 'value': 'row_0', 'info': 'checkpointed from first run'}
{'id': 1, 'value': 'row_1', 'info': 'checkpointed from first run'}
{'id': 2, 'value': 'row_2', 'info': 'checkpointed from first run'}
{'id': 3, 'value': 'row_3', 'info': 'checkpointed from first run'}
{'id': 4, 'value': 'row_4', 'info': 'checkpointed from first run'}
{'id': 5, 'value': 'row_5', 'info': 'not checkpointed from first run'}
{'id': 6, 'value': 'row_6', 'info': 'not checkpointed from first run'}
{'id': 7, 'value': 'row_7', 'info': 'not checkpointed from first run'}
{'id': 8, 'value': 'row_8', 'info': 'not checkpointed from first run'}
{'id': 9, 'value': 'row_9', 'info': 'not checkpointed from first run'}
```
## Related issues
Closes ray-project#55008
## Additional information
---------
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
This PR documents Ray Data job-level checkpointing functionality added in #59409. Adds documentation explaining job-level checkpointing and its application to offline batch inference, including configuration examples. **Sections modified:** - Execution Configurations - End-to-end: Offline Batch Inference Supersedes #60289 Fixes #60250 --------- Signed-off-by: Abhishek Kumar <anonyomoushunter@gmail.com> Signed-off-by: “Alex <alexchien130@gmail.com> Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu> Co-authored-by: Abhishek Kumar <anonyomoushunter@gmail.com> Co-authored-by: Balaji Veeramani <bveeramani@berkeley.edu> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Co-authored-by: Balaji Veeramani <balaji@anyscale.com>
…0921) This PR documents Ray Data job-level checkpointing functionality added in ray-project#59409. Adds documentation explaining job-level checkpointing and its application to offline batch inference, including configuration examples. **Sections modified:** - Execution Configurations - End-to-end: Offline Batch Inference Supersedes ray-project#60289 Fixes ray-project#60250 --------- Signed-off-by: Abhishek Kumar <anonyomoushunter@gmail.com> Signed-off-by: “Alex <alexchien130@gmail.com> Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu> Co-authored-by: Abhishek Kumar <anonyomoushunter@gmail.com> Co-authored-by: Balaji Veeramani <bveeramani@berkeley.edu> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Co-authored-by: Balaji Veeramani <balaji@anyscale.com> Signed-off-by: Ondrej Prenek <ondra.prenek@gmail.com>
…0921) This PR documents Ray Data job-level checkpointing functionality added in ray-project#59409. Adds documentation explaining job-level checkpointing and its application to offline batch inference, including configuration examples. **Sections modified:** - Execution Configurations - End-to-end: Offline Batch Inference Supersedes ray-project#60289 Fixes ray-project#60250 --------- Signed-off-by: Abhishek Kumar <anonyomoushunter@gmail.com> Signed-off-by: “Alex <alexchien130@gmail.com> Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu> Co-authored-by: Abhishek Kumar <anonyomoushunter@gmail.com> Co-authored-by: Balaji Veeramani <bveeramani@berkeley.edu> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Co-authored-by: Balaji Veeramani <balaji@anyscale.com>
…0921) This PR documents Ray Data job-level checkpointing functionality added in ray-project#59409. Adds documentation explaining job-level checkpointing and its application to offline batch inference, including configuration examples. **Sections modified:** - Execution Configurations - End-to-end: Offline Batch Inference Supersedes ray-project#60289 Fixes ray-project#60250 --------- Signed-off-by: Abhishek Kumar <anonyomoushunter@gmail.com> Signed-off-by: “Alex <alexchien130@gmail.com> Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu> Co-authored-by: Abhishek Kumar <anonyomoushunter@gmail.com> Co-authored-by: Balaji Veeramani <bveeramani@berkeley.edu> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Co-authored-by: Balaji Veeramani <balaji@anyscale.com>
…0921) This PR documents Ray Data job-level checkpointing functionality added in ray-project#59409. Adds documentation explaining job-level checkpointing and its application to offline batch inference, including configuration examples. **Sections modified:** - Execution Configurations - End-to-end: Offline Batch Inference Supersedes ray-project#60289 Fixes ray-project#60250 --------- Signed-off-by: Abhishek Kumar <anonyomoushunter@gmail.com> Signed-off-by: “Alex <alexchien130@gmail.com> Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu> Co-authored-by: Abhishek Kumar <anonyomoushunter@gmail.com> Co-authored-by: Balaji Veeramani <bveeramani@berkeley.edu> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Co-authored-by: Balaji Veeramani <balaji@anyscale.com> Signed-off-by: Muhammad Saif <2024BBIT200@student.Uet.edu.pk>
…0921) This PR documents Ray Data job-level checkpointing functionality added in ray-project#59409. Adds documentation explaining job-level checkpointing and its application to offline batch inference, including configuration examples. **Sections modified:** - Execution Configurations - End-to-end: Offline Batch Inference Supersedes ray-project#60289 Fixes ray-project#60250 --------- Signed-off-by: Abhishek Kumar <anonyomoushunter@gmail.com> Signed-off-by: “Alex <alexchien130@gmail.com> Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu> Co-authored-by: Abhishek Kumar <anonyomoushunter@gmail.com> Co-authored-by: Balaji Veeramani <bveeramani@berkeley.edu> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Co-authored-by: Balaji Veeramani <balaji@anyscale.com>
…0921) This PR documents Ray Data job-level checkpointing functionality added in ray-project#59409. Adds documentation explaining job-level checkpointing and its application to offline batch inference, including configuration examples. **Sections modified:** - Execution Configurations - End-to-end: Offline Batch Inference Supersedes ray-project#60289 Fixes ray-project#60250 --------- Signed-off-by: Abhishek Kumar <anonyomoushunter@gmail.com> Signed-off-by: “Alex <alexchien130@gmail.com> Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu> Co-authored-by: Abhishek Kumar <anonyomoushunter@gmail.com> Co-authored-by: Balaji Veeramani <bveeramani@berkeley.edu> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Co-authored-by: Balaji Veeramani <balaji@anyscale.com> Signed-off-by: Adel Nour <ans9868@nyu.edu>
…0921) This PR documents Ray Data job-level checkpointing functionality added in ray-project#59409. Adds documentation explaining job-level checkpointing and its application to offline batch inference, including configuration examples. **Sections modified:** - Execution Configurations - End-to-end: Offline Batch Inference Supersedes ray-project#60289 Fixes ray-project#60250 --------- Signed-off-by: Abhishek Kumar <anonyomoushunter@gmail.com> Signed-off-by: “Alex <alexchien130@gmail.com> Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu> Co-authored-by: Abhishek Kumar <anonyomoushunter@gmail.com> Co-authored-by: Balaji Veeramani <bveeramani@berkeley.edu> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Co-authored-by: Balaji Veeramani <balaji@anyscale.com>
## Description
Add checkpointing for data pipeline, currently only support pipeline
starts with `Read` and end ends with `Write`.
Example Script:
```py
import ray
import pandas as pd
from ray.data.checkpoint import CheckpointConfig
# Setup paths
base_dir = "/tmp/ray_checkpoint_demo"
input_path = os.path.join(base_dir, "input")
output_path = os.path.join(base_dir, "output")
checkpoint_path = os.path.join(base_dir, "checkpoint")
# Create sample data (10 rows with unique IDs)
df = pd.DataFrame({"id": range(10), "value": [f"row_{i}" for i in range(10)]})
df.to_parquet(os.path.join(input_path, "data.parquet"), index=False)
print(f"Created 10 rows of sample data")
def run_pipeline(fail_on_id_gt=None):
"""Run pipeline with optional simulated failure."""
ctx = ray.data.DataContext.get_current()
ctx.checkpoint_config = CheckpointConfig(
id_column="id",
checkpoint_path=checkpoint_path,
delete_checkpoint_on_success=False,
)
def process_batch(batch):
if fail_on_id_gt is not None and max(batch["id"]) > fail_on_id_gt:
raise RuntimeError(f"Simulated failure at id > {fail_on_id_gt}")
if fail_on_id_gt is not None:
batch["info"] = ["checkpointed from first run"] * len(batch["id"])
else:
batch["info"] = ["not checkpointed from first run"] * len(batch["id"])
return batch
ds = ray.data.read_parquet(input_path)
ds = ds.map_batches(process_batch, batch_size=5)
ds.write_parquet(output_path)
# Run 1: Fail after processing some rows
print("\n=== Run 1: Pipeline with simulated failure ===")
try:
run_pipeline(fail_on_id_gt=5)
except Exception as e:
print(f"Failed as expected: {e}")
# Run 2: Resume from checkpoint
print("\n=== Run 2: Resume from checkpoint ===")
run_pipeline(fail_on_id_gt=None) # No failure
# Verify results
print("\n=== Results ===")
result = ray.data.read_parquet(output_path)
print(f"Total rows in output: {result.count()}")
print(f"Result: {result.take_all()}")
```
```
{'id': 0, 'value': 'row_0', 'info': 'checkpointed from first run'}
{'id': 1, 'value': 'row_1', 'info': 'checkpointed from first run'}
{'id': 2, 'value': 'row_2', 'info': 'checkpointed from first run'}
{'id': 3, 'value': 'row_3', 'info': 'checkpointed from first run'}
{'id': 4, 'value': 'row_4', 'info': 'checkpointed from first run'}
{'id': 5, 'value': 'row_5', 'info': 'not checkpointed from first run'}
{'id': 6, 'value': 'row_6', 'info': 'not checkpointed from first run'}
{'id': 7, 'value': 'row_7', 'info': 'not checkpointed from first run'}
{'id': 8, 'value': 'row_8', 'info': 'not checkpointed from first run'}
{'id': 9, 'value': 'row_9', 'info': 'not checkpointed from first run'}
```
## Related issues
Closes ray-project#55008
## Additional information
---------
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
…0921) This PR documents Ray Data job-level checkpointing functionality added in ray-project#59409. Adds documentation explaining job-level checkpointing and its application to offline batch inference, including configuration examples. **Sections modified:** - Execution Configurations - End-to-end: Offline Batch Inference Supersedes ray-project#60289 Fixes ray-project#60250 --------- Signed-off-by: Abhishek Kumar <anonyomoushunter@gmail.com> Signed-off-by: “Alex <alexchien130@gmail.com> Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu> Co-authored-by: Abhishek Kumar <anonyomoushunter@gmail.com> Co-authored-by: Balaji Veeramani <bveeramani@berkeley.edu> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Co-authored-by: Balaji Veeramani <balaji@anyscale.com> Signed-off-by: peterxcli <peterxcli@gmail.com>
## Description
Add checkpointing for data pipeline, currently only support pipeline
starts with `Read` and end ends with `Write`.
Example Script:
```py
import ray
import pandas as pd
from ray.data.checkpoint import CheckpointConfig
# Setup paths
base_dir = "/tmp/ray_checkpoint_demo"
input_path = os.path.join(base_dir, "input")
output_path = os.path.join(base_dir, "output")
checkpoint_path = os.path.join(base_dir, "checkpoint")
# Create sample data (10 rows with unique IDs)
df = pd.DataFrame({"id": range(10), "value": [f"row_{i}" for i in range(10)]})
df.to_parquet(os.path.join(input_path, "data.parquet"), index=False)
print(f"Created 10 rows of sample data")
def run_pipeline(fail_on_id_gt=None):
"""Run pipeline with optional simulated failure."""
ctx = ray.data.DataContext.get_current()
ctx.checkpoint_config = CheckpointConfig(
id_column="id",
checkpoint_path=checkpoint_path,
delete_checkpoint_on_success=False,
)
def process_batch(batch):
if fail_on_id_gt is not None and max(batch["id"]) > fail_on_id_gt:
raise RuntimeError(f"Simulated failure at id > {fail_on_id_gt}")
if fail_on_id_gt is not None:
batch["info"] = ["checkpointed from first run"] * len(batch["id"])
else:
batch["info"] = ["not checkpointed from first run"] * len(batch["id"])
return batch
ds = ray.data.read_parquet(input_path)
ds = ds.map_batches(process_batch, batch_size=5)
ds.write_parquet(output_path)
# Run 1: Fail after processing some rows
print("\n=== Run 1: Pipeline with simulated failure ===")
try:
run_pipeline(fail_on_id_gt=5)
except Exception as e:
print(f"Failed as expected: {e}")
# Run 2: Resume from checkpoint
print("\n=== Run 2: Resume from checkpoint ===")
run_pipeline(fail_on_id_gt=None) # No failure
# Verify results
print("\n=== Results ===")
result = ray.data.read_parquet(output_path)
print(f"Total rows in output: {result.count()}")
print(f"Result: {result.take_all()}")
```
```
{'id': 0, 'value': 'row_0', 'info': 'checkpointed from first run'}
{'id': 1, 'value': 'row_1', 'info': 'checkpointed from first run'}
{'id': 2, 'value': 'row_2', 'info': 'checkpointed from first run'}
{'id': 3, 'value': 'row_3', 'info': 'checkpointed from first run'}
{'id': 4, 'value': 'row_4', 'info': 'checkpointed from first run'}
{'id': 5, 'value': 'row_5', 'info': 'not checkpointed from first run'}
{'id': 6, 'value': 'row_6', 'info': 'not checkpointed from first run'}
{'id': 7, 'value': 'row_7', 'info': 'not checkpointed from first run'}
{'id': 8, 'value': 'row_8', 'info': 'not checkpointed from first run'}
{'id': 9, 'value': 'row_9', 'info': 'not checkpointed from first run'}
```
## Related issues
Closes ray-project#55008
## Additional information
---------
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
…0921) This PR documents Ray Data job-level checkpointing functionality added in ray-project#59409. Adds documentation explaining job-level checkpointing and its application to offline batch inference, including configuration examples. **Sections modified:** - Execution Configurations - End-to-end: Offline Batch Inference Supersedes ray-project#60289 Fixes ray-project#60250 --------- Signed-off-by: Abhishek Kumar <anonyomoushunter@gmail.com> Signed-off-by: “Alex <alexchien130@gmail.com> Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu> Co-authored-by: Abhishek Kumar <anonyomoushunter@gmail.com> Co-authored-by: Balaji Veeramani <bveeramani@berkeley.edu> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Co-authored-by: Balaji Veeramani <balaji@anyscale.com> Signed-off-by: peterxcli <peterxcli@gmail.com>
Description
Add checkpointing for data pipeline, currently only support pipeline starts with
Readand end ends withWrite.Example Script:
Related issues
Closes #55008
Additional information