onestep is a small async task runtime centered on four concepts:
OneStepApp: task registry and lifecycle managerSource: fetch data from a queue or polling backendSink: publish processed dataDelivery: a single fetched item withack/retry/fail
The V1 stable surface includes:
MemoryQueueMySQLConnector.table_queue(...)MySQLConnector.incremental(...)MySQLConnector.table_sink(...)MySQLConnector.state_store(...)MySQLConnector.cursor_store(...)RabbitMQConnector.queue(...)RedisConnector.stream(...)SQSConnector.queue(...)IntervalSource.every(...)CronSource(...)WebhookSource(...)
Core package:
pip install onestepCommon extras:
pip install 'onestep[yaml]'pip install 'onestep[mysql]'pip install 'onestep[rabbitmq]'pip install 'onestep[redis]'pip install 'onestep[sqs]'pip install 'onestep[all]'
From a source checkout:
pip install -e .pip install -e '.[dev]'pip install -e '.[integration]'
1.0.0 is a runtime rewrite. Projects built on the legacy step and
*Broker APIs should treat the upgrade as a migration, not a drop-in package
bump.
See MIGRATION-0.5-to-1.0.0.md for:
- old-to-new API mapping
- unsupported legacy features
- a minimal before/after example
- rollout guidance for existing projects
The deployment entrypoint is the onestep CLI.
Recommended module shape:
from onestep import IntervalSource, OneStepApp
app = OneStepApp("billing-sync")
@app.task(source=IntervalSource.every(hours=1, immediate=True, overlap="skip"))
async def sync_billing(ctx, _):
print("syncing billing data")Run the app:
onestep run your_package.tasks:appThe short form is also supported:
onestep your_package.tasks:appCheck the target before starting it:
onestep check your_package.tasks:appYou can also point the CLI at a zero-argument factory:
onestep check your_package.tasks:build_appUse JSON output when you want the check result in CI or deployment scripts:
onestep check --json your_package.tasks:appYou can also load a YAML app definition with handler.ref entries that point to Python callables:
app:
name: billing-sync
connectors:
tick:
type: interval
minutes: 5
immediate: true
processed:
type: memory
tasks:
- name: sync_billing
source: tick
handler:
ref: your_package.handlers.billing:sync_billing
params:
region: cn
emit: [processed]
retry:
type: max_attempts
max_attempts: 3
delay_s: 10Check or run the YAML target the same way:
onestep check worker.yaml
onestep run worker.yamlYAML resources can reference other resources by name, for example rabbitmq_queue.connector: rmq
or mysql_incremental.state: cursor_store.
Currently supported YAML resource types:
memoryintervalcronwebhookrabbitmqrabbitmq_queueredisredis_streamsqssqs_queuemysqlmysql_state_storemysql_cursor_storemysql_table_queuemysql_incrementalmysql_table_sink
YAML apps can also bind app-level state explicitly:
app:
name: billing-sync
state: app_state
connectors:
app_state:
type: mysql_state_store
connector: db
table: onestep_stateThe named state resource must support load/save/delete; mysql_state_store
and mysql_cursor_store both work.
Runnable examples live in:
example/cli_app.pyexample/cli_app.yaml
Run it locally with:
PYTHONPATH=src onestep check example.cli_app:app
onestep check example/cli_app.yaml
SYNC_INTERVAL_SECONDS=5 PYTHONPATH=src onestep run example.cli_app:appA complete deployment template lives in:
deploy/README.mddeploy/systemd/onestep-app.servicedeploy/env/onestep-app.env.exampledeploy/bin/onestep-preflight.sh
The example uses:
/etc/onestep/onestep-app.envfor deployment variablesExecStartPreto run a startup checkExecStartto launchonestep run
Install it with:
sudo mkdir -p /etc/onestep
sudo cp deploy/env/onestep-app.env.example /etc/onestep/onestep-app.env
sudo cp deploy/systemd/onestep-app.service /etc/systemd/system/onestep-app.service
sudo systemctl daemon-reload
sudo systemctl enable --now onestep-appCheck status and logs:
sudo systemctl status onestep-app
sudo journalctl -u onestep-app -fSee deploy/README.md for the expected directory layout and the env vars you need to adjust first.
The deploy template prepends APP_CWD to PYTHONPATH so module targets defined inside the repo can be imported by the onestep console script.
onestep can push runtime telemetry to onestep-control-plane without adding a new connector or changing task code.
Attach the reporter explicitly:
from onestep import ControlPlaneReporter, ControlPlaneReporterConfig, IntervalSource, OneStepApp
app = OneStepApp("billing-sync")
reporter = ControlPlaneReporter(ControlPlaneReporterConfig.from_env(app_name=app.name))
reporter.attach(app)
@app.task(source=IntervalSource.every(hours=1, immediate=True, overlap="skip"))
async def sync_users(ctx, _):
print("syncing users")Required environment variables:
ONESTEP_CONTROL_PLANE_URLorONESTEP_CONTROL_URLONESTEP_CONTROL_PLANE_TOKENorONESTEP_CONTROL_TOKEN
Common optional environment variables:
ONESTEP_ENVONESTEP_SERVICE_NAMEONESTEP_NODE_NAMEONESTEP_DEPLOYMENT_VERSIONONESTEP_INSTANCE_IDONESTEP_CONTROL_PLANE_HEARTBEAT_INTERVAL_SONESTEP_CONTROL_PLANE_METRICS_INTERVAL_SONESTEP_CONTROL_PLANE_EVENT_FLUSH_INTERVAL_SONESTEP_CONTROL_PLANE_EVENT_BATCH_SIZEONESTEP_CONTROL_PLANE_TIMEOUT_S
The reporter currently pushes:
POST /api/v1/agents/syncPOST /api/v1/agents/heartbeatPOST /api/v1/agents/metricsPOST /api/v1/agents/events
Behavior:
- startup sends an initial heartbeat
- startup also sends a topology sync built from the current app tasks
- sync is retried on later heartbeat cycles until the current topology hash is accepted
- task execution events are aggregated into task window metrics
- important runtime events (
retried,failed,dead_lettered,cancelled) are batched and pushed - reporter failures are logged but do not stop task execution
Quick local demo:
-
Start
onestep-control-plane:If the control plane repo is checked out next to this one:
cd ../onestep-control-plane
./scripts/start-local.sh- Start a long-running OneStep reporter demo:
./scripts/run-control-plane-demo.sh- Inspect the control plane:
http://127.0.0.1:8080/api/v1/services?environment=dev
http://127.0.0.1:8080/openapi.json
The runtime now has a few explicit control points:
OneStepApp(..., shutdown_timeout_s=30.0)controls how long the app waits for inflight tasks before cancelling them during shutdown@app.task(..., timeout_s=30.0)applies an execution timeout to async handlers@app.task(..., dead_letter=...)routes terminal failures into a dead-letter sink@app.on_eventreceives task execution eventsInMemoryMetrics()is a built-in metrics hook for event counting@app.on_startupand@app.on_shutdownregister lifecycle hooksctx.configexposes app configctx.stateexposes per-task namespaced state backed by the app state store
Failures are classified as:
errortimeoutcancelled
Custom retry policies receive a FailureInfo object so they can decide differently for timeouts vs business exceptions.
Execution events are emitted for:
fetchedstartedsucceededretriedfaileddead_letteredcancelled
from onestep import InMemoryMetrics, InMemoryStateStore, MemoryQueue, OneStepApp, StructuredEventLogger
source = MemoryQueue("incoming")
dead = MemoryQueue("dead-letter")
metrics = InMemoryMetrics()
app = OneStepApp(
"runtime-demo",
config={"prefix": "demo"},
state=InMemoryStateStore(),
shutdown_timeout_s=10.0,
)
app.on_event(metrics)
app.on_event(StructuredEventLogger())
@app.on_startup
async def bootstrap(app):
await source.publish({"value": 1})
@app.task(source=source, timeout_s=5.0, dead_letter=dead)
async def consume(ctx, item):
runs = await ctx.state.get("runs", 0)
await ctx.state.set("runs", runs + 1)
print(ctx.config["prefix"], item)
ctx.app.request_shutdown()If a task ends in a terminal failure and dead_letter is configured, the dead-letter sink receives:
body["payload"]: the original payloadbody["failure"]:{kind, exception_type, message, traceback?}meta["original_meta"]: the original envelope metadata
You can inspect metrics in-process:
snapshot = metrics.snapshot()
print(snapshot["kinds"])StructuredEventLogger() bridges TaskEvent into standard Python logging with consistent fields such as:
event_kindapp_nametask_namesource_nameattemptsduration_sfailure_kind
from onestep import MemoryQueue, OneStepApp
app = OneStepApp("demo")
source = MemoryQueue("incoming")
sink = MemoryQueue("processed")
@app.task(source=source, emit=sink, concurrency=4)
async def double(ctx, item):
ctx.app.request_shutdown()
return {"value": item["value"] * 2}
async def main():
await source.publish({"value": 21})
await app.serve()Use a local scheduler when you need to run a task every fixed duration.
from onestep import IntervalSource, OneStepApp
app = OneStepApp("interval-demo")
@app.task(
source=IntervalSource.every(
hours=1,
immediate=True,
overlap="skip",
payload={"job": "refresh-cache"},
)
)
async def refresh_cache(ctx, item):
print("scheduled at:", ctx.current.meta["scheduled_at"], "payload:", item)overlap controls what happens when the previous run is still inflight:
allow: start another run immediatelyskip: drop missed ticks while the previous run is still runningqueue: serialize missed ticks and run them one by one afterwards
Use CronSource when you care about wall-clock time rather than elapsed duration.
from onestep import CronSource, OneStepApp
app = OneStepApp("hourly-sync")
@app.task(source=CronSource("0 * * * *", timezone="Asia/Shanghai", overlap="skip"))
async def sync_hourly(ctx, _):
print("running at:", ctx.current.meta["scheduled_at"])The built-in parser supports standard 5-field cron expressions and these aliases:
@hourly@daily@weekly@monthly@yearly
Use WebhookSource when external systems push events into your app.
from onestep import BearerAuth, MemoryQueue, OneStepApp, WebhookSource
app = OneStepApp("webhook-demo")
jobs = MemoryQueue("jobs")
@app.task(
source=WebhookSource(
path="/webhooks/github",
methods=("POST",),
host="127.0.0.1",
port=8080,
auth=BearerAuth("replace-me"),
),
emit=jobs,
)
async def ingest_github(ctx, event):
return {
"event": event["headers"].get("x-github-event"),
"payload": event["body"],
}The stdlib implementation supports:
- shared
host:portlisteners across multiple webhook routes - optional
BearerAuth(...) json,form,text,raw, andautobody parsing- fixed
WebhookResponse(...)responses - exact path matching and method filtering
The payload delivered to your task contains:
bodyheadersquerymethodpathclientreceived_at
Use a table as a task queue by claiming rows and marking them as finished.
from onestep import MemoryQueue, MySQLConnector, OneStepApp
app = OneStepApp("orders")
db = MySQLConnector("mysql+pymysql://root:root@localhost:3306/app")
source = db.table_queue(
table="orders",
key="id",
where="status = 0",
claim={"status": 9},
ack={"status": 1},
nack={"status": 0},
batch_size=100,
)
sink = db.table_sink(table="processed_orders", mode="upsert", keys=("id",))
@app.task(source=source, emit=sink, concurrency=16)
async def process_order(ctx, row):
return {"id": row["id"], "payload": row["payload"], "status": "done"}Use (updated_at, id) as a lightweight cursor for Logstash-style sync.
from onestep import MemoryQueue, MySQLConnector, OneStepApp
app = OneStepApp("sync-users")
db = MySQLConnector("mysql+pymysql://root:root@localhost:3306/app")
state = db.cursor_store(table="onestep_cursor")
source = db.incremental(
table="users",
key="id",
cursor=("updated_at", "id"),
where="deleted = 0",
batch_size=1000,
state=state,
)
out = MemoryQueue("dw")
@app.task(source=source, emit=out, concurrency=1)
async def sync_user(ctx, row):
return {"id": row["id"], "name": row["name"], "updated_at": row["updated_at"]}For production deployments, prefer db.cursor_store(...) or db.state_store(...) over the in-memory stores so cursors and task state survive process restarts.
from onestep import OneStepApp, RabbitMQConnector
app = OneStepApp("rabbitmq-demo")
rmq = RabbitMQConnector("amqp://guest:guest@localhost/")
source = rmq.queue(
"incoming_jobs",
exchange="jobs.events",
routing_key="jobs.created",
prefetch=50,
)
out = rmq.queue(
"processed_jobs",
exchange="jobs.events",
routing_key="jobs.done",
)
@app.task(source=source, emit=out, concurrency=8)
async def process_job(ctx, item):
return {"job": item["job"], "status": "done"}Install with pip install '.[rabbitmq]'.
Use Redis Streams for lightweight, reliable message queuing with consumer groups.
from onestep import OneStepApp, RedisConnector
app = OneStepApp("redis-demo")
redis = RedisConnector("redis://localhost:6379")
source = redis.stream(
"jobs",
group="workers",
batch_size=100,
poll_interval_s=0.5,
)
out = redis.stream("processed")
@app.task(source=source, emit=out, concurrency=8)
async def process_job(ctx, item):
return {"job": item["job"], "status": "done"}Key features:
- Consumer groups: Multiple consumers share message processing
- Message acknowledgment:
XACKfor reliable processing - Pending messages: Unacked messages stay in PEL for retry via
XCLAIM - Stream trimming:
maxlenoption to limit stream size
Install with pip install '.[redis]'.
from onestep import OneStepApp, SQSConnector
app = OneStepApp("sqs-demo")
sqs = SQSConnector(region_name="ap-southeast-1")
source = sqs.queue(
"https://sqs.ap-southeast-1.amazonaws.com/123456789/jobs.fifo",
message_group_id="workers",
delete_batch_size=10,
delete_flush_interval_s=0.5,
heartbeat_interval_s=15,
heartbeat_visibility_timeout=60,
)
out = sqs.queue(
"https://sqs.ap-southeast-1.amazonaws.com/123456789/processed.fifo",
message_group_id="workers",
)
@app.task(source=source, emit=out, concurrency=16)
async def process_job(ctx, item):
return {"job": item["job"], "status": "done"}Install with pip install '.[sqs]'.
Supported examples are indexed in:
example/README.md
Common entrypoints:
example/cli_app.pyexample/runtime_showcase.pyexample/mysql_incremental.pyexample/redis_stream.pyexample/webhook_source.py
Optional live tests are under tests/integration/.
The local stack now includes Redis, RabbitMQ, LocalStack SQS, and MySQL.
Install the live-test dependencies:
pip install '.[integration]'Start the local integration stack:
make integration-upLoad the generated environment into your current shell:
eval "$(./scripts/setup-integration-env.sh)"Run all live tests in one command:
make integration-testYou can also run one test file manually after loading the environment:
PYTHONPATH=src python3 -m pytest tests/integration/test_redis_live.py -qPYTHONPATH=src python3 -m pytest tests/integration/test_rabbitmq_live.py -qPYTHONPATH=src python3 -m pytest tests/integration/test_sqs_live.py -qPYTHONPATH=src python3 -m pytest tests/integration/test_mysql_live.py -q
Set KEEP_INTEGRATION_SERVICES=1 to keep containers running after make integration-test.
The test suite is now intentionally split by responsibility:
tests/contract/: runtime contract tests that lock task execution semanticstests/integration/: live infrastructure tests for Redis, RabbitMQ, SQS, and MySQLtests/test_*.py: connector-focused unit tests
For a quick end-to-end demo with webhook ingestion, queueing, dead-letter handling, metrics, and structured logs:
PYTHONPATH=src python3 example/runtime_showcase.pyThen send:
curl -X POST http://127.0.0.1:8090/demo/webhook \
-H 'Content-Type: application/json' \
-d '{"action":"ok","value":21}'
curl -X POST http://127.0.0.1:8090/demo/webhook \
-H 'Content-Type: application/json' \
-d '{"action":"fail","value":21}'
curl -X POST http://127.0.0.1:8090/demo/webhook \
-H 'Content-Type: application/json' \
-d '{"action":"slow","value":21}'You will see:
- structured task event logs
- successful processing for
action=ok - dead-letter output for
action=fail - timeout + dead-letter output for
action=slow