Deploy and run Metaflow flows as Kestra workflows.
metaflow-kestra compiles any Metaflow flow into a Kestra YAML workflow, letting you schedule,
deploy, and monitor your pipelines through Kestra while keeping all your existing Metaflow code
unchanged.
pip install metaflow-kestraOr from source:
git clone https://github.com/npow/metaflow-kestra.git
cd metaflow-kestra
pip install -e ".[dev]"python my_flow.py kestra run --kestra-host http://localhost:8080 --wait# Compile the flow to a Kestra YAML
python my_flow.py kestra compile flow.yaml
# Deploy to a running Kestra server
python my_flow.py kestra create --kestra-host http://localhost:8080
# Compile, deploy, and trigger in one step
python my_flow.py kestra run --kestra-host http://localhost:8080 --wait
# Resume a failed run (skips already-completed steps)
python my_flow.py kestra resume --clone-run-id kestra-<hex> \
--kestra-host http://localhost:8080# Linear
class SimpleFlow(FlowSpec):
@step
def start(self):
self.value = 42
self.next(self.end)
@step
def end(self): pass
# Split/join (branch)
class BranchFlow(FlowSpec):
@step
def start(self):
self.data = [1, 2, 3]
self.next(self.branch_a, self.branch_b)
...
# Foreach fan-out (body tasks run concurrently)
class ForeachFlow(FlowSpec):
@step
def start(self):
self.items = [1, 2, 3]
self.next(self.process, foreach="items")
...metaflow.Parameter definitions become Kestra input fields automatically:
class ParamFlow(FlowSpec):
greeting = Parameter("greeting", default="hello")
count = Parameter("count", default=3, type=int)
...# Trigger with custom values via the CLI
python param_flow.py kestra run --wait
# Or trigger manually in the Kestra UI — inputs appear as typed form fields@schedule maps to a Kestra cron trigger:
@schedule(cron="0 9 * * 1") # every Monday at 9 AM
class WeeklyFlow(FlowSpec):
...Multi-level foreach fan-outs are fully supported — each nesting level maps to a nested Kestra
ForEach task:
class NestedForeachFlow(FlowSpec):
@step
def start(self):
self.models = ["a", "b"]
self.next(self.train, foreach="models")
@step
def train(self):
self.seeds = [1, 2, 3]
self.next(self.run_seed, foreach="seeds")
@step
def run_seed(self): ...
...@retry is read from your flow and applied to the generated Kestra task automatically:
class MyFlow(FlowSpec):
@retry(times=3, minutes_between_retries=2)
@step
def train(self):
...@resources on a step forwards CPU, memory, and GPU hints to the underlying compute backend
via --with=resources:cpu=N,memory=M,gpu=G:
class MyFlow(FlowSpec):
@resources(cpu=4, memory=8000, gpu=1)
@step
def train(self):
...# Trigger this flow when a named Kestra event label fires
@trigger(event="data.ready")
class MyFlow(FlowSpec):
...# Trigger this flow when UpstreamFlow completes in Kestra
@trigger_on_finish(flow="UpstreamFlow")
class DownstreamFlow(FlowSpec):
...Both translate to Kestra Flow trigger entries in the generated YAML.
metaflow-kestra bakes the active metadata and datastore backends into the generated YAML at
compile time, so every step subprocess uses the same backend. To use a specific backend:
python my_flow.py \
--metadata=service \
--datastore=s3 \
kestra compile flow.yamlOr via environment variables:
export METAFLOW_DEFAULT_METADATA=service
export METAFLOW_DEFAULT_DATASTORE=s3
python my_flow.py kestra compile flow.yamlpython my_flow.py kestra run \
--kestra-host http://kestra.internal:8080 \
--kestra-user admin@example.com \
--kestra-password secret \
--waitmetaflow-kestra walks your Metaflow flow's DAG and emits a Kestra YAML. Each Metaflow step
becomes a io.kestra.plugin.scripts.python.Script task. The generated YAML:
- runs a
metaflow_inittask first to create the_parametersartifact and assign a stable run ID - runs each step as a subprocess via the standard
metaflow stepCLI - passes
--input-pathscorrectly for joins and foreach splits - wraps split branches in a
Paralleltask so they execute concurrently - wraps foreach body tasks in a
ForEachtask - maps
@retryto Kestra task retry configuration - maps
@scheduleto a Kestra cron trigger - writes Metaflow artifact names and a ready-to-use retrieval snippet to Kestra task outputs after each step
The generated flow preserves the Metaflow DAG structure. Branch and foreach fan-outs appear as nested parallel task groups in the topology view and run concurrently in the Gantt timeline:
After each step completes, two extra output variables are posted to the Kestra task:
| Variable | Content |
|---|---|
metaflow_artifacts |
Comma-separated list of artifact names produced by the step |
metaflow_snippet |
Ready-to-paste Python code to load those artifacts via the Metaflow client |
| Construct | Kestra mapping |
|---|---|
| Linear steps | Sequential Script tasks |
self.next(a, b) split |
Parallel task wrapping branch tasks |
@step with inputs (join) |
Sequential task after Parallel completes |
self.next(step, foreach=items) |
ForEach task |
Nested foreach (multi-level) |
Nested ForEach tasks |
Parameter |
Kestra inputs with type mapping (INT, STRING, FLOAT, BOOLEAN) |
@schedule(cron=...) |
Kestra Schedule trigger |
@retry |
Kestra task retry configuration |
@resources(cpu=N, memory=M, gpu=G) |
--with=resources:... forwarded to compute backend |
@trigger(event=...) |
Kestra Flow trigger with event-label condition |
@trigger_on_finish(flow=...) |
Kestra Flow trigger on upstream flow completion |
kestra resume --clone-run-id <id> |
Resume failed run; skips steps that already succeeded |
git clone https://github.com/npow/metaflow-kestra.git
cd metaflow-kestra
pip install -e ".[dev]"
# Start a local Kestra instance (requires Docker)
docker compose up -d
# Run the integration test suite
KESTRA_HOST=http://localhost:8090 \
python -m pytest tests/test_e2e.py -m integration -v


