Skip to content

feat(live): add @pipeline decorator for simplified pipeline creation#900

Draft
rickstaa wants to merge 5 commits intolivepeer:mainfrom
rickstaa:feat/pipeline-decorator
Draft

feat(live): add @pipeline decorator for simplified pipeline creation#900
rickstaa wants to merge 5 commits intolivepeer:mainfrom
rickstaa:feat/pipeline-decorator

Conversation

@rickstaa
Copy link
Member

@rickstaa rickstaa commented Mar 7, 2026

Summary

Add a @pipeline decorator that turns a plain function or class into a fully working live Pipeline subclass — handling frame queues, lifecycle management, parameter validation, and thread safety automatically.

Function form (minimal)

from runner.app import start_app
from runner.live.pipelines import pipeline, BaseParams
from runner.live.trickle import VideoFrame

@pipeline(name="green-shift")
async def green_shift(frame: VideoFrame, params: BaseParams) -> torch.Tensor:
    tensor = frame.tensor.clone()
    tensor[:, :, :, 1] = torch.clamp(tensor[:, :, :, 1] + 0.3, -1.0, 1.0)
    return tensor

if __name__ == "__main__":
    start_app(pipeline=green_shift._spec)

Class form (with lifecycle hooks)

@pipeline(name="edge-detect", params=EdgeParams)
class EdgeDetect:
    def on_ready(self, **params):
        """Called once at startup — load models, allocate GPU resources."""
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.kernel = build_sobel_kernel(self.device)

    def transform(self, frame: VideoFrame, params: EdgeParams) -> torch.Tensor:
        """Called per frame — run inference."""
        return sobel_edge_detect(frame.tensor, self.kernel, params.threshold)

    def on_update(self, **params):
        """Called when params change mid-stream."""
        self._threshold = params.get("threshold", 0.1)

    def on_stop(self):
        """Called on shutdown — cleanup."""
        pass

What the decorator handles

  • Wraps functions/classes into a Pipeline subclass with async frame queues
  • Both sync and async transform methods (sync runs in thread pool)
  • Auto-generates PipelineSpec for dynamic loading via importlib
  • Thread-safe parameter updates via asyncio.Lock
  • prepare_models() classmethod for build-time model downloads
  • Warns if @pipeline decorates a nested definition (dotted __qualname__ breaks the loader)

Includes two example pipelines

  • green_shift: function form — boosts the green channel (no GPU needed)
  • edge_detect: class form — Sobel edge detection with all lifecycle hooks

How to test it works

  1. See description

You can test both examples and will get a result like

image image

Copilot AI review requested due to automatic review settings March 7, 2026 21:37
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a new @pipeline decorator to simplify authoring live video pipelines by generating a Pipeline subclass (plus an attached PipelineSpec) from either a plain function or a small user class, and adds documentation + example pipelines demonstrating the new API.

Changes:

  • Add runner.live.pipelines.create:pipeline decorator to auto-generate a working Pipeline wrapper and PipelineSpec.
  • Export pipeline from runner.live.pipelines for public use.
  • Add docs and two example pipelines (green_shift, depth_midas) illustrating function and class forms.

Reviewed changes

Copilot reviewed 5 out of 6 changed files in this pull request and generated 8 comments.

Show a summary per file
File Description
runner/src/runner/live/pipelines/create.py Implements the @pipeline decorator and generated Pipeline wrapper logic.
runner/src/runner/live/pipelines/__init__.py Exposes the new decorator as part of the public live pipelines API.
examples/live-video-to-video/green_shift.py Adds a minimal function-form example pipeline.
examples/live-video-to-video/depth_midas.py Adds a class-form example pipeline demonstrating lifecycle hooks and model loading.
docs/create-pipeline.md Adds API reference and usage guidance for the decorator and examples.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Add a `@pipeline` decorator that turns a plain function or class into a
fully working live Pipeline subclass — handling frame queues, lifecycle
management, parameter validation, and thread safety automatically.

Includes two example pipelines:
- green_shift: minimal function form (4 lines of user code)
- depth_midas: class form with MiDaS Small depth estimation, demonstrating
  prepare_models, on_ready, transform, on_update, and on_stop

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@rickstaa rickstaa force-pushed the feat/pipeline-decorator branch from e2aa119 to 378275f Compare March 8, 2026 08:39
Copy link
Contributor

@victorges victorges left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

has_prepare = hasattr(user_cls, "prepare_models")
label = user_cls.__name__

class GeneratedPipeline(Pipeline):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You know the cleanest approach would be to simply migrate to existing Pipeline base class and all the implementations to the new interface. I don't think there's a core need for spring 2 class interfaces and we could probably stick to just the cleaner one. If there are more "hardcore" functions that not everyone needs to override, the override can be optional and we just have sane defaults if not implemented.

So instead of having 1 function and 2 class forms, it's either function or class. With decorator always used for registration. Or alternatively there could be at least a "Simple pipeline" base class so the interface is not just assumed by the decorator but properly defined and typed.

Not required change, just an architectural nit. I'm not even a core maintainer anymore hahaha

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, it's probably better to define an interface and apply it to the Pipeline class. Let’s treat this as a draft for revisiting the idea later.

rickstaa pushed a commit to rickstaa/ai-runner that referenced this pull request Mar 11, 2026
…lities

Consolidate duplicated boilerplate across all batch pipeline route handlers
into shared utility functions (check_auth_token, check_model_id,
execute_pipeline) and a shared RESPONSES dict in routes/utils.py.

This addresses Victor's feedback on PR livepeer#900 about reducing redundant
pipeline interface patterns and making common behavior properly defined
in one place rather than copy-pasted across every route.

https://claude.ai/code/session_01RvBGa2npztEMxwfAHH3Xve
rickstaa and others added 4 commits March 16, 2026 13:29
- Fix __qualname__ on function wrapper for correct importlib resolution
- Add module docstring with lifecycle hook reference to create.py
- Improve _build_pipeline docstring documenting hook detection
- Normalize VideoOutput request_id to prevent silent frame drops
- Make prepare_models async-capable via _invoke for sync/async support
- Replace MiDaS example with pure-torch edge detection (no external deps)
- Update docs to use EdgeDetect example throughout

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Allow running examples directly with `python examples/live-video-to-video/green_shift.py`
instead of the verbose `python -m runner.live.infer --pipeline ...` invocation.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Nested classes/functions produce a dotted __qualname__ which breaks
the loader's getattr lookup. Log a warning so developers know to
keep @pipeline at the top level.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Move example pipeline launching from individual __main__ blocks into a
shared test_examples.py script. Update integration testing docs with
go-livepeer box setup, webcam streaming, and MediaMTX troubleshooting.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants