Skip to content

[Data] Add Checkpointing to Ray Data#59409

Merged
raulchen merged 8 commits intoray-project:masterfrom
owenowenisme:data/add-checkpoint-in-ray-data
Jan 2, 2026
Merged

[Data] Add Checkpointing to Ray Data#59409
raulchen merged 8 commits intoray-project:masterfrom
owenowenisme:data/add-checkpoint-in-ray-data

Conversation

@owenowenisme
Copy link
Member

@owenowenisme owenowenisme commented Dec 12, 2025

Description

Add checkpointing for data pipeline, currently only support pipeline starts with Read and end ends with Write.

Example Script:

import ray
import pandas as pd
from ray.data.checkpoint import CheckpointConfig

# Setup paths
base_dir = "/tmp/ray_checkpoint_demo"
input_path = os.path.join(base_dir, "input")
output_path = os.path.join(base_dir, "output")
checkpoint_path = os.path.join(base_dir, "checkpoint")

# Create sample data (10 rows with unique IDs)
df = pd.DataFrame({"id": range(10), "value": [f"row_{i}" for i in range(10)]})
df.to_parquet(os.path.join(input_path, "data.parquet"), index=False)
print(f"Created 10 rows of sample data")

def run_pipeline(fail_on_id_gt=None):
    """Run pipeline with optional simulated failure."""
    ctx = ray.data.DataContext.get_current()
    ctx.checkpoint_config = CheckpointConfig(
        id_column="id",
        checkpoint_path=checkpoint_path,
        delete_checkpoint_on_success=False,
    )

    def process_batch(batch):
        if fail_on_id_gt is not None and max(batch["id"]) > fail_on_id_gt:
            raise RuntimeError(f"Simulated failure at id > {fail_on_id_gt}")
        if fail_on_id_gt is not None:
            batch["info"] = ["checkpointed from first run"] * len(batch["id"])
        else:
            batch["info"] = ["not checkpointed from first run"] * len(batch["id"])
        return batch

    ds = ray.data.read_parquet(input_path)
    ds = ds.map_batches(process_batch, batch_size=5)
    ds.write_parquet(output_path)


# Run 1: Fail after processing some rows
print("\n=== Run 1: Pipeline with simulated failure ===")
try:
    run_pipeline(fail_on_id_gt=5)
except Exception as e:
    print(f"Failed as expected: {e}")

# Run 2: Resume from checkpoint
print("\n=== Run 2: Resume from checkpoint ===")
run_pipeline(fail_on_id_gt=None)  # No failure

# Verify results
print("\n=== Results ===")
result = ray.data.read_parquet(output_path)
print(f"Total rows in output: {result.count()}")
print(f"Result: {result.take_all()}")
{'id': 0, 'value': 'row_0', 'info': 'checkpointed from first run'}
{'id': 1, 'value': 'row_1', 'info': 'checkpointed from first run'}
{'id': 2, 'value': 'row_2', 'info': 'checkpointed from first run'}
{'id': 3, 'value': 'row_3', 'info': 'checkpointed from first run'}
{'id': 4, 'value': 'row_4', 'info': 'checkpointed from first run'}                                                                                  
{'id': 5, 'value': 'row_5', 'info': 'not checkpointed from first run'}
{'id': 6, 'value': 'row_6', 'info': 'not checkpointed from first run'}
{'id': 7, 'value': 'row_7', 'info': 'not checkpointed from first run'}
{'id': 8, 'value': 'row_8', 'info': 'not checkpointed from first run'}
{'id': 9, 'value': 'row_9', 'info': 'not checkpointed from first run'}

Related issues

Closes #55008

Additional information

Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
@owenowenisme owenowenisme requested a review from a team as a code owner December 12, 2025 18:32
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a robust checkpointing mechanism to Ray Data, a significant feature for ensuring fault tolerance and efficient recovery in data pipelines. The implementation is comprehensive, touching upon planning, execution, configuration, and testing. Key additions include a CheckpointConfig for setup, dynamic plan adjustments in the Planner to inject checkpointing logic, and specific operators for filtering processed rows and writing checkpoint data. The use of an ExecutionCallback for managing the checkpoint lifecycle is a clean approach. The code is well-structured, and the tests are thorough, covering various success and failure scenarios. I have a few suggestions to enhance maintainability, improve memory efficiency, and make the configuration handling more robust. Overall, this is a very strong and valuable addition to Ray Data.

@ray-gardener ray-gardener bot added the data Ray Data-related issues label Dec 12, 2025
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
@owenowenisme owenowenisme force-pushed the data/add-checkpoint-in-ray-data branch from 6e66b8f to b7da48d Compare December 13, 2025 08:20
@wxwmd
Copy link
Contributor

wxwmd commented Dec 23, 2025

good feature. exactly what i need

Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
@owenowenisme owenowenisme added the go add ONLY when ready to merge, run all tests label Dec 30, 2025
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Copy link
Contributor

@raulchen raulchen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stamping. already reviewed offline

@raulchen raulchen merged commit 566a5fa into ray-project:master Jan 2, 2026
6 checks passed
AYou0207 pushed a commit to AYou0207/ray that referenced this pull request Jan 13, 2026
## Description
Add checkpointing for data pipeline, currently only support pipeline
starts with `Read` and end ends with `Write`.

Example Script:
```py
import ray
import pandas as pd
from ray.data.checkpoint import CheckpointConfig

# Setup paths
base_dir = "/tmp/ray_checkpoint_demo"
input_path = os.path.join(base_dir, "input")
output_path = os.path.join(base_dir, "output")
checkpoint_path = os.path.join(base_dir, "checkpoint")

# Create sample data (10 rows with unique IDs)
df = pd.DataFrame({"id": range(10), "value": [f"row_{i}" for i in range(10)]})
df.to_parquet(os.path.join(input_path, "data.parquet"), index=False)
print(f"Created 10 rows of sample data")

def run_pipeline(fail_on_id_gt=None):
    """Run pipeline with optional simulated failure."""
    ctx = ray.data.DataContext.get_current()
    ctx.checkpoint_config = CheckpointConfig(
        id_column="id",
        checkpoint_path=checkpoint_path,
        delete_checkpoint_on_success=False,
    )

    def process_batch(batch):
        if fail_on_id_gt is not None and max(batch["id"]) > fail_on_id_gt:
            raise RuntimeError(f"Simulated failure at id > {fail_on_id_gt}")
        if fail_on_id_gt is not None:
            batch["info"] = ["checkpointed from first run"] * len(batch["id"])
        else:
            batch["info"] = ["not checkpointed from first run"] * len(batch["id"])
        return batch

    ds = ray.data.read_parquet(input_path)
    ds = ds.map_batches(process_batch, batch_size=5)
    ds.write_parquet(output_path)

# Run 1: Fail after processing some rows
print("\n=== Run 1: Pipeline with simulated failure ===")
try:
    run_pipeline(fail_on_id_gt=5)
except Exception as e:
    print(f"Failed as expected: {e}")

# Run 2: Resume from checkpoint
print("\n=== Run 2: Resume from checkpoint ===")
run_pipeline(fail_on_id_gt=None)  # No failure

# Verify results
print("\n=== Results ===")
result = ray.data.read_parquet(output_path)
print(f"Total rows in output: {result.count()}")
print(f"Result: {result.take_all()}")

```

```
{'id': 0, 'value': 'row_0', 'info': 'checkpointed from first run'}
{'id': 1, 'value': 'row_1', 'info': 'checkpointed from first run'}
{'id': 2, 'value': 'row_2', 'info': 'checkpointed from first run'}
{'id': 3, 'value': 'row_3', 'info': 'checkpointed from first run'}
{'id': 4, 'value': 'row_4', 'info': 'checkpointed from first run'}
{'id': 5, 'value': 'row_5', 'info': 'not checkpointed from first run'}
{'id': 6, 'value': 'row_6', 'info': 'not checkpointed from first run'}
{'id': 7, 'value': 'row_7', 'info': 'not checkpointed from first run'}
{'id': 8, 'value': 'row_8', 'info': 'not checkpointed from first run'}
{'id': 9, 'value': 'row_9', 'info': 'not checkpointed from first run'}
```

## Related issues
Closes ray-project#55008

## Additional information

---------

Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: jasonwrwang <jasonwrwang@tencent.com>
lee1258561 pushed a commit to pinterest/ray that referenced this pull request Feb 3, 2026
## Description
Add checkpointing for data pipeline, currently only support pipeline
starts with `Read` and end ends with `Write`.

Example Script:
```py
import ray
import pandas as pd
from ray.data.checkpoint import CheckpointConfig

# Setup paths
base_dir = "/tmp/ray_checkpoint_demo"
input_path = os.path.join(base_dir, "input")
output_path = os.path.join(base_dir, "output")
checkpoint_path = os.path.join(base_dir, "checkpoint")

# Create sample data (10 rows with unique IDs)
df = pd.DataFrame({"id": range(10), "value": [f"row_{i}" for i in range(10)]})
df.to_parquet(os.path.join(input_path, "data.parquet"), index=False)
print(f"Created 10 rows of sample data")

def run_pipeline(fail_on_id_gt=None):
    """Run pipeline with optional simulated failure."""
    ctx = ray.data.DataContext.get_current()
    ctx.checkpoint_config = CheckpointConfig(
        id_column="id",
        checkpoint_path=checkpoint_path,
        delete_checkpoint_on_success=False,
    )

    def process_batch(batch):
        if fail_on_id_gt is not None and max(batch["id"]) > fail_on_id_gt:
            raise RuntimeError(f"Simulated failure at id > {fail_on_id_gt}")
        if fail_on_id_gt is not None:
            batch["info"] = ["checkpointed from first run"] * len(batch["id"])
        else:
            batch["info"] = ["not checkpointed from first run"] * len(batch["id"])
        return batch

    ds = ray.data.read_parquet(input_path)
    ds = ds.map_batches(process_batch, batch_size=5)
    ds.write_parquet(output_path)


# Run 1: Fail after processing some rows
print("\n=== Run 1: Pipeline with simulated failure ===")
try:
    run_pipeline(fail_on_id_gt=5)
except Exception as e:
    print(f"Failed as expected: {e}")

# Run 2: Resume from checkpoint
print("\n=== Run 2: Resume from checkpoint ===")
run_pipeline(fail_on_id_gt=None)  # No failure

# Verify results
print("\n=== Results ===")
result = ray.data.read_parquet(output_path)
print(f"Total rows in output: {result.count()}")
print(f"Result: {result.take_all()}")

```

```
{'id': 0, 'value': 'row_0', 'info': 'checkpointed from first run'}
{'id': 1, 'value': 'row_1', 'info': 'checkpointed from first run'}
{'id': 2, 'value': 'row_2', 'info': 'checkpointed from first run'}
{'id': 3, 'value': 'row_3', 'info': 'checkpointed from first run'}
{'id': 4, 'value': 'row_4', 'info': 'checkpointed from first run'}                                                                                  
{'id': 5, 'value': 'row_5', 'info': 'not checkpointed from first run'}
{'id': 6, 'value': 'row_6', 'info': 'not checkpointed from first run'}
{'id': 7, 'value': 'row_7', 'info': 'not checkpointed from first run'}
{'id': 8, 'value': 'row_8', 'info': 'not checkpointed from first run'}
{'id': 9, 'value': 'row_9', 'info': 'not checkpointed from first run'}
```

## Related issues
Closes ray-project#55008 

## Additional information

---------

Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
ryanaoleary pushed a commit to ryanaoleary/ray that referenced this pull request Feb 3, 2026
## Description
Add checkpointing for data pipeline, currently only support pipeline
starts with `Read` and end ends with `Write`.

Example Script:
```py
import ray
import pandas as pd
from ray.data.checkpoint import CheckpointConfig

# Setup paths
base_dir = "/tmp/ray_checkpoint_demo"
input_path = os.path.join(base_dir, "input")
output_path = os.path.join(base_dir, "output")
checkpoint_path = os.path.join(base_dir, "checkpoint")

# Create sample data (10 rows with unique IDs)
df = pd.DataFrame({"id": range(10), "value": [f"row_{i}" for i in range(10)]})
df.to_parquet(os.path.join(input_path, "data.parquet"), index=False)
print(f"Created 10 rows of sample data")

def run_pipeline(fail_on_id_gt=None):
    """Run pipeline with optional simulated failure."""
    ctx = ray.data.DataContext.get_current()
    ctx.checkpoint_config = CheckpointConfig(
        id_column="id",
        checkpoint_path=checkpoint_path,
        delete_checkpoint_on_success=False,
    )

    def process_batch(batch):
        if fail_on_id_gt is not None and max(batch["id"]) > fail_on_id_gt:
            raise RuntimeError(f"Simulated failure at id > {fail_on_id_gt}")
        if fail_on_id_gt is not None:
            batch["info"] = ["checkpointed from first run"] * len(batch["id"])
        else:
            batch["info"] = ["not checkpointed from first run"] * len(batch["id"])
        return batch

    ds = ray.data.read_parquet(input_path)
    ds = ds.map_batches(process_batch, batch_size=5)
    ds.write_parquet(output_path)


# Run 1: Fail after processing some rows
print("\n=== Run 1: Pipeline with simulated failure ===")
try:
    run_pipeline(fail_on_id_gt=5)
except Exception as e:
    print(f"Failed as expected: {e}")

# Run 2: Resume from checkpoint
print("\n=== Run 2: Resume from checkpoint ===")
run_pipeline(fail_on_id_gt=None)  # No failure

# Verify results
print("\n=== Results ===")
result = ray.data.read_parquet(output_path)
print(f"Total rows in output: {result.count()}")
print(f"Result: {result.take_all()}")

```

```
{'id': 0, 'value': 'row_0', 'info': 'checkpointed from first run'}
{'id': 1, 'value': 'row_1', 'info': 'checkpointed from first run'}
{'id': 2, 'value': 'row_2', 'info': 'checkpointed from first run'}
{'id': 3, 'value': 'row_3', 'info': 'checkpointed from first run'}
{'id': 4, 'value': 'row_4', 'info': 'checkpointed from first run'}                                                                                  
{'id': 5, 'value': 'row_5', 'info': 'not checkpointed from first run'}
{'id': 6, 'value': 'row_6', 'info': 'not checkpointed from first run'}
{'id': 7, 'value': 'row_7', 'info': 'not checkpointed from first run'}
{'id': 8, 'value': 'row_8', 'info': 'not checkpointed from first run'}
{'id': 9, 'value': 'row_9', 'info': 'not checkpointed from first run'}
```

## Related issues
Closes ray-project#55008 

## Additional information

---------

Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
bveeramani added a commit that referenced this pull request Feb 11, 2026
This PR documents Ray Data job-level checkpointing functionality added
in #59409.

Adds documentation explaining job-level checkpointing and its
application to offline batch inference, including configuration
examples.

**Sections modified:**
- Execution Configurations
- End-to-end: Offline Batch Inference

Supersedes #60289

Fixes #60250

---------

Signed-off-by: Abhishek Kumar <anonyomoushunter@gmail.com>
Signed-off-by: “Alex <alexchien130@gmail.com>
Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
Co-authored-by: Abhishek Kumar <anonyomoushunter@gmail.com>
Co-authored-by: Balaji Veeramani <bveeramani@berkeley.edu>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Co-authored-by: Balaji Veeramani <balaji@anyscale.com>
preneond pushed a commit to preneond/ray that referenced this pull request Feb 15, 2026
…0921)

This PR documents Ray Data job-level checkpointing functionality added
in ray-project#59409.

Adds documentation explaining job-level checkpointing and its
application to offline batch inference, including configuration
examples.

**Sections modified:**
- Execution Configurations
- End-to-end: Offline Batch Inference

Supersedes ray-project#60289

Fixes ray-project#60250

---------

Signed-off-by: Abhishek Kumar <anonyomoushunter@gmail.com>
Signed-off-by: “Alex <alexchien130@gmail.com>
Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
Co-authored-by: Abhishek Kumar <anonyomoushunter@gmail.com>
Co-authored-by: Balaji Veeramani <bveeramani@berkeley.edu>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Co-authored-by: Balaji Veeramani <balaji@anyscale.com>
Signed-off-by: Ondrej Prenek <ondra.prenek@gmail.com>
limarkdcunha pushed a commit to limarkdcunha/ray that referenced this pull request Feb 17, 2026
…0921)

This PR documents Ray Data job-level checkpointing functionality added
in ray-project#59409.

Adds documentation explaining job-level checkpointing and its
application to offline batch inference, including configuration
examples.

**Sections modified:**
- Execution Configurations
- End-to-end: Offline Batch Inference

Supersedes ray-project#60289

Fixes ray-project#60250

---------

Signed-off-by: Abhishek Kumar <anonyomoushunter@gmail.com>
Signed-off-by: “Alex <alexchien130@gmail.com>
Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
Co-authored-by: Abhishek Kumar <anonyomoushunter@gmail.com>
Co-authored-by: Balaji Veeramani <bveeramani@berkeley.edu>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Co-authored-by: Balaji Veeramani <balaji@anyscale.com>
preneond pushed a commit to preneond/ray that referenced this pull request Feb 17, 2026
…0921)

This PR documents Ray Data job-level checkpointing functionality added
in ray-project#59409.

Adds documentation explaining job-level checkpointing and its
application to offline batch inference, including configuration
examples.

**Sections modified:**
- Execution Configurations
- End-to-end: Offline Batch Inference

Supersedes ray-project#60289

Fixes ray-project#60250

---------

Signed-off-by: Abhishek Kumar <anonyomoushunter@gmail.com>
Signed-off-by: “Alex <alexchien130@gmail.com>
Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
Co-authored-by: Abhishek Kumar <anonyomoushunter@gmail.com>
Co-authored-by: Balaji Veeramani <bveeramani@berkeley.edu>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Co-authored-by: Balaji Veeramani <balaji@anyscale.com>
MuhammadSaif700 pushed a commit to MuhammadSaif700/ray that referenced this pull request Feb 17, 2026
…0921)

This PR documents Ray Data job-level checkpointing functionality added
in ray-project#59409.

Adds documentation explaining job-level checkpointing and its
application to offline batch inference, including configuration
examples.

**Sections modified:**
- Execution Configurations
- End-to-end: Offline Batch Inference

Supersedes ray-project#60289

Fixes ray-project#60250

---------

Signed-off-by: Abhishek Kumar <anonyomoushunter@gmail.com>
Signed-off-by: “Alex <alexchien130@gmail.com>
Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
Co-authored-by: Abhishek Kumar <anonyomoushunter@gmail.com>
Co-authored-by: Balaji Veeramani <bveeramani@berkeley.edu>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Co-authored-by: Balaji Veeramani <balaji@anyscale.com>
Signed-off-by: Muhammad Saif <2024BBIT200@student.Uet.edu.pk>
Kunchd pushed a commit to Kunchd/ray that referenced this pull request Feb 17, 2026
…0921)

This PR documents Ray Data job-level checkpointing functionality added
in ray-project#59409.

Adds documentation explaining job-level checkpointing and its
application to offline batch inference, including configuration
examples.

**Sections modified:**
- Execution Configurations
- End-to-end: Offline Batch Inference

Supersedes ray-project#60289

Fixes ray-project#60250

---------

Signed-off-by: Abhishek Kumar <anonyomoushunter@gmail.com>
Signed-off-by: “Alex <alexchien130@gmail.com>
Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
Co-authored-by: Abhishek Kumar <anonyomoushunter@gmail.com>
Co-authored-by: Balaji Veeramani <bveeramani@berkeley.edu>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Co-authored-by: Balaji Veeramani <balaji@anyscale.com>
ans9868 pushed a commit to ans9868/ray that referenced this pull request Feb 18, 2026
…0921)

This PR documents Ray Data job-level checkpointing functionality added
in ray-project#59409.

Adds documentation explaining job-level checkpointing and its
application to offline batch inference, including configuration
examples.

**Sections modified:**
- Execution Configurations
- End-to-end: Offline Batch Inference

Supersedes ray-project#60289

Fixes ray-project#60250

---------

Signed-off-by: Abhishek Kumar <anonyomoushunter@gmail.com>
Signed-off-by: “Alex <alexchien130@gmail.com>
Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
Co-authored-by: Abhishek Kumar <anonyomoushunter@gmail.com>
Co-authored-by: Balaji Veeramani <bveeramani@berkeley.edu>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Co-authored-by: Balaji Veeramani <balaji@anyscale.com>
Signed-off-by: Adel Nour <ans9868@nyu.edu>
Aydin-ab pushed a commit to kunling-anyscale/ray that referenced this pull request Feb 20, 2026
…0921)

This PR documents Ray Data job-level checkpointing functionality added
in ray-project#59409.

Adds documentation explaining job-level checkpointing and its
application to offline batch inference, including configuration
examples.

**Sections modified:**
- Execution Configurations
- End-to-end: Offline Batch Inference

Supersedes ray-project#60289

Fixes ray-project#60250

---------

Signed-off-by: Abhishek Kumar <anonyomoushunter@gmail.com>
Signed-off-by: “Alex <alexchien130@gmail.com>
Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
Co-authored-by: Abhishek Kumar <anonyomoushunter@gmail.com>
Co-authored-by: Balaji Veeramani <bveeramani@berkeley.edu>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Co-authored-by: Balaji Veeramani <balaji@anyscale.com>
peterxcli pushed a commit to peterxcli/ray that referenced this pull request Feb 25, 2026
## Description
Add checkpointing for data pipeline, currently only support pipeline
starts with `Read` and end ends with `Write`.

Example Script:
```py
import ray
import pandas as pd
from ray.data.checkpoint import CheckpointConfig

# Setup paths
base_dir = "/tmp/ray_checkpoint_demo"
input_path = os.path.join(base_dir, "input")
output_path = os.path.join(base_dir, "output")
checkpoint_path = os.path.join(base_dir, "checkpoint")

# Create sample data (10 rows with unique IDs)
df = pd.DataFrame({"id": range(10), "value": [f"row_{i}" for i in range(10)]})
df.to_parquet(os.path.join(input_path, "data.parquet"), index=False)
print(f"Created 10 rows of sample data")

def run_pipeline(fail_on_id_gt=None):
    """Run pipeline with optional simulated failure."""
    ctx = ray.data.DataContext.get_current()
    ctx.checkpoint_config = CheckpointConfig(
        id_column="id",
        checkpoint_path=checkpoint_path,
        delete_checkpoint_on_success=False,
    )

    def process_batch(batch):
        if fail_on_id_gt is not None and max(batch["id"]) > fail_on_id_gt:
            raise RuntimeError(f"Simulated failure at id > {fail_on_id_gt}")
        if fail_on_id_gt is not None:
            batch["info"] = ["checkpointed from first run"] * len(batch["id"])
        else:
            batch["info"] = ["not checkpointed from first run"] * len(batch["id"])
        return batch

    ds = ray.data.read_parquet(input_path)
    ds = ds.map_batches(process_batch, batch_size=5)
    ds.write_parquet(output_path)

# Run 1: Fail after processing some rows
print("\n=== Run 1: Pipeline with simulated failure ===")
try:
    run_pipeline(fail_on_id_gt=5)
except Exception as e:
    print(f"Failed as expected: {e}")

# Run 2: Resume from checkpoint
print("\n=== Run 2: Resume from checkpoint ===")
run_pipeline(fail_on_id_gt=None)  # No failure

# Verify results
print("\n=== Results ===")
result = ray.data.read_parquet(output_path)
print(f"Total rows in output: {result.count()}")
print(f"Result: {result.take_all()}")

```

```
{'id': 0, 'value': 'row_0', 'info': 'checkpointed from first run'}
{'id': 1, 'value': 'row_1', 'info': 'checkpointed from first run'}
{'id': 2, 'value': 'row_2', 'info': 'checkpointed from first run'}
{'id': 3, 'value': 'row_3', 'info': 'checkpointed from first run'}
{'id': 4, 'value': 'row_4', 'info': 'checkpointed from first run'}
{'id': 5, 'value': 'row_5', 'info': 'not checkpointed from first run'}
{'id': 6, 'value': 'row_6', 'info': 'not checkpointed from first run'}
{'id': 7, 'value': 'row_7', 'info': 'not checkpointed from first run'}
{'id': 8, 'value': 'row_8', 'info': 'not checkpointed from first run'}
{'id': 9, 'value': 'row_9', 'info': 'not checkpointed from first run'}
```

## Related issues
Closes ray-project#55008

## Additional information

---------

Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
peterxcli pushed a commit to peterxcli/ray that referenced this pull request Feb 25, 2026
…0921)

This PR documents Ray Data job-level checkpointing functionality added
in ray-project#59409.

Adds documentation explaining job-level checkpointing and its
application to offline batch inference, including configuration
examples.

**Sections modified:**
- Execution Configurations
- End-to-end: Offline Batch Inference

Supersedes ray-project#60289

Fixes ray-project#60250

---------

Signed-off-by: Abhishek Kumar <anonyomoushunter@gmail.com>
Signed-off-by: “Alex <alexchien130@gmail.com>
Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
Co-authored-by: Abhishek Kumar <anonyomoushunter@gmail.com>
Co-authored-by: Balaji Veeramani <bveeramani@berkeley.edu>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Co-authored-by: Balaji Veeramani <balaji@anyscale.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
peterxcli pushed a commit to peterxcli/ray that referenced this pull request Feb 25, 2026
## Description
Add checkpointing for data pipeline, currently only support pipeline
starts with `Read` and end ends with `Write`.

Example Script:
```py
import ray
import pandas as pd
from ray.data.checkpoint import CheckpointConfig

# Setup paths
base_dir = "/tmp/ray_checkpoint_demo"
input_path = os.path.join(base_dir, "input")
output_path = os.path.join(base_dir, "output")
checkpoint_path = os.path.join(base_dir, "checkpoint")

# Create sample data (10 rows with unique IDs)
df = pd.DataFrame({"id": range(10), "value": [f"row_{i}" for i in range(10)]})
df.to_parquet(os.path.join(input_path, "data.parquet"), index=False)
print(f"Created 10 rows of sample data")

def run_pipeline(fail_on_id_gt=None):
    """Run pipeline with optional simulated failure."""
    ctx = ray.data.DataContext.get_current()
    ctx.checkpoint_config = CheckpointConfig(
        id_column="id",
        checkpoint_path=checkpoint_path,
        delete_checkpoint_on_success=False,
    )

    def process_batch(batch):
        if fail_on_id_gt is not None and max(batch["id"]) > fail_on_id_gt:
            raise RuntimeError(f"Simulated failure at id > {fail_on_id_gt}")
        if fail_on_id_gt is not None:
            batch["info"] = ["checkpointed from first run"] * len(batch["id"])
        else:
            batch["info"] = ["not checkpointed from first run"] * len(batch["id"])
        return batch

    ds = ray.data.read_parquet(input_path)
    ds = ds.map_batches(process_batch, batch_size=5)
    ds.write_parquet(output_path)

# Run 1: Fail after processing some rows
print("\n=== Run 1: Pipeline with simulated failure ===")
try:
    run_pipeline(fail_on_id_gt=5)
except Exception as e:
    print(f"Failed as expected: {e}")

# Run 2: Resume from checkpoint
print("\n=== Run 2: Resume from checkpoint ===")
run_pipeline(fail_on_id_gt=None)  # No failure

# Verify results
print("\n=== Results ===")
result = ray.data.read_parquet(output_path)
print(f"Total rows in output: {result.count()}")
print(f"Result: {result.take_all()}")

```

```
{'id': 0, 'value': 'row_0', 'info': 'checkpointed from first run'}
{'id': 1, 'value': 'row_1', 'info': 'checkpointed from first run'}
{'id': 2, 'value': 'row_2', 'info': 'checkpointed from first run'}
{'id': 3, 'value': 'row_3', 'info': 'checkpointed from first run'}
{'id': 4, 'value': 'row_4', 'info': 'checkpointed from first run'}
{'id': 5, 'value': 'row_5', 'info': 'not checkpointed from first run'}
{'id': 6, 'value': 'row_6', 'info': 'not checkpointed from first run'}
{'id': 7, 'value': 'row_7', 'info': 'not checkpointed from first run'}
{'id': 8, 'value': 'row_8', 'info': 'not checkpointed from first run'}
{'id': 9, 'value': 'row_9', 'info': 'not checkpointed from first run'}
```

## Related issues
Closes ray-project#55008

## Additional information

---------

Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
peterxcli pushed a commit to peterxcli/ray that referenced this pull request Feb 25, 2026
…0921)

This PR documents Ray Data job-level checkpointing functionality added
in ray-project#59409.

Adds documentation explaining job-level checkpointing and its
application to offline batch inference, including configuration
examples.

**Sections modified:**
- Execution Configurations
- End-to-end: Offline Batch Inference

Supersedes ray-project#60289

Fixes ray-project#60250

---------

Signed-off-by: Abhishek Kumar <anonyomoushunter@gmail.com>
Signed-off-by: “Alex <alexchien130@gmail.com>
Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
Co-authored-by: Abhishek Kumar <anonyomoushunter@gmail.com>
Co-authored-by: Balaji Veeramani <bveeramani@berkeley.edu>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Co-authored-by: Balaji Veeramani <balaji@anyscale.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Data] Add Checkpoint/Resume Support for Ray Data Pipelines

3 participants