Skip to content

feat: Checkpoint pipeline infrastructure (Phase 1)#501

Merged
JesperDramsch merged 32 commits intomainfrom
feature/checkpoint-pipeline-infrastructure
Feb 17, 2026
Merged

feat: Checkpoint pipeline infrastructure (Phase 1)#501
JesperDramsch merged 32 commits intomainfrom
feature/checkpoint-pipeline-infrastructure

Conversation

@JesperDramsch
Copy link
Member

@JesperDramsch JesperDramsch commented Aug 21, 2025

Summary

This PR implements Phase 1 of the checkpoint pipeline infrastructure, establishing the core abstractions and pipeline pattern for flexible checkpoint handling in Anemoi.

Changes

  • Core abstractions: Added for state management and base class for all pipeline stages
  • Pipeline orchestration: Implemented with support for async/sync execution and stage management
  • Dynamic component discovery: Created with automatic discovery of checkpoint sources, loaders, and modifiers using module inspection
  • Comprehensive error handling: Added exception hierarchy for checkpoint operations
  • Utility functions: Implemented helpers for download retry, validation, and metadata extraction

Technical Details

The component catalog uses a hybrid approach for identifying abstract classes:

  • ABC inheritance detection
  • Abstract method checking
  • Name-based convention (Base* prefix)

This ensures true dynamic discovery without hardcoded component lists, making the system easily extensible.

Testing

  • Unit tests for all core components
  • Integration tests for pipeline execution
  • Tests for dynamic component discovery with various abstract class patterns

Related Issues

Closes #493

Next Steps

This is Phase 1 of a multi-phase implementation:


📚 Documentation preview 📚: https://anemoi-training--501.org.readthedocs.build/en/501/


📚 Documentation preview 📚: https://anemoi-graphs--501.org.readthedocs.build/en/501/


📚 Documentation preview 📚: https://anemoi-models--501.org.readthedocs.build/en/501/

@github-project-automation github-project-automation bot moved this to To be triaged in Anemoi-dev Aug 21, 2025
@github-actions github-actions bot added training enhancement New feature or request and removed training labels Aug 21, 2025
@JesperDramsch JesperDramsch self-assigned this Aug 21, 2025
@mchantry mchantry added the ATS Approval Needed Approval needed by ATS label Aug 26, 2025
@mchantry mchantry moved this from To be triaged to Reviewers needed in Anemoi-dev Sep 1, 2025
@JesperDramsch
Copy link
Member Author

Phase 1: Checkpoint Pipeline Infrastructure - Progress Update

Summary

This PR implements the foundational infrastructure for the new checkpoint architecture, establishing the core abstractions, pipeline pattern, and utilities that all checkpoint operations will build upon.

Related Issues: #493 (Phase 1), Part of 5-phase checkpoint architecture refactoring
Branch: feature/checkpoint-pipeline-infrastructure
Status: Core complete, configuration integration pending review

🎯 Motivation

The current checkpoint handling in anemoi-training is monolithic and difficult to extend. This PR establishes a clean, three-layer pipeline architecture:

┌─────────────────────────────────────────────────┐
│        Model Transformation Layer               │
│         (Post-loading modifications)            │
├─────────────────────────────────────────────────┤
│         Loading Orchestration Layer             │
│    (Strategies for applying checkpoints)        │
├─────────────────────────────────────────────────┤
│        Checkpoint Acquisition Layer             │
│      (Obtaining checkpoint from sources)        │
├─────────────────────────────────────────────────┤
│         Pipeline Infrastructure  ← THIS PR      │
│         (Core abstractions & context)           │
└─────────────────────────────────────────────────┘

📦 Changes Included

New Files (VERIFIED)

training/src/anemoi/training/checkpoint/
├── __init__.py                    # Package initialization (2.6KB)
├── base.py                        # Core abstractions (16KB)
├── pipeline.py                    # Pipeline orchestrator (26KB)
├── catalog.py                     # Component discovery (22KB)
├── exceptions.py                  # Exception hierarchy (19KB)
├── formats.py                     # Multi-format support (19KB)
└── utils.py                       # Async utilities (17KB)

training/tests/checkpoint/
├── conftest.py                    # Test fixtures (9.7KB)
├── test_base.py                   # Core tests (8.1KB)
├── test_pipeline.py               # Pipeline tests (10KB)
├── test_catalog.py                # Catalog tests (11KB)
├── test_formats.py                # Format tests (26KB)
├── test_utils.py                  # Utility tests (37KB)
└── test_exceptions.py             # Exception tests (32KB)

Modified Files (VERIFIED)

training/pyproject.toml            # Added optional checkpoint dependency (line 67)

Pending Additions (Not Yet in PR)

training/src/anemoi/training/schemas/training.py   # TODO: Add CheckpointPipelineConfig
config/training/checkpoint_pipeline/*.yaml         # TODO: Create config templates

🔑 Key Features

1. Pipeline Pattern

Clean, composable stages for checkpoint processing:

# Example usage (programmatic)
pipeline = CheckpointPipeline(
    stages=[
        CheckpointSource(...),      # Phase 2
        LoadingStrategy(...),       # Phase 2
        ModelModifier(...),         # Phase 2
    ],
    async_execution=True
)

context = await pipeline.execute(initial_context)

2. Component Catalog

Automatic discovery of pipeline components using reflection:

# No manual registration needed!
ComponentCatalog.list_sources()    # Auto-discovers CheckpointSource subclasses
ComponentCatalog.list_loaders()    # Auto-discovers LoadingStrategy subclasses
ComponentCatalog.list_modifiers()  # Auto-discovers ModelModifier subclasses

3. Multi-Format Support

Seamless handling of different checkpoint formats:

  • PyTorch Lightning checkpoints
  • Standard PyTorch checkpoints
  • SafeTensors format (optional)
  • Raw state_dict formats

Auto-detection and conversion included.

4. Comprehensive Error Handling

Rich exception hierarchy for debugging:

try:
    context = await pipeline.execute(context)
except CheckpointNotFoundError as e:
    logger.error(f"Checkpoint not found: {e}")
except CheckpointIncompatibleError as e:
    logger.error(f"Incompatible checkpoint: {e}")

5. Async-First Design

Efficient async operations with sync compatibility:

# Async mode (default)
context = await pipeline.execute(context)

# Sync mode (when needed)
pipeline = CheckpointPipeline(stages, async_execution=False)
context = pipeline.execute(context)

🧪 Testing Strategy

Coverage Metrics (VERIFIED)

  • Total Tests: 213
  • Passing: 211 (99.1%)
  • Skipped: 2
  • Code Coverage: 80% (798/1003 lines)

Test Execution

# Run all checkpoint tests
pytest training/tests/checkpoint/ -v

# Run with coverage
pytest training/tests/checkpoint/ --cov=anemoi.training.checkpoint --cov-report=html

# Quick verification
pytest training/tests/checkpoint/ -q
# Output: 211 passed, 2 skipped in 46.46s

Test Coverage by Module

  • base.py: Core abstractions
  • pipeline.py: Pipeline orchestration with Hydra
  • catalog.py: Component discovery
  • exceptions.py: Error hierarchy
  • formats.py: Multi-format detection/conversion
  • utils.py: Async downloads, validation, checksums

🔄 Integration Points

With PR #464 (Checkpoint Acquisition)

# PR #464 will implement CheckpointSource subclasses
class S3Source(PipelineStage):  # Uses base.py abstractions
    async def process(self, context: CheckpointContext) -> CheckpointContext:
        # Load from S3, populate context.checkpoint_data
        ...

With PR #494 (Loading Orchestration)

# PR #494 will implement LoadingStrategy subclasses
class TransferLearningLoader(PipelineStage):  # Uses base.py abstractions
    async def process(self, context: CheckpointContext) -> CheckpointContext:
        # Apply checkpoint to model with flexibility
        ...

With PR #442 (Model Modifiers)

# PR #442 will implement ModelModifier as PipelineStage
class FreezingModifier(PipelineStage):  # Uses base.py abstractions
    async def process(self, context: CheckpointContext) -> CheckpointContext:
        # Freeze model layers post-loading
        ...

⚠️ Breaking Changes

None. This PR is pure infrastructure - no existing code is modified or broken.

📋 Configuration Schema

Status: ⚠️ Not yet integrated - needs to be added before merge

Planned addition to training/src/anemoi/training/schemas/training.py:

class CheckpointPipelineConfig(BaseModel):
    """Configuration for checkpoint pipeline"""
    stages: List[Dict[str, Any]] = Field(default_factory=list)
    async_execution: bool = Field(default=True)
    cache_dir: Optional[str] = Field(default=None)
    max_retries: int = Field(default=3)
    timeout: int = Field(default=300)

🔍 Review Checklist

Code Quality ✅

  • All code follows project style guidelines (ruff)
  • Type hints on all functions (mypy validated)
  • Comprehensive docstrings (NumPy style)
  • No code duplication

Testing ✅

  • Unit tests for all new functionality
  • 99.1% test pass rate (211/213)
  • 80% overall test coverage achieved
  • All critical paths covered

Documentation ⚠️

  • Public API documented in code
  • User guide needed
  • Migration guide needed
  • Architecture diagrams included

Configuration ⚠️

  • Schema integration pending
  • Config templates pending
  • Optional dependencies properly marked
  • Graceful degradation when optional deps missing

Compatibility ✅

  • No breaking changes to existing API
  • Backward compatible
  • Optional dependencies properly marked
  • Graceful degradation

🚀 Deployment Notes

Dependencies

Required: Already in pyproject.toml

  • torch >= 2.2.0
  • omegaconf
  • hydra-core

Optional: Added in this PR (pyproject.toml line 67)

[project.optional-dependencies]
checkpoint = ["safetensors>=0.4.0"]

Migration Path

No migration needed - this is new infrastructure. Existing checkpoint code remains unchanged.

📊 Performance Impact

  • Memory: Minimal overhead (~1-2MB for pipeline infrastructure)
  • Speed: Async operations enable concurrent checkpoint processing
  • CPU: No significant impact, efficient component discovery

🔜 Next Steps

Before Merge (Recommended)

  1. Add configuration schema to training/src/anemoi/training/schemas/training.py
  2. Create config templates in config/training/checkpoint_pipeline/
  3. Add basic user documentation

After Merge

  1. Phase 2 PRs can immediately build on this:

  2. Documentation: Comprehensive user guides

  3. Phase 3: Integration & migration utilities

📝 Honest Assessment

What's Strong

  • Core pipeline architecture is solid and tested
  • Component discovery eliminates boilerplate
  • Multi-format support is comprehensive
  • Error handling is thorough
  • Test coverage is good (80%, 99.1% pass rate)
  • All abstractions are in place for Phase 2

What Needs Work

  • Configuration schema not integrated yet
  • User documentation is minimal
  • Config templates don't exist
  • Some edge cases not covered in tests (20% uncovered)

Production Readiness

  • For developers building Phase 2: ✅ Ready now
  • For end users: ⚠️ Needs config integration and docs

Reviewers: Please focus on:

  1. API design of CheckpointContext and PipelineStage
  2. Component discovery mechanism in catalog.py
  3. Exception hierarchy completeness
  4. Whether to require config schema integration before merge

@anaprietonem anaprietonem added ATS Approved Approved by ATS and removed ATS Approval Needed Approval needed by ATS labels Nov 11, 2025
@JesperDramsch
Copy link
Member Author

Thank you @anaprietonem and @sahahner for the thorough review! Here's how I'm addressing your feedback:

Already Addressed

  • ✅ Safetensors removed (commit 15d1845) - keeping formats simple
  • ✅ Added device logging to checkpoint loading utilities (commit 506103d)

Will Address Before Merge

  1. Execution model docs - Adding clear documentation for standalone/callback patterns
  2. Callback naming - Adding docstring cross-references for clarity

Architectural Decisions

  1. ComponentCatalog location - Keeping in training for Phase 1 (tightly coupled to checkpoint base classes). Requesting input from @gareth-j @jjlk for future consideration.
  2. Callback unification - These are complementary (save vs load), not duplicates. Will improve naming/docs.
  3. Cloud authentication - Documented as Phase 2 scope (PR Checkpoint Acquisition Layer - Multi-source checkpoint loading (S3, HTTP, local, MLFlow) #458). Will follow standard SDK patterns.

Future Work

  1. Exception consolidation - Opening issue to audit and consolidate across packages
  2. aiohttp optional - Moving to optional-dependencies.remote

Let me know if you'd like any of these addressed differently!

JesperDramsch added a commit that referenced this pull request Nov 26, 2025
Reverts TYPE_CHECKING import reorganization and noqa comment removals
in 9 files that had no functional changes related to the checkpoint
pipeline infrastructure (Phase 1 - Issue #493).

These linting changes were introduced by pre-commit hooks running on
the entire codebase. Reverting them reduces PR scope as requested in
PR #501 review.

Files reverted:
- diagnostics/callbacks/plot.py
- diagnostics/mlflow/logger.py
- schemas/base_schema.py
- schemas/dataloader.py
- schemas/graphs/node_schemas.py
- schemas/hardware.py
- schemas/models/models.py
- schemas/graphs/base_graph.py
- train/forecaster.py

All checkpoint pipeline implementation and integration code preserved.

Note: This revert temporarily breaks linting rules (FA102) because
origin/main uses PEP 604 union syntax without the required future
import. This will be resolved when origin/main is updated or when
this feature branch is rebased.
@JesperDramsch JesperDramsch force-pushed the feature/checkpoint-pipeline-infrastructure branch from 6b5b854 to 5cc5303 Compare November 26, 2025 13:29
JesperDramsch added a commit that referenced this pull request Dec 5, 2025
Changes based on @anaprietonem's Nov 25 review comments:

- Remove GCS/Azure references from catalog.py
  (to be added in a future phase after proper discussion)
- Make aiohttp an optional dependency in pyproject.toml
  (install with: pip install anemoi-training[remote])
- Add conditional import handling for aiohttp with HAS_AIOHTTP flag
- Add execution patterns documentation to pipeline.py module docstring
  (explains standalone vs callback integration patterns)
- Add ComponentCatalog relationship docs explaining connection to
  anemoi.utils.registry
@JesperDramsch JesperDramsch marked this pull request as ready for review December 23, 2025 16:04
- Add CheckpointContext dataclass for carrying state through pipeline
- Add PipelineStage abstract base class for all pipeline stages
- Add comprehensive exception hierarchy for checkpoint operations
- Establish foundation for three-layer checkpoint architecture

Part of Phase 1 checkpoint pipeline infrastructure (#493)
JesperDramsch and others added 18 commits January 29, 2026 16:16
…dling

- Extract _handle_unknown_loader() for loader error handling
- Extract _handle_unknown_modifier() for modifier error handling
- Add _build_loader_error_message() for detailed error messages
- Add _build_modifier_error_message() for modifier errors
- Add _get_loader_type_descriptions() for loader documentation
- Add _get_modifier_type_descriptions() for modifier documentation
- Add _find_similar_names() for smart suggestions
- Use list comprehensions for better performance
- Preserve all helpful error messages and suggestions
- Reduce McCabe complexity from 11 to acceptable levels
…t validation

- Add CheckpointPipeline class for stage orchestration
- Implement async and sync execution modes
- Add Hydra-based configuration support with instantiation
- Implement smart pipeline composition validation
- Check source-loader-modifier ordering automatically
- Detect duplicate stages and provide warnings
- Suggest missing stages based on pipeline composition
- Add pre-execution validation with health checks
- Support dynamic stage management (add/remove/clear)
- Include comprehensive error handling with context
- Track stage execution in metadata for debugging
- Support continue_on_error for resilient pipelines
The conftest.py used hardcoded relative path "../src/anemoi/training/config"
which only worked from training/tests/ directory, causing Hydra to hang
when tests were run from training/ root.

Changes:
- Add _get_config_path() helper to dynamically locate config directory
- Supports running tests from any directory (training/ or training/tests/)
- Add lazy import of AnemoiDatasetsDataModule for performance
- Tests now run in 40s instead of hanging for 2+ minutes

Fixes issue where `cd training && pytest tests/checkpoint/` would timeout.
Updated error handling tests to expect the proper CheckpointConfigError
exception instead of generic ValueError, matching the actual implementation
in catalog.py.

Changes:
- test_get_source_target_when_empty: expect CheckpointConfigError
- test_get_loader_target_when_empty: expect CheckpointConfigError
- test_get_modifier_target_when_empty: expect CheckpointConfigError
- Update assertions to match actual error message format
Remove undefined placeholder functions from __all__ that don't exist yet:
- create_error_context
- ensure_checkpoint_error
- log_checkpoint_error
- map_pytorch_error_to_checkpoint_error
- validate_checkpoint_keys

Apply ruff auto-fixes:
- Sort __all__ alphabetically within sections
- Add trailing commas where missing

Fixes 5 F822 ruff errors (undefined name in __all__).
Remove safetensors format support to simplify checkpoint handling:

Source changes:
- Remove safetensors optional dependency from pyproject.toml
- Remove safetensors import and HAS_SAFETENSORS flag from formats.py
- Update checkpoint_format type hints to exclude "safetensors"
- Remove safetensors loading/saving logic and helper functions
- Remove .safetensors from supported file extensions in exceptions
- Update documentation to reflect supported formats only

Test changes:
- Remove safetensors fixture from conftest.py
- Remove 8 safetensors test methods from test_formats.py
- Remove safetensors skip logic from test_utils.py
- Remove unused unittest.mock.patch import

Net changes: -55 lines from source, -157 lines from tests

Supported formats after this change: lightning, pytorch, state_dict
- Restore missing CheckpointSourceError and CheckpointTimeoutError
- Fix TRY003/EM102: Extract exception messages to variables before raising
- Fix G004: Convert logging f-strings to % formatting
- Fix TRY300: Add explicit else blocks for clarity
- Fix BLE001: Specify exception types (OSError, RuntimeError) instead of blind catch
- Fix TRY400: Use logging.exception() for automatic traceback inclusion
- Fix TC002/TC003: Remove incorrect type checking noqa comments

All core checkpoint tests passing (43/43 in test_formats.py, 31/31 in test_base.py and test_pipeline.py).
Changes based on @anaprietonem's Nov 25 review comments:

- Remove GCS/Azure references from catalog.py
  (to be added in a future phase after proper discussion)
- Make aiohttp an optional dependency in pyproject.toml
  (install with: pip install anemoi-training[remote])
- Add conditional import handling for aiohttp with HAS_AIOHTTP flag
- Add execution patterns documentation to pipeline.py module docstring
  (explains standalone vs callback integration patterns)
- Add ComponentCatalog relationship docs explaining connection to
  anemoi.utils.registry
Add comprehensive documentation for the Phase 1 checkpoint pipeline:

- checkpoint_integration.rst: Core API reference (CheckpointContext,
  PipelineStage, CheckpointPipeline), error handling, utility functions,
  component discovery, and configuration examples

- checkpoint_pipeline_configuration.rst: Configuration guide covering
  pipeline structure, stage types (sources, loaders, modifiers),
  complete examples, migration from legacy config, and best practices

- checkpoint_troubleshooting.rst: Diagnostic guide for common issues
  including configuration errors, environment setup, file/path problems,
  loading compatibility, network issues, and memory optimization

All documentation reflects currently implemented features. Planned
features for subsequent phases are clearly marked with notes.
- Add pytest-asyncio dependency and configure asyncio_mode=auto
- Include validation errors in CheckpointValidationError message
- Make nested tensor validation recursive for deep structures
- Fix "infinite values" message to match test expectations
- Catch all exceptions in get_checkpoint_metadata for corrupted files
- Fix test assertions for source_path and state_dict validation
Fix test failures that occurred in GitHub Actions CI:

- test_pipeline.py: Avoid Hydra _target_ instantiation with test module
  paths that aren't installed as packages in CI. Changed tests to use
  pre-instantiated MockStage objects passed directly to pipeline.

- test_formats.py: Fix PicklingError in test_detect_format_non_dict_checkpoint
  by saving state_dict instead of raw model object (recommended approach).

- test_exceptions.py: Fix constructor signature mismatches for
  CheckpointSourceError tests, remove non-existent logging tests,
  and adjust picklability test to use base CheckpointError class.

All 192 checkpoint tests now pass with 7 skipped.
- Add return type annotations (-> None) to all test methods
- Fix isinstance syntax to use X | Y instead of (X, Y)
- Move Path import to TYPE_CHECKING block in utils.py
- Add else block for TRY300 compliance in formats.py
- Fix PT017: refactor try/except assertions to pytest.raises()
- Fix SIM117: combine nested with statements
- Fix EM101/EM102: extract exception messages to variables
- Rename unused parameters with underscore prefix
- Rename fixture without return value with underscore prefix (PT004)
In Python 3.10, asyncio.TimeoutError is distinct from the builtin
TimeoutError. This fix catches both exception types to ensure proper
timeout handling across all supported Python versions.

Also restores marshmallow warning filters in pytest.ini for
compatibility with marshmallow 3.x installations while keeping
asyncio test configuration.
@JesperDramsch JesperDramsch force-pushed the feature/checkpoint-pipeline-infrastructure branch from cad403d to 1e0f1a2 Compare January 29, 2026 17:03
pre-commit-ci bot and others added 3 commits January 29, 2026 17:04
PreprocessorSchema must be imported at runtime for Pydantic v2 model
building. Reverts the TYPE_CHECKING guard from commit 2a0d002 that
caused 16 CI test failures across Python 3.11/3.12/3.13.
@anaprietonem anaprietonem self-requested a review February 17, 2026 13:17
Copy link
Contributor

@anaprietonem anaprietonem left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Jesper for addressing the comments and implementing this. Nice work!! LGTM and can be merged.

@JesperDramsch JesperDramsch merged commit 2e20ec6 into main Feb 17, 2026
19 checks passed
@JesperDramsch JesperDramsch deleted the feature/checkpoint-pipeline-infrastructure branch February 17, 2026 14:02
@github-project-automation github-project-automation bot moved this from Reviewers needed to Done in Anemoi-dev Feb 17, 2026
@DeployDuck DeployDuck mentioned this pull request Feb 17, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ATS Approved Approved by ATS enhancement New feature or request training

Projects

Status: Done

Development

Successfully merging this pull request may close these issues.

Checkpoint Pipeline Infrastructure (Phase 1)

4 participants