RunAgent Pulse uses a modular architecture with clear separation of concerns:
server/
├── api/ # HTTP API endpoints
├── executors/ # Agent execution backends (modular)
│ ├── base.py # Base executor interface
│ ├── serverless.py # Serverless executor
│ ├── local.py # Local executor
│ └── factory.py # Executor factory
├── workers/ # Background workers
│ ├── agent_executor_worker.py # Agent execution worker
│ └── __init__.py
├── database.py # Database layer
├── scheduler.py # Task scheduling
├── services.py # Business logic
└── main.py # Application entry point
All executors implement BaseExecutor which defines:
execute(): Execute an agentis_available(): Check if executor is availablevalidate_params(): Validate execution parameters
Executes agents via RunAgent Serverless SDK.
Requirements:
runagentpackage installedENABLE_SERVERLESS_INTEGRATION=true- Optional:
RUNAGENT_SERVERLESS_API_KEY
Usage:
executor = ServerlessExecutor(api_key="...")
result = await executor.execute(
agent_id="agent-id",
entrypoint_tag="entrypoint",
params={"prompt": "Hello"}
)Executes agents locally by importing Python modules.
Requirements:
- Agent module available in Python path
- Optional:
LOCAL_AGENT_PATHfor custom paths
Usage:
executor = LocalExecutor(agent_path="/path/to/agents")
result = await executor.execute(
agent_id="my_agent_module",
entrypoint_tag="process",
params={"input": "Hello"}
)The ExecutorFactory manages executor instances and provides:
get_executor(type): Get executor by typelist_available(): List available executor types
Auto-selection:
- If
type=None, prefers serverless if available, otherwise local
The AgentExecutorWorker polls for tasks and executes them using the appropriate executor:
- Polls for tasks with
schedule_type="run_agent"or"execute_agent" - Claims tasks atomically
- Selects executor based on task payload/metadata
- Executes agent via executor
- Stores results
- POSTs to callback URL if provided
- Acknowledges task completion
To add a new executor:
- Create a new file in
server/executors/ - Inherit from
BaseExecutor - Implement required methods
- Register in
ExecutorFactory._initialize_executors() - Update
server/executors/__init__.py
Example:
# server/executors/custom.py
from server.executors.base import BaseExecutor
class CustomExecutor(BaseExecutor):
def __init__(self, config):
super().__init__("custom")
self.config = config
def is_available(self) -> bool:
return True # Check availability
async def execute(self, agent_id, entrypoint_tag, params, **kwargs):
# Custom execution logic
return result- Extensibility: Easy to add new execution backends
- Testability: Each executor can be tested independently
- Flexibility: Choose executor per task
- Maintainability: Clear separation of concerns
- Scalability: Can add executors for different platforms