feat(live): add @pipeline decorator for simplified pipeline creation#900
feat(live): add @pipeline decorator for simplified pipeline creation#900rickstaa wants to merge 5 commits intolivepeer:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
This PR introduces a new @pipeline decorator to simplify authoring live video pipelines by generating a Pipeline subclass (plus an attached PipelineSpec) from either a plain function or a small user class, and adds documentation + example pipelines demonstrating the new API.
Changes:
- Add
runner.live.pipelines.create:pipelinedecorator to auto-generate a workingPipelinewrapper andPipelineSpec. - Export
pipelinefromrunner.live.pipelinesfor public use. - Add docs and two example pipelines (
green_shift,depth_midas) illustrating function and class forms.
Reviewed changes
Copilot reviewed 5 out of 6 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
runner/src/runner/live/pipelines/create.py |
Implements the @pipeline decorator and generated Pipeline wrapper logic. |
runner/src/runner/live/pipelines/__init__.py |
Exposes the new decorator as part of the public live pipelines API. |
examples/live-video-to-video/green_shift.py |
Adds a minimal function-form example pipeline. |
examples/live-video-to-video/depth_midas.py |
Adds a class-form example pipeline demonstrating lifecycle hooks and model loading. |
docs/create-pipeline.md |
Adds API reference and usage guidance for the decorator and examples. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Add a `@pipeline` decorator that turns a plain function or class into a fully working live Pipeline subclass — handling frame queues, lifecycle management, parameter validation, and thread safety automatically. Includes two example pipelines: - green_shift: minimal function form (4 lines of user code) - depth_midas: class form with MiDaS Small depth estimation, demonstrating prepare_models, on_ready, transform, on_update, and on_stop Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
e2aa119 to
378275f
Compare
| has_prepare = hasattr(user_cls, "prepare_models") | ||
| label = user_cls.__name__ | ||
|
|
||
| class GeneratedPipeline(Pipeline): |
There was a problem hiding this comment.
You know the cleanest approach would be to simply migrate to existing Pipeline base class and all the implementations to the new interface. I don't think there's a core need for spring 2 class interfaces and we could probably stick to just the cleaner one. If there are more "hardcore" functions that not everyone needs to override, the override can be optional and we just have sane defaults if not implemented.
So instead of having 1 function and 2 class forms, it's either function or class. With decorator always used for registration. Or alternatively there could be at least a "Simple pipeline" base class so the interface is not just assumed by the decorator but properly defined and typed.
Not required change, just an architectural nit. I'm not even a core maintainer anymore hahaha
There was a problem hiding this comment.
Good point, it's probably better to define an interface and apply it to the Pipeline class. Let’s treat this as a draft for revisiting the idea later.
…lities Consolidate duplicated boilerplate across all batch pipeline route handlers into shared utility functions (check_auth_token, check_model_id, execute_pipeline) and a shared RESPONSES dict in routes/utils.py. This addresses Victor's feedback on PR livepeer#900 about reducing redundant pipeline interface patterns and making common behavior properly defined in one place rather than copy-pasted across every route. https://claude.ai/code/session_01RvBGa2npztEMxwfAHH3Xve
- Fix __qualname__ on function wrapper for correct importlib resolution - Add module docstring with lifecycle hook reference to create.py - Improve _build_pipeline docstring documenting hook detection - Normalize VideoOutput request_id to prevent silent frame drops - Make prepare_models async-capable via _invoke for sync/async support - Replace MiDaS example with pure-torch edge detection (no external deps) - Update docs to use EdgeDetect example throughout Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Allow running examples directly with `python examples/live-video-to-video/green_shift.py` instead of the verbose `python -m runner.live.infer --pipeline ...` invocation. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Nested classes/functions produce a dotted __qualname__ which breaks the loader's getattr lookup. Log a warning so developers know to keep @pipeline at the top level. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Move example pipeline launching from individual __main__ blocks into a shared test_examples.py script. Update integration testing docs with go-livepeer box setup, webcam streaming, and MediaMTX troubleshooting. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Summary
Add a
@pipelinedecorator that turns a plain function or class into a fully working livePipelinesubclass — handling frame queues, lifecycle management, parameter validation, and thread safety automatically.Function form (minimal)
Class form (with lifecycle hooks)
What the decorator handles
Pipelinesubclass with async frame queuestransformmethods (sync runs in thread pool)PipelineSpecfor dynamic loading viaimportlibasyncio.Lockprepare_models()classmethod for build-time model downloads@pipelinedecorates a nested definition (dotted__qualname__breaks the loader)Includes two example pipelines
How to test it works
You can test both examples and will get a result like