feat(proxy): durable agent workflow run tracking via /v1/workflows/runs#26793
Conversation
…LiteLLM_WorkflowEvent, LiteLLM_WorkflowMessage)
|
|
Greptile SummaryThis PR introduces durable workflow run tracking via three new Prisma models and 8 REST endpoints under Confidence Score: 4/5Safe to merge with minor quality gaps; no P0/P1 issues remain after previous fix rounds. All P0/P1 issues from prior review rounds have been addressed (transactions, 404 guards, tenant isolation, status validation, pagination limits). Remaining findings are P2: unvalidated status query filter and a null-caller bypass that is likely intentional for master-key access. These do not block merging. litellm/proxy/management_endpoints/workflow_management_endpoints.py — status filter validation and null-caller ownership logic.
|
| Filename | Overview |
|---|---|
| litellm/proxy/management_endpoints/workflow_management_endpoints.py | New file: 8 REST endpoints for durable workflow run tracking. Previous round fixed transaction atomicity, 404 handling, tenant isolation, and pagination limits. Remaining concerns: unvalidated status query filter and a null-caller path that silently bypasses ownership checks. |
| litellm/proxy/schema.prisma | Three new Prisma models with appropriate indexes, unique constraints for sequence numbers, and a created_by field for tenant isolation. No deletions, no schema regressions. |
| litellm/proxy/proxy_server.py | Minimal change: imports and registers the new workflow_management_router. No logic changes. |
| tests/test_litellm/proxy/management_endpoints/test_workflow_management_endpoints.py | 14 unit tests covering CRUD, status transitions, sequence collision retry, limit enforcement, and tenant isolation — all using mocked DB, compliant with repo policy. |
| litellm/proxy/workflows/README.md | New documentation file describing the workflow API. No code changes. |
Reviews (3): Last reviewed commit: "add ownership and bounded-limit tests fo..." | Re-trigger Greptile
| take=1, | ||
| ) | ||
| else: | ||
| rows = await prisma_client.db.litellm_workflowmessage.find_many( | ||
| where={"run_id": run_id}, | ||
| order={"sequence_number": "desc"}, | ||
| take=1, | ||
| ) | ||
| return (rows[0].sequence_number + 1) if rows else 0 | ||
|
|
||
|
|
||
| # --------------------------------------------------------------------------- | ||
| # Endpoints | ||
| # --------------------------------------------------------------------------- | ||
|
|
There was a problem hiding this comment.
Race condition in sequence number assignment
_get_next_sequence_number reads MAX then the caller inserts with MAX+1, with no transaction wrapping either step. Two concurrent calls for the same run_id will read the same max value, compute the same next number, and one INSERT will violate the @@unique([run_id, sequence_number]) constraint — returning a 500 to the caller and leaving the run without that event or message.
Either wrap the read+insert in a prisma_client.db.tx(...) block, or use an upsert-style approach with retry on unique-constraint failure.
There was a problem hiding this comment.
Fixed in commit f830894 — the sequence number read and the event+status update are now wrapped in a single prisma_client.db.tx() transaction. On UniqueViolationError (concurrent append collision), the handler retries up to 5 times with a fresh MAX+1 read, then returns 409 if all retries are exhausted.
| "sequence_number": seq, | ||
| } | ||
| if data.data is not None: | ||
| event_data["data"] = _json(data.data) | ||
| event = await prisma_client.db.litellm_workflowevent.create( | ||
| data=event_data | ||
| ) | ||
|
|
||
| new_status = _EVENT_STATUS_MAP.get(data.event_type) | ||
| if new_status: | ||
| await prisma_client.db.litellm_workflowrun.update( | ||
| where={"run_id": run_id}, | ||
| data={"status": new_status}, | ||
| ) | ||
|
|
||
| return event | ||
| except Exception as e: | ||
| verbose_proxy_logger.exception("Error appending workflow event: %s", e) | ||
| raise HTTPException(status_code=500, detail=str(e)) | ||
|
|
||
|
|
||
| @router.get( | ||
| "/v1/workflows/runs/{run_id}/events", | ||
| tags=["workflow management"], | ||
| dependencies=[Depends(user_api_key_auth)], |
There was a problem hiding this comment.
Non-atomic event creation and status update
The event row is committed on line 278 before the status update on line 284. If the status update fails (e.g. network blip, or the run_id no longer exists after the event insert), the event log is persisted but WorkflowRun.status stays stale — permanently. There is no rollback of the event.
Wrap both operations in a single Prisma transaction so they succeed or fail together.
There was a problem hiding this comment.
Fixed in commit f830894 — both the event insert and the run status update now run inside a single async with prisma_client.db.tx() as tx: block. If the status update fails the event insert rolls back with it; the two always commit or fail together.
| class WorkflowEventCreateRequest(BaseModel): | ||
| event_type: str | ||
| step_name: str | ||
| data: Optional[Dict[str, Any]] = None |
There was a problem hiding this comment.
Unconstrained
status field accepts arbitrary strings
WorkflowRunUpdateRequest.status is a plain Optional[str] with no validation. A caller can PATCH status="anything", which will be written to the DB. The auto-update logic in append_workflow_event assumes a closed set ("running", "failed", "paused", "completed"), and downstream consumers (e.g. the startup recovery query) filter by known values.
Add a Pydantic validator or use a Literal / Enum type to restrict status to the documented values.
There was a problem hiding this comment.
Fixed in commit 1a391c7 — WorkflowRunUpdateRequest.status is now typed as Optional[Literal["pending", "running", "paused", "completed", "failed"]]. Pydantic rejects any value outside that set with a 422 before it reaches the DB.
| ) | ||
|
|
||
| try: | ||
| run = await prisma_client.db.litellm_workflowrun.find_unique( |
There was a problem hiding this comment.
Medium: Cross-tenant data access (IDOR)
The run_id path parameter is used directly in the DB query with no ownership check. The user_api_key_dict is resolved but never consulted — any caller who can reach this endpoint can fetch, update, or append events/messages to any run they can guess the UUID for.
If these endpoints are intended to be strictly admin-only, consider documenting that clearly. If non-admin users may be granted access via allowed_routes configuration, add a created_by column to LiteLLM_WorkflowRun (populated from user_api_key_dict.user_id at creation time) and filter all reads/writes by the caller's identity for non-admin users.
There was a problem hiding this comment.
Acknowledged. These endpoints are intentionally admin-scoped — they require a valid LiteLLM API key and in practice are used by the proxy operator (or an agent running with a master key), not by end-user callers. run_ids are random UUIDs (not sequential integers), so enumeration is not practical.
Full tenant isolation (storing created_by on the run and filtering by caller identity) would require a schema migration and is tracked as follow-up work if non-admin access patterns are needed.
…i-cli#26208+26207 - BerriAI/litellm#26793 feat(proxy): durable agent workflow run tracking (needs-discussion — sequence-number race + missing tenancy scoping + Prisma error leak) - google-gemini/gemini-cli#26208 fix: suppress duplicate extension warnings (merge-after-nits) - google-gemini/gemini-cli#26207 Add @ mention the gemini robot (request-changes — critique-semantics flip + comment-step if guard regression + PAT scope concern)
…, sequence retry on collision
9d9efc1 to
cb89e15
Compare
|
@greptile review again |
…in list responses
| workflow_type: Optional[str] = Query(None), | ||
| status: Optional[str] = Query(None), | ||
| limit: int = Query(50, ge=1, le=250), | ||
| user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth), |
There was a problem hiding this comment.
High: Cross-tenant data access via missing ownership filter
list_workflow_runs returns all runs in the system regardless of who created them. The user_api_key_dict is resolved but never consulted — an authenticated user can enumerate every workflow run (including other users' runs) and obtain their run_ids, which then gives access to all sub-resource endpoints (events, messages, PATCH).
The LiteLLM_WorkflowRun model needs an ownership column (e.g. user_id/team_id), and every query should filter by the caller's identity:
where["user_id"] = user_api_key_dict.user_idThere was a problem hiding this comment.
Fixed in this push — list_workflow_runs now adds WHERE created_by = caller for non-admin API keys, so the endpoint only returns runs owned by the calling key. Admin keys still see all runs. The created_by field is populated at create time from user_api_key_dict.token.
|
Both P1s are now addressed: P1 – No tenant isolation: Added P1 – Unbounded list queries: |
|
@greptile review again |
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Bugbot Autofix prepared a fix for the issue found in the latest run.
- ✅ Fixed: Ownership check bypassed when
created_byis null- Non-admin workflow run access now requires an exact caller token match, so null-owner runs return 404 and cannot be read or modified.
Preview (6b1b787b64)
diff --git a/litellm/proxy/management_endpoints/workflow_management_endpoints.py b/litellm/proxy/management_endpoints/workflow_management_endpoints.py
new file mode 100644
--- /dev/null
+++ b/litellm/proxy/management_endpoints/workflow_management_endpoints.py
@@ -1,0 +1,490 @@
+"""
+WORKFLOW RUN MANAGEMENT
+
+Generic durable state tracking for agents and automated workflows.
+
+POST /v1/workflows/runs - Create a workflow run
+GET /v1/workflows/runs - List runs (filter by type, status)
+GET /v1/workflows/runs/{run_id} - Get run with latest event
+PATCH /v1/workflows/runs/{run_id} - Update status, metadata, output
+POST /v1/workflows/runs/{run_id}/events - Append event (updates run status)
+GET /v1/workflows/runs/{run_id}/events - Full event log
+POST /v1/workflows/runs/{run_id}/messages - Append conversation message
+GET /v1/workflows/runs/{run_id}/messages - Fetch conversation history
+"""
+
+import json
+from typing import Any, Dict, Literal, Optional
+
+from fastapi import APIRouter, Depends, HTTPException, Query
+from prisma.errors import UniqueViolationError
+from pydantic import BaseModel
+
+from litellm._logging import verbose_proxy_logger
+from litellm.proxy._types import CommonProxyErrors, LitellmUserRoles, UserAPIKeyAuth
+from litellm.proxy.auth.user_api_key_auth import user_api_key_auth
+
+router = APIRouter()
+
+_MAX_SEQUENCE_RETRIES = 5
+
+
+def _json(value: Any) -> str:
+ """Serialize a Python value for prisma-client-py Json fields (must be a string)."""
+ return json.dumps(value)
+
+
+def _is_admin(user_api_key_dict: UserAPIKeyAuth) -> bool:
+ return user_api_key_dict.user_role == LitellmUserRoles.PROXY_ADMIN.value
+
+
+def _caller_key(user_api_key_dict: UserAPIKeyAuth) -> Optional[str]:
+ """Return the hashed key token that identifies this caller, or None for master key."""
+ return user_api_key_dict.token
+
+
+# Status transitions driven by event_type
+_EVENT_STATUS_MAP: Dict[str, str] = {
+ "step.started": "running",
+ "step.failed": "failed",
+ "hook.waiting": "paused",
+ "hook.received": "running",
+}
+
+
+# ---------------------------------------------------------------------------
+# Request / Response models
+# ---------------------------------------------------------------------------
+
+
+class WorkflowRunCreateRequest(BaseModel):
+ workflow_type: str
+ input: Optional[Dict[str, Any]] = None
+ metadata: Optional[Dict[str, Any]] = None
+
+
+WorkflowRunStatus = Literal["pending", "running", "paused", "completed", "failed"]
+
+
+class WorkflowRunUpdateRequest(BaseModel):
+ status: Optional[WorkflowRunStatus] = None
+ output: Optional[Dict[str, Any]] = None
+ metadata: Optional[Dict[str, Any]] = None
+
+
+class WorkflowEventCreateRequest(BaseModel):
+ event_type: str
+ step_name: str
+ data: Optional[Dict[str, Any]] = None
+
+
+class WorkflowMessageCreateRequest(BaseModel):
+ role: str
+ content: str
+ session_id: Optional[str] = None
+
+
+# ---------------------------------------------------------------------------
+# Helpers
+# ---------------------------------------------------------------------------
+
+
+async def _get_next_sequence_number(prisma_client: Any, run_id: str, table: str) -> int:
+ """Return MAX(sequence_number) + 1 for the given run, for either events or messages."""
+ if table == "events":
+ rows = await prisma_client.db.litellm_workflowevent.find_many(
+ where={"run_id": run_id},
+ order={"sequence_number": "desc"},
+ take=1,
+ )
+ else:
+ rows = await prisma_client.db.litellm_workflowmessage.find_many(
+ where={"run_id": run_id},
+ order={"sequence_number": "desc"},
+ take=1,
+ )
+ return (rows[0].sequence_number + 1) if rows else 0
+
+
+async def _require_run(
+ prisma_client: Any,
+ run_id: str,
+ user_api_key_dict: Optional[UserAPIKeyAuth] = None,
+) -> Any:
+ """Return the run or raise 404. For non-admin callers, also enforce key ownership."""
+ run = await prisma_client.db.litellm_workflowrun.find_unique(
+ where={"run_id": run_id}
+ )
+ if run is None:
+ raise HTTPException(status_code=404, detail=f"Run '{run_id}' not found")
+ if user_api_key_dict is not None and not _is_admin(user_api_key_dict):
+ caller = _caller_key(user_api_key_dict)
+ if not caller or run.created_by != caller:
+ raise HTTPException(status_code=404, detail=f"Run '{run_id}' not found")
+ return run
+
+
+# ---------------------------------------------------------------------------
+# Endpoints
+# ---------------------------------------------------------------------------
+
+
+@router.post(
+ "/v1/workflows/runs",
+ tags=["workflow management"],
+ dependencies=[Depends(user_api_key_auth)],
+)
+async def create_workflow_run(
+ data: WorkflowRunCreateRequest,
+ user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
+):
+ """Create a new workflow run. Returns run_id and session_id.
+
+ The caller's API key token is stored as created_by so that non-admin keys
+ can only see and modify their own runs.
+ """
+ from litellm.proxy.proxy_server import prisma_client
+
+ if prisma_client is None:
+ raise HTTPException(
+ status_code=500, detail=CommonProxyErrors.db_not_connected_error.value
+ )
+
+ try:
+ create_data: Dict[str, Any] = {
+ "workflow_type": data.workflow_type,
+ "created_by": _caller_key(user_api_key_dict),
+ }
+ if data.input is not None:
+ create_data["input"] = _json(data.input)
+ if data.metadata is not None:
+ create_data["metadata"] = _json(data.metadata)
+ run = await prisma_client.db.litellm_workflowrun.create(data=create_data)
+ return run
+ except Exception as e:
+ verbose_proxy_logger.exception("Error creating workflow run: %s", e)
+ raise HTTPException(status_code=500, detail=str(e))
+
+
+@router.get(
+ "/v1/workflows/runs",
+ tags=["workflow management"],
+ dependencies=[Depends(user_api_key_auth)],
+)
+async def list_workflow_runs(
+ workflow_type: Optional[str] = Query(None),
+ status: Optional[str] = Query(None),
+ limit: int = Query(50, ge=1, le=250),
+ user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
+):
+ """List workflow runs. Filter by workflow_type and/or status.
+
+ Non-admin callers only see runs created by their own API key.
+ """
+ from litellm.proxy.proxy_server import prisma_client
+
+ if prisma_client is None:
+ raise HTTPException(
+ status_code=500, detail=CommonProxyErrors.db_not_connected_error.value
+ )
+
+ where: Dict[str, Any] = {}
+ if workflow_type:
+ where["workflow_type"] = workflow_type
+ if status:
+ statuses = [s.strip() for s in status.split(",")]
+ where["status"] = {"in": statuses} if len(statuses) > 1 else statuses[0]
+
+ # Non-admin callers are scoped to their own key.
+ if not _is_admin(user_api_key_dict):
+ caller = _caller_key(user_api_key_dict)
+ if caller:
+ where["created_by"] = caller
+
+ try:
+ runs = await prisma_client.db.litellm_workflowrun.find_many(
+ where=where,
+ order={"created_at": "desc"},
+ take=limit,
+ )
+ return {"runs": runs, "count": len(runs)}
+ except Exception as e:
+ verbose_proxy_logger.exception("Error listing workflow runs: %s", e)
+ raise HTTPException(status_code=500, detail=str(e))
+
+
+@router.get(
+ "/v1/workflows/runs/{run_id}",
+ tags=["workflow management"],
+ dependencies=[Depends(user_api_key_auth)],
+)
+async def get_workflow_run(
+ run_id: str,
+ user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
+):
+ """Get a workflow run with its most recent event."""
+ from litellm.proxy.proxy_server import prisma_client
+
+ if prisma_client is None:
+ raise HTTPException(
+ status_code=500, detail=CommonProxyErrors.db_not_connected_error.value
+ )
+
+ try:
+ run = await prisma_client.db.litellm_workflowrun.find_unique(
+ where={"run_id": run_id},
+ include={"events": {"order_by": {"sequence_number": "desc"}, "take": 1}},
+ )
+ if run is None:
+ raise HTTPException(status_code=404, detail=f"Run '{run_id}' not found")
+ if not _is_admin(user_api_key_dict):
+ caller = _caller_key(user_api_key_dict)
+ if not caller or run.created_by != caller:
+ raise HTTPException(status_code=404, detail=f"Run '{run_id}' not found")
+ return run
+ except HTTPException:
+ raise
+ except Exception as e:
+ verbose_proxy_logger.exception("Error getting workflow run: %s", e)
+ raise HTTPException(status_code=500, detail=str(e))
+
+
+@router.patch(
+ "/v1/workflows/runs/{run_id}",
+ tags=["workflow management"],
+ dependencies=[Depends(user_api_key_auth)],
+)
+async def update_workflow_run(
+ run_id: str,
+ data: WorkflowRunUpdateRequest,
+ user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
+):
+ """Update status, metadata, or output on a workflow run."""
+ from litellm.proxy.proxy_server import prisma_client
+
+ if prisma_client is None:
+ raise HTTPException(
+ status_code=500, detail=CommonProxyErrors.db_not_connected_error.value
+ )
+
+ update: Dict[str, Any] = {}
+ if data.status is not None:
+ update["status"] = data.status
+ if data.output is not None:
+ update["output"] = _json(data.output)
+ if data.metadata is not None:
+ update["metadata"] = _json(data.metadata)
+
+ if not update:
+ raise HTTPException(status_code=400, detail="No fields to update")
+
+ # Enforce ownership before writing.
+ await _require_run(prisma_client, run_id, user_api_key_dict)
+
+ try:
+ run = await prisma_client.db.litellm_workflowrun.update(
+ where={"run_id": run_id},
+ data=update,
+ )
+ if run is None:
+ raise HTTPException(status_code=404, detail=f"Run '{run_id}' not found")
+ return run
+ except HTTPException:
+ raise
+ except Exception as e:
+ verbose_proxy_logger.exception("Error updating workflow run: %s", e)
+ raise HTTPException(status_code=500, detail=str(e))
+
+
+@router.post(
+ "/v1/workflows/runs/{run_id}/events",
+ tags=["workflow management"],
+ dependencies=[Depends(user_api_key_auth)],
+)
+async def append_workflow_event(
+ run_id: str,
+ data: WorkflowEventCreateRequest,
+ user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
+):
+ """Append an event to the run's event log. Also updates run.status if event_type maps to a status.
+
+ Sequence numbers use optimistic concurrency: on a unique-constraint collision
+ (concurrent append), retries up to _MAX_SEQUENCE_RETRIES times with a fresh MAX+1.
+ The event+status update is atomic in a single DB transaction.
+ """
+ from litellm.proxy.proxy_server import prisma_client
+
+ if prisma_client is None:
+ raise HTTPException(
+ status_code=500, detail=CommonProxyErrors.db_not_connected_error.value
+ )
+
+ await _require_run(prisma_client, run_id, user_api_key_dict)
+
+ new_status = _EVENT_STATUS_MAP.get(data.event_type)
+
+ for attempt in range(_MAX_SEQUENCE_RETRIES):
+ try:
+ seq = await _get_next_sequence_number(prisma_client, run_id, "events")
+ event_data: Dict[str, Any] = {
+ "run_id": run_id,
+ "event_type": data.event_type,
+ "step_name": data.step_name,
+ "sequence_number": seq,
+ }
+ if data.data is not None:
+ event_data["data"] = _json(data.data)
+
+ async with prisma_client.db.tx() as tx:
+ event = await tx.litellm_workflowevent.create(data=event_data)
+ if new_status:
+ await tx.litellm_workflowrun.update(
+ where={"run_id": run_id},
+ data={"status": new_status},
+ )
+
+ return event
+
+ except UniqueViolationError:
+ if attempt == _MAX_SEQUENCE_RETRIES - 1:
+ verbose_proxy_logger.exception(
+ "Sequence number collision after %d retries for run %s",
+ _MAX_SEQUENCE_RETRIES,
+ run_id,
+ )
+ raise HTTPException(
+ status_code=409,
+ detail="Concurrent write conflict — please retry",
+ )
+ continue
+
+ except Exception as e:
+ verbose_proxy_logger.exception("Error appending workflow event: %s", e)
+ raise HTTPException(status_code=500, detail=str(e))
+
+ raise HTTPException(
+ status_code=500, detail="Failed to append event"
+ ) # pragma: no cover
+
+
+@router.get(
+ "/v1/workflows/runs/{run_id}/events",
+ tags=["workflow management"],
+ dependencies=[Depends(user_api_key_auth)],
+)
+async def list_workflow_events(
+ run_id: str,
+ limit: int = Query(100, ge=1, le=500),
+ user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
+):
+ """Fetch event log for a run, ordered by sequence_number. Default limit 100, max 500."""
+ from litellm.proxy.proxy_server import prisma_client
+
+ if prisma_client is None:
+ raise HTTPException(
+ status_code=500, detail=CommonProxyErrors.db_not_connected_error.value
+ )
+
+ await _require_run(prisma_client, run_id, user_api_key_dict)
+
+ try:
+ events = await prisma_client.db.litellm_workflowevent.find_many(
+ where={"run_id": run_id},
+ order={"sequence_number": "asc"},
+ take=limit,
+ )
+ return {"events": events, "count": len(events)}
+ except Exception as e:
+ verbose_proxy_logger.exception("Error listing workflow events: %s", e)
+ raise HTTPException(status_code=500, detail=str(e))
+
+
+@router.post(
+ "/v1/workflows/runs/{run_id}/messages",
+ tags=["workflow management"],
+ dependencies=[Depends(user_api_key_auth)],
+)
+async def append_workflow_message(
+ run_id: str,
+ data: WorkflowMessageCreateRequest,
+ user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
+):
+ """Append a conversation message. Stores full content (not truncated).
+
+ Uses optimistic concurrency for sequence numbers.
+ """
+ from litellm.proxy.proxy_server import prisma_client
+
+ if prisma_client is None:
+ raise HTTPException(
+ status_code=500, detail=CommonProxyErrors.db_not_connected_error.value
+ )
+
+ await _require_run(prisma_client, run_id, user_api_key_dict)
+
+ for attempt in range(_MAX_SEQUENCE_RETRIES):
+ try:
+ seq = await _get_next_sequence_number(prisma_client, run_id, "messages")
+ msg_data: Dict[str, Any] = {
+ "run_id": run_id,
+ "role": data.role,
+ "content": data.content,
+ "sequence_number": seq,
+ }
+ if data.session_id is not None:
+ msg_data["session_id"] = data.session_id
+ msg = await prisma_client.db.litellm_workflowmessage.create(data=msg_data)
+ return msg
+
+ except UniqueViolationError:
+ if attempt == _MAX_SEQUENCE_RETRIES - 1:
+ verbose_proxy_logger.exception(
+ "Sequence number collision after %d retries for run %s",
+ _MAX_SEQUENCE_RETRIES,
+ run_id,
+ )
+ raise HTTPException(
+ status_code=409,
+ detail="Concurrent write conflict — please retry",
+ )
+ continue
+
+ except Exception as e:
+ verbose_proxy_logger.exception("Error appending workflow message: %s", e)
+ raise HTTPException(status_code=500, detail=str(e))
+
+ raise HTTPException(
+ status_code=500, detail="Failed to append message"
+ ) # pragma: no cover
+
+
+@router.get(
+ "/v1/workflows/runs/{run_id}/messages",
+ tags=["workflow management"],
+ dependencies=[Depends(user_api_key_auth)],
+)
+async def list_workflow_messages(
+ run_id: str,
+ limit: int = Query(100, ge=1, le=500),
+ user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
+):
+ """Fetch conversation history for a run, ordered by sequence_number. Default limit 100, max 500."""
+ from litellm.proxy.proxy_server import prisma_client
+
+ if prisma_client is None:
+ raise HTTPException(
+ status_code=500, detail=CommonProxyErrors.db_not_connected_error.value
+ )
+
+ await _require_run(prisma_client, run_id, user_api_key_dict)
+
+ try:
+ messages = await prisma_client.db.litellm_workflowmessage.find_many(
+ where={"run_id": run_id},
+ order={"sequence_number": "asc"},
+ take=limit,
+ )
+ return {"messages": messages, "count": len(messages)}
+ except Exception as e:
+ verbose_proxy_logger.exception("Error listing workflow messages: %s", e)
+ raise HTTPException(status_code=500, detail=str(e))
diff --git a/litellm/proxy/proxy_server.py b/litellm/proxy/proxy_server.py
--- a/litellm/proxy/proxy_server.py
+++ b/litellm/proxy/proxy_server.py
@@ -426,6 +426,9 @@
from litellm.proxy.management_endpoints.tool_management_endpoints import (
router as tool_management_router,
)
+from litellm.proxy.management_endpoints.workflow_management_endpoints import (
+ router as workflow_management_router,
+)
from litellm.proxy.memory.memory_endpoints import router as memory_router
from litellm.proxy.management_endpoints.ui_sso import (
get_disabled_non_admin_personal_key_creation,
@@ -14170,6 +14173,7 @@
app.include_router(model_access_group_management_router)
app.include_router(tag_management_router)
app.include_router(tool_management_router)
+app.include_router(workflow_management_router)
app.include_router(memory_router)
app.include_router(cost_tracking_settings_router)
app.include_router(router_settings_router)
diff --git a/litellm/proxy/schema.prisma b/litellm/proxy/schema.prisma
--- a/litellm/proxy/schema.prisma
+++ b/litellm/proxy/schema.prisma
@@ -1290,3 +1290,80 @@
@@id([session_id, router_name, model_name])
@@index([last_activity_at], map: "idx_adaptive_router_session_activity")
}
+
+// ---------------------------------------------------------------------------
+// Workflow Run Tracking
+//
+// Generic durable state tracking for any agent or automated workflow.
+// Design: three tables — run (header + materialized status), event (append-only
+// source of truth for state transitions), message (conversation inbox/outbox).
+//
+// Usage:
+// - Set `workflow_type` to identify the owning system (e.g. "shin-builder").
+// - Store domain-specific fields in `metadata` (worktree_path, pr_url, etc.).
+// - `session_id` on WorkflowRun matches `x-litellm-session-id` header sent to
+// the proxy — all spend logs for this run are automatically tagged.
+// ---------------------------------------------------------------------------
+
+// One instance of work being done. `status` is a materialized cache of the
+// latest event; the event log is the authoritative source of truth.
+model LiteLLM_WorkflowRun {
+ run_id String @id @default(uuid())
+ session_id String @unique @default(uuid())
+ workflow_type String
+ status String @default("pending")
+ created_by String? // user_id of the key that created this run; null = created by master key
+ created_at DateTime @default(now())
+ updated_at DateTime @updatedAt
+ input Json?
+ output Json?
+ metadata Json?
+
+ events LiteLLM_WorkflowEvent[]
+ messages LiteLLM_WorkflowMessage[]
+
+ @@index([workflow_type, status])
+ @@index([session_id])
+ @@index([created_at])
+ @@index([created_by])
+}
+
+// Append-only log of state transitions. Never mutate rows here.
+// `step_name` and `event_type` are caller-defined strings — no hardcoded enums.
+// Status auto-update rules (applied by the append endpoint):
+// step.started → run.status = running
+// step.failed → run.status = failed
+// hook.waiting → run.status = paused
+// hook.received → run.status = running
+model LiteLLM_WorkflowEvent {
+ event_id String @id @default(uuid())
+ run_id String
+ event_type String
+ step_name String
+ sequence_number Int
+ data Json?
+ created_at DateTime @default(now())
+
+ run LiteLLM_WorkflowRun @relation(fields: [run_id], references: [run_id])
+
+ @@unique([run_id, sequence_number])
+ @@index([run_id])
+}
+
+// Conversation inbox/outbox — full message content, separate from the durable
+// event log. Spend logs truncate messages; this table stores them in full.
+// `session_id` here is the Claude --resume session ID (or similar).
+model LiteLLM_WorkflowMessage {
+ message_id String @id @default(uuid())
+ run_id String
+ role String
+ content String
+ sequence_number Int
+ session_id String?
+ created_at DateTime @default(now())
+
+ run LiteLLM_WorkflowRun @relation(fields: [run_id], references: [run_id])
+
+ @@unique([run_id, sequence_number])
+ @@index([run_id])
+}
diff --git a/litellm/proxy/workflows/README.md b/litellm/proxy/workflows/README.md
new file mode 100644
--- /dev/null
+++ b/litellm/proxy/workflows/README.md
@@ -1,0 +1,150 @@
+# Workflow Run Tracking
+
+Generic durable state tracking for agents and automated workflows built on the LiteLLM proxy.
+
+## The Problem
+
+Agents like [shin-builder](https://github.com/BerriAI/shin-builder) run multi-stage pipelines (triage → plan → implement → PR). Their task state and conversation history lived in memory — a process restart lost everything.
+
+## Three-Table Design
+
+```
+WorkflowRun one instance of work (header + materialized status)
+WorkflowEvent append-only state transitions (source of truth for replay)
+WorkflowMessage conversation inbox/outbox (full content, not truncated)
+```
+
+**WorkflowEvent is the source of truth.** `WorkflowRun.status` is a materialized cache updated automatically when events are appended. If you need to debug a run, replay its events.
+
+## API
+
+All endpoints require a valid LiteLLM API key (`Authorization: Bearer sk-...`).
+
+### Runs
+
+```
+POST /v1/workflows/runs Create a run
+GET /v1/workflows/runs List runs (?workflow_type=&status=)
+GET /v1/workflows/runs/{run_id} Get run + latest event
+PATCH /v1/workflows/runs/{run_id} Update status / metadata / output
+```
+
+### Events
+
+```
+POST /v1/workflows/runs/{run_id}/events Append event (auto-updates run status)
+GET /v1/workflows/runs/{run_id}/events Full event log (ordered by sequence)
+```
+
+### Messages
+
+```
+POST /v1/workflows/runs/{run_id}/messages Append message
+GET /v1/workflows/runs/{run_id}/messages Conversation history (ordered by sequence)
+```
+
+## Quick Start
+
+```bash
+# Create a run
+curl -X POST http://localhost:4000/v1/workflows/runs \
+ -H "Authorization: Bearer sk-1234" \
+ -H "Content-Type: application/json" \
+ -d '{"workflow_type": "shin-builder", "metadata": {"title": "Fix login bug"}}'
+
+# {"run_id": "abc-123", "session_id": "xyz-456", "status": "pending", ...}
+
+# Mark step started (sets status → running)
+curl -X POST http://localhost:4000/v1/workflows/runs/abc-123/events \
+ -H "Authorization: Bearer sk-1234" \
+ -H "Content-Type: application/json" \
+ -d '{"event_type": "step.started", "step_name": "grill", "data": {"claude_session_id": "sess-789"}}'
+
+# Store a conversation message
+curl -X POST http://localhost:4000/v1/workflows/runs/abc-123/messages \
+ -H "Authorization: Bearer sk-1234" \
+ -H "Content-Type: application/json" \
+ -d '{"role": "user", "content": "What is the expected behavior?", "session_id": "sess-789"}'
+
+# Restart recovery: fetch active runs and resume from last event's data.claude_session_id
+curl "http://localhost:4000/v1/workflows/runs?status=running,paused&workflow_type=shin-builder" \
+ -H "Authorization: Bearer sk-1234"
+```
+
+## Status Auto-Update Rules
+
+When you append an event, the run's status is updated automatically:
+
+| event_type | run.status |
+|-----------------|------------|
+| `step.started` | `running` |
+| `step.failed` | `failed` |
+| `hook.waiting` | `paused` |
+| `hook.received` | `running` |
+
+Set `status = completed` explicitly via PATCH when the workflow finishes.
+
+## Linking to Spend Logs
+
+`WorkflowRun.session_id` is generated automatically (UUID). Pass it as the `x-litellm-session-id` header when making completions through the proxy:
+
+```python
+headers = {"x-litellm-session-id": run.session_id}
+```
+
+All spend log entries for this run are then tagged automatically. Query cost per run:
+
+```
+POST /ui/spend_logs/view_session_spend_logs?session_id={run.session_id}
+```
+
+## Sequence Numbers
+
+Sequence numbers on events and messages are assigned server-side (`MAX + 1` per run). Callers never supply them. This guarantees ordering even under concurrent writes.
+
+## Using from shin-builder
+
+Replace the in-memory `tasks.py` dict with calls to these endpoints:
+
+```python
+import httpx
+
+class WorkflowRunClient:
+ def __init__(self, base_url: str, api_key: str):
+ self._client = httpx.AsyncClient(
+ base_url=base_url,
+ headers={"Authorization": f"Bearer {api_key}"},
+ )
+
+ async def create_task(self, title: str, **metadata) -> dict:
+ r = await self._client.post("/v1/workflows/runs", json={
+ "workflow_type": "shin-builder",
+ "metadata": {"title": title, **metadata},
+ })
+ r.raise_for_status()
+ return r.json()
+
+ async def list_active_tasks(self) -> list:
+ r = await self._client.get(
+ "/v1/workflows/runs",
+ params={"workflow_type": "shin-builder", "status": "running,paused"},
+ )
+ r.raise_for_status()
+ return r.json()["runs"]
+
+ async def transition(self, run_id: str, step_name: str, event_type: str, data: dict = None):
+ r = await self._client.post(f"/v1/workflows/runs/{run_id}/events", json={
+ "event_type": event_type,
+ "step_name": step_name,
+ "data": data or {},
+ })
+ r.raise_for_status()
+
+ async def append_message(self, run_id: str, role: str, content: str, session_id: str = None):
+ r = await self._client.post(f"/v1/workflows/runs/{run_id}/messages", json={
+ "role": role, "content": content, "session_id": session_id,
+ })
+ r.raise_for_status()
+```
+
+On startup, call `list_active_tasks()` to restore in-flight runs. The last `step.started` event's `data.claude_session_id` gives you the `--resume` ID.
diff --git a/tests/test_litellm/proxy/management_endpoints/test_workflow_management_endpoints.py b/tests/test_litellm/proxy/management_endpoints/test_workflow_management_endpoints.py
new file mode 100644
--- /dev/null
+++ b/tests/test_litellm/proxy/management_endpoints/test_workflow_management_endpoints.py
@@ -1,0 +1,611 @@
+"""
+Unit tests for workflow management endpoints (/v1/workflows/runs/*).
+Uses FastAPI TestClient with a mocked prisma_client.
+"""
+
+import os
+import sys
+from datetime import datetime, timezone
+from typing import Any
+from unittest.mock import AsyncMock, MagicMock, patch
+
+from fastapi import FastAPI
+from fastapi.testclient import TestClient
+from prisma.errors import UniqueViolationError
+
+sys.path.insert(0, os.path.abspath("../../.."))
+
+from litellm.proxy.management_endpoints.workflow_management_endpoints import router
+
+
+# ---------------------------------------------------------------------------
+# Helpers
+# ---------------------------------------------------------------------------
+
+
+def _make_run(
+ run_id: str = "run-1",
+ session_id: str = "sess-1",
+ workflow_type: str = "shin-builder",
+ status: str = "pending",
+ created_by: Any = "tok-test",
+) -> MagicMock:
+ obj = MagicMock()
+ obj.run_id = run_id
+ obj.session_id = session_id
+ obj.workflow_type = workflow_type
... diff truncated: showing 800 of 1375 linesYou can send follow-ups to the cloud agent here.
Reviewed by Cursor Bugbot for commit e0a30b8. Configure here.
| if user_api_key_dict is not None and not _is_admin(user_api_key_dict): | ||
| caller = _caller_key(user_api_key_dict) | ||
| if caller and run.created_by and run.created_by != caller: | ||
| raise HTTPException(status_code=404, detail=f"Run '{run_id}' not found") |
There was a problem hiding this comment.
Ownership check bypassed when created_by is null
Medium Severity
The ownership check in _require_run (and duplicated in get_workflow_run) requires both caller and run.created_by to be truthy: if caller and run.created_by and run.created_by != caller. When created_by is None (runs created via master key), the entire check is skipped, allowing any non-admin user to read and modify those runs. This is inconsistent with list_workflow_runs, which filters by where["created_by"] = caller and correctly excludes null-owner runs from non-admin results. A non-admin user can't discover these runs via list but can access, update, and append events/messages to them if the run_id UUID is known.
Additional Locations (1)
Reviewed by Cursor Bugbot for commit e0a30b8. Configure here.
87269a5 to
6b1b787
Compare
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
* add WorkflowRuns component with two-panel layout for workflow runs UI * add Agentic collapsible nav group with Agents, Workflow Runs, Memory * wire workflows page to WorkflowRuns component in page.tsx * rewrite workflow runs UI with Vercel-style design - Gantt chart timeline: proportional bars positioned by timestamp, color-coded by event type (green/amber/blue/red) - Status dot pill header (no Tag spam), monospace run ID + duration - Messages as monospace log lines [user]/[assistant], not chat bubbles - Metadata section: key/value grid at bottom - Left panel: status dot + state text, no colored badges * redesign workflow runs UI: full-width table + right drawer - Replace two-panel layout (double left nav problem) with full-width antd Table filling the content area — nav hierarchy is now clear - Run detail opens in a right-side Drawer instead of a split panel - Metadata card moved to top of drawer: title headline, status/created fields, pr_url as clickable link, state and session IDs in a grid - Timeline and Messages are collapsed by default via antd Collapse - Messages render as monospace log lines [user]/[assistant], not chat bubbles * workflow runs: truncate long metadata, expand timeline by default - Long metadata values (plan_text etc) truncated at 120 chars with show/hide toggle — prevents a single field from blowing up the drawer - Timeline Collapse panel now opens by default; Messages stays collapsed - Table wrapper uses rounded-lg custom-border to match logs page density
…entic/workflows "Agents" moved into the collapsible "Agentic" submenu, so it's no longer a top-level visible item. Test now checks for "Agentic" (the submenu label) instead. Also adds missing pageDescriptions entries for "agentic" and "workflows" pages that the page_utils.test.ts sync check requires.
Workflow run tracking endpoints with tenant isolationThis PR adds new Status: 2 open |
|
2 review comment(s) still open on this PR. Status: 2 open |
1 similar comment
|
2 review comment(s) still open on this PR. Status: 2 open |


Relevant issues
Fixes task state loss on process restart for agents like shin-builder. Task state and conversation history were stored in-memory only — a restart wiped everything.
Changes
Schema (
litellm/proxy/schema.prisma): Three new tables.LiteLLM_WorkflowRun— one run per agent job.session_idauto-generated UUID bridges toLiteLLM_SpendLogs.session_idfor free cost tracking via existingPOST /ui/spend_logs/view_session_spend_logs?session_id=.LiteLLM_WorkflowEvent— append-only event log (source of truth).statuson the run row is a materialized cache updated automatically when events are appended.LiteLLM_WorkflowMessage— conversation inbox/outbox. Stores full message content (spend logs truncate atMAX_STRING_LENGTH_PROMPT_IN_DB).8 REST endpoints (
litellm/proxy/management_endpoints/workflow_management_endpoints.py):Status auto-update on event append:
step.started→running,step.failed→failed,hook.waiting→paused,hook.received→running. Setcompletedexplicitly via PATCH.Generic design —
workflow_typeandstep_nameare caller-defined strings. No hardcoded stage names. Domain fields (worktree_path, pr_url, claude_session_id) go inmetadata: Json.Startup recovery:
GET /v1/workflows/runs?status=running,paused&workflow_type=<type>returns all in-flight runs. Laststep.startedevent'sdata.claude_session_idgives the--resumeID for Claude CLI agents.Pre-Submission checklist
tests/test_litellm/proxy/management_endpoints/test_workflow_management_endpoints.pyprisma_client.db.litellm_workflowrun/event/messageCI
Type
Changes
litellm/proxy/management_endpoints/workflow_management_endpoints.pylitellm/proxy/workflows/README.mdtests/test_litellm/proxy/management_endpoints/test_workflow_management_endpoints.pylitellm/proxy/schema.prisma(3 new models)litellm/proxy/proxy_server.py(register router)Note
Medium Risk
Adds new persistent tables and multiple authenticated write endpoints, including ownership checks and transactional status updates. Risk is mainly around schema migration correctness and concurrency/authorization edge cases in the new sequence-number retry logic.
Overview
Adds durable workflow run tracking to the proxy, including a new
/v1/workflows/runsAPI for creating/listing/getting/updating runs plus append-only event and message logs (with server-assigned sequence numbers and event-driven status updates).Introduces three new Prisma models (
LiteLLM_WorkflowRun,LiteLLM_WorkflowEvent,LiteLLM_WorkflowMessage) and registers a new FastAPI router inproxy_server.py; non-admin keys are scoped to only access runs they created (created_by), and event appends updaterun.statusatomically in a DB transaction with retry-on-UniqueViolationErrorfor concurrent sequence collisions.Includes docs (
proxy/workflows/README.md) and unit tests covering CRUD flows, ordering/limits, status transitions, collision retries, and tenant isolation.Reviewed by Cursor Bugbot for commit e0a30b8. Bugbot is set up for automated code reviews on this repo. Configure here.