Skip to content

feat(proxy): durable agent workflow run tracking via /v1/workflows/runs#26793

Merged
ishaan-berri merged 19 commits intolitellm_internal_stagingfrom
litellm_durable-agent-workflow-runs
Apr 30, 2026
Merged

feat(proxy): durable agent workflow run tracking via /v1/workflows/runs#26793
ishaan-berri merged 19 commits intolitellm_internal_stagingfrom
litellm_durable-agent-workflow-runs

Conversation

@ishaan-berri
Copy link
Copy Markdown
Contributor

@ishaan-berri ishaan-berri commented Apr 29, 2026

Relevant issues

Fixes task state loss on process restart for agents like shin-builder. Task state and conversation history were stored in-memory only — a restart wiped everything.

Changes

Schema (litellm/proxy/schema.prisma): Three new tables.

  • LiteLLM_WorkflowRun — one run per agent job. session_id auto-generated UUID bridges to LiteLLM_SpendLogs.session_id for free cost tracking via existing POST /ui/spend_logs/view_session_spend_logs?session_id=.
  • LiteLLM_WorkflowEvent — append-only event log (source of truth). status on the run row is a materialized cache updated automatically when events are appended.
  • LiteLLM_WorkflowMessage — conversation inbox/outbox. Stores full message content (spend logs truncate at MAX_STRING_LENGTH_PROMPT_IN_DB).

8 REST endpoints (litellm/proxy/management_endpoints/workflow_management_endpoints.py):

POST   /v1/workflows/runs                       create run
GET    /v1/workflows/runs                       list runs (?workflow_type=&status=running,paused)
GET    /v1/workflows/runs/{run_id}              get run + latest event
PATCH  /v1/workflows/runs/{run_id}              update status/metadata/output
POST   /v1/workflows/runs/{run_id}/events       append event → auto-updates run.status
GET    /v1/workflows/runs/{run_id}/events       full event log (ordered by sequence)
POST   /v1/workflows/runs/{run_id}/messages     append conversation message
GET    /v1/workflows/runs/{run_id}/messages     conversation history (ordered by sequence)

Status auto-update on event append: step.started→running, step.failed→failed, hook.waiting→paused, hook.received→running. Set completed explicitly via PATCH.

Generic design — workflow_type and step_name are caller-defined strings. No hardcoded stage names. Domain fields (worktree_path, pr_url, claude_session_id) go in metadata: Json.

Startup recovery: GET /v1/workflows/runs?status=running,paused&workflow_type=<type> returns all in-flight runs. Last step.started event's data.claude_session_id gives the --resume ID for Claude CLI agents.

Pre-Submission checklist

  • Tests added: 14 unit tests in tests/test_litellm/proxy/management_endpoints/test_workflow_management_endpoints.py
  • Endpoints verified against live Neon PostgreSQL
  • No raw SQL — all DB access via prisma_client.db.litellm_workflowrun/event/message

CI

  • LiteLLM tests pass

Type

  • New Feature

Changes

  • New: litellm/proxy/management_endpoints/workflow_management_endpoints.py
  • New: litellm/proxy/workflows/README.md
  • New: tests/test_litellm/proxy/management_endpoints/test_workflow_management_endpoints.py
  • Modified: litellm/proxy/schema.prisma (3 new models)
  • Modified: litellm/proxy/proxy_server.py (register router)

Note

Medium Risk
Adds new persistent tables and multiple authenticated write endpoints, including ownership checks and transactional status updates. Risk is mainly around schema migration correctness and concurrency/authorization edge cases in the new sequence-number retry logic.

Overview
Adds durable workflow run tracking to the proxy, including a new /v1/workflows/runs API for creating/listing/getting/updating runs plus append-only event and message logs (with server-assigned sequence numbers and event-driven status updates).

Introduces three new Prisma models (LiteLLM_WorkflowRun, LiteLLM_WorkflowEvent, LiteLLM_WorkflowMessage) and registers a new FastAPI router in proxy_server.py; non-admin keys are scoped to only access runs they created (created_by), and event appends update run.status atomically in a DB transaction with retry-on-UniqueViolationError for concurrent sequence collisions.

Includes docs (proxy/workflows/README.md) and unit tests covering CRUD flows, ordering/limits, status transitions, collision retries, and tenant isolation.

Reviewed by Cursor Bugbot for commit e0a30b8. Bugbot is set up for automated code reviews on this repo. Configure here.

@CLAassistant
Copy link
Copy Markdown

CLAassistant commented Apr 29, 2026

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you all sign our Contributor License Agreement before we can accept your contribution.
0 out of 2 committers have signed the CLA.

❌ cursoragent
❌ ishaan-berri
You have signed the CLA already but the status is still pending? Let us recheck it.

@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented Apr 29, 2026

Greptile Summary

This PR introduces durable workflow run tracking via three new Prisma models and 8 REST endpoints under /v1/workflows/runs. Previous review rounds addressed the major concerns (sequence-number race conditions, non-atomic event+status updates, missing 404 guards, unconstrained status writes, and unbounded pagination). The remaining findings are P2: the status query filter in list_workflow_runs is not validated against the WorkflowRunStatus literal set (unlike the PATCH endpoint), and the ownership-bypass path when a caller has a null hashed key is undocumented. Schema, router registration, and tests are clean.

Confidence Score: 4/5

Safe to merge with minor quality gaps; no P0/P1 issues remain after previous fix rounds.

All P0/P1 issues from prior review rounds have been addressed (transactions, 404 guards, tenant isolation, status validation, pagination limits). Remaining findings are P2: unvalidated status query filter and a null-caller bypass that is likely intentional for master-key access. These do not block merging.

litellm/proxy/management_endpoints/workflow_management_endpoints.py — status filter validation and null-caller ownership logic.

Important Files Changed

Filename Overview
litellm/proxy/management_endpoints/workflow_management_endpoints.py New file: 8 REST endpoints for durable workflow run tracking. Previous round fixed transaction atomicity, 404 handling, tenant isolation, and pagination limits. Remaining concerns: unvalidated status query filter and a null-caller path that silently bypasses ownership checks.
litellm/proxy/schema.prisma Three new Prisma models with appropriate indexes, unique constraints for sequence numbers, and a created_by field for tenant isolation. No deletions, no schema regressions.
litellm/proxy/proxy_server.py Minimal change: imports and registers the new workflow_management_router. No logic changes.
tests/test_litellm/proxy/management_endpoints/test_workflow_management_endpoints.py 14 unit tests covering CRUD, status transitions, sequence collision retry, limit enforcement, and tenant isolation — all using mocked DB, compliant with repo policy.
litellm/proxy/workflows/README.md New documentation file describing the workflow API. No code changes.

Reviews (3): Last reviewed commit: "add ownership and bounded-limit tests fo..." | Re-trigger Greptile

Comment on lines +83 to +97
take=1,
)
else:
rows = await prisma_client.db.litellm_workflowmessage.find_many(
where={"run_id": run_id},
order={"sequence_number": "desc"},
take=1,
)
return (rows[0].sequence_number + 1) if rows else 0


# ---------------------------------------------------------------------------
# Endpoints
# ---------------------------------------------------------------------------

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Race condition in sequence number assignment

_get_next_sequence_number reads MAX then the caller inserts with MAX+1, with no transaction wrapping either step. Two concurrent calls for the same run_id will read the same max value, compute the same next number, and one INSERT will violate the @@unique([run_id, sequence_number]) constraint — returning a 500 to the caller and leaving the run without that event or message.

Either wrap the read+insert in a prisma_client.db.tx(...) block, or use an upsert-style approach with retry on unique-constraint failure.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit f830894 — the sequence number read and the event+status update are now wrapped in a single prisma_client.db.tx() transaction. On UniqueViolationError (concurrent append collision), the handler retries up to 5 times with a fresh MAX+1 read, then returns 409 if all retries are exhausted.

Comment on lines +268 to +292
"sequence_number": seq,
}
if data.data is not None:
event_data["data"] = _json(data.data)
event = await prisma_client.db.litellm_workflowevent.create(
data=event_data
)

new_status = _EVENT_STATUS_MAP.get(data.event_type)
if new_status:
await prisma_client.db.litellm_workflowrun.update(
where={"run_id": run_id},
data={"status": new_status},
)

return event
except Exception as e:
verbose_proxy_logger.exception("Error appending workflow event: %s", e)
raise HTTPException(status_code=500, detail=str(e))


@router.get(
"/v1/workflows/runs/{run_id}/events",
tags=["workflow management"],
dependencies=[Depends(user_api_key_auth)],
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Non-atomic event creation and status update

The event row is committed on line 278 before the status update on line 284. If the status update fails (e.g. network blip, or the run_id no longer exists after the event insert), the event log is persisted but WorkflowRun.status stays stale — permanently. There is no rollback of the event.

Wrap both operations in a single Prisma transaction so they succeed or fail together.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit f830894 — both the event insert and the run status update now run inside a single async with prisma_client.db.tx() as tx: block. If the status update fails the event insert rolls back with it; the two always commit or fail together.

Comment thread litellm/proxy/management_endpoints/workflow_management_endpoints.py
Comment on lines +60 to +63
class WorkflowEventCreateRequest(BaseModel):
event_type: str
step_name: str
data: Optional[Dict[str, Any]] = None
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Unconstrained status field accepts arbitrary strings

WorkflowRunUpdateRequest.status is a plain Optional[str] with no validation. A caller can PATCH status="anything", which will be written to the DB. The auto-update logic in append_workflow_event assumes a closed set ("running", "failed", "paused", "completed"), and downstream consumers (e.g. the startup recovery query) filter by known values.

Add a Pydantic validator or use a Literal / Enum type to restrict status to the documented values.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit 1a391c7WorkflowRunUpdateRequest.status is now typed as Optional[Literal["pending", "running", "paused", "completed", "failed"]]. Pydantic rejects any value outside that set with a 422 before it reaches the DB.

Comment thread litellm/proxy/management_endpoints/workflow_management_endpoints.py Outdated
)

try:
run = await prisma_client.db.litellm_workflowrun.find_unique(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Medium: Cross-tenant data access (IDOR)

The run_id path parameter is used directly in the DB query with no ownership check. The user_api_key_dict is resolved but never consulted — any caller who can reach this endpoint can fetch, update, or append events/messages to any run they can guess the UUID for.

If these endpoints are intended to be strictly admin-only, consider documenting that clearly. If non-admin users may be granted access via allowed_routes configuration, add a created_by column to LiteLLM_WorkflowRun (populated from user_api_key_dict.user_id at creation time) and filter all reads/writes by the caller's identity for non-admin users.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Acknowledged. These endpoints are intentionally admin-scoped — they require a valid LiteLLM API key and in practice are used by the proxy operator (or an agent running with a master key), not by end-user callers. run_ids are random UUIDs (not sequential integers), so enumeration is not practical.

Full tenant isolation (storing created_by on the run and filtering by caller identity) would require a schema migration and is tracked as follow-up work if non-admin access patterns are needed.

Bojun-Vvibe added a commit to Bojun-Vvibe/oss-contributions that referenced this pull request Apr 29, 2026
…i-cli#26208+26207

- BerriAI/litellm#26793 feat(proxy): durable agent workflow run tracking (needs-discussion — sequence-number race + missing tenancy scoping + Prisma error leak)
- google-gemini/gemini-cli#26208 fix: suppress duplicate extension warnings (merge-after-nits)
- google-gemini/gemini-cli#26207 Add @ mention the gemini robot (request-changes — critique-semantics flip + comment-step if guard regression + PAT scope concern)
@ishaan-berri ishaan-berri force-pushed the litellm_durable-agent-workflow-runs branch from 9d9efc1 to cb89e15 Compare April 29, 2026 18:46
Comment thread litellm/proxy/management_endpoints/workflow_management_endpoints.py
@ishaan-berri
Copy link
Copy Markdown
Contributor Author

@greptile review again

Comment thread litellm/proxy/management_endpoints/workflow_management_endpoints.py
Comment thread litellm/proxy/management_endpoints/workflow_management_endpoints.py
workflow_type: Optional[str] = Query(None),
status: Optional[str] = Query(None),
limit: int = Query(50, ge=1, le=250),
user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

High: Cross-tenant data access via missing ownership filter

list_workflow_runs returns all runs in the system regardless of who created them. The user_api_key_dict is resolved but never consulted — an authenticated user can enumerate every workflow run (including other users' runs) and obtain their run_ids, which then gives access to all sub-resource endpoints (events, messages, PATCH).

The LiteLLM_WorkflowRun model needs an ownership column (e.g. user_id/team_id), and every query should filter by the caller's identity:

where["user_id"] = user_api_key_dict.user_id

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in this push — list_workflow_runs now adds WHERE created_by = caller for non-admin API keys, so the endpoint only returns runs owned by the calling key. Admin keys still see all runs. The created_by field is populated at create time from user_api_key_dict.token.

@ishaan-berri
Copy link
Copy Markdown
Contributor Author

Both P1s are now addressed:

P1 – No tenant isolation: Added created_by String? to LiteLLM_WorkflowRun (schema.prisma) that stores the caller's hashed API key token at create time. Non-admin callers are scoped to their own key on all endpoints: list adds WHERE created_by = caller, get/update/event/message sub-resources return 404 on ownership mismatch. Admin keys see all runs. _require_run() centralises the ownership check so it can't be missed.

P1 – Unbounded list queries: list_workflow_events and list_workflow_messages now both take a limit query param (default 100, max 500) and pass it as take=limit to find_many. Added tests for limit enforcement.

@ishaan-berri
Copy link
Copy Markdown
Contributor Author

@greptile review again

Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Fix All in Cursor

Bugbot Autofix prepared a fix for the issue found in the latest run.

  • ✅ Fixed: Ownership check bypassed when created_by is null
    • Non-admin workflow run access now requires an exact caller token match, so null-owner runs return 404 and cannot be read or modified.
Preview (6b1b787b64)
diff --git a/litellm/proxy/management_endpoints/workflow_management_endpoints.py b/litellm/proxy/management_endpoints/workflow_management_endpoints.py
new file mode 100644
--- /dev/null
+++ b/litellm/proxy/management_endpoints/workflow_management_endpoints.py
@@ -1,0 +1,490 @@
+"""
+WORKFLOW RUN MANAGEMENT
+
+Generic durable state tracking for agents and automated workflows.
+
+POST   /v1/workflows/runs                       - Create a workflow run
+GET    /v1/workflows/runs                       - List runs (filter by type, status)
+GET    /v1/workflows/runs/{run_id}              - Get run with latest event
+PATCH  /v1/workflows/runs/{run_id}              - Update status, metadata, output
+POST   /v1/workflows/runs/{run_id}/events       - Append event (updates run status)
+GET    /v1/workflows/runs/{run_id}/events       - Full event log
+POST   /v1/workflows/runs/{run_id}/messages     - Append conversation message
+GET    /v1/workflows/runs/{run_id}/messages     - Fetch conversation history
+"""
+
+import json
+from typing import Any, Dict, Literal, Optional
+
+from fastapi import APIRouter, Depends, HTTPException, Query
+from prisma.errors import UniqueViolationError
+from pydantic import BaseModel
+
+from litellm._logging import verbose_proxy_logger
+from litellm.proxy._types import CommonProxyErrors, LitellmUserRoles, UserAPIKeyAuth
+from litellm.proxy.auth.user_api_key_auth import user_api_key_auth
+
+router = APIRouter()
+
+_MAX_SEQUENCE_RETRIES = 5
+
+
+def _json(value: Any) -> str:
+    """Serialize a Python value for prisma-client-py Json fields (must be a string)."""
+    return json.dumps(value)
+
+
+def _is_admin(user_api_key_dict: UserAPIKeyAuth) -> bool:
+    return user_api_key_dict.user_role == LitellmUserRoles.PROXY_ADMIN.value
+
+
+def _caller_key(user_api_key_dict: UserAPIKeyAuth) -> Optional[str]:
+    """Return the hashed key token that identifies this caller, or None for master key."""
+    return user_api_key_dict.token
+
+
+# Status transitions driven by event_type
+_EVENT_STATUS_MAP: Dict[str, str] = {
+    "step.started": "running",
+    "step.failed": "failed",
+    "hook.waiting": "paused",
+    "hook.received": "running",
+}
+
+
+# ---------------------------------------------------------------------------
+# Request / Response models
+# ---------------------------------------------------------------------------
+
+
+class WorkflowRunCreateRequest(BaseModel):
+    workflow_type: str
+    input: Optional[Dict[str, Any]] = None
+    metadata: Optional[Dict[str, Any]] = None
+
+
+WorkflowRunStatus = Literal["pending", "running", "paused", "completed", "failed"]
+
+
+class WorkflowRunUpdateRequest(BaseModel):
+    status: Optional[WorkflowRunStatus] = None
+    output: Optional[Dict[str, Any]] = None
+    metadata: Optional[Dict[str, Any]] = None
+
+
+class WorkflowEventCreateRequest(BaseModel):
+    event_type: str
+    step_name: str
+    data: Optional[Dict[str, Any]] = None
+
+
+class WorkflowMessageCreateRequest(BaseModel):
+    role: str
+    content: str
+    session_id: Optional[str] = None
+
+
+# ---------------------------------------------------------------------------
+# Helpers
+# ---------------------------------------------------------------------------
+
+
+async def _get_next_sequence_number(prisma_client: Any, run_id: str, table: str) -> int:
+    """Return MAX(sequence_number) + 1 for the given run, for either events or messages."""
+    if table == "events":
+        rows = await prisma_client.db.litellm_workflowevent.find_many(
+            where={"run_id": run_id},
+            order={"sequence_number": "desc"},
+            take=1,
+        )
+    else:
+        rows = await prisma_client.db.litellm_workflowmessage.find_many(
+            where={"run_id": run_id},
+            order={"sequence_number": "desc"},
+            take=1,
+        )
+    return (rows[0].sequence_number + 1) if rows else 0
+
+
+async def _require_run(
+    prisma_client: Any,
+    run_id: str,
+    user_api_key_dict: Optional[UserAPIKeyAuth] = None,
+) -> Any:
+    """Return the run or raise 404. For non-admin callers, also enforce key ownership."""
+    run = await prisma_client.db.litellm_workflowrun.find_unique(
+        where={"run_id": run_id}
+    )
+    if run is None:
+        raise HTTPException(status_code=404, detail=f"Run '{run_id}' not found")
+    if user_api_key_dict is not None and not _is_admin(user_api_key_dict):
+        caller = _caller_key(user_api_key_dict)
+        if not caller or run.created_by != caller:
+            raise HTTPException(status_code=404, detail=f"Run '{run_id}' not found")
+    return run
+
+
+# ---------------------------------------------------------------------------
+# Endpoints
+# ---------------------------------------------------------------------------
+
+
+@router.post(
+    "/v1/workflows/runs",
+    tags=["workflow management"],
+    dependencies=[Depends(user_api_key_auth)],
+)
+async def create_workflow_run(
+    data: WorkflowRunCreateRequest,
+    user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
+):
+    """Create a new workflow run. Returns run_id and session_id.
+
+    The caller's API key token is stored as created_by so that non-admin keys
+    can only see and modify their own runs.
+    """
+    from litellm.proxy.proxy_server import prisma_client
+
+    if prisma_client is None:
+        raise HTTPException(
+            status_code=500, detail=CommonProxyErrors.db_not_connected_error.value
+        )
+
+    try:
+        create_data: Dict[str, Any] = {
+            "workflow_type": data.workflow_type,
+            "created_by": _caller_key(user_api_key_dict),
+        }
+        if data.input is not None:
+            create_data["input"] = _json(data.input)
+        if data.metadata is not None:
+            create_data["metadata"] = _json(data.metadata)
+        run = await prisma_client.db.litellm_workflowrun.create(data=create_data)
+        return run
+    except Exception as e:
+        verbose_proxy_logger.exception("Error creating workflow run: %s", e)
+        raise HTTPException(status_code=500, detail=str(e))
+
+
+@router.get(
+    "/v1/workflows/runs",
+    tags=["workflow management"],
+    dependencies=[Depends(user_api_key_auth)],
+)
+async def list_workflow_runs(
+    workflow_type: Optional[str] = Query(None),
+    status: Optional[str] = Query(None),
+    limit: int = Query(50, ge=1, le=250),
+    user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
+):
+    """List workflow runs. Filter by workflow_type and/or status.
+
+    Non-admin callers only see runs created by their own API key.
+    """
+    from litellm.proxy.proxy_server import prisma_client
+
+    if prisma_client is None:
+        raise HTTPException(
+            status_code=500, detail=CommonProxyErrors.db_not_connected_error.value
+        )
+
+    where: Dict[str, Any] = {}
+    if workflow_type:
+        where["workflow_type"] = workflow_type
+    if status:
+        statuses = [s.strip() for s in status.split(",")]
+        where["status"] = {"in": statuses} if len(statuses) > 1 else statuses[0]
+
+    # Non-admin callers are scoped to their own key.
+    if not _is_admin(user_api_key_dict):
+        caller = _caller_key(user_api_key_dict)
+        if caller:
+            where["created_by"] = caller
+
+    try:
+        runs = await prisma_client.db.litellm_workflowrun.find_many(
+            where=where,
+            order={"created_at": "desc"},
+            take=limit,
+        )
+        return {"runs": runs, "count": len(runs)}
+    except Exception as e:
+        verbose_proxy_logger.exception("Error listing workflow runs: %s", e)
+        raise HTTPException(status_code=500, detail=str(e))
+
+
+@router.get(
+    "/v1/workflows/runs/{run_id}",
+    tags=["workflow management"],
+    dependencies=[Depends(user_api_key_auth)],
+)
+async def get_workflow_run(
+    run_id: str,
+    user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
+):
+    """Get a workflow run with its most recent event."""
+    from litellm.proxy.proxy_server import prisma_client
+
+    if prisma_client is None:
+        raise HTTPException(
+            status_code=500, detail=CommonProxyErrors.db_not_connected_error.value
+        )
+
+    try:
+        run = await prisma_client.db.litellm_workflowrun.find_unique(
+            where={"run_id": run_id},
+            include={"events": {"order_by": {"sequence_number": "desc"}, "take": 1}},
+        )
+        if run is None:
+            raise HTTPException(status_code=404, detail=f"Run '{run_id}' not found")
+        if not _is_admin(user_api_key_dict):
+            caller = _caller_key(user_api_key_dict)
+            if not caller or run.created_by != caller:
+                raise HTTPException(status_code=404, detail=f"Run '{run_id}' not found")
+        return run
+    except HTTPException:
+        raise
+    except Exception as e:
+        verbose_proxy_logger.exception("Error getting workflow run: %s", e)
+        raise HTTPException(status_code=500, detail=str(e))
+
+
+@router.patch(
+    "/v1/workflows/runs/{run_id}",
+    tags=["workflow management"],
+    dependencies=[Depends(user_api_key_auth)],
+)
+async def update_workflow_run(
+    run_id: str,
+    data: WorkflowRunUpdateRequest,
+    user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
+):
+    """Update status, metadata, or output on a workflow run."""
+    from litellm.proxy.proxy_server import prisma_client
+
+    if prisma_client is None:
+        raise HTTPException(
+            status_code=500, detail=CommonProxyErrors.db_not_connected_error.value
+        )
+
+    update: Dict[str, Any] = {}
+    if data.status is not None:
+        update["status"] = data.status
+    if data.output is not None:
+        update["output"] = _json(data.output)
+    if data.metadata is not None:
+        update["metadata"] = _json(data.metadata)
+
+    if not update:
+        raise HTTPException(status_code=400, detail="No fields to update")
+
+    # Enforce ownership before writing.
+    await _require_run(prisma_client, run_id, user_api_key_dict)
+
+    try:
+        run = await prisma_client.db.litellm_workflowrun.update(
+            where={"run_id": run_id},
+            data=update,
+        )
+        if run is None:
+            raise HTTPException(status_code=404, detail=f"Run '{run_id}' not found")
+        return run
+    except HTTPException:
+        raise
+    except Exception as e:
+        verbose_proxy_logger.exception("Error updating workflow run: %s", e)
+        raise HTTPException(status_code=500, detail=str(e))
+
+
+@router.post(
+    "/v1/workflows/runs/{run_id}/events",
+    tags=["workflow management"],
+    dependencies=[Depends(user_api_key_auth)],
+)
+async def append_workflow_event(
+    run_id: str,
+    data: WorkflowEventCreateRequest,
+    user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
+):
+    """Append an event to the run's event log. Also updates run.status if event_type maps to a status.
+
+    Sequence numbers use optimistic concurrency: on a unique-constraint collision
+    (concurrent append), retries up to _MAX_SEQUENCE_RETRIES times with a fresh MAX+1.
+    The event+status update is atomic in a single DB transaction.
+    """
+    from litellm.proxy.proxy_server import prisma_client
+
+    if prisma_client is None:
+        raise HTTPException(
+            status_code=500, detail=CommonProxyErrors.db_not_connected_error.value
+        )
+
+    await _require_run(prisma_client, run_id, user_api_key_dict)
+
+    new_status = _EVENT_STATUS_MAP.get(data.event_type)
+
+    for attempt in range(_MAX_SEQUENCE_RETRIES):
+        try:
+            seq = await _get_next_sequence_number(prisma_client, run_id, "events")
+            event_data: Dict[str, Any] = {
+                "run_id": run_id,
+                "event_type": data.event_type,
+                "step_name": data.step_name,
+                "sequence_number": seq,
+            }
+            if data.data is not None:
+                event_data["data"] = _json(data.data)
+
+            async with prisma_client.db.tx() as tx:
+                event = await tx.litellm_workflowevent.create(data=event_data)
+                if new_status:
+                    await tx.litellm_workflowrun.update(
+                        where={"run_id": run_id},
+                        data={"status": new_status},
+                    )
+
+            return event
+
+        except UniqueViolationError:
+            if attempt == _MAX_SEQUENCE_RETRIES - 1:
+                verbose_proxy_logger.exception(
+                    "Sequence number collision after %d retries for run %s",
+                    _MAX_SEQUENCE_RETRIES,
+                    run_id,
+                )
+                raise HTTPException(
+                    status_code=409,
+                    detail="Concurrent write conflict — please retry",
+                )
+            continue
+
+        except Exception as e:
+            verbose_proxy_logger.exception("Error appending workflow event: %s", e)
+            raise HTTPException(status_code=500, detail=str(e))
+
+    raise HTTPException(
+        status_code=500, detail="Failed to append event"
+    )  # pragma: no cover
+
+
+@router.get(
+    "/v1/workflows/runs/{run_id}/events",
+    tags=["workflow management"],
+    dependencies=[Depends(user_api_key_auth)],
+)
+async def list_workflow_events(
+    run_id: str,
+    limit: int = Query(100, ge=1, le=500),
+    user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
+):
+    """Fetch event log for a run, ordered by sequence_number. Default limit 100, max 500."""
+    from litellm.proxy.proxy_server import prisma_client
+
+    if prisma_client is None:
+        raise HTTPException(
+            status_code=500, detail=CommonProxyErrors.db_not_connected_error.value
+        )
+
+    await _require_run(prisma_client, run_id, user_api_key_dict)
+
+    try:
+        events = await prisma_client.db.litellm_workflowevent.find_many(
+            where={"run_id": run_id},
+            order={"sequence_number": "asc"},
+            take=limit,
+        )
+        return {"events": events, "count": len(events)}
+    except Exception as e:
+        verbose_proxy_logger.exception("Error listing workflow events: %s", e)
+        raise HTTPException(status_code=500, detail=str(e))
+
+
+@router.post(
+    "/v1/workflows/runs/{run_id}/messages",
+    tags=["workflow management"],
+    dependencies=[Depends(user_api_key_auth)],
+)
+async def append_workflow_message(
+    run_id: str,
+    data: WorkflowMessageCreateRequest,
+    user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
+):
+    """Append a conversation message. Stores full content (not truncated).
+
+    Uses optimistic concurrency for sequence numbers.
+    """
+    from litellm.proxy.proxy_server import prisma_client
+
+    if prisma_client is None:
+        raise HTTPException(
+            status_code=500, detail=CommonProxyErrors.db_not_connected_error.value
+        )
+
+    await _require_run(prisma_client, run_id, user_api_key_dict)
+
+    for attempt in range(_MAX_SEQUENCE_RETRIES):
+        try:
+            seq = await _get_next_sequence_number(prisma_client, run_id, "messages")
+            msg_data: Dict[str, Any] = {
+                "run_id": run_id,
+                "role": data.role,
+                "content": data.content,
+                "sequence_number": seq,
+            }
+            if data.session_id is not None:
+                msg_data["session_id"] = data.session_id
+            msg = await prisma_client.db.litellm_workflowmessage.create(data=msg_data)
+            return msg
+
+        except UniqueViolationError:
+            if attempt == _MAX_SEQUENCE_RETRIES - 1:
+                verbose_proxy_logger.exception(
+                    "Sequence number collision after %d retries for run %s",
+                    _MAX_SEQUENCE_RETRIES,
+                    run_id,
+                )
+                raise HTTPException(
+                    status_code=409,
+                    detail="Concurrent write conflict — please retry",
+                )
+            continue
+
+        except Exception as e:
+            verbose_proxy_logger.exception("Error appending workflow message: %s", e)
+            raise HTTPException(status_code=500, detail=str(e))
+
+    raise HTTPException(
+        status_code=500, detail="Failed to append message"
+    )  # pragma: no cover
+
+
+@router.get(
+    "/v1/workflows/runs/{run_id}/messages",
+    tags=["workflow management"],
+    dependencies=[Depends(user_api_key_auth)],
+)
+async def list_workflow_messages(
+    run_id: str,
+    limit: int = Query(100, ge=1, le=500),
+    user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
+):
+    """Fetch conversation history for a run, ordered by sequence_number. Default limit 100, max 500."""
+    from litellm.proxy.proxy_server import prisma_client
+
+    if prisma_client is None:
+        raise HTTPException(
+            status_code=500, detail=CommonProxyErrors.db_not_connected_error.value
+        )
+
+    await _require_run(prisma_client, run_id, user_api_key_dict)
+
+    try:
+        messages = await prisma_client.db.litellm_workflowmessage.find_many(
+            where={"run_id": run_id},
+            order={"sequence_number": "asc"},
+            take=limit,
+        )
+        return {"messages": messages, "count": len(messages)}
+    except Exception as e:
+        verbose_proxy_logger.exception("Error listing workflow messages: %s", e)
+        raise HTTPException(status_code=500, detail=str(e))

diff --git a/litellm/proxy/proxy_server.py b/litellm/proxy/proxy_server.py
--- a/litellm/proxy/proxy_server.py
+++ b/litellm/proxy/proxy_server.py
@@ -426,6 +426,9 @@
 from litellm.proxy.management_endpoints.tool_management_endpoints import (
     router as tool_management_router,
 )
+from litellm.proxy.management_endpoints.workflow_management_endpoints import (
+    router as workflow_management_router,
+)
 from litellm.proxy.memory.memory_endpoints import router as memory_router
 from litellm.proxy.management_endpoints.ui_sso import (
     get_disabled_non_admin_personal_key_creation,
@@ -14170,6 +14173,7 @@
 app.include_router(model_access_group_management_router)
 app.include_router(tag_management_router)
 app.include_router(tool_management_router)
+app.include_router(workflow_management_router)
 app.include_router(memory_router)
 app.include_router(cost_tracking_settings_router)
 app.include_router(router_settings_router)

diff --git a/litellm/proxy/schema.prisma b/litellm/proxy/schema.prisma
--- a/litellm/proxy/schema.prisma
+++ b/litellm/proxy/schema.prisma
@@ -1290,3 +1290,80 @@
   @@id([session_id, router_name, model_name])
   @@index([last_activity_at], map: "idx_adaptive_router_session_activity")
 }
+
+// ---------------------------------------------------------------------------
+// Workflow Run Tracking
+//
+// Generic durable state tracking for any agent or automated workflow.
+// Design: three tables — run (header + materialized status), event (append-only
+// source of truth for state transitions), message (conversation inbox/outbox).
+//
+// Usage:
+//   - Set `workflow_type` to identify the owning system (e.g. "shin-builder").
+//   - Store domain-specific fields in `metadata` (worktree_path, pr_url, etc.).
+//   - `session_id` on WorkflowRun matches `x-litellm-session-id` header sent to
+//     the proxy — all spend logs for this run are automatically tagged.
+// ---------------------------------------------------------------------------
+
+// One instance of work being done. `status` is a materialized cache of the
+// latest event; the event log is the authoritative source of truth.
+model LiteLLM_WorkflowRun {
+  run_id        String   @id @default(uuid())
+  session_id    String   @unique @default(uuid())
+  workflow_type String
+  status        String   @default("pending")
+  created_by    String?  // user_id of the key that created this run; null = created by master key
+  created_at    DateTime @default(now())
+  updated_at    DateTime @updatedAt
+  input         Json?
+  output        Json?
+  metadata      Json?
+
+  events   LiteLLM_WorkflowEvent[]
+  messages LiteLLM_WorkflowMessage[]
+
+  @@index([workflow_type, status])
+  @@index([session_id])
+  @@index([created_at])
+  @@index([created_by])
+}
+
+// Append-only log of state transitions. Never mutate rows here.
+// `step_name` and `event_type` are caller-defined strings — no hardcoded enums.
+// Status auto-update rules (applied by the append endpoint):
+//   step.started   → run.status = running
+//   step.failed    → run.status = failed
+//   hook.waiting   → run.status = paused
+//   hook.received  → run.status = running
+model LiteLLM_WorkflowEvent {
+  event_id        String   @id @default(uuid())
+  run_id          String
+  event_type      String
+  step_name       String
+  sequence_number Int
+  data            Json?
+  created_at      DateTime @default(now())
+
+  run LiteLLM_WorkflowRun @relation(fields: [run_id], references: [run_id])
+
+  @@unique([run_id, sequence_number])
+  @@index([run_id])
+}
+
+// Conversation inbox/outbox — full message content, separate from the durable
+// event log. Spend logs truncate messages; this table stores them in full.
+// `session_id` here is the Claude --resume session ID (or similar).
+model LiteLLM_WorkflowMessage {
+  message_id      String   @id @default(uuid())
+  run_id          String
+  role            String
+  content         String
+  sequence_number Int
+  session_id      String?
+  created_at      DateTime @default(now())
+
+  run LiteLLM_WorkflowRun @relation(fields: [run_id], references: [run_id])
+
+  @@unique([run_id, sequence_number])
+  @@index([run_id])
+}

diff --git a/litellm/proxy/workflows/README.md b/litellm/proxy/workflows/README.md
new file mode 100644
--- /dev/null
+++ b/litellm/proxy/workflows/README.md
@@ -1,0 +1,150 @@
+# Workflow Run Tracking
+
+Generic durable state tracking for agents and automated workflows built on the LiteLLM proxy.
+
+## The Problem
+
+Agents like [shin-builder](https://github.com/BerriAI/shin-builder) run multi-stage pipelines (triage → plan → implement → PR). Their task state and conversation history lived in memory — a process restart lost everything.
+
+## Three-Table Design
+
+```
+WorkflowRun      one instance of work (header + materialized status)
+WorkflowEvent    append-only state transitions (source of truth for replay)
+WorkflowMessage  conversation inbox/outbox (full content, not truncated)
+```
+
+**WorkflowEvent is the source of truth.** `WorkflowRun.status` is a materialized cache updated automatically when events are appended. If you need to debug a run, replay its events.
+
+## API
+
+All endpoints require a valid LiteLLM API key (`Authorization: Bearer sk-...`).
+
+### Runs
+
+```
+POST   /v1/workflows/runs                  Create a run
+GET    /v1/workflows/runs                  List runs (?workflow_type=&status=)
+GET    /v1/workflows/runs/{run_id}         Get run + latest event
+PATCH  /v1/workflows/runs/{run_id}         Update status / metadata / output
+```
+
+### Events
+
+```
+POST   /v1/workflows/runs/{run_id}/events  Append event (auto-updates run status)
+GET    /v1/workflows/runs/{run_id}/events  Full event log (ordered by sequence)
+```
+
+### Messages
+
+```
+POST   /v1/workflows/runs/{run_id}/messages  Append message
+GET    /v1/workflows/runs/{run_id}/messages  Conversation history (ordered by sequence)
+```
+
+## Quick Start
+
+```bash
+# Create a run
+curl -X POST http://localhost:4000/v1/workflows/runs \
+  -H "Authorization: Bearer sk-1234" \
+  -H "Content-Type: application/json" \
+  -d '{"workflow_type": "shin-builder", "metadata": {"title": "Fix login bug"}}'
+
+# {"run_id": "abc-123", "session_id": "xyz-456", "status": "pending", ...}
+
+# Mark step started (sets status → running)
+curl -X POST http://localhost:4000/v1/workflows/runs/abc-123/events \
+  -H "Authorization: Bearer sk-1234" \
+  -H "Content-Type: application/json" \
+  -d '{"event_type": "step.started", "step_name": "grill", "data": {"claude_session_id": "sess-789"}}'
+
+# Store a conversation message
+curl -X POST http://localhost:4000/v1/workflows/runs/abc-123/messages \
+  -H "Authorization: Bearer sk-1234" \
+  -H "Content-Type: application/json" \
+  -d '{"role": "user", "content": "What is the expected behavior?", "session_id": "sess-789"}'
+
+# Restart recovery: fetch active runs and resume from last event's data.claude_session_id
+curl "http://localhost:4000/v1/workflows/runs?status=running,paused&workflow_type=shin-builder" \
+  -H "Authorization: Bearer sk-1234"
+```
+
+## Status Auto-Update Rules
+
+When you append an event, the run's status is updated automatically:
+
+| event_type      | run.status |
+|-----------------|------------|
+| `step.started`  | `running`  |
+| `step.failed`   | `failed`   |
+| `hook.waiting`  | `paused`   |
+| `hook.received` | `running`  |
+
+Set `status = completed` explicitly via PATCH when the workflow finishes.
+
+## Linking to Spend Logs
+
+`WorkflowRun.session_id` is generated automatically (UUID). Pass it as the `x-litellm-session-id` header when making completions through the proxy:
+
+```python
+headers = {"x-litellm-session-id": run.session_id}
+```
+
+All spend log entries for this run are then tagged automatically. Query cost per run:
+
+```
+POST /ui/spend_logs/view_session_spend_logs?session_id={run.session_id}
+```
+
+## Sequence Numbers
+
+Sequence numbers on events and messages are assigned server-side (`MAX + 1` per run). Callers never supply them. This guarantees ordering even under concurrent writes.
+
+## Using from shin-builder
+
+Replace the in-memory `tasks.py` dict with calls to these endpoints:
+
+```python
+import httpx
+
+class WorkflowRunClient:
+    def __init__(self, base_url: str, api_key: str):
+        self._client = httpx.AsyncClient(
+            base_url=base_url,
+            headers={"Authorization": f"Bearer {api_key}"},
+        )
+
+    async def create_task(self, title: str, **metadata) -> dict:
+        r = await self._client.post("/v1/workflows/runs", json={
+            "workflow_type": "shin-builder",
+            "metadata": {"title": title, **metadata},
+        })
+        r.raise_for_status()
+        return r.json()
+
+    async def list_active_tasks(self) -> list:
+        r = await self._client.get(
+            "/v1/workflows/runs",
+            params={"workflow_type": "shin-builder", "status": "running,paused"},
+        )
+        r.raise_for_status()
+        return r.json()["runs"]
+
+    async def transition(self, run_id: str, step_name: str, event_type: str, data: dict = None):
+        r = await self._client.post(f"/v1/workflows/runs/{run_id}/events", json={
+            "event_type": event_type,
+            "step_name": step_name,
+            "data": data or {},
+        })
+        r.raise_for_status()
+
+    async def append_message(self, run_id: str, role: str, content: str, session_id: str = None):
+        r = await self._client.post(f"/v1/workflows/runs/{run_id}/messages", json={
+            "role": role, "content": content, "session_id": session_id,
+        })
+        r.raise_for_status()
+```
+
+On startup, call `list_active_tasks()` to restore in-flight runs. The last `step.started` event's `data.claude_session_id` gives you the `--resume` ID.

diff --git a/tests/test_litellm/proxy/management_endpoints/test_workflow_management_endpoints.py b/tests/test_litellm/proxy/management_endpoints/test_workflow_management_endpoints.py
new file mode 100644
--- /dev/null
+++ b/tests/test_litellm/proxy/management_endpoints/test_workflow_management_endpoints.py
@@ -1,0 +1,611 @@
+"""
+Unit tests for workflow management endpoints (/v1/workflows/runs/*).
+Uses FastAPI TestClient with a mocked prisma_client.
+"""
+
+import os
+import sys
+from datetime import datetime, timezone
+from typing import Any
+from unittest.mock import AsyncMock, MagicMock, patch
+
+from fastapi import FastAPI
+from fastapi.testclient import TestClient
+from prisma.errors import UniqueViolationError
+
+sys.path.insert(0, os.path.abspath("../../.."))
+
+from litellm.proxy.management_endpoints.workflow_management_endpoints import router
+
+
+# ---------------------------------------------------------------------------
+# Helpers
+# ---------------------------------------------------------------------------
+
+
+def _make_run(
+    run_id: str = "run-1",
+    session_id: str = "sess-1",
+    workflow_type: str = "shin-builder",
+    status: str = "pending",
+    created_by: Any = "tok-test",
+) -> MagicMock:
+    obj = MagicMock()
+    obj.run_id = run_id
+    obj.session_id = session_id
+    obj.workflow_type = workflow_type
... diff truncated: showing 800 of 1375 lines

You can send follow-ups to the cloud agent here.

Reviewed by Cursor Bugbot for commit e0a30b8. Configure here.

if user_api_key_dict is not None and not _is_admin(user_api_key_dict):
caller = _caller_key(user_api_key_dict)
if caller and run.created_by and run.created_by != caller:
raise HTTPException(status_code=404, detail=f"Run '{run_id}' not found")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ownership check bypassed when created_by is null

Medium Severity

The ownership check in _require_run (and duplicated in get_workflow_run) requires both caller and run.created_by to be truthy: if caller and run.created_by and run.created_by != caller. When created_by is None (runs created via master key), the entire check is skipped, allowing any non-admin user to read and modify those runs. This is inconsistent with list_workflow_runs, which filters by where["created_by"] = caller and correctly excludes null-owner runs from non-admin results. A non-admin user can't discover these runs via list but can access, update, and append events/messages to them if the run_id UUID is known.

Additional Locations (1)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit e0a30b8. Configure here.

@cursor cursor Bot force-pushed the litellm_durable-agent-workflow-runs branch from 87269a5 to 6b1b787 Compare April 29, 2026 21:02
@BerriAI BerriAI deleted a comment from veria-ai Bot Apr 29, 2026
@codecov
Copy link
Copy Markdown

codecov Bot commented Apr 29, 2026

Codecov Report

❌ Patch coverage is 75.49020% with 50 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
...agement_endpoints/workflow_management_endpoints.py 75.49% 50 Missing ⚠️

📢 Thoughts on this report? Let us know!

@BerriAI BerriAI deleted a comment from veria-ai Bot Apr 29, 2026
@ishaan-berri ishaan-berri enabled auto-merge (squash) April 29, 2026 23:17
ishaan-berri and others added 3 commits April 29, 2026 16:19
* add WorkflowRuns component with two-panel layout for workflow runs UI

* add Agentic collapsible nav group with Agents, Workflow Runs, Memory

* wire workflows page to WorkflowRuns component in page.tsx

* rewrite workflow runs UI with Vercel-style design

- Gantt chart timeline: proportional bars positioned by timestamp,
  color-coded by event type (green/amber/blue/red)
- Status dot pill header (no Tag spam), monospace run ID + duration
- Messages as monospace log lines [user]/[assistant], not chat bubbles
- Metadata section: key/value grid at bottom
- Left panel: status dot + state text, no colored badges

* redesign workflow runs UI: full-width table + right drawer

- Replace two-panel layout (double left nav problem) with full-width
  antd Table filling the content area — nav hierarchy is now clear
- Run detail opens in a right-side Drawer instead of a split panel
- Metadata card moved to top of drawer: title headline, status/created
  fields, pr_url as clickable link, state and session IDs in a grid
- Timeline and Messages are collapsed by default via antd Collapse
- Messages render as monospace log lines [user]/[assistant], not chat bubbles

* workflow runs: truncate long metadata, expand timeline by default

- Long metadata values (plan_text etc) truncated at 120 chars with
  show/hide toggle — prevents a single field from blowing up the drawer
- Timeline Collapse panel now opens by default; Messages stays collapsed
- Table wrapper uses rounded-lg custom-border to match logs page density
…entic/workflows

"Agents" moved into the collapsible "Agentic" submenu, so it's no longer
a top-level visible item. Test now checks for "Agentic" (the submenu label)
instead. Also adds missing pageDescriptions entries for "agentic" and
"workflows" pages that the page_utils.test.ts sync check requires.
@veria-ai
Copy link
Copy Markdown

veria-ai Bot commented Apr 30, 2026

Workflow run tracking endpoints with tenant isolation

This PR adds new /v1/workflows/runs endpoints for durable agent workflow state tracking. All endpoints are gated behind user_api_key_auth. Non-admin callers are scoped to their own runs via created_by matching the caller's hashed token. Ownership enforcement is applied on list, get, update, append-event, append-message, and read operations via the _require_run helper and inline checks. Data access uses Prisma ORM (parameterized queries). The UI component uses the session's access token for authenticated fetches.


Status: 2 open
Risk: 2/10

@ishaan-berri ishaan-berri merged commit 4a7af1f into litellm_internal_staging Apr 30, 2026
117 checks passed
@ishaan-berri ishaan-berri deleted the litellm_durable-agent-workflow-runs branch April 30, 2026 00:12
@veria-ai
Copy link
Copy Markdown

veria-ai Bot commented Apr 30, 2026

2 review comment(s) still open on this PR.


Status: 2 open
Risk: 0/10

1 similar comment
@veria-ai
Copy link
Copy Markdown

veria-ai Bot commented Apr 30, 2026

2 review comment(s) still open on this PR.


Status: 2 open
Risk: 0/10

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants