TLO Gateway Routing Specification
Implementation specification for TLO Gateway (:8001) as the River Agents entry point -- JWT validation, context extraction and propagation, route registration with FastAPI precedence rules, ACL enforcement, proxy client configuration, WebSocket channel management, Temporal workflow integration, request/response transformation, and security enforcement.
Quick Navigation
- Architecture Overview
- JWT Authentication and Context Propagation
- Route Registration
- ACL Permission Matrix
- Proxy Client Configuration
- WebSocket Integration
- Temporal Workflow Integration
- Request and Response Transformation
- Security Enforcement
- Appendix A: Configuration Reference
- Appendix B: TOOL_ACL_MAP Additions
- Appendix C: Comparison with PSA Workflow
Architecture Overview
Position in the Request Flow
TLO Gateway (:8001) is the sole entry point from the frontend for all River Agent operations. No frontend request reaches Backend or river-agent directly. The gateway performs three functions on every request in this order:
- Authentication -- Validates the JWT, decodes claims, and extracts user context
- ACL Enforcement -- Checks the user's permissions against the required permission for the target endpoint
- Proxy Routing -- Forwards the request to the appropriate downstream service with injected internal headers
For River Agents, TLO proxies to two downstream targets:
| Target | Port | Scope | Latency Profile |
|---|---|---|---|
Backend (rgen-backend) | 8005 | All CRUD, lifecycle, state, audit, trigger management, run history, metrics, settings, approval resolution | Low -- database reads and writes |
river-agent microservice (rgen-river-agent) | 8007 | LLM-powered agent generation from natural language; internal runtime execution calls from Temporal | High -- LLM inference |
Security invariant: Backend (:8005) and river-agent (:8007) trust all X-* context headers from TLO without independent JWT validation. They assume TLO has already validated the token. Network-level controls (VPC, security groups) must ensure these services are only reachable from TLO Gateway.
Proxy Topology
Comparison with Existing TLO Subsystems
River Agents follow the same architectural patterns as existing TLO subsystems:
| Subsystem | Route Prefix | Target(s) | Workflow Engine | WebSocket |
|---|---|---|---|---|
| PSA (Prompt Studio AI) | /api/v1/prompt-studio/** | Backend :8005, PSA :8010 | Temporal (psa-execute-{id}) | /ws/executions/{id} |
| Data Sources | /api/v1/data-sources/** | Backend :8005, Data Orchestration :8002 | Temporal (test, discover) | None |
| Automation | /api/v1/automation/** | Automation Service :8007 | None | None |
| Model Studio | /api/v1/models/** | Backend :8005 | None | None |
| River Agents | /api/v1/agents/** | Backend :8005, river-agent :8007 | Temporal (agent-run-{id}, agent-schedule-{id}) | /ws/agents/{id} |
Key difference from other subsystems: River Agents require split routing within the same /agents/ prefix -- most routes go to Backend, but /agents/ai/generate targets the river-agent microservice. Specific lifecycle routes trigger Temporal workflows directly from TLO. This is analogous to how /data-sources/{id}/test routes through Temporal to Data Orchestration while other /data-sources/** routes go directly to Backend.
JWT Authentication and Context Propagation
JWT Validation Flow
Validation steps in order:
- Token presence -- missing
Authorizationheader returns401 - Token signature -- invalid signature returns
401 - Token expiry -- expired token returns
401 - Required claims -- missing
org_idorworkspace_idreturns401 is_activecheck -- disabled accounts return401- RBAC check -- valid token with insufficient permissions returns
403
JWTPayload Extraction
TLO decodes the JWT into a JWTPayload dataclass. The following claims are extracted:
| JWT Claim | JWTPayload Field | Type | Notes |
|---|---|---|---|
sub | sub | str | Standard JWT subject |
user_id or sub | user_id | int | Custom RiverGen claim |
email | email | str | User email |
org_id or organization_id | organization_id | int | Tenant identifier |
workspace_id | workspace_id | int | Workspace scope |
roles | roles | list[str] | e.g., ["admin"], ["editor"] |
permissions | permissions | list[str] | e.g., ["agent:view", "agent:create"] |
is_active | is_active | bool | Disabled accounts are rejected at step 5 above |
session_id | session_id | str | Session tracking |
Context Headers Injected by TLO
After authentication, TLO injects the following headers into every proxied request. The target service must treat these as authoritative. Any client-supplied versions of these headers are overwritten.
| Header | Value Source | Purpose |
|---|---|---|
X-User-ID | user.user_id | Identity of the requesting user |
X-Org-ID | user.organization_id | Tenant isolation -- all queries MUST filter by this value |
X-Organization-ID | user.organization_id | Alias for backward compatibility |
X-Workspace-ID | user.workspace_id | Workspace scoping |
X-Email | user.email | User email for audit logs |
X-Roles | user.roles (comma-separated) | Role list for downstream authorization decisions |
X-Session-ID | user.session_id | Session tracking |
X-Request-ID | Generated UUID v4 or propagated | Distributed tracing -- logged by all services |
X-Trace-ID | Propagated from client or generated | OpenTelemetry trace correlation |
X-Internal-Call | "true" | Marks the request as TLO-originated (service-to-service) |
For agent-related requests, two additional headers are injected:
| Header | Value Source | Purpose |
|---|---|---|
X-Agent-ID | {id} from request path | Identifies the target agent for audit and ACL scoping |
X-Execution-ID | {execution_id} from request path | Identifies the target execution (for run and approval routes) |
RequestContextData Structure
The RequestContextData dataclass (from middleware/context.py) is attached to request.state.context on every request:
@dataclass
class RequestContextData:
request_id: str # UUID v4, generated or propagated from X-Request-ID
timestamp: datetime # UTC request start time
trace_id: str | None # Distributed tracing ID
span_id: str | None # Current span ID
parent_span_id: str | None
organization_id: int | None # From X-Org-ID header or JWT
workspace_id: int | None # From X-Workspace-ID header or path
client_ip: str | None # From X-Forwarded-For, X-Real-IP, or direct connection
user_agent: str | None # From User-Agent header
user_id: int | None # Set after authentication
Route Registration
Routing Pattern and FastAPI Precedence
TLO uses FastAPI's @router.api_route with wildcard path parameters to proxy entire resource trees:
@router.api_route(
"/{resource}/{path:path}",
methods=["GET", "POST", "PUT", "PATCH", "DELETE"],
)
async def proxy_routes(request, path, user=Depends(get_current_user)):
return await proxy_to_backend(request, f"/api/v1/{resource}/{path}")
FastAPI matches routes in registration order. Specific routes take precedence over the wildcard catch-all. River Agent routes must be declared in this order:
- Specific routes that target the river-agent service (
/agents/ai/generate,/agents/ai/suggest-goal,/agents/ai/refine) - Specific POST routes with Temporal workflow side-effects (
/agents/{id}/run,/agents/{id}/deploy) - Approval resolution routes with Temporal signals (
/agent-approvals/{id}/approve,/reject,/edit-approve) - Run control routes with Temporal cancellation (
/agent-runs/{id}/stop,/agent-runs/{id}/retry) - Wildcard catch-all for remaining CRUD (
/agents/{path:path},/agent-runs/{path:path}, etc.)
Route Precedence Decision Tree
Complete Route Registration Table
Agent Management (Backend :8005)
| Method | Path | Required Permission | Notes |
|---|---|---|---|
| GET | /api/v1/agents | agent:view | List with search, filter, pagination |
| POST | /api/v1/agents | agent:create | Create agent in draft status |
| GET | /api/v1/agents/{id} | agent:view | Full agent detail |
| PUT | /api/v1/agents/{id} | agent:update | Update config; auto-versions if deployed |
| DELETE | /api/v1/agents/{id} | agent:delete | Soft-delete |
| POST | /api/v1/agents/{id}/validate | agent:deploy | Pre-deployment validation only |
| POST | /api/v1/agents/{id}/deploy | agent:deploy | Deploy and activate agent; confirm = yes |
| POST | /api/v1/agents/{id}/pause | agent:deploy | Suspend triggers |
| POST | /api/v1/agents/{id}/resume | agent:deploy | Reactivate triggers |
| POST | /api/v1/agents/{id}/archive | agent:delete | Archive; confirm = yes |
| GET | /api/v1/agents/{id}/versions | agent:view | List versions |
| GET | /api/v1/agents/{id}/versions/{version_id} | agent:view | Get single version |
| GET | /api/v1/agents/{id}/versions/diff | agent:view | Diff two versions |
| POST | /api/v1/agents/{id}/versions/{version_id}/rollback | agent:deploy | Rollback; confirm = yes |
Agent Configuration Sub-Resources (Backend :8005)
| Method | Path | Required Permission |
|---|---|---|
| GET | /api/v1/agents/{id}/triggers | agent:view |
| POST | /api/v1/agents/{id}/triggers | agent:update |
| PUT | /api/v1/agents/{id}/triggers/{trigger_id} | agent:update |
| DELETE | /api/v1/agents/{id}/triggers/{trigger_id} | agent:update |
| GET | /api/v1/agents/{id}/data-sources | agent:view |
| POST | /api/v1/agents/{id}/data-sources | agent:update |
| DELETE | /api/v1/agents/{id}/data-sources/{binding_id} | agent:update |
| GET | /api/v1/agents/{id}/tools | agent:view |
| PUT | /api/v1/agents/{id}/tools | agent:update |
| GET | /api/v1/agents/{id}/policies | agent:view |
| POST | /api/v1/agents/{id}/policies | agent:update |
| PUT | /api/v1/agents/{id}/policies/{policy_id} | agent:update |
| DELETE | /api/v1/agents/{id}/policies/{policy_id} | agent:update |
AI Generation (river-agent :8007)
These routes must be registered before the /agents/{path:path} wildcard catch-all.
| Method | Path | Required Permission | Target |
|---|---|---|---|
| POST | /api/v1/agents/ai/generate | agent:create | river-agent :8007 (SSE stream) |
| POST | /api/v1/agents/ai/suggest-goal | agent:create | river-agent :8007 |
| POST | /api/v1/agents/ai/refine | agent:create | river-agent :8007 |
Templates (Backend :8005)
| Method | Path | Required Permission |
|---|---|---|
| GET | /api/v1/templates | agent:view |
| GET | /api/v1/templates/{id} | agent:view |
| POST | /api/v1/templates | agent:create |
| PATCH | /api/v1/templates/{id} | agent:update |
| DELETE | /api/v1/templates/{id} | agent:delete |
Executions and Runs (Backend :8005 + Temporal)
| Method | Path | Required Permission | Routing Note |
|---|---|---|---|
| POST | /api/v1/agents/{id}/runs | agent:execute | Starts Temporal workflow agent-run-{execution_id}; returns 202 |
| GET | /api/v1/agents/runs | agent:view | Backend direct |
| GET | /api/v1/agents/{id}/runs | agent:view | Backend direct |
| GET | /api/v1/agents/runs/{execution_id} | agent:view | Backend direct |
| GET | /api/v1/agents/runs/{execution_id}/logs | agent:view | Backend direct |
| POST | /api/v1/agents/runs/{execution_id}/stop | agent:execute | Sends Temporal cancellation signal |
| POST | /api/v1/agents/runs/{execution_id}/retry | agent:execute | Starts new Temporal workflow with same context |
Inbound Triggers (Backend :8005)
These routes accept traffic from external systems (webhooks, API callers). They do not require a user JWT -- they authenticate via agent API keys validated at TLO.
| Method | Path | Auth Method | Notes |
|---|---|---|---|
| POST | /api/v1/agent-webhooks/{agent_id} | Agent API key | Event-based trigger from external webhook |
| POST | /api/v1/agent-api/{agent_id}/execute | Agent API key | Authenticated API trigger from external systems |
Approvals (Backend :8005 + Temporal)
| Method | Path | Required Permission | Routing Note |
|---|---|---|---|
| GET | /api/v1/agents/approvals | agent:approve | Backend direct |
| GET | /api/v1/agents/approvals/{id} | agent:approve | Backend direct |
| GET | /api/v1/agents/{id}/approvals | agent:view | Backend direct |
| PATCH | /api/v1/agents/approvals/{id} | agent:approve | Backend writes decision; TLO signals Temporal workflow |
| POST | /api/v1/agents/approvals/{id}/expire | agent:deploy | Force-expire pending approval (Admin only) |
Metrics, Monitoring, and Audit (Backend :8005)
| Method | Path | Required Permission |
|---|---|---|
| GET | /api/v1/agents/{id}/metrics | agent:view |
| GET | /api/v1/agents/stats | agent:view |
| GET | /api/v1/agents/monitoring/summary | agent:monitor |
| GET | /api/v1/agents/monitoring/throughput | agent:monitor |
| GET | /api/v1/agents/monitoring/cluster | agent:monitor |
| GET | /api/v1/agents/monitoring/alerts | agent:monitor |
| GET | /api/v1/audit | agent:audit |
| GET | /api/v1/audit/{entry_id} | agent:audit |
| POST | /api/v1/audit/export | agent:audit |
Settings (Backend :8005)
| Method | Path | Required Permission |
|---|---|---|
| GET | /api/v1/workspace/settings | agent:view |
| PATCH | /api/v1/workspace/settings | agent:update |
| GET | /api/v1/workspace/settings/api-keys | agent:admin |
| POST | /api/v1/workspace/settings/api-keys | agent:admin |
| DELETE | /api/v1/workspace/settings/api-keys/{key_id} | agent:admin |
| POST | /api/v1/workspace/settings/webhooks | agent:admin |
| DELETE | /api/v1/workspace/settings/webhooks/{webhook_id} | agent:admin |
Emergency Controls (Backend :8005)
| Method | Path | Required Permission |
|---|---|---|
| POST | /api/v1/governance/emergency/pause-all | agent:admin |
| POST | /api/v1/governance/emergency/policy | agent:admin |
Wildcard Catch-All (Backend :8005)
These must be declared last in the route file.
| Pattern | Methods | Notes |
|---|---|---|
/api/v1/agents/{path:path} | GET, POST, PUT, PATCH, DELETE | Remaining /agents/** routes |
/api/v1/templates/{path:path} | GET, POST, PUT, PATCH, DELETE | Remaining template routes |
/api/v1/agent-webhooks/{path:path} | POST | Remaining webhook routes |
/api/v1/agent-api/{path:path} | POST | Remaining API trigger routes |
WebSocket Routes
| Path | Required Permission | Auth Method | Notes |
|---|---|---|---|
/ws/agents/{execution_id} | agent:view | Sec-WebSocket-Protocol: Bearer {jwt} | Per-execution live stream |
/ws/agents/workspace | agent:view | Sec-WebSocket-Protocol: Bearer {jwt} | Workspace-wide activity feed |
/ws/monitoring | agent:monitor | Sec-WebSocket-Protocol: Bearer {jwt} | Infrastructure health stream |
/ws/approvals | agent:approve | Sec-WebSocket-Protocol: Bearer {jwt} | Pending approval notifications |
Configuration Additions for river-agent Service
Add to services/tlo_gateway/config.py:
# River Agent Service
river_agent_service_url: str = "http://localhost:8007"
river_agent_timeout_seconds: int = 120
river_agent_retry_count: int = 2
Add to middleware/acl.py get_service_url():
"river_agent": settings.river_agent_service_url,
ACL Permission Matrix
Permission Definitions
River Agents introduce 9 permissions in the agent: namespace:
| Permission | Scope | Description |
|---|---|---|
agent:view | Read | View agents, templates, runs, logs, metrics, and settings |
agent:create | Write | Create agents, bind data sources and tools, create from template |
agent:update | Write | Update agent config, triggers, policies, notification settings |
agent:delete | Write | Soft-delete (archive) agents |
agent:deploy | Write | Deploy, pause, resume, validate agents; manage lifecycle transitions |
agent:execute | Write | Trigger manual runs, retry failed runs, send stop signals |
agent:approve | Write | Approve, reject, or edit-approve pending approval requests |
agent:audit | Read | View audit logs and execution traces |
agent:monitor | Read | View monitoring metrics and system health |
agent:admin | Admin | Manage workspace settings, API keys, webhooks, emergency controls |
Admin bypass: Users with the admin role in their JWT bypass all ACL checks, consistent with the existing check_tool_acl() behavior.
Full Endpoint-to-Permission Matrix
| Endpoint | Method | Required Permission | Confirm |
|---|---|---|---|
/api/v1/agents | GET | agent:view | No |
/api/v1/agents | POST | agent:create | No |
/api/v1/agents/{id} | GET | agent:view | No |
/api/v1/agents/{id} | PUT | agent:update | No |
/api/v1/agents/{id} | DELETE | agent:delete | Yes |
/api/v1/agents/{id}/validate | POST | agent:deploy | No |
/api/v1/agents/{id}/deploy | POST | agent:deploy | Yes |
/api/v1/agents/{id}/pause | POST | agent:deploy | No |
/api/v1/agents/{id}/resume | POST | agent:deploy | No |
/api/v1/agents/{id}/archive | POST | agent:delete | Yes |
/api/v1/agents/{id}/versions | GET | agent:view | No |
/api/v1/agents/{id}/versions/{vid}/rollback | POST | agent:deploy | Yes |
/api/v1/agents/ai/generate | POST | agent:create | No |
/api/v1/agents/ai/suggest-goal | POST | agent:create | No |
/api/v1/agents/ai/refine | POST | agent:create | No |
/api/v1/templates | GET | agent:view | No |
/api/v1/templates | POST | agent:create | No |
/api/v1/templates/{id} | PATCH | agent:update | No |
/api/v1/templates/{id} | DELETE | agent:delete | Yes |
/api/v1/agents/{id}/triggers | GET | agent:view | No |
/api/v1/agents/{id}/triggers | POST | agent:update | No |
/api/v1/agents/{id}/triggers/{tid} | PUT | agent:update | No |
/api/v1/agents/{id}/triggers/{tid} | DELETE | agent:update | No |
/api/v1/agents/{id}/runs | POST | agent:execute | No |
/api/v1/agents/runs | GET | agent:view | No |
/api/v1/agents/runs/{id} | GET | agent:view | No |
/api/v1/agents/runs/{id}/stop | POST | agent:execute | No |
/api/v1/agents/runs/{id}/retry | POST | agent:execute | No |
/api/v1/agents/runs/{id}/logs | GET | agent:view | No |
/api/v1/agent-webhooks/{agent_id} | POST | Agent API key | No |
/api/v1/agent-api/{agent_id}/execute | POST | Agent API key | No |
/api/v1/agents/approvals | GET | agent:approve | No |
/api/v1/agents/approvals/{id} | GET | agent:approve | No |
/api/v1/agents/approvals/{id} | PATCH | agent:approve | No |
/api/v1/agents/approvals/{id}/expire | POST | agent:admin | No |
/api/v1/agents/{id}/metrics | GET | agent:view | No |
/api/v1/agents/monitoring/summary | GET | agent:monitor | No |
/api/v1/agents/monitoring/throughput | GET | agent:monitor | No |
/api/v1/agents/monitoring/cluster | GET | agent:monitor | No |
/api/v1/agents/monitoring/alerts | GET | agent:monitor | No |
/api/v1/workspace/settings | GET | agent:view | No |
/api/v1/workspace/settings | PATCH | agent:update | No |
/api/v1/workspace/settings/api-keys | GET, POST | agent:admin | No |
/api/v1/workspace/settings/api-keys/{key_id} | DELETE | agent:admin | Yes |
/api/v1/workspace/settings/webhooks | POST | agent:admin | No |
/api/v1/governance/emergency/pause-all | POST | agent:admin | Yes |
Role-to-Permission Mapping
River Agents RBAC operates on two tiers. Both tiers are enforced; a user must satisfy the relevant tier for the requested operation.
Organization-level roles:
| Permission | Org Admin | Org Editor | Org Viewer |
|---|---|---|---|
agent:view | Yes | Yes | Yes |
agent:create | Yes | Yes | No |
agent:update | Yes | Yes | No |
agent:delete | Yes | No | No |
agent:deploy | Yes | Yes | No |
agent:execute | Yes | Yes | No |
agent:approve | Yes | Yes | No |
agent:audit | Yes | No | No |
agent:monitor | Yes | No | No |
agent:admin | Yes | No | No |
Workspace-level roles:
| Permission | WS Admin | WS Editor | WS Analyst | WS Viewer | WS Auditor |
|---|---|---|---|---|---|
agent:view | Yes | Yes | Yes | Yes | Yes |
agent:create | Yes | Yes | No | No | No |
agent:update | Yes | Yes | No | No | No |
agent:delete | Yes | No | No | No | No |
agent:deploy | Yes | Yes | No | No | No |
agent:execute | Yes | Yes | Yes | No | No |
agent:approve | Yes | Yes | No | No | No |
agent:audit | Yes | No | No | No | Yes |
agent:monitor | Yes | No | Yes | No | Yes |
agent:admin | Yes | No | No | No | No |
TOOL_ACL_MAP Entries for Agent Execution
When River Agents execute tools during their agentic loop via TLO's ACL proxy, the existing TOOL_ACL_MAP in middleware/acl.py applies. The agent runs with the triggering user's permissions -- see Agent Execution Security Model. No new entries are required for agent-invoked platform tools (e.g., execute_query still maps to data_source:query). However, TLO must pass X-Agent-ID alongside the existing user context headers so downstream services can distinguish agent-initiated requests from direct user requests in audit logs.
See Appendix B: TOOL_ACL_MAP Additions for the internal activity tools used by Temporal workflow activities.
Proxy Client Configuration
Circuit Breaker Settings
River Agents require separate circuit breakers for each downstream target:
| Circuit Breaker | Target | Failure Threshold | Recovery Timeout | Rationale |
|---|---|---|---|---|
backend | Backend :8005 | 5 consecutive failures | 30 seconds | Shared with all TLO routes; conservative threshold |
river_agent | river-agent :8007 | 3 consecutive failures | 60 seconds | LLM service may have cold starts; longer recovery window |
Circuit breaker state machine:
CLOSED --[failure_count >= threshold]--> OPEN
OPEN --[recovery_timeout elapsed]--> HALF_OPEN
HALF_OPEN --[success]--> CLOSED
HALF_OPEN --[failure]--> OPEN
When the circuit is OPEN, TLO returns 503 Service Unavailable immediately without attempting the downstream call.
Retry Policy
| Error Condition | Retry? | Max Retries | Backoff | Applied To |
|---|---|---|---|---|
| HTTP 502 Bad Gateway | Yes | 3 | Exponential (0.5s, 1s, 2s) | All routes |
| HTTP 503 Service Unavailable | Yes | 3 | Exponential | All routes |
| HTTP 504 Gateway Timeout | Yes | 3 | Exponential | All routes |
| HTTP 429 Rate Limited | Yes | 2 | Fixed (1s) | All routes |
httpx.ConnectError | Yes | 3 | Exponential | All routes |
httpx.TimeoutException | Yes | 1 | None | CRUD routes only |
| HTTP 4xx client errors | No | -- | -- | Never retry |
| HTTP 5xx other server errors | No | -- | -- | Only 502/503/504 retry |
Execution trigger (POST .../runs) | No | -- | -- | Not idempotent -- never retry |
Approval resolution (PATCH .../approvals/{id}) | No | -- | -- | Not idempotent -- never retry |
No retries on execute and approve routes: A retry on a successful-but-slow run trigger would start a duplicate execution. The client is responsible for retry logic with idempotency keys where required.
Timeout Configuration
| Route Category | Connect Timeout | Read Timeout | Total Timeout | Rationale |
|---|---|---|---|---|
| CRUD (list, get, create, update, delete) | 5s | 10s | 15s | Standard database operations |
| Lifecycle (deploy, pause, resume, archive) | 5s | 15s | 20s | May trigger validation and trigger registration |
Validation (/validate) | 5s | 30s | 35s | Connectivity tests to data sources |
AI Generation (/ai/generate) | 5s | 120s | 125s | LLM inference; SSE stream may run for the full duration |
Manual Run (/agents/{id}/runs) | 5s | 10s | 15s | Only starts the Temporal workflow; does not wait for completion |
| Execution Activity (Temporal to river-agent) | 5s | 300s | 305s | Full agentic loop; up to 15 turns |
Approval Resolution (PATCH .../approvals/{id}) | 5s | 10s | 15s | Temporal signal dispatch; near-instant |
Health Check Endpoints
| Service | Health Endpoint | Expected Response | Failure Mode |
|---|---|---|---|
| Backend :8005 | GET /health | HTTP 200 | "backend": "failed" -- critical; /ready returns unhealthy |
| river-agent :8007 | GET /health | HTTP 200 | "river_agent": "failed" -- non-critical; CRUD and lifecycle still work |
river-agent is not critical for readiness. If it is unavailable, only AI generation and runtime LLM execution are degraded. CRUD, lifecycle, and approval operations continue normally.
WebSocket Integration
Dedicated Endpoint
River Agents use a dedicated WebSocket endpoint: /ws/agents/{execution_id}.
This is separate from PSA's /ws/executions/{execution_id} because River Agent execution messages have different semantics -- approval notifications, governance gate events, turn-by-turn reasoning with action level enforcement, and long-running execution spans that may pause for hours at an approval gate. A dedicated endpoint allows:
- Agent-specific message type definitions
- Agent API key authentication in addition to user JWT (for external system integrations)
- Independent connection management and per-execution message queuing
WebSocket Authentication
WebSocket connections authenticate via the Sec-WebSocket-Protocol header (this is the browser-compatible mechanism -- the protocols parameter in the WebSocket() constructor):
GET /ws/agents/exec-001-uuid HTTP/1.1
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Protocol: Bearer eyJhbGci...
TLO validates the token identically to HTTP request tokens and injects context headers into the forwarded upgrade request. The Backend WebSocket service never receives the client's raw token.
Queued messages: if the Temporal workflow has already emitted events before the WebSocket connection is established, those messages are queued in the ConnectionManager and flushed immediately upon connection.
Message Types
Server to Client (8 types)
| Type | When Sent | Description |
|---|---|---|
connected | Immediately after WebSocket accept | Connection bound to execution_id |
run_started | When Temporal workflow begins | Execution has started |
turn_update | After each agentic loop turn | Reasoning turn progress with tool call and observation |
governance_check | After governance evaluation | Decision for a tool call: proceed, blocked, suggest, or gate |
approval_required | When workflow enters approval wait | Action requires human approval; execution is paused |
approval_resolved | When approval signal is processed | Approval decision received |
run_completed | When Temporal workflow finishes | Execution finished with final result and status |
error | On any unrecoverable error | Timeout, service failure, or policy violation |
Client to Server (3 types)
| Type | When Sent | Description |
|---|---|---|
approval_response | In response to approval_required | User's approval decision (approve, reject, edit-approve) |
stop_request | When user clicks "Stop" | Emergency stop signal |
ping | Periodically | Keepalive; server responds with pong |
Key Message Structures
turn_update:
{
"type": "turn_update",
"execution_id": "exec-001-uuid",
"turn": 3,
"max_turns": 15,
"phase": "reasoning",
"tool_name": "execute_query",
"tool_category": "execution",
"message": "Querying sales data for Q1 2026...",
"progress_percentage": 40.0,
"timestamp": "2026-04-23T10:15:05Z"
}
approval_required:
{
"type": "approval_required",
"execution_id": "exec-001-uuid",
"approval_id": "apr-001-uuid",
"agent_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"action": "write_back",
"severity": "warning",
"title": "Write 1,250 rows to production table",
"details": {
"tool_name": "write_back",
"tool_args": { "target_table": "customer_segments", "row_count": 1250 },
"agent_name": "Customer Segmentation Agent",
"reasoning": "Based on updated model scores, 1,250 customer records need segment reassignment."
},
"timeout_seconds": 3600,
"timestamp": "2026-04-23T10:15:12Z"
}
run_completed:
{
"type": "run_completed",
"execution_id": "exec-001-uuid",
"status": "completed",
"result": {
"summary": "Revenue anomaly detected: Q1 revenue dropped 15% vs Q4.",
"actions_taken": [
{ "tool": "execute_query", "status": "success" },
{ "tool": "send_notification", "status": "success" }
],
"turns_used": 8,
"total_duration_ms": 45200
},
"timestamp": "2026-04-23T10:15:50Z"
}
Client approval_response:
{
"type": "approval_response",
"execution_id": "exec-001-uuid",
"approval_id": "apr-001-uuid",
"decision": "edited",
"edited_args": { "target_table": "customer_segments_staging", "row_count": 1250 },
"reason": "Write to staging first for review.",
"timestamp": "2026-04-23T10:16:30Z"
}
Channel Subscription Model
The ConnectionManager tracks WebSocket connections at three scopes. When a run event occurs, it is fan-out delivered to all three scopes simultaneously:
| Scope | Subscription Key | Use Case |
|---|---|---|
| Per-execution | agent-run-{execution_id} | Live progress for a single running execution |
| Per-agent | agent-{agent_id} | Monitoring dashboard for one agent (all run starts and completions) |
| Workspace-wide | workspace-agents-{workspace_id} | Dashboard showing all agent activity in a workspace |
WebSocket Flow During Execution
Temporal Workflow Integration
river_agent_execution_workflow
The primary workflow for every agent run, regardless of trigger type.
- Workflow ID pattern:
agent-run-{execution_id} - Task queue:
tlo-gateway(shared with PSA workflows) - Execution timeout: 1 hour (configurable per agent; max 4 hours)
Activities
| Activity | Timeout | Retry | Target | Description |
|---|---|---|---|---|
resolve_agent | 10s | 2 retries | Backend :8005 | Load agent config, validate Active status, load current version |
build_context | 30s | 2 retries | Backend :8005 | Assemble AgentContext: instructions, tools, data sources, memory, trigger payload, schema metadata |
call_river_agent | 150s | 1 retry | river-agent :8007 | Send context and observations to LLM for next reasoning turn; returns tool call or final answer |
check_governance | 10s | 2 retries | Backend :8005 | Evaluate action level, policies, and approval rules for the proposed tool call |
execute_tool | 60s (120s for queries) | 1 retry | TLO ACL proxy | Execute tool call through TLO's ACL validation and service routing |
log_turn | 5s | 3 retries | Backend :8005 | Persist turn log entry (reasoning, tool call, observation, governance decision) |
update_execution_status | 5s | 3 retries | Backend :8005 | Update agent_executions record status and timing |
update_long_term_context | 10s | 2 retries | Backend :8005 | Persist learnings from the run into agent's long_term_context |
send_ws_message | 10s | 1 retry | TLO internal | Push WebSocket message via /internal/ws/push |
send_notification | 10s | 2 retries | Notification :8006 | Send approval/completion notifications via Novu |
Signal Handling for Approval Resolution
The workflow uses Temporal's workflow.wait_condition() to hibernate at zero compute cost while waiting for an approval signal.
Signal flow:
- User clicks Approve/Reject/Edit in the frontend
- Frontend calls
PATCH /api/v1/agents/approvals/{id} - Backend writes the resolution to
approval_requests - TLO resolves the
execution_idfrom theapproval_requestrecord - TLO sends Temporal signal to workflow
agent-run-{execution_id}
Signal name: approval_resolution
Signal payload:
{
"approval_id": "apr-001-uuid",
"decision": "approved",
"edited_args": {},
"approver_user_id": 42,
"reason": "Verified against policy; proceed.",
"timestamp": "2026-04-23T10:16:30Z"
}
State Serialization for Approval Hibernation
When the workflow enters the approval wait, Temporal automatically serializes workflow state during wait_condition. All workflow variables must be JSON-serializable (Pydantic models guarantee this). The serialized state includes:
@dataclass
class HibernatedState:
execution_id: str
agent_id: str
current_turn: int
pending_tool_call: dict # {tool_name, tool_args, governance_decision}
conversation_history: list # list[TurnEntry]
agent_context: dict # serialized AgentContext
started_at: datetime
hibernated_at: datetime
No manual serialization is needed beyond ensuring all dataclasses are Pydantic models or standard Python types.
Error Handling and Retry Policy
| Error Type | Handling | Retry | Notification |
|---|---|---|---|
resolve_agent failure | Abort workflow immediately | 2 retries, 1s backoff | None |
build_context failure | Abort workflow | 2 retries, 2s backoff | error WebSocket message |
call_river_agent timeout | Retry once, then abort | 1 retry, 5s backoff | error WebSocket message |
call_river_agent LLM error | Retry once, then abort | 1 retry, 2s backoff | error WebSocket message |
execute_tool timeout | Skip tool; feed error as observation | 1 retry, 2s backoff | turn_update with error |
execute_tool ACL denied | Feed denial as observation; do not retry | No retry | turn_update with denial |
execute_tool service error | Retry once; then feed error as observation | 1 retry, 2s backoff | turn_update with error |
| Approval timeout (1h default) | Terminate with approval_timeout status | No retry | Notification to agent owner |
| Max turns exceeded | Terminate with max_turns_exceeded status | No retry | Notification to agent owner |
| Workflow cancellation | Clean up; set status to stopped | No retry | error WebSocket message |
river_agent_scheduled_trigger_workflow
Handles cron-based scheduled triggers for active agents.
- Workflow ID pattern:
agent-schedule-{agent_id} - Task queue:
tlo-gateway - Pattern:
ContinueAsNewafter each execution to maintain the cron schedule indefinitely
Sequence
Cron Configuration
| Parameter | Value |
|---|---|
| Schedule ID | agent-schedule-{agent_id} |
| Cron Expression | From agent_triggers.trigger_config.cron_expression |
| Timezone | From trigger_config.timezone (default: UTC) |
| Overlap Policy | SKIP -- do not start a new run if previous is still running |
| Catchup Window | 1 hour -- catch up on missed triggers within this window |
| Pause on Deploy | Yes -- paused until agent reaches Active state |
Request and Response Transformation
Header Injection
| Transformation | Trigger | Description |
|---|---|---|
| Standard context headers | All requests | X-User-ID, X-Org-ID, X-Workspace-ID, X-Roles, X-Request-ID, X-Internal-Call injected; client-supplied values overwritten |
X-Agent-ID injection | Path contains /agents/{id} | Agent ID extracted from path parameter |
X-Execution-ID injection | Path contains /runs/{execution_id} | Execution ID extracted from path parameter |
X-Trigger-Type injection | POST /agents/{id}/runs | Trigger type extracted from request body |
Request Validation and Sanitization
TLO performs lightweight validation before proxying. Deep schema validation is the responsibility of Backend.
| Validation | Applied To | Behavior on Failure |
|---|---|---|
| JSON body parse | All POST, PUT, PATCH requests | Return 400 if body is not valid JSON |
agent_id format | Path parameter {id} | Must be valid UUID; return 400 otherwise |
| Required fields for create | POST /api/v1/agents | name and instruction_set must be present |
| State pre-check for deploy | POST /api/v1/agents/{id}/deploy | Agent must not be archived; return 409 otherwise |
Response Envelope
For routes handled directly by TLO (Temporal workflow triggers, approval signals), the response is wrapped in the standard RiverGen envelope:
{
"success": true,
"status": 200,
"message": "Human-readable description",
"data": {},
"error": null,
"meta": {
"request_id": "uuid",
"timestamp": "2026-04-23T10:15:00Z"
}
}
For proxy routes (pass-through to Backend), the Backend response is forwarded unchanged -- Backend already returns this format.
Error Response Normalization
| HTTP Status | Error Code | Message Template |
|---|---|---|
| 400 | validation_error | Specific validation failure message |
| 401 | missing_token, invalid_token, expired_token | Authentication failure message |
| 403 | permission_denied | "Permission denied: requires '{permission}'" |
| 404 | not_found | "Agent not found", "Run not found" |
| 409 | invalid_state_transition | "Cannot deploy agent in '{current_status}' state" |
| 429 | rate_limited | "Rate limit exceeded. Retry after {retry_after}s" |
| 503 | service_unavailable | "Service {name} is temporarily unavailable" |
| 504 | gateway_timeout | "Service {name} timed out after {timeout}s" |
Rate Limiting Headers
TLO injects rate limiting headers on every response:
| Header | Value | Description |
|---|---|---|
X-RateLimit-Limit | Requests per minute | Window size for the current endpoint group |
X-RateLimit-Remaining | Remaining requests in window | Count down to limit |
X-RateLimit-Reset | Unix timestamp | When the window resets |
Retry-After | Seconds | Only present on 429 responses |
Security Enforcement
Per-Request ACL Validation Flow
Tenant Isolation Enforcement
Tenant isolation is enforced at three independent layers:
TLO Layer: org_id and workspace_id come exclusively from the decoded JWT. TLO never allows the client to override these values via query parameters, request body fields, or path parameters. If a client sends ?workspace_id=999 but their JWT contains workspace_id=7, TLO uses 7.
Backend Layer: Every database query includes WHERE organization_id = :org_id using the value from X-Org-ID. This is enforced by SQLAlchemy query patterns and the organization_id dependency injected into all service methods.
Agent Layer: During execution, the agent can only access data sources belonging to the same organization_id as the agent. agent_version_data_sources bindings are validated against the user's org at deployment time.
Rate Limiting
Rate limits are enforced at two granularities:
| Granularity | Window | Limit | Applies To |
|---|---|---|---|
| Per user (workspace) | 1 minute | 100 requests | All agent API endpoints |
| Per API key | 1 minute | 60 requests | Agent API trigger endpoint (/agent-api/{id}/execute) |
| Per agent (execution) | 1 hour | 50 manual runs | Manual trigger endpoint (/agents/{id}/runs) |
| Burst (token bucket) | -- | 20 concurrent | All endpoints |
Rate limiting is implemented in TLO middleware. When Redis is available (redis_url configured), limits are distributed across TLO instances. Otherwise, in-memory rate limiting applies (suitable for single-instance development only).
Request Logging for Audit Trail
Every River Agent request is logged with the following fields:
| Field | Source | Purpose |
|---|---|---|
request_id | Generated UUID | Distributed trace correlation |
timestamp | Server clock (UTC) | Event ordering |
method | HTTP method | Operation type |
path | Request URL path | Target resource |
user_id | JWT claim | Who made the request |
organization_id | JWT claim | Tenant identifier |
workspace_id | JWT claim | Workspace scope |
agent_id | Path parameter | Target agent (if applicable) |
execution_id | Path parameter or body | Target execution (if applicable) |
status_code | Response | Success or failure |
duration_ms | Computed | Performance tracking |
acl_decision | ACL check result | Whether permission was granted or denied |
Audit logs are:
- Written synchronously to the application log (structured JSON)
- Persisted asynchronously to
audit_logsin PostgreSQL via Backend - Never include request or response bodies in production (configurable via
log_include_request_body/log_include_response_body)
Sensitive Data Redaction
The following data is never logged, even in debug mode:
| Data | Redaction Rule |
|---|---|
| JWT token | Logged as Bearer *** |
| API keys | Logged as ***{last4} (last 4 characters only) |
| Database credentials | Never present in TLO context -- fetched by Backend only |
| Auth endpoint request bodies | Never logged (passwords, MFA codes) |
| WebSocket message bodies | Logged as type + execution_id only -- no payload content |
Agent Execution Security Model
During an agent execution, the security context is the triggering user's identity, not the agent's identity. This means:
- The agent can only access data sources the triggering user has permission to access
- The agent can only execute tools the triggering user has permissions for
X-User-IDin tool calls contains the triggering user's ID, not a service accountX-Agent-IDis added for audit purposes but does not grant any additional permissions
This prevents privilege escalation: an agent configured by an Admin but triggered by a Viewer cannot perform Admin-level operations. The Viewer's permissions are the ceiling.
Scheduled triggers (no interactive user): The agent runs with the permissions of the agent owner -- the user who last deployed the agent. The owner's user_id and permissions are stored in the agent configuration at deployment time and used as the security context for all scheduled and event-triggered runs.
Appendix A: Configuration Reference
TLO Gateway Settings for River Agents
| Setting | Env Variable | Default | Description |
|---|---|---|---|
river_agent_service_url | TLO_RIVER_AGENT_SERVICE_URL | http://localhost:8007 | river-agent microservice base URL |
river_agent_timeout_seconds | TLO_RIVER_AGENT_TIMEOUT_SECONDS | 120 | Default timeout for river-agent calls |
river_agent_retry_count | TLO_RIVER_AGENT_RETRY_COUNT | 2 | Max retries for river-agent calls |
agent_execution_timeout_seconds | TLO_AGENT_EXECUTION_TIMEOUT_SECONDS | 3600 | Max duration for a single agent run |
agent_turn_timeout_seconds | TLO_AGENT_TURN_TIMEOUT_SECONDS | 150 | Max duration for a single reasoning turn |
agent_max_turns | TLO_AGENT_MAX_TURNS | 15 | Max reasoning turns per execution |
agent_approval_timeout_seconds | TLO_AGENT_APPROVAL_TIMEOUT_SECONDS | 3600 | Max wait time for approval (1 hour) |
agent_ws_message_queue_size | TLO_AGENT_WS_MESSAGE_QUEUE_SIZE | 200 | Max queued WebSocket messages per execution |
Circuit Breaker Settings
| Setting | Env Variable | Default | Description |
|---|---|---|---|
river_agent_cb_failure_threshold | TLO_RIVER_AGENT_CB_FAILURE_THRESHOLD | 3 | Failures before circuit opens |
river_agent_cb_recovery_timeout | TLO_RIVER_AGENT_CB_RECOVERY_TIMEOUT | 60 | Seconds before half-open test |
backend_cb_failure_threshold | TLO_BACKEND_CB_FAILURE_THRESHOLD | 5 | Failures before circuit opens |
backend_cb_recovery_timeout | TLO_BACKEND_CB_RECOVERY_TIMEOUT | 30 | Seconds before half-open test |
Appendix B: TOOL_ACL_MAP Additions
The following entries must be added to middleware/acl.py TOOL_ACL_MAP for agent-internal tools invoked by Temporal workflow activities. These tools are called via X-Internal-Call: true from Temporal workers -- they bypass JWT validation and use the triggering user's context from X-User-ID / X-Org-ID headers.
# River Agent internal activity tools (called by Temporal workflow activities)
# These use X-Internal-Call: true and bypass JWT validation.
# User context is propagated via X-User-ID, X-Org-ID, X-Workspace-ID headers.
"resolve_agent_config": {"permission": "agent:view", "service": "backend"},
"list_agent_data_sources": {"permission": "agent:view", "service": "backend"},
"get_agent_memory": {"permission": "agent:view", "service": "backend"},
"update_agent_memory": {"permission": "agent:update", "service": "backend"},
"create_approval_request": {"permission": "agent:execute", "service": "backend"},
"log_agent_turn": {"permission": "agent:execute", "service": "backend"},
"update_execution_status": {"permission": "agent:execute", "service": "backend"},
Appendix C: Comparison with PSA Workflow
| Aspect | PSA Workflow | River Agent Workflow |
|---|---|---|
| Workflow ID | psa-execute-{execution_id} | agent-run-{execution_id} |
| Trigger types | Always manual (user prompt) | Manual, scheduled, event, API, threshold, workflow |
| Context assembly | 5 parallel enrichment activities | Context builder loads agent config, memory, and trigger payload |
| Reasoning target | PSA Service :8010 | river-agent :8007 |
| Governance | Per-tool ACL only | Per-tool ACL + Action Level + Approval Gates + Policy Engine |
| Approval gate | Confirmation token (destructive ops only) | Full approval workflow with Temporal hibernate/resume |
| Max turns | 15 | 15 (configurable per agent version) |
| WebSocket path | /ws/executions/{id} | /ws/agents/{id} |
| Post-execution | Store prompt history | Update long-term context + metrics + notifications |
| State persistence | Session-based (lost on browser close) | Long-lived memory across runs |
| Tool execution | Client-side (WebSocket round-trip) | Server-side (Temporal activity via TLO ACL proxy) |
| Scheduled runs | Not supported | Temporal Schedule with ContinueAsNew cron pattern |