Real-Time Training Monitoring
Page Outline
Model Studio provides WebSocket streaming for real-time monitoring of training jobs, enabling developers to build responsive UIs that display live progress updates, performance metrics, and training logs as they are generated.
Perfect for building interactive dashboards, Jupyter notebook integrations, or CLI tools that need instant feedback during long-running training operations.
Architecture
WebSocket Endpoint
Connection Details
Endpoint: WS /api/model-studio/ws/train-from-parquet
Protocol: WebSocket (RFC 6455)
Encoding: JSON messages
Max Message Size: 100 MB (for Parquet file upload)
WebSocket connections require sticky sessions or connection affinity. Ensure your load balancer (Nginx, HAProxy, ALB) is configured to route all messages from a single client to the same backend server.
Message Protocol
Client → Server: Start Training
The client initiates training by sending a JSON message containing the Parquet file data (Base64-encoded) and training parameters.
- Message Schema
- JavaScript Example
- Python Example
{
"action": "start_training",
"file_data": "<base64-encoded-parquet-file>",
"ml_task": "regression",
"target_variable": "cpu_utilization",
"model_type": "auto",
"task_name": "Agent Resource Prediction",
"test_size": 0.2
}
Field Descriptions:
| Field | Type | Required | Description |
|---|---|---|---|
action | string | Yes | Must be "start_training" |
file_data | string | Yes | Base64-encoded Parquet file content |
ml_task | string | Yes | ML task: classification, regression, clustering, time_series |
target_variable | string | Conditional | Target column name (required for supervised learning) |
model_type | string | No | Model type or "auto" for AutoML selection (default: "auto") |
task_name | string | No | Optional descriptive name for the training task |
test_size | float | No | Test set proportion (0.0-1.0, default: 0.2) |
const ws = new WebSocket('ws://localhost:8000/api/model-studio/ws/train-from-parquet');
ws.onopen = () => {
const fileInput = document.getElementById('parquetFile');
const file = fileInput.files[0];
const reader = new FileReader();
reader.onload = (e) => {
const arrayBuffer = e.target.result;
const base64 = btoa(
new Uint8Array(arrayBuffer)
.reduce((data, byte) => data + String.fromCharCode(byte), '')
);
ws.send(JSON.stringify({
action: "start_training",
file_data: base64,
ml_task: "regression",
target_variable: "cpu_utilization",
model_type: "auto",
task_name: "Agent Resource Prediction",
test_size: 0.2
}));
};
reader.readAsArrayBuffer(file);
};
import asyncio
import websockets
import json
import base64
async def train_model():
uri = "ws://localhost:8000/api/model-studio/ws/train-from-parquet"
# Read and encode Parquet file
with open("agent_telemetry.parquet", "rb") as f:
file_data = base64.b64encode(f.read()).decode('utf-8')
async with websockets.connect(uri) as websocket:
# Send training request
await websocket.send(json.dumps({
"action": "start_training",
"file_data": file_data,
"ml_task": "regression",
"target_variable": "cpu_utilization",
"model_type": "auto",
"test_size": 0.2
}))
# Receive progress updates
async for message in websocket:
update = json.loads(message)
if update["type"] == "progress":
print(f"[{update['phase']}] {update['progress']:.1f}% - {update['message']}")
elif update["type"] == "completed":
print(f"✅ Training completed! Model ID: {update['model_id']}")
print(f" Test Accuracy: {update['metrics']['test_accuracy']:.3f}")
break
elif update["type"] == "error":
print(f"❌ Error: {update['message']}")
break
asyncio.run(train_model())
Server → Client: Progress Updates
The server sends real-time progress updates as the training pipeline executes.
Progress Message Format
{
"type": "progress",
"phase": "training",
"progress": 65.5,
"message": "Training XGBoost model (iteration 45/100)",
"metrics": {
"current_algorithm": "xgboost",
"train_accuracy": 0.89,
"validation_accuracy": 0.86,
"iteration": 45,
"total_iterations": 100
},
"logs": [
"Iteration 45: train_acc=0.89, val_acc=0.86",
"Learning rate: 0.01"
],
"timestamp": "2026-01-30T18:15:30Z"
}
Field Descriptions:
| Field | Type | Description |
|---|---|---|
type | string | Message type: "progress", "completed", "error" |
phase | string | Current training phase (see below) |
progress | float | Overall progress percentage (0.0-100.0) |
message | string | Human-readable status message |
metrics | object | Current performance metrics (varies by phase) |
logs | array | Recent log messages from the training engine |
timestamp | string | ISO 8601 timestamp |
Training Phases
The training pipeline progresses through the following phases:
| Phase | Progress Range | Description |
|---|---|---|
data_loading | 0% - 10% | Loading and validating Parquet file |
data_analysis | 10% - 20% | Analyzing data characteristics (missing values, distributions) |
preprocessing | 20% - 35% | Imputing missing values, encoding categoricals, scaling |
algorithm_selection | 35% - 45% | Testing multiple algorithms to find the best fit |
training | 45% - 85% | Training the selected model with hyperparameter tuning |
evaluation | 85% - 95% | Evaluating on test set and generating metrics |
completed | 100% | Training finished successfully |
Server → Client: Completion Message
When training completes successfully, the server sends a final message with the trained model details.
{
"type": "completed",
"phase": "completed",
"progress": 100.0,
"message": "Model training completed successfully",
"metrics": {
"algorithm": "xgboost",
"train_accuracy": 0.92,
"test_accuracy": 0.88,
"train_r2": 0.90,
"test_r2": 0.85,
"total_time_seconds": 245.5
},
"logs": [
"Model saved: model-20260130-a1b2c3d4",
"Model file: ./models/model-20260130-a1b2c3d4/model.pkl",
"Preprocessing file: ./models/model-20260130-a1b2c3d4/preprocessing.pkl",
"Total training time: 245.50 seconds"
],
"model_id": "model-20260130-a1b2c3d4",
"model_file_path": "./models/model-20260130-a1b2c3d4/model.pkl",
"preprocessing_file_path": "./models/model-20260130-a1b2c3d4/preprocessing.pkl",
"timestamp": "2026-01-30T18:19:45Z"
}
After receiving the completed message, use the model_id to make predictions via the Test Prediction API.
Server → Client: Error Message
If training fails, the server sends an error message and closes the connection.
{
"type": "error",
"message": "Training failed: Target variable 'cpu_utilization' not found in data. Available columns: ['timestamp', 'agent_id', 'memory_usage', 'network_latency']",
"timestamp": "2026-01-30T18:16:00Z"
}
Complete Implementation Examples
React Component with Progress Bar
- React Component
- CLI Tool (Python)
import React, { useState, useRef } from 'react';
function ModelTrainer() {
const [progress, setProgress] = useState(0);
const [phase, setPhase] = useState('');
const [logs, setLogs] = useState([]);
const [modelId, setModelId] = useState(null);
const wsRef = useRef(null);
const startTraining = (file) => {
const ws = new WebSocket('ws://localhost:8000/api/model-studio/ws/train-from-parquet');
wsRef.current = ws;
ws.onopen = () => {
const reader = new FileReader();
reader.onload = (e) => {
const base64 = btoa(
new Uint8Array(e.target.result)
.reduce((data, byte) => data + String.fromCharCode(byte), '')
);
ws.send(JSON.stringify({
action: "start_training",
file_data: base64,
ml_task: "regression",
target_variable: "cpu_utilization",
model_type: "auto",
test_size: 0.2
}));
};
reader.readAsArrayBuffer(file);
};
ws.onmessage = (event) => {
const update = JSON.parse(event.data);
if (update.type === "progress") {
setProgress(update.progress);
setPhase(update.phase);
setLogs(prev => [...prev, ...update.logs]);
} else if (update.type === "completed") {
setProgress(100);
setPhase("completed");
setModelId(update.model_id);
setLogs(prev => [...prev, `✅ Training completed! Model ID: ${update.model_id}`]);
} else if (update.type === "error") {
setLogs(prev => [...prev, `❌ Error: ${update.message}`]);
}
};
ws.onerror = (error) => {
console.error('WebSocket error:', error);
setLogs(prev => [...prev, '❌ Connection error']);
};
};
return (
<div>
<input
type="file"
accept=".parquet"
onChange={(e) => startTraining(e.target.files[0])}
/>
<div className="progress-bar">
<div className="progress-fill" style={{ width: `${progress}%` }}>
{progress.toFixed(1)}%
</div>
</div>
<p>Phase: {phase}</p>
<div className="logs">
{logs.map((log, i) => <div key={i}>{log}</div>)}
</div>
{modelId && (
<button onClick={() => window.location.href = `/models/${modelId}`}>
View Model Details
</button>
)}
</div>
);
}
#!/usr/bin/env python3
import asyncio
import websockets
import json
import base64
import sys
from rich.console import Console
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn
console = Console()
async def train_with_progress(parquet_file: str, ml_task: str, target: str):
uri = "ws://localhost:8000/api/model-studio/ws/train-from-parquet"
# Read and encode file
with open(parquet_file, "rb") as f:
file_data = base64.b64encode(f.read()).decode('utf-8')
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
) as progress:
task = progress.add_task("[cyan]Training model...", total=100)
async with websockets.connect(uri) as websocket:
# Send training request
await websocket.send(json.dumps({
"action": "start_training",
"file_data": file_data,
"ml_task": ml_task,
"target_variable": target,
"model_type": "auto",
"test_size": 0.2
}))
# Process updates
async for message in websocket:
update = json.loads(message)
if update["type"] == "progress":
progress.update(
task,
completed=update["progress"],
description=f"[cyan]{update['phase']}: {update['message']}"
)
for log in update.get("logs", []):
console.log(log)
elif update["type"] == "completed":
progress.update(task, completed=100)
console.print(f"\n[green]✅ Training completed![/green]")
console.print(f"Model ID: [bold]{update['model_id']}[/bold]")
console.print(f"Test Accuracy: {update['metrics']['test_accuracy']:.3f}")
break
elif update["type"] == "error":
console.print(f"\n[red]❌ Error: {update['message']}[/red]")
sys.exit(1)
if __name__ == "__main__":
asyncio.run(train_with_progress(
parquet_file="agent_telemetry.parquet",
ml_task="regression",
target="cpu_utilization"
))
Connection Management
Timeouts & Keepalive
- Connection Timeout: 60 seconds (if no training request is sent)
- Training Timeout: Configurable via task intent
timeout_seconds(default: 24 hours) - Ping Interval: Server sends ping frames every 30 seconds
- Pong Timeout: Connection closed if client doesn't respond within 10 seconds
Reconnection Strategy
WebSocket connections are not resumable. If the connection drops during training:
- The training job continues running on the server
- Use the Training Details API to check progress
- Poll the status endpoint until
status: "completed"
For production deployments, implement exponential backoff reconnection logic and fallback to HTTP polling if WebSocket connections are unstable.
Performance Considerations
Message Frequency
- Data Loading: 1 update per second
- Preprocessing: 1 update every 2 seconds
- Training: 1 update per iteration (varies by algorithm)
- Evaluation: 1 update at completion
Bandwidth Usage
| Phase | Avg Message Size | Messages/Min | Bandwidth |
|---|---|---|---|
| Data Loading | 500 bytes | 60 | ~30 KB/min |
| Training | 1 KB | 30 | ~30 KB/min |
| Completion | 2 KB | 1 | 2 KB |
Total for typical job: < 1 MB for a 10-minute training session
Troubleshooting
Common Issues
1. Connection Refused
Symptom: WebSocket connection failed
Solution: Verify Model Studio is running and WebSocket endpoint is enabled
2. File Too Large
Symptom: Error: Message size exceeds limit
Solution: Use the Model Creation API (POST /api/v1/models) for files > 100 MB
3. No Progress Updates
Symptom: Connection established but no messages received
Solution: Check that action: "start_training" is included in the initial message
4. Premature Disconnection
Symptom: Connection closes before training completes
Solution: Ensure load balancer supports WebSocket and has sticky sessions enabled
Related Documentation
- Model Studio API Endpoints - HTTP alternatives
- Model Studio Overview - Architecture overview
- Compute Resources - Resource allocation