Skip to main content

River-Agent Microservice

The stateless FastAPI reasoning engine at :8007 that receives a fully assembled execution context, runs the multi-turn agentic loop (Reason, Act, Observe), and returns structured results to the calling Temporal workflow activity.

Quick Navigation

Service Overview

AttributeValue
Service nameriver-agent
Port8007
FrameworkFastAPI (Python 3.12, async)
Containerdocker/river-agent/Dockerfile
StatefulnessStateless -- no database, no Redis, no local storage between invocations
CallerTemporal workflow activity (via TLO Gateway :8001)
DependenciesTLO Gateway :8001, LLM provider APIs (external)
Horizontal scalingFully supported; any replica handles any request
Health checkGET /health

The river-agent service is the reasoning brain of every agent execution. It receives a fully assembled ExecutionRequest from the calling Temporal activity, runs the iterative Reason, Act, Observe loop internally, and returns a structured ExecutionResponse when the loop terminates. It is a pure function: given the same input context, it produces deterministic structured output through LLM reasoning and tool calls.

Service Boundaries

The river-agent service does NOT:

  • Store any state -- no database connections, no Redis, no persistent storage of any kind
  • Access the database directly -- all data retrieval flows through TLO Gateway tool calls
  • Manage agent lifecycle -- agent CRUD, versioning, deployment, and state machine transitions are owned by the backend
  • Process triggers -- trigger ingestion, scheduling, and dispatch are handled by the Trigger Ingestion Service in the backend
  • Manage approvals -- approval gate creation, notification dispatch, and resolution handling belong to the Governance and Approval Service in the backend
  • Maintain WebSocket connections -- real-time telemetry is emitted as structured events to the calling Temporal activity, which forwards them to TLO Gateway for WebSocket delivery
  • Store execution logs -- log persistence is owned by the Execution Logging and Audit Service in the backend
  • Own any credentials -- database connection strings, API keys, and user tokens are never exposed to the river-agent service; it receives only sanitized user context

Execution Model

river-agent runs the entire agentic loop for one execution in a single HTTP call. The caller -- a Temporal workflow activity in Backend :8005 -- sends a fully assembled ExecutionRequest and blocks until river-agent returns a complete ExecutionResponse. There are no callbacks, no streaming responses, and no mid-execution control flow between river-agent and its caller.

The context bundle passed in the request is the sole source of truth for the execution. river-agent reads no database, calls no external services directly, and holds no in-memory state between invocations. Every piece of information it needs -- the agent's instruction set, the active tool registry with Pydantic schemas, connected data source metadata, governance constraints, accumulated long-term memory, trigger payload, and conversation history -- is present in the ExecutionRequest.

Key behaviors per execution:

AttributeValue
TriggerSingle HTTP POST from Temporal workflow activity via TLO Gateway
Context sourceExecutionRequest.agent_config, user_context, data_source_metadata, tool_inventory, long_term_context, trigger_payload, conversation_history
System promptAssembled per-execution: agent instruction set, allowed tool schemas, data source metadata, governance constraints, accumulated memory
Tool set6 reasoning tools (local) + 10 execution tools (via TLO Gateway) + 1 interaction tool
Turn budgetConfigurable per agent via max_turns (default: 15); enforced by river-agent loop guard
Governance enforcementGovernance Checker validates every tool call before dispatch; blocked calls are returned as observations, not silently dropped
Output destinationExecutionResponse returned to Temporal activity, which persists results, emits WebSocket events, and triggers notifications
Cost trackingPer-turn token usage accumulated in ExecutionResponse.ai_metadata; budget enforcement via budget_policy in request

Internal Components

Agent Runtime Engine

The central orchestrator that manages the lifecycle of a single agent execution. It receives the ExecutionRequest from the API endpoint, delegates to the Context Builder to assemble the LLM prompt, runs the iterative Reason, Act, Observe loop, coordinates with the Governance Checker before each tool execution, enforces turn limits and token budgets, delegates to the Response Formatter to produce the final ExecutionResponse, and emits per-step telemetry events for the calling Temporal activity.

Context Builder

Transforms raw execution context into a structured LLM prompt. It receives agent_config, user_context, data_source_metadata, tool_inventory, governance_policies, long_term_context, trigger_context, and conversation_history. It builds the system prompt by assembling role definition, goal and instructions, available tool schemas in function-calling format, data source metadata, governance rules, accumulated memory, and trigger context. It optimizes the prompt for the selected LLM provider's context window and compresses conversation history when it exceeds token thresholds using deterministic summarization (no LLM call).

Multi-LLM Router (RiverCore)

Selects the optimal LLM model and provider for each reasoning turn. It classifies each turn's task complexity, selects the best available model from configured providers, manages provider health monitoring and auto-failover, tracks per-turn token usage, and supports dynamic provider configuration without code changes.

Tool Executor

Dispatches tool calls to the appropriate target. For reasoning tools, it executes locally within the service process. For execution tools, it constructs an HTTP request to TLO Gateway with proper headers (X-User-ID, X-Org-ID, X-Workspace-ID, X-Agent-ID, X-Internal-Call). For interaction tools, it returns an interaction payload for the Temporal workflow to forward via WebSocket. It validates tool arguments against Pydantic input schemas before dispatch and handles timeouts and retries for transient failures.

Response Formatter

Structures the final execution output into a well-defined Pydantic model. It assembles the ExecutionResponse from accumulated turn results, extracts query results, action results, visualization specs, and execution plans from tool outputs, builds the AI metadata block (total turns, models used, token counts, cost estimate), generates follow-up suggestions based on detected intent, and handles partial results when execution is interrupted.

Governance Checker

Evaluates governance policies inline during execution. Before each execution tool call, it checks whether the action is permitted, requires approval, or is blocked based on the agent's action_level, approval_rules, and bound governance policies. It returns structured governance decisions (PROCEED, APPROVAL_REQUIRED, SUGGEST_ONLY, BLOCKED) with reason.

Agentic Loop

Overview

The river-agent service implements the ReAct (Reason + Act) pattern. Unlike a plan-then-execute model, the agent reasons and acts iteratively -- each step's result informs the next step. The loop runs entirely inside the service process for a single execution, from initial context assembly to final structured output.

Step 1: Context Assembly

The Context Builder assembles the full LLM context from all inputs received in the ExecutionRequest:

InputSourcePurpose
agent_configBackend (agent + version tables)Goal, instructions, action level, model config, allowed tools, governance policies
user_contextTLO Gateway (extracted from JWT)user_id, org_id, workspace_id, roles, permissions (sanitized -- no raw JWT or credentials)
input_promptUser (for manual triggers)Natural language instruction or question
trigger_contextTrigger Ingestion ServiceTrigger type, source, payload (webhook data, metric values, cron context)
data_source_metadataBackend + Schema DiscoveryConnected data source IDs, names, types, table/column metadata
governance_policiesBackend (policies table)Bound policy conditions and enforcement rules
conversation_historyBackend (execution logs from previous runs)Long-term memory and accumulated learnings
tool_inventoryAgent version configFiltered list of tools the agent is allowed to use

System prompt sections built by the Context Builder:

SectionContent
Role definition"You are {agent.name}, a {agent.business_function} agent for {agent.domain}..."
Goal and instructionsThe agent's instruction_set -- natural language behavioral rules and objectives
Available toolsJSON schema definitions of allowed tools in function-calling format
Data contextSchema metadata for connected data sources (table names, column types, relationships)
Governance rules"Your action level is {level}. You must {constraints}. The following policies apply: {policies}"
Long-term memory"From previous runs, you have learned: {accumulated_learnings}"
Trigger context"This run was triggered by: {trigger_type} from {trigger_source}. Payload: {trigger_payload}"

Step 2: Reasoning

Each reasoning turn sends the assembled system prompt, conversation history, and tool declarations to the LLM via RiverCore. All LLM responses are parsed as either tool calls (function-calling format) or text content. Tool call arguments are validated against Pydantic input schemas before execution. The LLM never produces free-form text that requires regex parsing -- all structured data flows through Pydantic models.

Step 3: Action Selection

After the LLM responds, the Agent Runtime Engine inspects the response:

LLM Response TypeAction Taken
Tool call (reasoning)Execute locally, feed result back as observation, continue loop
Tool call (execution)Check governance, then route through TLO Gateway
Tool call (interaction)Return interaction payload to Temporal for WebSocket delivery, pause loop
Text content (final answer)Treat as final answer, proceed to finalization
Empty response (0 tokens)Treat as error, retry with model escalation

Step 4: Tool Execution

Tool call routing by category:

CategoryDestinationExamples
ReasoningLocal (within service process)classify_intent, generate_query, search_catalog
ExecutionTLO Gateway (ACL-validated HTTP)execute_query, write_back, create_data_source
InteractionTemporal workflow (WebSocket via TLO)ask_user

Per-tool timeout is 30 seconds (configurable). Transient failures are retried once with exponential backoff (100 ms, 200 ms). Permanent failures (403, 404, 422) are reported immediately as failed observations.

Step 5: Observation

The tool result (success or failure) is fed back into the LLM's conversation context:

[Tool Result] Tool '{tool_name}' returned: {json_result}

The LLM then reasons about the result and decides the next action: call another tool, produce a final answer, or request user input.

Step 6: Finalization

Terminal ConditionHandling
Final answerLLM produces text content with no tool call. Response Formatter builds ExecutionResponse with status: success.
InteractionLLM calls ask_user. Loop pauses. Temporal workflow handles WebSocket delivery and waits for user response.
Approval requiredGovernance Checker returns APPROVAL_REQUIRED. Loop pauses. Temporal workflow manages approval gate.
Turn limit reachedMax turns exceeded. Response Formatter builds ExecutionResponse with partial results and status: max_turns_exceeded.
Token budget exceededCumulative token usage exceeds budget. status: budget_exceeded.
Unrecoverable errorError after retry exhausted. status: failed with error details.

Full Loop Sequence

Multi-LLM Routing

Model Categories

RiverCore organizes all models into four capability tiers:

TierSelection CriteriaExample ModelsTypical Latency
FastIntent classification, catalog search, simple formatting, status retrievalGPT-4o-mini, Gemini Flash Lite, Claude Haiku 4.5, DeepSeek-V30.5-2s
BalancedGovernance checks, simple query generation, result explanation, data source managementGPT-4o, Gemini Flash, Claude Sonnet 4.6, DeepSeek-V31-4s
ReasoningComplex multi-table joins, federated queries, error recovery, ambiguous promptso3, Gemini Pro, Claude Opus 4.7, DeepSeek-R13-15s
CodingSQL/NoSQL generation, dialect-specific optimization, data transformationGPT-4o, Claude Sonnet 4.6, DeepSeek-V31-5s

Intent-to-Tier Mapping

After the initial classify_intent turn resolves the intent, subsequent turns use the intent-based tier:

IntentTierMax TurnsExpected Duration
PLATFORM_EXPLANATIONFast32-4s
DATA_DISCOVERYFast42-4s
DATA_QUERYCoding64-8s
DATA_SOURCE_MANAGEMENTBalanced55-10s
ACTION_EXECUTIONCoding65-12s
GOVERNANCE_MANAGEMENTBalanced44-8s
WORKSPACE_ADMINFast52-8s
STORAGE_MANAGEMENTFast32-5s
OBSERVABILITYBalanced44-8s

Model Selection

Per-turn priority order:

PriorityConditionTier Selected
1No intent classified yet (first turn)Fast
2Previous turn failed with errorReasoning (escalation for error recovery)
3Query mode activeCoding
4Intent classifiedIntent-based mapping (see table above)
5Default fallbackBalanced

Provider Configuration

SettingDescriptionExample
provider_nameProvider identifiergemini, openai, anthropic, deepseek
api_keyAuthentication credential(from secrets management)
base_urlAPI endpointhttps://generativelanguage.googleapis.com/v1beta
modelsMap of tier to model ID{"fast": "gemini-2.0-flash-lite", "balanced": "gemini-2.0-flash"}
priorityProvider selection priority (lower = preferred)1
max_retriesRetries per request2
timeout_secondsPer-request timeout30
rate_limit_rpmRequests per minute limit60
enabledWhether this provider is activetrue

RiverCore tracks per-provider latency (P50, P90, P99) and error rate over rolling 5-minute windows. If error rate exceeds 30% or P90 latency exceeds 20 seconds, the provider is marked as degraded and deprioritized in selection. If all providers in a tier are degraded, RiverCore selects the least-degraded option.

Token Budget Management

ThresholdBehavior
80% of budget consumedTurn limit is reduced to force finalization
100% of budget exceededExecution terminates with status: budget_exceeded
Default budget100,000 tokens per execution

API Endpoints

MethodEndpointDescription
POST/api/v1/executeRun a full agent execution from initial context
POST/api/v1/execute/continueResume an execution paused for interaction or approval
POST/api/v1/generate-agentGenerate agent configuration from a natural language description
GET/healthLiveness check
GET/health/readyReadiness check (verifies TLO Gateway and LLM provider reachability)

Input Contract

The complete request body for POST /api/v1/execute.

Top-Level Fields

FieldTypeRequiredDescription
execution_idintYesUnique identifier for this execution (from agent_executions table)
agent_configobjectYesAgent configuration snapshot; see below
user_contextobjectYesSanitized user identity and permissions
input_promptstringNoNatural language input (for manual triggers; null for automated triggers)
trigger_contextobjectYesTrigger type, source, and payload
data_source_metadataarrayYesConnected data sources with schema metadata
conversation_historyarrayYesLong-term memory from previous executions of this agent

agent_config Object

FieldTypeRequiredDescription
agent_iduuidYesThe agent's unique identifier
namestringYesHuman-readable agent name
business_functionenumYescustomer_support, sales, finance, risk_compliance, data_analyst, operations, executive, custom
domainstringYesBusiness domain tag
goalstringYesHigh-level objective (natural language)
instructionsstringYesDetailed behavioral instructions (the instruction_set)
system_promptstringNoOptional custom system prompt override
action_levelenumYesread_only, recommend, act_with_approval, automated
governance_levelenumYesstandard, strict, custom
model_configobjectYesModel selection parameters; see below
toolsarrayYesTool names this agent is allowed to invoke
governance_policiesarrayYesBound policy conditions and enforcement rules
approval_rulesobjectYesApproval gate configuration
notification_configjsonNoAlert channels and trigger conditions

model_config Object

FieldTypeRequiredDescription
preferred_tierenumNoOverride default tier selection
max_turnsintYesTurn limit for this execution (default: 15)
token_budgetintYesMaximum total tokens (default: 100,000)
timeout_secondsintYesPer-turn timeout (default: 30)

approval_rules Object

FieldTypeRequiredDescription
require_approval_forarrayYesTool names that require human approval
auto_approve_conditionsarrayNoPer-tool conditions for automatic approval
approver_rolesarrayYesRoles authorized to approve (e.g., ["admin", "editor"])
escalation_timeout_minutesintYesTime before approval request expires (default: 1440)

user_context Object

FieldTypeRequiredDescription
user_idintYesNumeric user ID (from JWT, sanitized)
org_idintYesOrganization ID (tenant isolation)
workspace_idintYesWorkspace scope
rolesarrayYesUser's roles (e.g., ["org_editor", "ws_analyst"])
permissionsarrayYesUser's permissions (e.g., ["data_source:view", "data_source:query"])
attributesjsonNoAdditional user attributes (e.g., {"assigned_region": "West"})

trigger_context Object

FieldTypeRequiredDescription
trigger_typeenumYesmanual, scheduled, event, api, threshold, workflow
trigger_sourcestringYesOrigin identifier (e.g., "cron:daily-8am", "webhook:zendesk")
trigger_payloadjsonNoData from the trigger event
triggered_byuuidNoUser ID (manual/API triggers) or system ID (automated)
triggered_attimestampYesTrigger timestamp (ISO 8601)

conversation_history Entry

Each entry in conversation_history represents long-term memory from a prior execution of this agent:

{
"execution_id": 1024,
"completed_at": "2026-04-22T09:14:00Z",
"summary": "Processed 42 overdue tickets. Escalated 3 to Tier 2.",
"learnings": [
"High-priority tickets from accounts over $10k should be escalated immediately.",
"Draft response template 'refund_policy_v2' works best for billing disputes."
],
"flagged_items": [
"Account ACCT-8821 has unusual refund request frequency."
]
}

Full ExecutionRequest Example

{
"execution_id": 9871,
"agent_config": {
"agent_id": "a7f3b2d4-1e5c-4f8a-9b6d-0c2e7f3a1d8b",
"name": "L1 Support Specialist",
"business_function": "customer_support",
"domain": "Customer Success",
"goal": "Resolve Tier 1 support tickets automatically using internal knowledge base and CRM data.",
"instructions": "You are a support agent. Check ticket priority first. For billing disputes, verify the charge in Stripe before responding. Always draft a response before sending.",
"action_level": "act_with_approval",
"governance_level": "standard",
"model_config": {
"max_turns": 15,
"token_budget": 100000,
"timeout_seconds": 30
},
"tools": [
"classify_intent", "check_governance", "generate_query",
"search_catalog", "recommend_visualization", "explain_results",
"execute_query", "write_back", "ask_user"
],
"governance_policies": [
{
"policy_id": "f9a2c4d6-8b1e-4f7a-a3c5-2d4e6f8b1a3c",
"name": "PII Export Limit",
"scope": "workspace",
"conditions": {"rows_gt": 10000, "table_classification": "pii"},
"enforcement": "require_approval"
}
],
"approval_rules": {
"require_approval_for": ["write_back"],
"auto_approve_conditions": [
{"tool": "write_back", "condition": "confidence_score > 0.95"}
],
"approver_roles": ["admin", "editor"],
"escalation_timeout_minutes": 1440
}
},
"user_context": {
"user_id": 4421,
"org_id": 12,
"workspace_id": 37,
"roles": ["org_editor", "ws_analyst"],
"permissions": ["data_source:view", "data_source:query", "agent:execute"]
},
"input_prompt": "Process all open high-priority tickets in Zendesk created in the last 24 hours.",
"trigger_context": {
"trigger_type": "manual",
"trigger_source": "ui",
"triggered_by": "4421",
"triggered_at": "2026-05-10T08:00:00Z"
},
"data_source_metadata": [
{
"data_source_id": 14,
"name": "Zendesk Production",
"type": "zendesk",
"status": "connected",
"access_level": "read_write",
"schemas": [
{
"table_name": "tickets",
"columns": [
{"column_name": "id", "data_type": "integer", "is_nullable": false, "description": "Ticket ID"},
{"column_name": "priority", "data_type": "string", "is_nullable": false, "description": "low, normal, high, urgent"},
{"column_name": "status", "data_type": "string", "is_nullable": false, "description": "open, pending, solved, closed"},
{"column_name": "created_at", "data_type": "timestamp", "is_nullable": false, "description": "Ticket creation timestamp"}
]
}
]
}
],
"conversation_history": [
{
"execution_id": 9840,
"completed_at": "2026-05-09T08:04:22Z",
"summary": "Processed 18 tickets. 14 resolved autonomously. 4 escalated.",
"learnings": ["Tickets with tag 'billing' require Stripe charge verification before responding."],
"flagged_items": []
}
]
}

Output Contract

The complete response schema for POST /api/v1/execute.

Top-Level Fields

FieldTypeAlways PresentDescription
execution_idintYesCorrelation ID matching the request
statusenumYessuccess, failed, awaiting_approval, awaiting_interaction, max_turns_exceeded, budget_exceeded, timeout
resultobjectYesExecution outcome (summary, actions taken, recommendations, artifacts)
stepsarrayYesComplete step-by-step trace of the execution
approval_requestobjectIf status: awaiting_approvalApproval payload for the Temporal workflow
interaction_requestobjectIf status: awaiting_interactionInteraction payload for WebSocket delivery
usageobjectYesToken usage, cost estimate, model breakdown
errorobjectIf status: failedError code, message, recoverability

result Object

FieldTypeDescription
summarystringNatural language summary of what the agent accomplished
actions_takenarrayTool calls executed with their results (tool_name, arguments, result_summary, status)
recommendationsarraySuggested follow-up actions
output_artifactsarrayGenerated reports, query results, or exported data references

steps Entry

FieldTypePresent WhenDescription
step_numberintAlwaysStep index
step_typeenumAlwaysreasoning, tool_call, observation, interaction, governance_check, error, final_answer
tool_namestringTool call stepsTool name dispatched
tool_categorystringTool call stepsreasoning, execution, or interaction
inputjsonTool call stepsTool arguments or LLM input summary
outputjsonTool call stepsTool result or LLM output summary
model_usedstringReasoning stepsLLM model ID (e.g., "claude-sonnet-4-6")
model_tierstringReasoning stepsfast, balanced, reasoning, coding
providerstringReasoning stepsLLM provider name
tokensobjectReasoning steps{"input": int, "output": int}
duration_msintAlwaysStep duration in milliseconds
statusstringAlwayscompleted, failed, blocked, pending
governance_decisionstringGovernance check stepsPROCEED, APPROVAL_REQUIRED, BLOCKED, SUGGEST_ONLY
errorstringError stepsError message

approval_request Object

Present only when status: awaiting_approval.

{
"tool_name": "write_back",
"proposed_payload": {
"data_source_id": 14,
"table_name": "tickets",
"operation": "update",
"data": {"status": "solved", "comment": "Resolved via automated policy lookup."},
"conditions": {"id": 98821}
},
"reasoning_summary": "Ticket #98821 matches the standard billing dispute resolution policy. The charge was verified in Stripe at $49.99, within the 30-day refund window.",
"risk_context": "Write operation on tickets table. Requires workspace editor approval per governance policy.",
"confidence_score": 0.94,
"auto_approve_eligible": false
}

interaction_request Object

Present only when status: awaiting_interaction.

{
"interaction_type": "clarification_request",
"message": "I found 3 high-priority tickets matching your criteria. Should I process all 3 autonomously, or would you like to review each response before I send it?",
"options": ["Process all autonomously", "Review each response first"],
"required": true
}

usage Object

FieldTypeDescription
total_turnsintTotal reasoning turns executed
total_tokensintTotal tokens consumed across all turns
cost_estimatefloatEstimated cost in USD
models_usedobjectPer-model breakdown: provider, tier, input_tokens, output_tokens, turns, estimated_cost
execution_duration_msintTotal execution time in milliseconds

error Object

Present only when status: failed.

FieldTypeDescription
codestringAGENT_ERROR, TIMEOUT, INVALID_TOOL, VALIDATION_ERROR, LLM_ERROR, GOVERNANCE_BLOCKED, TURN_LIMIT_EXCEEDED, BUDGET_EXCEEDED, PROVIDER_UNAVAILABLE
messagestringHuman-readable error message
recoverableboolWhether the error is transient and the execution can be retried
detailsjsonAdditional error context

Continuation Contract

Sent to POST /api/v1/execute/continue when the Temporal workflow resumes after an interaction or approval resolution.

FieldTypeRequiredDescription
execution_idintYesExecution to resume
continuation_typeenumYesinteraction_response, approval_resolved
interaction_responseobjectIf continuation_type: interaction_response{"user_response": json}
approval_resolutionobjectIf continuation_type: approval_resolvedSee below
serialized_statejsonYesFull AgentContext state serialized when the loop paused (conversation history, turn results, tool results, detected intent, governance evaluation, timing metadata)

approval_resolution Object

FieldTypeRequiredDescription
statusenumYesapproved, rejected, edited_approved
modified_argsjsonIf edited_approvedModified tool arguments
resolved_byuuidYesID of the approver
resolution_commentstringNoApprover comment

AI Generation Contract

POST /api/v1/generate-agent Request

Generates a complete agent configuration from a natural language description.

FieldTypeRequiredDescription
promptstringYesNatural language description of the desired agent
user_contextobjectYesuser_id, org_id, workspace_id, roles, permissions
available_data_sourcesarrayYesData sources the workspace has access to
available_toolsarrayYesAll tools the user's workspace can use
workspace_templatesarrayNoAvailable agent templates for reference
{
"prompt": "Create an agent that monitors Zendesk tickets and drafts responses for high-priority issues",
"user_context": {
"user_id": 4421,
"org_id": 12,
"workspace_id": 37,
"roles": ["org_editor"],
"permissions": ["agent:create", "data_source:view"]
},
"available_data_sources": [
{"data_source_id": 14, "name": "Zendesk Production", "type": "zendesk", "description": "Primary customer support desk"}
],
"available_tools": ["classify_intent", "execute_query", "write_back", "ask_user"],
"workspace_templates": []
}

POST /api/v1/generate-agent Response

FieldTypeDescription
agent_configobjectComplete suggested agent configuration (name, description, category, goal, instructions, action_level, suggested triggers, data sources, tools, policies)
reasoningstringFull chain-of-thought explaining the AI's configuration choices
confidence_scorefloat0.0 to 1.0 confidence in the generated configuration
warningsarrayConcerns about the requested configuration
{
"agent_config": {
"name": "Zendesk Tier 1 Support Agent",
"description": "Monitors incoming Zendesk tickets and drafts structured responses for high-priority issues using CRM and knowledge base data.",
"category": "customer_support",
"goal": "Reduce first-response time on high-priority Zendesk tickets by automatically drafting and routing responses.",
"instructions": "Check ticket priority on each run. For tickets with priority 'high' or 'urgent' created in the last 24 hours, query the CRM for account details and draft a response using the approved response templates. Always request approval before sending.",
"action_level": "act_with_approval",
"suggested_triggers": [
{
"trigger_type": "scheduled",
"trigger_config": {"cron": "*/15 * * * *"},
"reasoning": "15-minute polling matches typical SLA requirements for high-priority tickets."
}
],
"suggested_data_sources": [
{
"data_source_id": 14,
"name": "Zendesk Production",
"access_level": "read_write",
"reasoning": "Required to read ticket data and post responses."
}
],
"suggested_tools": [
{"tool_name": "execute_query", "reasoning": "Retrieve ticket and CRM data."},
{"tool_name": "write_back", "reasoning": "Post drafted responses to Zendesk."}
],
"suggested_policies": [
{
"name": "Require Approval Before Sending",
"conditions": {"tool": "write_back"},
"enforcement": "require_approval",
"reasoning": "Customer-facing messages should be reviewed before delivery."
}
]
},
"reasoning": "The user wants a reactive agent triggered on ticket creation. Act with Approval is appropriate because responses are customer-facing and carry reputational risk. A 15-minute schedule balances responsiveness with processing cost.",
"confidence_score": 0.91,
"warnings": [
"Ensure the Zendesk Production data source has write access enabled before deployment."
]
}

Tool Registry

The 17 tools available to River Agents divide into three categories. Reasoning tools execute locally within the river-agent process. Execution tools route through TLO Gateway for ACL validation. The interaction tool returns a payload to the Temporal workflow for WebSocket delivery.

Reasoning Tools

These tools execute inside the river-agent process and do not require TLO ACL validation.

classify_intent

PropertyValue
CategoryReasoning
PurposeClassify the natural language prompt into a structured intent type
TargetLocal (LLM call via RiverCore, Fast tier)
Timeout30s

Input:

FieldTypeRequiredDescription
user_promptstringYesThe user's natural language input
available_intentsarrayNoIntent types to consider (defaults to all active intents)

Output:

FieldTypeDescription
intent_typestringClassified intent (e.g., DATA_QUERY, DATA_SOURCE_MANAGEMENT)
confidencefloatClassification confidence (0.0 to 1.0)
sub_intentsarrayDecomposed sub-intents for compound prompts
entitiesjsonExtracted entities (table names, column names, dates, values)

check_governance

PropertyValue
CategoryReasoning
PurposeEvaluate RBAC, RLS, masking, budget, and action-level permissions
TargetLocal (policy evaluation)
Timeout30s

Input:

FieldTypeRequiredDescription
intent_typestringYesThe classified intent
data_source_idsarrayNoData sources to check access for
requested_operationstringYesOperation type (e.g., "query", "write", "delete")

Output:

FieldTypeDescription
allowedboolWhether the operation is permitted
reasonstringExplanation of the decision
constraintsjsonApplied constraints (RLS filters, masking rules, row limits)
warningsarrayNon-blocking warnings

generate_query

PropertyValue
CategoryReasoning
PurposeGenerate SQL/NoSQL query from natural language with governance filters applied
TargetLocal (LLM call via RiverCore, Coding tier)
Timeout30s

Input:

FieldTypeRequiredDescription
user_promptstringYesThe user's natural language query
intent_typestringYesClassified intent
data_sourcesarrayYesData source metadata with schemas
table_schemasarrayNoSpecific table schemas to use
governancejsonNoGovernance constraints to embed in the query

Output:

FieldTypeDescription
querystringThe generated SQL/NoSQL query
dialectstringQuery dialect (e.g., "postgresql", "mongodb")
explanationstringExplanation of the query logic
operationsarrayStructured operation plan (for multi-step queries)
plan_typestring"query", "mutation", or "federated"
data_source_idsarrayData sources referenced

search_catalog

PropertyValue
CategoryReasoning
PurposeFind relevant tables and columns via vector similarity search
TargetQdrant (via internal client)
Timeout30s

Input:

FieldTypeRequiredDescription
querystringYesNatural language search query
data_source_idsarrayNoLimit search to specific data sources
top_kintNoNumber of results to return (default: 10)

Output:

FieldTypeDescription
matchesarrayMatching catalog entries with similarity scores
total_resultsintTotal number of matches

recommend_visualization

PropertyValue
CategoryReasoning
PurposeSuggest charts and display formats for query results
TargetLocal (LLM call via RiverCore, Fast tier)
Timeout30s

Input:

FieldTypeRequiredDescription
columnsarrayYesResult column names
column_typesarrayYesColumn data types
row_countintYesNumber of result rows
user_promptstringNoOriginal query for context

Output:

FieldTypeDescription
recommended_typestringChart type (e.g., "bar", "line", "pie", "table")
x_axisstringSuggested X axis column
y_axisstringSuggested Y axis column
titlestringSuggested chart title
configjsonAdditional visualization configuration

explain_results

PropertyValue
CategoryReasoning
PurposeScore confidence and provide human-readable explanation of results
TargetLocal (LLM call via RiverCore, Balanced tier)
Timeout30s

Input:

FieldTypeRequiredDescription
user_promptstringYesOriginal user query
query_result_columnsarrayYesColumn names from the result
query_result_samplearrayYesSample rows from the result
generated_querystringNoThe SQL/NoSQL query that produced the result

Output:

FieldTypeDescription
explanationstringNatural language explanation of the results
confidencefloatConfidence score (0.0 to 1.0)
suggestionsarrayFollow-up query suggestions
data_quality_notesarrayData quality observations

Execution Tools

These tools route through TLO Gateway for ACL validation. The river-agent service constructs the HTTP request with proper auth headers and receives the downstream service response.

create_data_source

PropertyValue
CategoryExecution
Required Permissiondata_source:create
Target ServiceBackend :8005
Target EndpointPOST /api/v1/data-sources
Timeout30s

Input:

FieldTypeRequiredDescription
namestringYesData source display name
typestringYesConnector type (e.g., "postgresql", "mongodb")
connection_configjsonYesConnection parameters (host, port, database)
credentialsjsonYesAuthentication credentials
descriptionstringNoHuman-readable description

Output:

FieldTypeDescription
successboolWhether the operation succeeded
data_source_idintID of the created data source
messagestringStatus message

update_data_source

PropertyValue
CategoryExecution
Required Permissiondata_source:update
Target ServiceBackend :8005
Target EndpointPATCH /api/v1/data-sources/\{id\}
Timeout30s

Input:

FieldTypeRequiredDescription
data_source_idintYesID of the data source to update
updatesjsonYesFields to update

Output: {"success": bool, "message": string}

delete_data_source

PropertyValue
CategoryExecution
Required Permissiondata_source:delete
Target ServiceBackend :8005
Target EndpointDELETE /api/v1/data-sources/\{id\}
Timeout30s

Input:

FieldTypeRequiredDescription
data_source_idintYesID of the data source to delete

Output: {"success": bool, "message": string}

test_connection

PropertyValue
CategoryExecution
Required Permissiondata_source:view
Target ServiceBackend :8005
Target EndpointPOST /api/v1/data-sources/\{id\}/test
Timeout30s

Input:

FieldTypeRequiredDescription
data_source_idintYesID of the data source to test

Output:

FieldTypeDescription
successboolWhether the connection succeeded
latency_msintConnection latency
messagestringStatus message

discover_schema

PropertyValue
CategoryExecution
Required Permissiondata_source:view
Target ServiceBackend :8005
Target EndpointPOST /api/v1/data-sources/\{id\}/discover
Timeout30s

Input:

FieldTypeRequiredDescription
data_source_idintYesID of the data source

Output:

FieldTypeDescription
successboolWhether discovery succeeded
tablesarrayDiscovered tables with column metadata
total_tablesintTotal number of tables discovered

execute_query

PropertyValue
CategoryExecution
Required Permissiondata_source:query
Target ServiceData Orchestration :8002
Target EndpointPOST /api/v1/query/execute
Timeout30s

Input:

FieldTypeRequiredDescription
data_source_idintYesTarget data source
querystringYesSQL/NoSQL query to execute
max_rowsintNoMaximum rows to return (default: 1000)
parametersjsonNoQuery parameters for parameterized queries

Output:

FieldTypeDescription
columnsarrayColumn names
rowsarrayResult rows
total_rowsintTotal rows returned
execution_time_msintQuery execution time

apply_governance_policy

PropertyValue
CategoryExecution
Required Permissionpolicy:create
Target ServiceBackend :8005
Target EndpointPOST /api/v1/policies
Timeout30s

Input:

FieldTypeRequiredDescription
namestringYesPolicy name
descriptionstringNoPolicy description
scopestringYes"organization" or "workspace"
conditionsjsonYesPolicy condition expressions
enforcementstringYes"block", "require_approval", "warn", or "log_only"

Output:

FieldTypeDescription
successboolWhether the policy was created
policy_iduuidID of the created policy
messagestringStatus message

write_back

PropertyValue
CategoryExecution
Required Permissiondata_source:update + confirmation
Target ServiceData Orchestration :8002
Target EndpointPOST /api/v1/data/write-back
Timeout30s

Input:

FieldTypeRequiredDescription
data_source_idintYesTarget data source
table_namestringYesTarget table
operationstringYes"insert", "update", or "delete"
datajsonYesData to write
conditionsjsonNoWHERE conditions (for update/delete)

Output:

FieldTypeDescription
successboolWhether the write succeeded
rows_affectedintNumber of rows modified
messagestringStatus message

get_workspace_info

PropertyValue
CategoryExecution
Required Permissionworkspace:view
Target ServiceBackend :8005
Target EndpointGET /api/v1/workspaces/\{id\}
Timeout30s

Input:

FieldTypeRequiredDescription
workspace_idintNoWorkspace ID (defaults to current workspace from user context)

Output:

FieldTypeDescription
namestringWorkspace name
descriptionstringWorkspace description
statusstringWorkspace status
total_membersintNumber of members
settingsjsonWorkspace configuration

get_storage_info

PropertyValue
CategoryExecution
Required Permissionstorage:view
Target ServiceStorage Service :8003
Target EndpointGET /api/v1/storage/usage
Timeout30s

Input: Uses workspace context from user_context. No additional input fields.

Output:

FieldTypeDescription
total_size_bytesintTotal storage used
quota_bytesintStorage quota
usage_percentagefloatUsage as percentage
file_countintNumber of stored files
remaining_bytesintAvailable storage

Interaction Tool

ask_user

PropertyValue
CategoryInteraction
Required PermissionNone
TargetWebSocket via TLO Gateway (delivered by Temporal workflow)
TimeoutN/A (waits for user response)

Input:

FieldTypeRequiredDescription
interaction_typestringYes"clarification_request", "confirmation_request", "parameter_request", "credential_request"
messagestringYesThe question or prompt for the user
optionsarrayNoSelectable options (for multiple-choice)
requiredboolNoWhether a response is mandatory (default: true)

Behavior: ask_user is intercepted by the Tool Executor before dispatch. Calling it causes the service to return status: awaiting_interaction immediately, with the full interaction payload embedded in the output. The Temporal workflow handles WebSocket delivery and waits for user response before re-invoking via POST /api/v1/execute/continue.

Error Handling and Recovery

Error Categories

Tool Error Taxonomy

Error TypeRetryableriver-agent Response
Tool timeout (exceeds 30s)Yes (max 2 retries)Observation with status: timeout
Transient failure (503, network)Yes (max 2 retries, exponential backoff)Observation with error details
ACL denial (403 from TLO Gateway)Nostatus: failed, code: GOVERNANCE_BLOCKED
Not found (404)NoObservation with error message
Validation error (422)NoObservation with field-level error details
LLM provider timeoutYes (auto-failover to next provider)Retry via RiverCore fallback chain
All providers unavailableNostatus: failed, code: PROVIDER_UNAVAILABLE
Prompt injection detected in trigger payloadNo (sanitize and continue)Log security event; inject sanitized payload; continue execution

Loop Guard Rails

ConditionBehavior
Single tool errorFeed error observation to LLM; LLM decides next action
2+ consecutive errors (same tool type)Escalate model to Reasoning tier for error recovery
3+ consecutive errors (interaction tools)Allow up to 3 retries -- LLM often needs multiple replanning attempts after guardrail rejections
Same tool called 3+ times with identical argumentsInject system notice: "You are in a loop. Stop calling this tool and proceed to the next step or provide a final answer."
No tool calls for 3 consecutive turnsInject: "You appear to be repeating yourself. Please take action or conclude."
Empty LLM response (0 completion tokens)Treat as error; retry with model escalation to Reasoning tier

Execution-Level Errors

ConditionBehavior
Turn limit reachedForce finalization with partial results; status: max_turns_exceeded
Token budget exceededForce finalization with partial results; status: budget_exceeded
Execution timeoutTerminate execution; status: timeout
Governance policy violationBlock action, log violation, feed "action blocked" observation to LLM
Approval timeoutExecution remains awaiting_approval until Temporal workflow handles expiration

Governance Integration

Pre-Tool-Call Governance Checks

Before every execution tool call, the Governance Checker evaluates in order:

Action-Level Decision Matrix

Agent Action LevelRead ToolWrite Tool (in approval_rules)Write Tool (not in approval_rules)
read_onlyPROCEEDBLOCKEDBLOCKED
recommendSUGGEST_ONLYSUGGEST_ONLYSUGGEST_ONLY
act_with_approvalPROCEEDAPPROVAL_REQUIREDPROCEED
automatedPROCEEDPROCEEDPROCEED

Policy Types Evaluated

Policy TypeEvaluation PointExample ConditionEnforcement Options
Budget PolicyBefore LLM call and before tool executioncost.tokens > 100000warn, block
Rate Limit PolicyBefore tool executionagent.tool_calls_this_hour > 100block, log_only
Data Export LimitAfter query execution, before returning resultsrows > 10000 AND table.classification == 'pii'block, require_approval
Time Window PolicyBefore write tool executiontime.hour >= 18 OR time.hour < 6block, require_approval
Tool Restriction PolicyBefore tool dispatchtool.name IN ['delete_data_source', 'write_back']block
Content PolicyAfter LLM generates tool argumentscontains_pii(args.message) == truerequire_approval, block

Approval Gate Flow

When the Governance Checker returns APPROVAL_REQUIRED:

  1. The river-agent service constructs an approval_request payload with tool_name, proposed_payload, reasoning_summary, risk_context, and confidence_score.
  2. The service returns ExecutionResponse with status: awaiting_approval and the approval_request embedded.
  3. The Temporal workflow serializes the full AgentContext into the context_snapshot field of the ApprovalRequest record created by the backend.
  4. The Temporal river_agent_execution_workflow hibernates via workflow.wait_condition(), consuming no resources while waiting.
  5. On resolution (approved/rejected/edited), the backend sends a approval_resolution Temporal signal.
  6. The workflow resumes and re-invokes river-agent via POST /api/v1/execute/continue with the serialized state and resolution outcome.
  7. river-agent restores context, feeds the resolution as an observation, and continues the loop.

Governance Token Lifetime

The governance_token is issued by the Governance Service when the backend initiates the Temporal workflow. It is included in every turn invocation by the Temporal activity. If the token expires during a long-running execution, the backend re-issues a fresh token before the next invocation. The river-agent service never requests its own governance token.

Observability and Telemetry

OpenTelemetry Spans

SpanAttributesNotes
river_agent.executionexecution_id, agent_id, action_levelRoot span per POST /api/v1/execute
river_agent.turnexecution_id, turn_number, model_used, model_tierChild span per reasoning turn
river_agent.reasoningprovider, tokens_in, tokens_out, latency_msChild span of turn
river_agent.governance_checktool_name, decision, policy_matchedChild span per governance evaluation
river_agent.tool_dispatchtool_name, category, status, latency_msChild span per tool call
river_agent.tool_retrytool_name, attempt, error_codeChild span if retry occurs

Prometheus Metrics

MetricTypeLabels
river_agent_executions_totalCounterstatus, action_level, agent_id
river_agent_turns_totalCounterstatus, model_tier, agent_id
river_agent_tool_calls_totalCountertool_name, category, status
river_agent_tool_latency_msHistogramtool_name, category
river_agent_llm_latency_msHistogramtier, provider
river_agent_tokens_consumed_totalCountertier, agent_id, provider
river_agent_governance_decisions_totalCounterdecision, agent_id, policy_type
river_agent_cost_estimate_usd_totalCounteragent_id, provider

Structured Log Format

Each turn emits a structured JSON log at INFO level, forwarded to the backend and written to agent_logs:

{
"level": "INFO",
"service": "river-agent",
"execution_id": 9871,
"turn_number": 2,
"model_used": "claude-sonnet-4-6",
"model_tier": "balanced",
"provider": "anthropic",
"tool_called": "execute_query",
"tool_category": "execution",
"tool_status": "success",
"governance_decision": "PROCEED",
"tokens_in": 2140,
"tokens_out": 145,
"latency_ms": 1380,
"timestamp": "2026-05-10T08:04:22.421Z"
}

Deployment Configuration

Environment Variables

VariableRequiredDefaultDescription
TLO_GATEWAY_URLYes--TLO Gateway URL for all tool dispatch and health readiness checks
RIVERCORE_CONFIG_PATHYes--Path to the provider configuration file (YAML)
MAX_EXECUTION_CONCURRENCYNo10Maximum concurrent agentic loops per replica
DEFAULT_MAX_TURNSNo15Default turn limit when not specified in agent_config
DEFAULT_TOKEN_BUDGETNo100000Default token budget per execution
DEFAULT_LLM_TIMEOUT_SECONDSNo30LLM inference timeout when not specified in agent_config
DEFAULT_TOOL_TIMEOUT_SECONDSNo30Per-tool call timeout
PROMPT_INJECTION_DETECTION_ENABLEDNotrueEnable trigger payload injection screening
LOG_LEVELNoINFOStructured log verbosity
OTEL_EXPORTER_OTLP_ENDPOINTNo--OpenTelemetry collector endpoint
METRICS_PORTNo9090Prometheus metrics scrape port

Resource Requirements

ResourceMinimumRecommended
CPU2 vCPU4 vCPU
Memory2 GB4 GB
Replicas2 (HA minimum)Auto-scale 2-10 based on river_agent_executions_total rate

Health Check Endpoints

EndpointStatus CodesPurpose
GET /health200Liveness check -- returns 200 OK if the process is running
GET /health/ready200, 503Readiness check -- returns 200 OK only when TLO Gateway is reachable and at least one LLM provider is available

mTLS Configuration

Communication between TLO Gateway (:8001) and river-agent (:8007) uses mutual TLS. TLO Gateway presents a client certificate issued by the internal CA. river-agent validates the certificate before accepting any request. This ensures only TLO Gateway can call river-agent, even within the internal network, providing an additional layer beyond network-level IP allowlisting.

Scaling Behavior

river-agent is fully stateless. Any replica handles any request. Horizontal scaling is safe at any replica count. Because the agentic loop runs entirely within a single invocation (from ExecutionRequest to ExecutionResponse), load balancing across replicas does not require session affinity. The Temporal activity that calls river-agent handles retry logic at the workflow level -- if a replica fails mid-execution, the Temporal activity retries the HTTP call, and a new replica picks up the full ExecutionRequest from the beginning of the current loop segment.

  • TLO Gateway -- Entry point that authenticates requests, enforces ACL, and routes execution tool calls from river-agent to downstream services
  • Backend API -- Owns all agent lifecycle state, logs, approvals, and triggers that river-agent reads and writes through tool calls
  • Governance and Safety -- Defines the policy conditions, action levels, and audit framework that the Governance Checker enforces inline during execution