Skip to Content

Fleet Management

TL;DR

CMDOP fleet management lets you orchestrate AI-driven operations across multiple machines simultaneously. It supports parallel health checks, rolling and canary deployments with automatic rollback, real-time fleet dashboards, parallel command execution, cross-server log aggregation, configuration synchronization, and automated remediation of common issues like high disk or memory usage.

Use AI to manage multiple machines simultaneously.

How do I run AI operations across multiple machines?

from cmdop import AsyncCMDOPClient from pydantic import BaseModel import asyncio # Schema for per-server health report class ServerStatus(BaseModel): hostname: str healthy: bool cpu_percent: float memory_percent: float issues: list[str] async with AsyncCMDOPClient.remote(api_key="cmd_xxx") as client: servers = ["web-1", "web-2", "web-3", "db-1"] async def check_server(hostname: str) -> ServerStatus: # Switch the terminal session to the target machine await client.terminal.set_machine(hostname) # AI checks health and returns structured status result = await client.agent.run( prompt="Check server health and identify any issues", output_schema=ServerStatus ) return result.output # Run all health checks in parallel using asyncio.gather statuses = await asyncio.gather(*[check_server(s) for s in servers]) # Print a summary for each server for status in statuses: icon = "βœ“" if status.healthy else "βœ—" print(f"{icon} {status.hostname}: CPU {status.cpu_percent}%") for issue in status.issues: print(f" ⚠ {issue}")

How do I perform a rolling deployment across servers?

# Schema for individual server deployment outcome class DeployResult(BaseModel): hostname: str success: bool version: str duration_seconds: float error: str | None async def rolling_deploy(servers: list[str], version: str): results = [] # Deploy to each server one at a time (sequentially) for server in servers: print(f"Deploying to {server}...") await client.terminal.set_machine(server) # AI runs the full deploy pipeline on this server result = await client.agent.run( prompt=f""" Deploy version {version}: 1. Pull latest code 2. Install dependencies 3. Run migrations 4. Restart service 5. Health check If health check fails, rollback immediately. """, output_schema=DeployResult ) results.append(result.output) # Stop the entire rollout if any server fails if not result.output.success: print(f"❌ Failed on {server}: {result.output.error}") print("Stopping deployment") break else: print(f"βœ“ {server} deployed in {result.output.duration_seconds}s") return results results = await rolling_deploy( ["web-1", "web-2", "web-3"], version="2.1.0" )

How do I set up a canary deployment?

async def canary_deploy( canary_server: str, production_servers: list[str], version: str, canary_duration: int = 300 # 5 minutes monitoring window ): # Phase 1: Deploy to the single canary server first print(f"Deploying to canary: {canary_server}") await client.terminal.set_machine(canary_server) canary_result = await client.agent.run( prompt=f"Deploy version {version} and verify", output_schema=DeployResult ) if not canary_result.output.success: return {"success": False, "stage": "canary", "error": canary_result.output.error} # Phase 2: Monitor canary health at 30-second intervals print(f"Monitoring canary for {canary_duration}s...") class CanaryHealth(BaseModel): error_rate: float latency_p99: float healthy: bool for _ in range(canary_duration // 30): # Check every 30s await asyncio.sleep(30) health = await client.agent.run( prompt="Check error rate and latency", output_schema=CanaryHealth ) # Rollback canary if error rate exceeds 1% threshold if not health.output.healthy or health.output.error_rate > 0.01: await client.agent.run("Rollback to previous version") return {"success": False, "stage": "canary_monitoring"} # Phase 3: Canary passed -- deploy to all production servers print("Canary healthy, deploying to production...") for server in production_servers: await client.terminal.set_machine(server) result = await client.agent.run( prompt=f"Deploy version {version}", output_schema=DeployResult ) if not result.output.success: # Rollback everything if any production server fails return {"success": False, "stage": "production", "server": server} return {"success": True}

How do I build a fleet health dashboard?

# Per-machine health metrics class MachineHealth(BaseModel): hostname: str status: str # healthy, warning, critical cpu_percent: float memory_percent: float disk_percent: float uptime_hours: float services_up: int services_down: int # Aggregated fleet-level summary class FleetHealth(BaseModel): total_machines: int healthy: int warning: int critical: int machines: list[MachineHealth] async def fleet_health_check(servers: list[str]) -> FleetHealth: async def check_one(hostname: str) -> MachineHealth: await client.terminal.set_machine(hostname) # AI gathers system metrics and service statuses result = await client.agent.run( "Check system health, services status", output_schema=MachineHealth ) return result.output # Check all machines in parallel for speed machines = await asyncio.gather(*[check_one(s) for s in servers]) # Build the aggregate summary from individual results return FleetHealth( total_machines=len(./machines), healthy=len([m for m in machines if m.status == "healthy"]), warning=len([m for m in machines if m.status == "warning"]), critical=len([m for m in machines if m.status == "critical"]), machines=machines ) # Continuous monitoring loop that refreshes every 60 seconds async def monitor_fleet(): servers = ["web-1", "web-2", "web-3", "db-1", "cache-1"] while True: health = await fleet_health_check(servers) print(f"\n=== Fleet Status ===") print(f"βœ“ Healthy: {health.healthy}") print(f"⚠ Warning: {health.warning}") print(f"βœ— Critical: {health.critical}") # Only print details for machines that need attention for machine in health.machines: if machine.status != "healthy": print(f"\n{machine.hostname} ({machine.status}):") print(f" CPU: {machine.cpu_percent}%") print(f" Memory: {machine.memory_percent}%") print(f" Services down: {machine.services_down}") await asyncio.sleep(60)

How do I execute commands on all servers in parallel?

async def run_on_all(servers: list[str], command: str): """Run same command on all servers.""" # Schema for the output of a single command execution class CommandResult(BaseModel): hostname: str exit_code: int output: str async def run_one(hostname: str) -> CommandResult: await client.terminal.set_machine(hostname) # AI executes the given command and captures the result result = await client.agent.run( f"Run: {command}", output_schema=CommandResult ) return result.output # Fire all commands in parallel using asyncio.gather return await asyncio.gather(*[run_one(s) for s in servers]) # Example: update packages on all web servers at once results = await run_on_all( ["web-1", "web-2", "web-3"], "apt update && apt upgrade -y" )

How do I aggregate logs from multiple servers?

# Single log entry with its originating server class LogEntry(BaseModel): timestamp: str level: str message: str source: str # Aggregated log analysis across the fleet class AggregatedLogs(BaseModel): total_errors: int servers_with_errors: list[str] unique_errors: list[str] entries: list[LogEntry] async def aggregate_logs(servers: list[str], pattern: str = "ERROR"): all_entries = [] for server in servers: await client.terminal.set_machine(server) class ServerLogs(BaseModel): entries: list[LogEntry] # AI searches logs on this server for the given pattern result = await client.agent.run( f"Find log entries matching '{pattern}' from last hour", output_schema=ServerLogs ) # Tag each entry with the server it came from for entry in result.output.entries: entry.source = server all_entries.append(entry) # Deduplicate error messages and identify affected servers unique_messages = set(e.message for e in all_entries) servers_with_errors = set(e.source for e in all_entries) return AggregatedLogs( total_errors=len(all_entries), servers_with_errors=list(servers_with_errors), unique_errors=list(unique_messages)[:10], entries=sorted(all_entries, key=lambda e: e.timestamp, reverse=True)[:100] )

How do I synchronize configuration across servers?

async def sync_config(servers: list[str], config_content: str, config_path: str): """Ensure all servers have same config.""" # Tracks what action was taken on each server (unchanged, updated, created) class SyncResult(BaseModel): hostname: str action: str # unchanged, updated, created previous_hash: str | None current_hash: str results = [] for server in servers: await client.terminal.set_machine(server) # AI compares the current file hash and writes the new content if different result = await client.agent.run( f""" Sync config file at {config_path}: 1. Check if file exists and get its hash 2. If different or missing, write new content 3. Report what action was taken New content: {config_content} """, output_schema=SyncResult ) results.append(result.output) return results

How do I set up automated remediation for common issues?

async def auto_remediate(): """Automatically fix common issues.""" # Describes a single detected issue and whether it can be auto-fixed class Issue(BaseModel): type: str severity: str description: str auto_fixable: bool # Per-server remediation outcome class RemediationResult(BaseModel): hostname: str issues_found: list[Issue] issues_fixed: list[str] manual_intervention_needed: list[str] servers = ["web-1", "web-2", "web-3"] results = [] for server in servers: await client.terminal.set_machine(server) # AI checks for known issue types and auto-fixes what it can result = await client.agent.run( """ Check for common issues and auto-fix if possible: 1. High disk usage (>90%) β†’ clean old logs 2. High memory usage β†’ restart leaky service 3. Failed services β†’ restart them 4. Expired certificates β†’ alert (don't fix) Report what was found and fixed. """, output_schema=RemediationResult ) results.append(result.output) # Escalate issues that require human intervention for issue in result.output.manual_intervention_needed: alert_team(server, issue) return results

What are the best practices for fleet management?

1. Use Parallel Execution for Independent Operations

# Good: parallel health checks -- all servers checked simultaneously await asyncio.gather(*[check(s) for s in servers]) # Bad: sequential when not needed -- each waits for the previous for s in servers: await check(s)

2. Implement Circuit Breakers

# Stop the rollout after too many consecutive failures failures = 0 max_failures = 3 for server in servers: if failures >= max_failures: print("Too many failures, stopping") break result = await deploy(server) if not result.success: failures += 1

3. Log Everything

import logging # Use a dedicated logger for fleet operations for easy filtering logger = logging.getLogger("fleet") async def deploy_with_logging(server: str): logger.info(f"Starting deploy on {server}") result = await deploy(server) logger.info(f"Deploy on {server}: {result.success}") return result

4. Use Dry Run First

# Step 1: Plan in dry-run mode (no side effects) plan = await client.agent.run( "Plan deployment", restrictions={"dry_run": True} ) # Step 2: Review the planned actions print(plan.output.planned_actions) # Step 3: Execute with dry_run disabled after review result = await client.agent.run( "Execute deployment", restrictions={"dry_run": False} )

Next

Last updated on