-
Notifications
You must be signed in to change notification settings - Fork 7.3k
[Data] Add Checkpointing to Ray Data #59409
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
raulchen
merged 8 commits into
ray-project:master
from
owenowenisme:data/add-checkpoint-in-ray-data
Jan 2, 2026
Merged
Changes from all commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
9c7b701
update
owenowenisme 20e010c
update
owenowenisme b7da48d
mvoe test file to correct place
owenowenisme 4964696
update
owenowenisme cd2ee2d
Merge branch 'master' into data/add-checkpoint-in-ray-data
owenowenisme fb423c9
update
owenowenisme 75e7922
add docs
owenowenisme 4eeeb1f
fix docstring
owenowenisme File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,18 @@ | ||
| .. _checkpoint-api: | ||
|
|
||
| Checkpoint API | ||
| ============== | ||
|
|
||
| .. currentmodule:: ray.data.checkpoint.interfaces | ||
|
|
||
| Configuration | ||
| ------------- | ||
|
|
||
| .. autosummary:: | ||
| :nosignatures: | ||
| :toctree: doc/ | ||
| :template: autosummary/class_without_autosummary.rst | ||
|
|
||
| CheckpointConfig | ||
| CheckpointBackend | ||
|
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,7 @@ | ||
| from .plan_read_op import plan_read_op_with_checkpoint_filter | ||
| from .plan_write_op import plan_write_op_with_checkpoint_writer | ||
|
|
||
| __all__ = [ | ||
| "plan_read_op_with_checkpoint_filter", | ||
| "plan_write_op_with_checkpoint_writer", | ||
| ] |
47 changes: 47 additions & 0 deletions
47
python/ray/data/_internal/planner/checkpoint/plan_read_op.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,47 @@ | ||
| import functools | ||
| from typing import Callable, List, Optional | ||
|
|
||
| from ray import ObjectRef | ||
| from ray.data._internal.execution.interfaces import PhysicalOperator | ||
| from ray.data._internal.execution.operators.map_transformer import ( | ||
| BlockMapTransformFn, | ||
| ) | ||
| from ray.data._internal.logical.operators.read_operator import Read | ||
| from ray.data._internal.output_buffer import OutputBlockSizeOption | ||
| from ray.data._internal.planner.plan_read_op import plan_read_op | ||
| from ray.data.checkpoint.util import ( | ||
| CHECKPOINTED_IDS_KWARG_NAME, | ||
| filter_checkpointed_rows_for_blocks, | ||
| ) | ||
| from ray.data.context import DataContext | ||
|
|
||
|
|
||
| def plan_read_op_with_checkpoint_filter( | ||
| op: Read, | ||
| physical_children: List[PhysicalOperator], | ||
| data_context: DataContext, | ||
| load_checkpoint: Optional[Callable[[], ObjectRef]] = None, | ||
| ) -> PhysicalOperator: | ||
| physical_op = plan_read_op(op, physical_children, data_context) | ||
|
|
||
| # TODO avoid modifying in-place | ||
| physical_op._map_transformer.add_transform_fns( | ||
owenowenisme marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| [ | ||
| BlockMapTransformFn( | ||
| functools.partial( | ||
| filter_checkpointed_rows_for_blocks, | ||
| checkpoint_config=data_context.checkpoint_config, | ||
| ), | ||
| output_block_size_option=OutputBlockSizeOption.of( | ||
| target_max_block_size=data_context.target_max_block_size, | ||
| ), | ||
| ), | ||
| ] | ||
| ) | ||
|
|
||
| if load_checkpoint is not None: | ||
| physical_op.add_map_task_kwargs_fn( | ||
| lambda: {CHECKPOINTED_IDS_KWARG_NAME: load_checkpoint()} | ||
| ) | ||
|
|
||
| return physical_op | ||
82 changes: 82 additions & 0 deletions
82
python/ray/data/_internal/planner/checkpoint/plan_write_op.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,82 @@ | ||
| import itertools | ||
| from typing import Iterable, List | ||
|
|
||
| from ray.data._internal.execution.interfaces import PhysicalOperator | ||
| from ray.data._internal.execution.interfaces.task_context import TaskContext | ||
| from ray.data._internal.execution.operators.map_transformer import ( | ||
| BlockMapTransformFn, | ||
| ) | ||
| from ray.data._internal.logical.operators.write_operator import Write | ||
| from ray.data._internal.planner.plan_write_op import ( | ||
| _plan_write_op_internal, | ||
| generate_collect_write_stats_fn, | ||
| ) | ||
| from ray.data.block import Block, BlockAccessor | ||
| from ray.data.checkpoint.checkpoint_writer import CheckpointWriter | ||
| from ray.data.checkpoint.interfaces import ( | ||
| InvalidCheckpointingOperators, | ||
| ) | ||
| from ray.data.context import DataContext | ||
| from ray.data.datasource.datasink import Datasink | ||
|
|
||
|
|
||
| def plan_write_op_with_checkpoint_writer( | ||
| op: Write, physical_children: List[PhysicalOperator], data_context: DataContext | ||
| ) -> PhysicalOperator: | ||
| assert data_context.checkpoint_config is not None | ||
|
|
||
| collect_stats_fn = generate_collect_write_stats_fn() | ||
| write_checkpoint_for_block_fn = _generate_checkpoint_writing_transform( | ||
| data_context, op | ||
| ) | ||
|
|
||
| physical_op = _plan_write_op_internal( | ||
| op, | ||
| physical_children, | ||
| data_context, | ||
| extra_transformations=[ | ||
| write_checkpoint_for_block_fn, | ||
| collect_stats_fn, | ||
| ], | ||
| ) | ||
|
|
||
| return physical_op | ||
|
|
||
|
|
||
| def _generate_checkpoint_writing_transform( | ||
| data_context: DataContext, logical_op: Write | ||
| ) -> BlockMapTransformFn: | ||
| datasink = logical_op._datasink_or_legacy_datasource | ||
| if not isinstance(datasink, Datasink): | ||
| raise InvalidCheckpointingOperators( | ||
| f"To enable checkpointing, Write operation must use a " | ||
| f"Datasink and not a legacy Datasource, but got: " | ||
| f"{type(datasink)}" | ||
| ) | ||
|
|
||
| checkpoint_writer = CheckpointWriter.create(data_context.checkpoint_config) | ||
|
|
||
| # MapTransformFn for writing checkpoint files after write completes. | ||
| def write_checkpoint_for_block( | ||
| blocks: Iterable[Block], ctx: TaskContext | ||
| ) -> Iterable[Block]: | ||
| it1, it2 = itertools.tee(blocks, 2) | ||
| for block in it1: | ||
| ba = BlockAccessor.for_block(block) | ||
| if ba.num_rows() > 0: | ||
| if data_context.checkpoint_config.id_column not in ba.column_names(): | ||
| raise ValueError( | ||
| f"ID column {data_context.checkpoint_config.id_column} is " | ||
| f"absent in the block to be written. Do not drop or rename " | ||
| f"this column." | ||
| ) | ||
| checkpoint_writer.write_block_checkpoint(ba) | ||
|
|
||
| return list(it2) | ||
owenowenisme marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| return BlockMapTransformFn( | ||
| write_checkpoint_for_block, | ||
| is_udf=False, | ||
| # NOTE: No need for block-shaping | ||
| disable_block_shaping=True, | ||
| ) | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,3 @@ | ||
| from .interfaces import CheckpointBackend, CheckpointConfig | ||
|
|
||
| __all__ = ["CheckpointConfig", "CheckpointBackend"] |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.