# Orchestrator System The Orchestrator coordinates multiple AI coding assistants through predefined, iterative workflows. It is a fully self-contained Python package at `orchestrator/`. ## Table of Contents - [Architecture](#architecture) - [Execution Flow](#execution-flow) - [Subpackages](#subpackages) - [Workflows](#workflows) - [Task Lifecycle](#task-lifecycle) - [Fallback Routing](#fallback-routing) - [Circuit Breaker](#circuit-breaker) - [Web UI API](#web-ui-api) - [CLI Commands](#cli-commands) - [Context System](#context-system) - [MCP Integration (Optional)](#mcp-integration-optional) ## Architecture ```mermaid graph TD subgraph "orchestrator/" CLI[cli/shell.py
Interactive REPL] UI[ui/app.py
Flask+SocketIO :5001] CORE[core/engine.py
Orchestrator] WF[core/workflow.py
WorkflowEngine] TM[core/task_manager.py
TaskManager] subgraph "Adapters" CLAUDE[ClaudeAdapter] CODEX[CodexAdapter] GEMINI[GeminiAdapter] COPILOT[CopilotAdapter] OLLAMA[OllamaAdapter] LLAMA[LlamaCppAdapter] end subgraph "Resilience" FB[FallbackManager] CB[CircuitBreaker] OD[OfflineDetector] end subgraph "Observability" MET[Prometheus Metrics] HEALTH[Health Checks] LOG[Structured Logging] end subgraph "Infrastructure" CACHE[InMemoryCache] ASYNC[AsyncExecutor] CFG[ConfigManager] end CLI --> CORE UI --> CORE CORE --> WF CORE --> TM WF --> FB FB --> CLAUDE & CODEX & GEMINI & COPILOT & OLLAMA & LLAMA CORE --> MET & LOG end ``` ## Execution Flow ```mermaid sequenceDiagram participant User participant Engine as Orchestrator participant WF as WorkflowEngine participant FB as FallbackManager participant Agent as Adapter User->>Engine: execute_task(task, workflow, max_iterations) Engine->>Engine: Load config, build WorkflowSteps loop For each iteration (up to max_iterations) loop For each step in workflow Engine->>WF: Execute step WF->>FB: execute_with_fallback(agent, task, context) alt Primary succeeds FB->>Agent: execute_task(prompt, context) Agent-->>FB: AgentResponse(success=True) FB-->>WF: (agent_used, response, None) else Primary fails (transient error) FB->>Agent: Fallback agent.execute_task() Agent-->>FB: AgentResponse FB-->>WF: (fallback_agent, response, primary_agent) end WF-->>Engine: Update context (output → next step input) end Engine->>Engine: _should_stop_iteration? Note over Engine: Stop if all steps pass
and review has ≤3 suggestions end Engine-->>User: {success, final_output, iterations} ``` ## Subpackages | Package | Purpose | Key Classes | |---------|---------|-------------| | `adapters/` | AI agent integration layer | `BaseAdapter`, `ClaudeAdapter`, `CodexAdapter`, `GeminiAdapter`, `CopilotAdapter`, `OllamaAdapter`, `LlamaCppAdapter`, `CLICommunicator` | | `core/` | Engine and workflow management | `Orchestrator`, `WorkflowEngine`, `WorkflowStep`, `PlannerAgent`, `TaskManager`, `OrchestratorError` | | `resilience/` | Fault tolerance | `FallbackManager`, `CircuitBreaker`, `CircuitState`, `OfflineDetector`, `RateLimiter` | | `observability/` | Monitoring and logging | `MetricsCollector`, `HealthChecker`, `configure_logging`, `get_logger` | | `security_module/` | Input validation and security | `InputValidator`, `TokenBucketRateLimiter`, `SecretManager`, `AuditLogger` | | `context/` | Graph-based context memory | `MemoryManager`, `GraphStore`, `ProjectScanner`, `BM25Index`, `HybridSearch` | | `infra/` | Caching and config | `InMemoryCache`, `FileCache`, `AsyncExecutor`, `ConfigManager` | | `cli/` | Interactive shell | `InteractiveShell`, `ConversationHistory` | | `config/` | YAML configuration | `agents.yaml` | | `ui/` | Web backend | Flask + Socket.IO app | ### Subpackage Dependency Graph The diagram below shows the internal dependency relationships between orchestrator subpackages. Arrows point from the dependent to the dependency. ```mermaid graph TD CORE[core/] --> ADAPTERS[adapters/] CORE --> RESILIENCE[resilience/] CORE --> INFRA[infra/] CORE --> OBSERVABILITY[observability/] CLI[cli/] --> CORE UI[ui/] --> CORE UI --> OBSERVABILITY RESILIENCE --> ADAPTERS CORE --> SECURITY[security_module/] CORE --> CONTEXT[context/] CONFIG[config/] -.->|read by| CORE CONFIG -.->|read by| ADAPTERS ``` ## Workflows ### Dynamic Planner Agent The Orchestrator features a **Dynamic Planner Agent** (`orchestrator/core/planner.py`) that acts as an intelligent router and dynamic workflow generator. When a task is executed using the `dynamic` workflow (or if no matching static YAML workflow is found), the Planner Agent: 1. **Reads Observability Metrics:** It accesses Prometheus metrics (`orchestrator_agent_calls_total`) to determine the real-time success and failure rates of all available agents. 2. **Evaluates Routing Policy:** Any agent with a success rate below `0.6` is deprioritized, removing it from the pool of candidates. 3. **Generates a Plan:** It uses a healthy LLM adapter (e.g., Claude, Gemini, Codex, or local-instruct) to break the task down into sequential steps (e.g., `implement`, `review`, `refine`) and assign the best available agents to each step. This ensures the system automatically adapts to API outages or degraded local backend performance without requiring manual YAML edits. ### Static YAML Workflows You can also explicitly define static workflows in `agents.yaml`: ```mermaid graph LR subgraph "default" D1[codex
implement] --> D2[gemini
review] --> D3[claude
refine] end subgraph "quick" Q1[codex
implement] end subgraph "thorough" T1[codex] --> T2[copilot] --> T3[gemini] --> T4[claude] end subgraph "review-only" R1[gemini
review] --> R2[claude
refine] end subgraph "offline-default" O1[local-code] --> O2[local-instruct] end ``` | Workflow | Agents | Use Case | |----------|--------|----------| | `default` | codex → gemini → claude | Standard implement → review → refine | | `quick` | codex | Fast single-agent | | `thorough` | codex → copilot → gemini → claude | Multi-agent deep | | `review-only` | gemini → claude | Review existing code | | `document` | claude → gemini | Documentation | | `offline-default` | local-code → local-instruct | Fully offline | | `hybrid` | local-code → claude (fallback: local) | Mixed cloud/local | ## Task Lifecycle The `TaskManager` in `orchestrator/core/task_manager.py` tracks individual tasks through a well-defined state machine. Each task transitions through these states: ```mermaid stateDiagram-v2 [*] --> PENDING: create_task() PENDING --> IN_PROGRESS: task.start(agent) IN_PROGRESS --> COMPLETED: task.complete(result) IN_PROGRESS --> FAILED: task.fail(error) PENDING --> CANCELLED: cancel() COMPLETED --> [*]: cleanup_stale() / clear_completed() FAILED --> [*]: cleanup_stale() CANCELLED --> [*] PENDING: Queued, waiting for agent IN_PROGRESS: Assigned to agent, executing COMPLETED: Finished with result FAILED: Finished with error CANCELLED: Aborted before start ``` Key `TaskManager` operations: | Method | Description | |--------|-------------| | `create_task(description, metadata)` | Create a new task in PENDING state | | `get_task(task_id)` | Retrieve a task by its ID | | `get_tasks_by_status(status)` | Filter tasks by their current status | | `get_statistics()` | Aggregate counts and average duration | | `cleanup_stale(max_age_seconds)` | Remove old COMPLETED/FAILED tasks | | `clear_all()` | Remove all tasks and reset counter | ## Fallback Routing ```mermaid flowchart TD A[Execute Primary Agent] --> B{Success?} B -->|Yes| C[Return Result] B -->|No| D{Transient Error?} D -->|ConnectionError
TimeoutError
HTTP 5xx| E{Fallback Configured?} D -->|Logic Error
Syntax Error| C2[Return Primary Error] E -->|Yes| F[Execute Fallback Agent] E -->|No| C2 F --> G{Fallback Success?} G -->|Yes| H[Return Fallback Result] G -->|No| I[Return Both Errors] ``` ## Circuit Breaker ```mermaid stateDiagram-v2 [*] --> CLOSED CLOSED --> OPEN: failure_count >= threshold OPEN --> HALF_OPEN: recovery_timeout elapsed HALF_OPEN --> CLOSED: success HALF_OPEN --> OPEN: failure CLOSED: Normal operation OPEN: All calls rejected HALF_OPEN: Allow one test call ``` ## Web UI API | Method | Path | Description | |--------|------|-------------| | GET | `/health` | Liveness probe | | GET | `/ready` | Readiness probe | | GET | `/api/agents` | List agents | | GET | `/api/workflows` | List workflows | | POST | `/api/execute` | Start task | | GET | `/api/status` | Session status | | GET | `/api/config` | Get config | | PUT | `/api/config` | Update config | | GET | `/api/models/status` | Local backend/model readiness summary | | GET | `/metrics` | Prometheus | ## CLI Commands ### Task Execution ```bash # Run a task with the default workflow (codex -> gemini -> claude) ./ai-orchestrator run "Implement a REST API for user management" --workflow default # Run with a specific workflow and iteration limit ./ai-orchestrator run "Add pagination to the users endpoint" --workflow thorough --max-iterations 5 # Force offline mode (only local agents) ./ai-orchestrator run "Write unit tests for auth module" --offline # Quick single-agent execution ./ai-orchestrator run "Fix the null pointer in parser.py" --workflow quick ``` ### Interactive Shell (REPL) ```bash # Start the interactive REPL session ./ai-orchestrator shell ``` Inside the shell, you can submit tasks conversationally. The shell maintains context between prompts, so follow-up tasks inherit prior output. Local model note in REPL: local adapters (Ollama/llama.cpp) return text output and are best used for offline drafting, review, and fallback. Direct file edits come from CLI-backed agents. ### Inspection and Validation ```bash # List all configured and available agents with their roles and types ./ai-orchestrator agents # List all configured workflows and their step sequences ./ai-orchestrator workflows # Validate configuration (checks YAML syntax, agent availability, workflow references) ./ai-orchestrator validate ``` ### Local Model Management ```bash # Check status of all local model endpoints (Ollama, llama.cpp) ./ai-orchestrator models status # List models available on local endpoints ./ai-orchestrator models list # Pull a model into the local Ollama cache ./ai-orchestrator models pull codellama:13b # Remove a model from the local Ollama cache ./ai-orchestrator models remove codellama:13b ``` ### Local Model Integration and Limits Local models are first-class workflow agents for offline/hybrid execution, but they use completion APIs (not workspace-editing CLIs). | Adapter family | Transport | Direct file edits | |----------|----------|----------| | CLI adapters (`codex`, `claude`, `gemini`, `copilot`) | Local CLI process + workspace execution | Yes (tool-dependent) | | Local adapters (`ollama`, `llamacpp`, `localai`, `openai-compatible`) | HTTP completion endpoints | No (text output only) | Best use for local models: - offline drafts and review responses, - cloud-to-local fallback continuity, - role-specific guidance in hybrid workflows. > [!IMPORTANT] > While it is possible to make local LLMs directly edit files (e.g., via a `file-editor` tool), this approach is currently disabled to prevent unintended destructive changes. Local adapters are advisory — they provide text output that the Orchestrator can use to inform the next steps, but they do not have direct write access to the workspace. This design choice prioritizes safety and predictability while still leveraging local models for their strengths in drafting and feedback. The hard part is not feasibility, it’s safety and reliability: permissions, diff constraints, validation/tests before write, rollback, and preventing bad edits. ### Single-Agent Testing ```bash # Test a single agent directly (bypasses workflow engine) ./ai-orchestrator test-agent codex "Write a hello world in Python" ./ai-orchestrator test-agent local-code "Explain quicksort" ``` ## Context System The orchestrator maintains a graph-based context database at `~/.ai-orchestrator/context.db` for persistent memory and cross-session learning. ### Features | Feature | Description | |---------|-------------| | **Hybrid Search** | BM25 keyword + semantic embedding + Reciprocal Rank Fusion | | **10 Node Types** | Conversation, Task, Mistake, Pattern, Decision, CodeSnippet, Preference, File, Concept, Project | | **12 Edge Types** | RELATED_TO, CAUSED_BY, FIXED_BY, SIMILAR_TO, DEPENDS_ON, and more | | **Project Scanning** | Automatic codebase analysis detecting languages, frameworks, and structure | | **Multi-Project** | Isolated per-project graphs with deterministic SHA-256 project IDs | | **Atomic Operations** | UPSERT nodes (edge-preserving), single-transaction bulk delete | ### Project-Scoped Operation Configure via `PROJECT_PATH` environment variable or `settings.project_path` in `orchestrator/config/agents.yaml`. On startup, the engine: 1. Resolves the project path from env var or config 2. Generates a deterministic `project_id` (SHA-256 prefix) 3. Scans the directory with `ProjectScanner` if not already registered 4. Tags all subsequent task outputs with the `project_id` 5. Includes project context in agent prompts via `get_relevant_context()` ### Key APIs ```python from orchestrator.context import MemoryManager manager = MemoryManager() # Register and scan a project pid = manager.register_project("/path/to/project") # Store task results scoped to the project manager.store_task("Implement auth", "JWT auth added", success=True, project_id=pid) # Get project-scoped context context = manager.get_project_context(pid, task="Add user roles") # Search with hybrid BM25 + semantic results = manager.search("authentication patterns", limit=10) ``` ### Obsidian Vault Export Export the orchestrator's context graph as an [Obsidian](https://obsidian.md) vault for interactive exploration. Each node (task, decision, pattern, mistake, conversation) becomes a markdown note with YAML frontmatter and `[[wikilinks]]` to related nodes. ```python from orchestrator.context.ops.export import ContextExporter from orchestrator.context.graph_store import GraphStore store = GraphStore("~/.ai-orchestrator/context.db") exporter = ContextExporter(store) # Export full graph result = exporter.export_obsidian("./orchestrator-vault") # → { notes_written: 142, edges_linked: 387, folders: [...] } # Export only decisions and patterns result = exporter.export_obsidian("./vault", node_types=["decision", "pattern"]) ``` Open the vault in Obsidian and press **Ctrl/Cmd + G** to visualize task dependencies, decision chains, and learned patterns as an interactive color-coded graph. ```mermaid graph LR subgraph "Orchestrator Context → Obsidian" DB[(context.db)] --> EXP[ContextExporter] EXP --> VAULT[Obsidian Vault] VAULT --> IDX[_Index.md
Map of Content] VAULT --> TASKS[Tasks/] VAULT --> DECS[Decisions/] VAULT --> PATS[Patterns/] VAULT --> MIST[Mistakes/] VAULT --> CONVS[Conversations/] VAULT --> OBS[.obsidian/
graph.json] end style VAULT fill:#7C3AED,color:#fff style OBS fill:#4FC3F7,color:#000 style IDX fill:#FFC107,color:#000 ``` **Generated vault structure:** | Folder | Contents | Color in Graph View | |--------|----------|-------------------| | `Tasks/` | Completed tasks with outcomes | 🟢 Green | | `Decisions/` | Architectural decisions with rationale | 🟣 Purple | | `Patterns/` | Reusable code patterns | 🟠 Orange | | `Mistakes/` | Errors with corrections and prevention | 🔴 Red | | `Conversations/` | Past chat sessions | 🔵 Light Blue | | `Code Snippets/` | Useful code fragments | ⚫ Grey | | `Concepts/` | Domain knowledge | 🟡 Lime | | `Projects/` | Registered project roots | 🟡 Amber | ## MCP Integration (Optional) The orchestrator is optionally accessible via MCP (Model Context Protocol) through the shared MCP server at `mcp_server/`. The MCP server is not required to use the orchestrator; it provides an additional integration surface for LLM clients that support the MCP protocol. ### MCP Architecture ```mermaid sequenceDiagram participant Client as MCP Client participant Server as MCP Server participant Orch as Orchestrator Client->>Server: call_tool("orchestrator_execute", {task, workflow}) Server->>Orch: execute_task(task, workflow, max_iterations) Orch->>Orch: Run workflow steps Orch-->>Server: Result dict Server-->>Client: JSON response ``` ### Orchestrator MCP Tools | Tool | Read-Only | Description | |------|-----------|-------------| | `orchestrator_execute` | No | Run a task through a named workflow | | `orchestrator_list_agents` | Yes | List available agents with roles and types | | `orchestrator_list_workflows` | Yes | List workflows with their step sequences | | `orchestrator_health` | Yes | Health check: agent count, offline mode, timestamps | ### Starting the MCP Server ```bash # stdio transport (default, used by Claude Desktop and similar clients) python -m mcp_server.server # HTTP transport on port 8000 python -m mcp_server.server --transport http --port 8000 # FastMCP Inspector for debugging fastmcp dev mcp_server/server.py:mcp ``` ### Example Tool Calls **Execute a task through the default workflow:** ```json { "method": "tools/call", "params": { "name": "orchestrator_execute", "arguments": { "task": "Implement a paginated REST endpoint for /api/users", "workflow": "default", "max_iterations": 3 } } } ``` Response: ```json { "success": true, "workflow": "default", "iterations": 2, "steps": [ " [OK] codex (implement)", " [OK] gemini (review)", " [OK] claude (refine)" ], "final_output": "..." } ``` **List available agents:** ```json { "method": "tools/call", "params": { "name": "orchestrator_list_agents", "arguments": {} } } ``` Response: ```json { "agents": [ {"name": "codex", "role": "implementation", "type": "cli", "available": true}, {"name": "gemini", "role": "review", "type": "cli", "available": true}, {"name": "claude", "role": "refinement", "type": "cli", "available": true} ], "count": 3 } ``` **Health check:** ```json { "method": "tools/call", "params": { "name": "orchestrator_health", "arguments": {} } } ``` Response: ```json { "status": "healthy", "agents": 3, "agent_names": ["codex", "gemini", "claude"], "workflows": ["default", "quick", "thorough", "review-only", "document", "offline-default", "hybrid"], "offline_mode": false, "timestamp": "2026-04-02T12:00:00+00:00" } ``` ### Python Client ```python from orchestrator.mcp_client import OrchestratorMCPClient # In-memory client (same process, no network) client = OrchestratorMCPClient() # Remote client (connects to HTTP MCP server) # client = OrchestratorMCPClient("http://localhost:8000/mcp") result = await client.execute_task("Build a REST API", workflow="default") ``` ## Code Quality | Metric | Value | |--------|-------| | **Pylint Score** | 10.00/10 (zero warnings) | | **Tests** | 386 passing (pytest) | | **Pre-commit Hooks** | 15 hooks passing (Black, isort, flake8, pylint, MyPy, etc.) | | **Formatting** | Black (120-char line length) | | **Logging** | Lazy `%s` formatting throughout; no stray `print()` in production code | | **I/O** | Explicit UTF-8 encoding on all file operations | ## Configuration See [`orchestrator/config/agents.yaml`](orchestrator/config/agents.yaml) and [`docs/configuration-guide.md`](docs/configuration-guide.md). ## Detailed Documentation - [Architecture Deep-Dive](docs/orchestrator-architecture.md) - [API Reference](docs/orchestrator-api-reference.md) - [Configuration Guide](docs/configuration-guide.md) - [Adding Agents](docs/adding-agents.md) - [MCP Server](mcp_server/server.py)