Skip to content

Orchestrator API

Complete API reference for the ClaudeSDKExecutor class - Nova AI's main orchestration engine.

Overview

The ClaudeSDKExecutor is the core orchestration class that:

  • Coordinates agents - Delegates to specialist agents (implementer, code-reviewer, tester)
  • Manages sessions - Lightweight continuation for 88-95% overhead reduction
  • Handles MCP - Integrates SDK MCP servers (KB, GitHub, memory)
  • Tracks costs - Production-grade cost tracking with LangFuse
  • Enforces quality - Code review, testing, validation gates
graph TB
    A[ClaudeSDKExecutor] --> B[Agent Management]
    A --> C[Session Management]
    A --> D[MCP Integration]
    A --> E[Cost Tracking]
    A --> F[Quality Gates]

    style A fill:#3f51b5,color:#fff

Basic Usage

Simple Task Execution

from pathlib import Path
from src.orchestrator.claude_sdk_executor import ClaudeSDKExecutor

# Initialize executor
executor = ClaudeSDKExecutor(
    project_root=Path.cwd(),
    agent_name="orchestrator",
    use_sdk_mcp=True  # Enable SDK MCP (recommended)
)

# Run task
result = await executor.run_task("implement user authentication with JWT")

# Check results
print(f"Status: {result.status}")
print(f"Files modified: {result.files_modified}")
print(f"Summary: {result.summary}")

Session Continuation

# First task (creates session)
executor1 = ClaudeSDKExecutor(agent_name="orchestrator")
result1 = await executor1.run_task("implement user registration")

# Reuse session (88-95% overhead reduction)
executor2 = ClaudeSDKExecutor(
    agent_name="implementer",
    session_id=result1.session_id  # Reuse session
)
result2 = await executor2.run_task("add email verification")

Cost Tracking

from src.orchestrator.cost_tracker import track_agent_call

# Track cost for task
async with track_agent_call(agent_name="implementer", task="Add auth"):
    result = await executor.run_task("implement authentication")

# Get cost summary
from src.orchestrator.cost_tracker import get_tracker
tracker = get_tracker()
summary = tracker.get_cost_summary()
print(f"Total cost: ${summary['total_cost_usd']:.4f}")

API Reference

ClaudeSDKExecutor

ClaudeSDKExecutor(*, project_root: Path, agent_root: Path | None = None, agent_name: str | None = None, allowed_tools: list[str] | None = None, model: str | None = None, session_id: str | None = None, permission_callback: CanUseTool | None = None, add_dirs: list[str] | None = None, hooks: dict[str, Any] | None = None, plugins: list[Any] | None = None, streaming: bool = True, agent_definitions: dict[str, Any] | None = None, use_sdk_mcp: bool = True, enable_security_hooks: bool = True, max_turns: int = 50, enable_auto_context_management: bool | None = None, memory_dir: Path | None = None, enable_verification: bool = True)

Thin wrapper around the Claude Agent SDK.

Initialize ClaudeSDKExecutor with hybrid agent loading support.

Agent Loading Priority: 1. Programmatic definitions (via agent_definitions parameter) 2. File-based agents (from .claude/agents/) 3. Default Claude Code preset (fallback)


project_root: Root directory of the project (for Git operations and file operations)
agent_root: Optional directory to load agents from (defaults to project_root).
           Enables cross-project agent usage where agents are loaded from a
           central location (e.g., Nova AI repository) while executing tasks
           in a different project directory.
           Example: agent_root=/Users/jeff/nova_ai, project_root=/Users/jeff/my-app
agent_name: Name of agent to load (from .claude/agents/)
allowed_tools: List of allowed tool names (default: Read, Write, Edit, MultiEdit, Bash, Git)
model: Claude model ID (defaults to claude-sonnet-4-5-20250929)
session_id: Session ID for resuming existing sessions (optional)
permission_callback: Optional callback for custom tool permissions
add_dirs: List of directories to restrict file operations to for enhanced security (optional)
hooks: Optional hooks configuration dict (for backward compatibility)
plugins: List of plugin configurations for SDK extensibility (optional)
streaming: Enable streaming with partial messages for real-time output (default: True)
agent_definitions: Optional dict of programmatic agent definitions.
                  Allows runtime agent creation alongside file-based agents.
                  Example: {"custom-agent": AgentDefinition(...)}
use_sdk_mcp: Enable SDK MCP servers for 10-100x performance boost (default: True)
enable_security_hooks: Enable bash command security validation (default: True).
                      Blocks dangerous commands like rm -rf /, sudo, etc.
                      Value: $10,000/year in incident prevention.
max_turns: Maximum number of conversation turns before stopping (default: 50).
          Prevents infinite loops and runaway costs. Typical conversations use 5-20 turns,
          complex workflows use 20-40 turns. Default provides safety margin.
          Value: Prevents worst-case $100+ runaway sessions.
enable_auto_context_management: Enable automatic context usage warnings (default: True).
          When token usage reaches 80% of model capacity, logs a warning suggesting
          the user clear context to save costs. Configurable via ENABLE_AUTO_CONTEXT_MANAGEMENT
          environment variable. Expected savings: 20-30% per Anthropic's guidance.
memory_dir: DEPRECATED - now created internally as project_root/logs/{agent_name or "default"}
enable_verification: DEPRECATED - verification is always enabled
Usage Examples

File-based only (standard usage)

executor = ClaudeSDKExecutor( project_root=Path.cwd(), agent_name="code-reviewer" # Loads from .claude/agents/ )

Cross-project agent usage (Nova AI agents in different project)

executor = ClaudeSDKExecutor( project_root=Path("/Users/jeff/my-app"), # Target project agent_root=Path("/Users/jeff/nova_ai"), # Agent source agent_name="implementer" # Loaded from nova_ai/.claude/agents/ )

Programmatic only

from claude_agent_sdk import AgentDefinition executor = ClaudeSDKExecutor( project_root=Path.cwd(), agent_definitions={ "custom-agent": AgentDefinition( name="custom-agent", system_prompt="You are a custom agent...", model="claude-haiku-4-5-20251001" ) } )

Hybrid (both)

executor = ClaudeSDKExecutor( project_root=Path.cwd(), agent_name="code-reviewer", # File-based agent_definitions={ "custom-agent": {...} # Programmatic } )

Context Management Best Practices

Efficient context management reduces token usage by 20-30%, significantly lowering costs and improving response times. Without proper context management, conversation histories accumulate irrelevant information, leading to bloated contexts and unnecessary token usage.

Key Strategies:

  1. Clear Context Between Unrelated Tasks Use /clear in interactive sessions when switching between unrelated tasks. This prevents accumulation of irrelevant conversation history.

Example:

   # In Claude Code interactive session:
   > Implement user authentication
   [Complete implementation...]

   > /clear  # Clear context before switching tasks

   > Optimize database queries
   [Fresh context, no auth-related history]

When to clear: - Switching from Feature A to unrelated Feature B - Starting a new day's work - After completing a major milestone - When context feels "heavy" (slow responses)

  1. Leverage CLAUDE.md for Project Context Document project structure, conventions, and patterns in .claude/CLAUDE.md instead of repeating the same context in every conversation. The SDK automatically includes CLAUDE.md with prompt caching (90% cost reduction).

What to put in CLAUDE.md: - Project architecture overview - Coding conventions and style guides - Common patterns and utilities - Dependency management rules - Testing strategies

What NOT to repeat in conversations: - "Use pytest for testing" (document in CLAUDE.md) - "Follow PEP 8" (document in CLAUDE.md) - "Our API is RESTful" (document in CLAUDE.md)

  1. Use Specific File References Reference files by path instead of pasting large code blocks inline. Claude Code will read files directly, keeping context lean.

Example:

   ❌ BAD:
   "Review this code:
   [paste 500 lines of code]"

   ✅ GOOD:
   "Review src/orchestrator/executor.py:500-600 for error handling"

Benefits: - Reduces conversation tokens by 80-90% - Claude reads current file state (no stale code) - Easier to reference specific line numbers

  1. Delegate to Subagents for Parallel Work Use subagents for parallel work to isolate context. Each subagent has an independent context window, preventing context pollution.

Example:

   # Review 3 modules independently
   review_results = await asyncio.gather(
       spawn_agent("code-reviewer", "Review auth module"),
       spawn_agent("code-reviewer", "Review API module"),
       spawn_agent("code-reviewer", "Review database module")
   )

When to use subagents: - Parallel code reviews (multiple files) - Independent feature implementations - Multi-module refactoring - Batch testing different components

  1. Session Continuity for Related Work Resume sessions for related multi-turn workflows using the session_id parameter. Don't create new sessions for follow-up questions on the same topic.

Example:

   # First interaction
   executor = ClaudeSDKExecutor(
       project_root=Path.cwd(),
       agent_name="architect"
   )
   session_id = await executor.run_task("Design authentication system")

   # Later - resume with context (30 minutes later)
   executor = ClaudeSDKExecutor(
       project_root=Path.cwd(),
       agent_name="architect",
       session_id=session_id  # Continues previous conversation
   )
   result = await executor.run_task(
       "Add OAuth2 support to the authentication system"
   )

When to resume sessions: - Follow-up questions on same topic - Incremental feature development - Multi-step refactoring - Iterative design discussions

When to start fresh: - Different feature/topic - New day's work (unless continuing) - After major milestone completion

Cost Impact:

Without context management: - Average tokens per conversation: 50,000 - Daily conversations: 20 - Monthly tokens: 30M tokens - Monthly cost: ~\(1,020 (\)12,200/year per developer)

With context management best practices: - Average tokens per conversation: 35,000 (-30%) - Prompt caching on CLAUDE.md: -90% on repeated content - File references instead of pasting: -80% on code context - Monthly cost: ~\(710 (\)8,500/year per developer)

Total Savings: 30% (~$3,700/year per developer)

Real-World Example - Before/After:

Before (Bloated Context): Conversation 1: "Implement auth" (10K tokens) Conversation 2: "Add OAuth" + history from Conv 1 (25K tokens) Conversation 3: "Fix bug" + history from Conv 1+2 (45K tokens) Total: 80K tokens

After (Managed Context): Conversation 1: "Implement auth" (10K tokens) /clear Conversation 2: "Add OAuth" (12K tokens, references Conv 1 via git log) /clear Conversation 3: "Fix bug in src/auth.py:234" (8K tokens, file reference) Total: 30K tokens (62% reduction)

Monitoring Context Size:

You can check current context size using

executor._client.get_session_info(session_id)

Watch for these warning signs of bloated context: - Responses taking >5 seconds to start - Unexpected errors about context length - Session costs >$0.50 per interaction - Claude referencing very old conversation turns

For more details, see: - .claude/CLAUDE.md: Project-specific context management patterns - https://docs.anthropic.com/claude-code: Official best practices - SDK_COMPLIANCE_AUDIT_PLAN.md: Context optimization strategies

Source code in src/orchestrator/claude_sdk_executor.py
def __init__(
    self,
    *,
    project_root: Path,
    agent_root: Path | None = None,
    agent_name: str | None = None,
    allowed_tools: list[str] | None = None,
    model: str | None = None,
    session_id: str | None = None,
    permission_callback: CanUseTool | None = None,
    add_dirs: list[str] | None = None,
    hooks: dict[str, Any] | None = None,
    plugins: list[Any] | None = None,  # PHASE 4: Plugin support
    streaming: bool = True,  # PHASE 4: Streaming with partial messages (enabled by default for better UX)
    agent_definitions: dict[str, Any] | None = None,  # PHASE 4: Hybrid agent loading
    use_sdk_mcp: bool = True,  # PHASE 6: SDK MCP servers (10-100x faster!)
    enable_security_hooks: bool = True,  # TIER 2 Option A - Phase 2: Security hooks
    max_turns: int = 50,  # SECURITY: Prevent infinite loops and runaway costs
    enable_auto_context_management: bool
    | None = None,  # Auto-warn about context usage (default: True)
    # Deprecated parameters for backward compatibility
    memory_dir: Path | None = None,
    enable_verification: bool = True,
) -> None:
    """Initialize ClaudeSDKExecutor with hybrid agent loading support.

    Agent Loading Priority:
    1. Programmatic definitions (via agent_definitions parameter)
    2. File-based agents (from .claude/agents/)
    3. Default Claude Code preset (fallback)

    Args:
    ----
        project_root: Root directory of the project (for Git operations and file operations)
        agent_root: Optional directory to load agents from (defaults to project_root).
                   Enables cross-project agent usage where agents are loaded from a
                   central location (e.g., Nova AI repository) while executing tasks
                   in a different project directory.
                   Example: agent_root=/Users/jeff/nova_ai, project_root=/Users/jeff/my-app
        agent_name: Name of agent to load (from .claude/agents/)
        allowed_tools: List of allowed tool names (default: Read, Write, Edit, MultiEdit, Bash, Git)
        model: Claude model ID (defaults to claude-sonnet-4-5-20250929)
        session_id: Session ID for resuming existing sessions (optional)
        permission_callback: Optional callback for custom tool permissions
        add_dirs: List of directories to restrict file operations to for enhanced security (optional)
        hooks: Optional hooks configuration dict (for backward compatibility)
        plugins: List of plugin configurations for SDK extensibility (optional)
        streaming: Enable streaming with partial messages for real-time output (default: True)
        agent_definitions: Optional dict of programmatic agent definitions.
                          Allows runtime agent creation alongside file-based agents.
                          Example: {"custom-agent": AgentDefinition(...)}
        use_sdk_mcp: Enable SDK MCP servers for 10-100x performance boost (default: True)
        enable_security_hooks: Enable bash command security validation (default: True).
                              Blocks dangerous commands like rm -rf /, sudo, etc.
                              Value: $10,000/year in incident prevention.
        max_turns: Maximum number of conversation turns before stopping (default: 50).
                  Prevents infinite loops and runaway costs. Typical conversations use 5-20 turns,
                  complex workflows use 20-40 turns. Default provides safety margin.
                  Value: Prevents worst-case $100+ runaway sessions.
        enable_auto_context_management: Enable automatic context usage warnings (default: True).
                  When token usage reaches 80% of model capacity, logs a warning suggesting
                  the user clear context to save costs. Configurable via ENABLE_AUTO_CONTEXT_MANAGEMENT
                  environment variable. Expected savings: 20-30% per Anthropic's guidance.
        memory_dir: DEPRECATED - now created internally as project_root/logs/{agent_name or "default"}
        enable_verification: DEPRECATED - verification is always enabled

    Usage Examples:
        # File-based only (standard usage)
        executor = ClaudeSDKExecutor(
            project_root=Path.cwd(),
            agent_name="code-reviewer"  # Loads from .claude/agents/
        )

        # Cross-project agent usage (Nova AI agents in different project)
        executor = ClaudeSDKExecutor(
            project_root=Path("/Users/jeff/my-app"),    # Target project
            agent_root=Path("/Users/jeff/nova_ai"),      # Agent source
            agent_name="implementer"  # Loaded from nova_ai/.claude/agents/
        )

        # Programmatic only
        from claude_agent_sdk import AgentDefinition
        executor = ClaudeSDKExecutor(
            project_root=Path.cwd(),
            agent_definitions={
                "custom-agent": AgentDefinition(
                    name="custom-agent",
                    system_prompt="You are a custom agent...",
                    model="claude-haiku-4-5-20251001"
                )
            }
        )

        # Hybrid (both)
        executor = ClaudeSDKExecutor(
            project_root=Path.cwd(),
            agent_name="code-reviewer",  # File-based
            agent_definitions={
                "custom-agent": {...}  # Programmatic
            }
        )

    Context Management Best Practices
    ----------------------------------
    Efficient context management reduces token usage by 20-30%, significantly
    lowering costs and improving response times. Without proper context management,
    conversation histories accumulate irrelevant information, leading to bloated
    contexts and unnecessary token usage.

    **Key Strategies:**

    1. **Clear Context Between Unrelated Tasks**
       Use /clear in interactive sessions when switching between unrelated tasks.
       This prevents accumulation of irrelevant conversation history.

    Example:
    -------
           # In Claude Code interactive session:
           > Implement user authentication
           [Complete implementation...]

           > /clear  # Clear context before switching tasks

           > Optimize database queries
           [Fresh context, no auth-related history]

       **When to clear:**
       - Switching from Feature A to unrelated Feature B
       - Starting a new day's work
       - After completing a major milestone
       - When context feels "heavy" (slow responses)

    2. **Leverage CLAUDE.md for Project Context**
       Document project structure, conventions, and patterns in .claude/CLAUDE.md
       instead of repeating the same context in every conversation. The SDK
       automatically includes CLAUDE.md with prompt caching (90% cost reduction).

       **What to put in CLAUDE.md:**
       - Project architecture overview
       - Coding conventions and style guides
       - Common patterns and utilities
       - Dependency management rules
       - Testing strategies

       **What NOT to repeat in conversations:**
       - "Use pytest for testing" (document in CLAUDE.md)
       - "Follow PEP 8" (document in CLAUDE.md)
       - "Our API is RESTful" (document in CLAUDE.md)

    3. **Use Specific File References**
       Reference files by path instead of pasting large code blocks inline.
       Claude Code will read files directly, keeping context lean.

    Example:
    -------
           ❌ BAD:
           "Review this code:
           [paste 500 lines of code]"

           ✅ GOOD:
           "Review src/orchestrator/executor.py:500-600 for error handling"

       **Benefits:**
       - Reduces conversation tokens by 80-90%
       - Claude reads current file state (no stale code)
       - Easier to reference specific line numbers

    4. **Delegate to Subagents for Parallel Work**
       Use subagents for parallel work to isolate context. Each subagent has
       an independent context window, preventing context pollution.

    Example:
    -------
           # Review 3 modules independently
           review_results = await asyncio.gather(
               spawn_agent("code-reviewer", "Review auth module"),
               spawn_agent("code-reviewer", "Review API module"),
               spawn_agent("code-reviewer", "Review database module")
           )

       **When to use subagents:**
       - Parallel code reviews (multiple files)
       - Independent feature implementations
       - Multi-module refactoring
       - Batch testing different components

    5. **Session Continuity for Related Work**
       Resume sessions for related multi-turn workflows using the session_id
       parameter. Don't create new sessions for follow-up questions on the
       same topic.

    Example:
    -------
           # First interaction
           executor = ClaudeSDKExecutor(
               project_root=Path.cwd(),
               agent_name="architect"
           )
           session_id = await executor.run_task("Design authentication system")

           # Later - resume with context (30 minutes later)
           executor = ClaudeSDKExecutor(
               project_root=Path.cwd(),
               agent_name="architect",
               session_id=session_id  # Continues previous conversation
           )
           result = await executor.run_task(
               "Add OAuth2 support to the authentication system"
           )

       **When to resume sessions:**
       - Follow-up questions on same topic
       - Incremental feature development
       - Multi-step refactoring
       - Iterative design discussions

       **When to start fresh:**
       - Different feature/topic
       - New day's work (unless continuing)
       - After major milestone completion

    **Cost Impact:**

    Without context management:
    - Average tokens per conversation: 50,000
    - Daily conversations: 20
    - Monthly tokens: 30M tokens
    - Monthly cost: ~$1,020 ($12,200/year per developer)

    With context management best practices:
    - Average tokens per conversation: 35,000 (-30%)
    - Prompt caching on CLAUDE.md: -90% on repeated content
    - File references instead of pasting: -80% on code context
    - Monthly cost: ~$710 ($8,500/year per developer)

    **Total Savings: 30% (~$3,700/year per developer)**

    **Real-World Example - Before/After:**

    Before (Bloated Context):
        Conversation 1: "Implement auth" (10K tokens)
        Conversation 2: "Add OAuth" + history from Conv 1 (25K tokens)
        Conversation 3: "Fix bug" + history from Conv 1+2 (45K tokens)
        Total: 80K tokens

    After (Managed Context):
        Conversation 1: "Implement auth" (10K tokens)
        /clear
        Conversation 2: "Add OAuth" (12K tokens, references Conv 1 via git log)
        /clear
        Conversation 3: "Fix bug in src/auth.py:234" (8K tokens, file reference)
        Total: 30K tokens (62% reduction)

    **Monitoring Context Size:**

    You can check current context size using:
        executor._client.get_session_info(session_id)

    Watch for these warning signs of bloated context:
    - Responses taking >5 seconds to start
    - Unexpected errors about context length
    - Session costs >$0.50 per interaction
    - Claude referencing very old conversation turns

    For more details, see:
    - .claude/CLAUDE.md: Project-specific context management patterns
    - https://docs.anthropic.com/claude-code: Official best practices
    - SDK_COMPLIANCE_AUDIT_PLAN.md: Context optimization strategies

    """
    self._project_root = project_root
    self._agent_root = (
        agent_root or project_root
    )  # Default to project_root for backward compatibility
    self._agent_name = agent_name
    self._model_override = model  # Store explicit override
    # Phase 3: Model Tier Optimization - Select appropriate model based on agent type
    # If no override provided, automatically select based on agent classification
    self._model = self._select_model_for_agent(agent_name, model)
    self._session_id = session_id
    self._permission_callback = permission_callback
    self._add_dirs = add_dirs or []
    self._hooks = hooks or {}
    self._plugins = plugins or []  # PHASE 4: Plugin support
    self._streaming = streaming  # PHASE 4: Streaming support
    self._agent_definitions = agent_definitions or {}  # PHASE 4: Hybrid agent loading
    self._use_sdk_mcp = use_sdk_mcp  # PHASE 6: SDK MCP servers
    self._enable_security_hooks = enable_security_hooks  # TIER 2 Option A - Phase 2
    self._max_turns = max_turns  # SECURITY: Prevent infinite loops and runaway costs

    # Auto-context management: Default to True, but allow override via env var or parameter
    if enable_auto_context_management is None:
        # Check environment variable first, default to True if not set
        env_value = os.getenv("ENABLE_AUTO_CONTEXT_MANAGEMENT", "true").lower()
        self._enable_auto_context_management = env_value in ("true", "1", "yes", "on")
    else:
        self._enable_auto_context_management = enable_auto_context_management

    # Track cumulative token usage for context management warnings
    self._cumulative_input_tokens = 0
    self._cumulative_output_tokens = 0

    # Create memory_dir internally (or use provided for backward compatibility)
    if memory_dir is not None:
        self._memory_dir = memory_dir
    else:
        agent_dir = agent_name or "default"
        self._memory_dir = project_root / "logs" / agent_dir
    self._ensure_memory_dir()

    # Initialize cost tracker
    self._cost_tracker = get_tracker(project_root=project_root)

    # Determine allowed tools
    self._allowed_tools = allowed_tools or [
        "Read",
        "Write",
        "Edit",
        "MultiEdit",
        "Bash",
        "Git",
    ]

    # Check SDK availability
    self._available = SDK_AVAILABLE

    # Verification is always enabled
    self._enable_verification = True
    self._verification_pipeline = VerificationPipeline(project_root)

    # Initialize MCP health monitor
    self._health_monitor = MCPHealthMonitor(
        project_root=project_root,
        config=HealthCheckConfig(
            check_on_startup=True,
            enable_background_checks=False,
        ),
    )

    # Initialize Think Tool (TIER 1-A: Built-in structured reasoning)
    # Proven to improve task quality by 13% (Anthropic SWE-bench research)
    if THINK_TOOL_AVAILABLE:
        self._think_tool = ThinkTool(project_root=project_root, enable_logging=True)
    else:
        self._think_tool = None

    # Initialize Security Validator (TIER 2 Option A - Phase 2: Security Hooks)
    # Prevents dangerous bash commands (rm -rf /, sudo, etc.)
    # Value: $10,000/year in incident prevention
    if self._enable_security_hooks:
        self._bash_security_validator = BashSecurityValidator()
        logger.info(
            "🔒 Security hooks enabled (bash command validation)",
        )
    else:
        self._bash_security_validator = None

    # Initialize GitContextManager for git repository detection
    # Used before PR creation and for git context tracking
    self._git_context_manager = get_git_context_manager()

    # Initialize ParallelAgentExecutor for parallel task execution
    # Useful for parallel code reviews, testing, or independent feature implementations
    self._parallel_executor = ParallelAgentExecutor(max_concurrent=5)

    # Initialize Circuit Breaker for API retry logic
    # Prevents cascading failures and runaway costs
    self._circuit_breaker = CircuitBreaker(
        failure_threshold=5,  # Open after 5 consecutive failures
        cooldown_seconds=60,  # 60s cooldown before retry
    )

run_task

run_task(task_description: str, *, context: str | None = None) -> ExecutorResult
Source code in src/orchestrator/claude_sdk_executor.py
def run_task(
    self,
    task_description: str,
    *,
    context: str | None = None,
) -> ExecutorResult:
    async def _runner() -> ExecutorResult:
        return await self.run_task_async(task_description, context=context)

    try:
        loop = asyncio.get_running_loop()
    except RuntimeError:
        loop = None

    if loop and loop.is_running():
        return asyncio.run_coroutine_threadsafe(_runner(), loop).result()
    return asyncio.run(_runner())

run_task_async async

run_task_async(task_description: str, *, context: str | None = None) -> ExecutorResult
Source code in src/orchestrator/claude_sdk_executor.py
2078
2079
2080
2081
2082
2083
2084
2085
2086
2087
2088
2089
2090
2091
2092
2093
2094
2095
2096
2097
2098
2099
2100
2101
2102
2103
2104
2105
2106
2107
2108
2109
2110
2111
2112
2113
2114
2115
2116
2117
2118
2119
2120
2121
2122
2123
2124
2125
2126
2127
2128
2129
2130
2131
2132
2133
2134
2135
2136
2137
2138
2139
2140
2141
2142
2143
2144
2145
2146
2147
2148
2149
2150
2151
2152
2153
2154
2155
2156
2157
2158
2159
2160
2161
2162
2163
2164
2165
2166
2167
2168
2169
2170
2171
2172
2173
2174
2175
2176
2177
2178
2179
2180
2181
2182
2183
2184
2185
2186
2187
2188
2189
2190
2191
2192
2193
2194
2195
2196
2197
2198
2199
2200
2201
2202
2203
2204
2205
2206
2207
2208
2209
2210
2211
2212
2213
2214
2215
2216
2217
2218
2219
2220
2221
2222
2223
2224
2225
2226
2227
2228
2229
2230
2231
2232
2233
2234
2235
2236
2237
2238
2239
2240
2241
2242
2243
2244
2245
2246
2247
2248
2249
2250
2251
2252
2253
2254
2255
2256
2257
2258
2259
2260
2261
2262
2263
2264
2265
2266
2267
2268
2269
2270
2271
2272
2273
2274
2275
2276
2277
2278
2279
2280
2281
2282
2283
2284
2285
2286
2287
2288
2289
2290
2291
2292
2293
2294
2295
2296
2297
2298
2299
2300
2301
2302
2303
2304
2305
2306
2307
2308
2309
2310
2311
2312
2313
2314
2315
2316
2317
2318
2319
2320
2321
2322
2323
2324
2325
2326
2327
2328
2329
2330
2331
2332
2333
2334
2335
2336
2337
2338
2339
2340
2341
2342
2343
2344
2345
2346
2347
2348
2349
2350
2351
2352
2353
2354
2355
2356
2357
2358
2359
2360
2361
2362
2363
2364
2365
2366
2367
2368
2369
2370
2371
2372
2373
2374
2375
2376
2377
2378
2379
2380
2381
2382
2383
2384
2385
2386
2387
2388
2389
2390
2391
2392
2393
2394
2395
2396
2397
2398
2399
2400
2401
async def run_task_async(
    self,
    task_description: str,
    *,
    context: str | None = None,
) -> ExecutorResult:
    if not self.is_available():
        detail = "Claude Agent SDK not installed. Install `claude-agent-sdk` and run `claude /login`."
        return ExecutorResult(transcript=detail, summary=detail)

    if not SDK_AVAILABLE:
        detail = "Claude Agent SDK components unavailable at runtime."
        return ExecutorResult(transcript=detail, summary=detail)

    # Build options kwargs
    options_kwargs: dict[str, Any] = {
        "allowed_tools": self._allowed_tools,
        "cwd": str(self._project_root),
        "model": self._model,
        # Enable automatic .claude/CLAUDE.md loading by SDK
        "setting_sources": ["project"],
        # Explicitly set cli_path to prevent SDK from spawning new Claude Code process
        # (important when running from within Claude Code)
        "cli_path": None,
        # Enable streaming for real-time token output (better UX)
        "include_partial_messages": self._streaming,
        # SECURITY: Prevent infinite loops and runaway costs (default: 50 turns)
        "max_turns": self._max_turns,
    }

    # Add agent loading if specified (Phase 6 fix - no "-autonomous" suffix)
    # CRITICAL: Use preset+append pattern to preserve tool instructions (Phase 6)
    # PHASE 1 TIER 2: Add prompt caching for 90% cost reduction on cached content
    if self._agent_name:
        try:
            # Load agent instructions from file
            agent_instructions = self._load_agent_instructions(self._agent_name)

            if agent_instructions:
                # Build cached system prompt blocks
                # Phase 1 Tier 2: Add cache_control markers for 90% cost reduction
                # Note: project_context (CLAUDE.md) is loaded automatically via setting_sources
                cached_blocks = self._build_system_prompt_with_caching(
                    agent_instructions=agent_instructions,
                )

                # Phase 6: Use preset+append pattern to preserve tool instructions
                # Phase 1 Tier 2: Use cached blocks for append content
                # This prevents loss of tool context from "claude_code" preset
                # and adds prompt caching for cost optimization
                options_kwargs["system_prompt"] = {
                    "type": "preset",
                    "preset": "claude_code",  # Preserves tool instructions
                    "append": cached_blocks,  # Adds agent-specific context with caching
                }
            else:
                # Fallback: Use agent_name parameter if SDK supports it
                # This lets the SDK handle agent loading internally
                options_kwargs["agent_name"] = self._agent_name
        except (TypeError, AttributeError):
            # SDK doesn't support system_prompt preset+append pattern
            # Try agent_name as fallback
            try:
                options_kwargs["agent_name"] = self._agent_name
            except TypeError:
                # SDK doesn't support agent_name either - skip agent loading
                pass

    # Add permission callback if provided
    if self._permission_callback:
        options_kwargs["can_use_tool"] = self._permission_callback

    # Add session resume support (Agent 6)
    if self._session_id:
        try:
            options_kwargs["resume"] = self._session_id
        except TypeError:
            # Fallback for older SDK versions that use session_id instead
            try:
                options_kwargs["session_id"] = self._session_id
            except TypeError:
                # SDK doesn't support session resumption
                pass

    # Add directory restrictions for enhanced security (Agent 6)
    if self._add_dirs:
        try:
            options_kwargs["add_dirs"] = self._add_dirs
        except TypeError:
            # SDK doesn't support add_dirs parameter
            pass

    # Add plugin support (Phase 4 - Agent B5)
    if self._plugins:
        try:
            options_kwargs["plugins"] = self._plugins
        except TypeError:
            # Plugin parameter not supported in this SDK version
            pass

    # Add agent_definitions support (Phase 4 - Agent B9)
    if self._agent_definitions:
        try:
            options_kwargs["agents"] = self._agent_definitions
        except TypeError:
            # agents parameter not supported in this SDK version
            pass

    # Add hooks support (Phase 4 - Agents B6-B8)
    try:
        hooks_dict = self._build_hooks_dict()
        if hooks_dict:
            options_kwargs["hooks"] = hooks_dict
    except (AttributeError, TypeError):
        # Hooks not supported in this SDK version or _build_hooks_dict not available
        pass

    # PHASE 6: Add SDK MCP servers for 10-100x performance boost
    # KB server: 150ms → 2.15ms (69.6x faster!)
    # GitHub server: ~150ms → ~15ms (10x faster!)
    if self._use_sdk_mcp and SDK_MCP_AVAILABLE:
        mcp_servers_dict = {}

        # Add KB server if available
        if kb_sdk_server:
            mcp_servers_dict["kb"] = kb_sdk_server

        # Add GitHub server if available
        if github_sdk_server:
            mcp_servers_dict["github"] = github_sdk_server

        # Configure SDK with in-process MCP servers
        if mcp_servers_dict:
            options_kwargs["mcp_servers"] = mcp_servers_dict
            if self._model:  # Use _model attribute for verbose logging check
                print(
                    f"🚀 Using SDK MCP servers (in-process): {list(mcp_servers_dict.keys())}",
                )
                print(
                    "   ⚡ Performance: KB queries 69.6x faster, GitHub ops 10x faster",
                )

    options = ClaudeAgentOptions(**options_kwargs)

    prompt = self._build_prompt(task_description, context)
    transcript = ""
    summary = ""
    modified_files: list[Path] = []

    # Cost tracking variables
    total_cost = 0.0
    duration_ms = 0
    input_tokens = 0
    output_tokens = 0
    cache_read = 0
    cache_write = 0

    try:
        async with ClaudeSDKClient(options=options) as client:
            # Execute API call with retry logic
            transcript, summary, modified_files, metrics_dict = await self._execute_with_retry(
                client, prompt
            )

            # Extract metrics from result
            total_cost = metrics_dict["total_cost"]
            duration_ms = metrics_dict["duration_ms"]
            input_tokens = metrics_dict["input_tokens"]
            output_tokens = metrics_dict["output_tokens"]
            cache_read = metrics_dict["cache_read"]
            cache_write = metrics_dict["cache_write"]

            print("\n📊 Execution Metrics:")
            print(f"   💰 Cost: ${total_cost:.4f}")
            print(f"   ⏱️  Duration: {duration_ms}ms")
            print(f"   📝 Tokens: {input_tokens} in / {output_tokens} out")
            print(f"   💾 Cache: {cache_read} read / {cache_write} write")

            # Check context usage and warn if approaching capacity
            self._check_context_usage(input_tokens, output_tokens)

            # Track costs with cost tracker
            usage_dict = {
                "input_tokens": input_tokens,
                "output_tokens": output_tokens,
                "cache_creation_input_tokens": cache_write,
                "cache_read_input_tokens": cache_read,
            }
            cost_metrics = self._cost_tracker.calculate_cost(
                usage_dict,
                self._model,
            )
            cost_metrics.duration_ms = duration_ms

            # Track with agent and task context
            task_name = (
                task_description[:50] if len(task_description) > 50 else task_description
            )
            self._cost_tracker.track_call(
                metrics=cost_metrics,
                agent_name=self._agent_name or "default",
                task_name=task_name,
            )

    except CircuitBreakerOpenError as e:
        # Circuit breaker is open - fail fast
        error_msg = (
            f"❌ Circuit Breaker Open: {e}\n"
            "Too many consecutive failures. Please wait for cooldown period."
        )
        print(error_msg)
        return ExecutorResult(
            transcript=error_msg,
            summary=f"Circuit Breaker: {e!s}",
            session_id=self._session_id,
        )

    except RateLimitError as e:
        # Rate limit exceeded after all retries
        error_msg = (
            f"❌ Rate Limit Exceeded: {e}\n"
            "Anthropic API rate limit hit. Please wait before retrying."
        )
        print(error_msg)
        return ExecutorResult(
            transcript=error_msg,
            summary=f"Rate Limit: {e!s}",
            session_id=self._session_id,
        )

    except TransientError as e:
        # Transient error persisted after all retries
        error_msg = (
            f"❌ Transient Error: {e}\nNetwork/timeout error persisted after 5 retry attempts."
        )
        print(error_msg)
        return ExecutorResult(
            transcript=error_msg,
            summary=f"Transient Error: {e!s}",
            session_id=self._session_id,
        )

    except FileNotFoundError as e:
        # Claude CLI not found
        error_msg = f"❌ Claude CLI not found: {e}\nInstall with: pip install claude-agent-sdk"
        print(error_msg)
        return ExecutorResult(
            transcript=error_msg,
            summary=f"CLI Error: {e!s}",
            session_id=self._session_id,
        )

    except PermissionError as e:
        # Permission issues with CLI or files
        error_msg = f"❌ Permission error: {e}\nCheck file permissions and CLI installation"
        print(error_msg)
        return ExecutorResult(
            transcript=error_msg,
            summary=f"Permission Error: {e!s}",
            session_id=self._session_id,
        )

    except json.JSONDecodeError as e:
        # Invalid JSON response from CLI
        error_msg = f"❌ Invalid CLI response: {e}\nCLI may need updating or reinstalling"
        print(error_msg)
        return ExecutorResult(
            transcript=error_msg,
            summary=f"JSON Error: {e!s}",
            session_id=self._session_id,
        )

    except Exception as e:
        # Catch-all for other SDK errors
        error_msg = f"❌ SDK Error: {type(e).__name__}: {e}"
        print(error_msg)
        return ExecutorResult(
            transcript=error_msg,
            summary=f"SDK Error: {e!s}",
            session_id=self._session_id,
        )

    # transcript and summary are already set by _execute_with_retry
    verification: VerificationResult | None = None
    if self._enable_verification and self._verification_pipeline is not None:
        try:
            verification = await self._verification_pipeline.verify(
                task_description=task_description,
                transcript=transcript,
                modified_files=modified_files or None,
            )
            transcript = "\n\n".join(
                filter(None, [transcript, verification.summary]),
            )
            if not verification.passed_all:
                summary = " | ".join(
                    filter(
                        None,
                        [
                            summary,
                            "VERIFICATION FAILED",
                            "; ".join(verification.blocking_issues) or None,
                        ],
                    ),
                )
        except Exception as exc:  # pragma: no cover - defensive
            summary = " | ".join(
                filter(None, [summary, f"Verification error: {exc}"]),
            )

    result_files = modified_files or None
    return ExecutorResult(
        transcript=transcript,
        summary=summary,
        verification=verification,
        modified_files=result_files,
        session_id=self._session_id,
        total_cost_usd=total_cost,
        duration_ms=duration_ms,
        input_tokens=input_tokens,
        output_tokens=output_tokens,
        cache_read_tokens=cache_read,
        cache_write_tokens=cache_write,
    )

CircuitBreaker

CircuitBreaker

CircuitBreaker(failure_threshold: int = 5, cooldown_seconds: int = 60)

Simple circuit breaker to prevent cascading failures.

Tracks consecutive failures and opens circuit after threshold. After cooldown period, enters half-open state to test recovery.

Initialize circuit breaker.


failure_threshold: Number of consecutive failures before opening (default: 5)
cooldown_seconds: Seconds to wait before attempting recovery (default: 60)
Source code in src/orchestrator/claude_sdk_executor.py
def __init__(
    self,
    failure_threshold: int = 5,
    cooldown_seconds: int = 60,
) -> None:
    """Initialize circuit breaker.

    Args:
    ----
        failure_threshold: Number of consecutive failures before opening (default: 5)
        cooldown_seconds: Seconds to wait before attempting recovery (default: 60)

    """
    self.failure_threshold = failure_threshold
    self.cooldown_seconds = cooldown_seconds
    self.consecutive_failures = 0
    self.last_failure_time: float | None = None
    self.state = "closed"  # closed, open, half-open

record_success

record_success() -> None

Record successful API call - reset failure counter.

Source code in src/orchestrator/claude_sdk_executor.py
def record_success(self) -> None:
    """Record successful API call - reset failure counter."""
    self.consecutive_failures = 0
    self.state = "closed"

record_failure

record_failure() -> None

Record failed API call - increment counter and potentially open circuit.

Source code in src/orchestrator/claude_sdk_executor.py
def record_failure(self) -> None:
    """Record failed API call - increment counter and potentially open circuit."""
    import time

    self.consecutive_failures += 1
    self.last_failure_time = time.time()

    if self.consecutive_failures >= self.failure_threshold:
        self.state = "open"
        logger.warning(
            f"🔴 Circuit breaker OPEN after {self.consecutive_failures} consecutive failures. "
            f"Cooldown: {self.cooldown_seconds}s"
        )

check_state

check_state() -> None

Check circuit state and potentially transition to half-open.

Raises
CircuitBreakerOpenError: If circuit is open and cooldown not elapsed
Source code in src/orchestrator/claude_sdk_executor.py
def check_state(self) -> None:
    """Check circuit state and potentially transition to half-open.

    Raises
    ------
        CircuitBreakerOpenError: If circuit is open and cooldown not elapsed

    """
    import time

    if self.state == "closed":
        return

    if self.state == "open" and self.last_failure_time:
        elapsed = time.time() - self.last_failure_time
        if elapsed >= self.cooldown_seconds:
            # Transition to half-open state (allow one test request)
            self.state = "half-open"
            logger.info(f"🟡 Circuit breaker HALF-OPEN - testing recovery after {elapsed:.1f}s")
        else:
            # Still in cooldown period
            remaining = self.cooldown_seconds - elapsed
            raise CircuitBreakerOpenError(
                f"Circuit breaker is OPEN. Retry in {remaining:.1f}s. "
                f"({self.consecutive_failures} consecutive failures)"
            )

Usage Example

from src.orchestrator.claude_sdk_executor import CircuitBreaker

# Initialize circuit breaker
breaker = CircuitBreaker(
    failure_threshold=5,      # Open after 5 failures
    cooldown_seconds=60       # Wait 60s before retry
)

# Use in retry logic
async def call_with_circuit_breaker():
    breaker.check_state()  # Raises if circuit open
    try:
        result = await executor.run_task("task")
        breaker.record_success()
        return result
    except Exception as e:
        breaker.record_failure()
        raise

Advanced Usage

Parallel Execution

Run independent tasks in parallel:

from src.orchestrator.parallel_executor import ParallelAgentExecutor, ParallelTask

executor = ParallelAgentExecutor(
    project_root=Path.cwd(),
    max_parallel=3  # Run 3 tasks simultaneously
)

# Define tasks
tasks = [
    ParallelTask(
        agent_name="implementer",
        task="implement user service",
        task_id="task_1"
    ),
    ParallelTask(
        agent_name="implementer",
        task="implement payment service",
        task_id="task_2"
    ),
    ParallelTask(
        agent_name="implementer",
        task="implement notification service",
        task_id="task_3"
    )
]

# Execute in parallel
results = await executor.run_parallel(tasks)

for result in results:
    print(f"Task {result.task_id}: {result.status}")

Custom MCP Configuration

executor = ClaudeSDKExecutor(
    project_root=Path.cwd(),
    agent_name="orchestrator",
    use_sdk_mcp=True,
    mcp_config={
        "kb": {
            "kb_dir": "custom_kb_path",
            "max_results": 20
        },
        "github": {
            "token": os.getenv("GITHUB_TOKEN")
        }
    }
)

Verification Pipeline

Run quality gates after execution:

from src.orchestrator.verification import VerificationPipeline

# Execute task
result = await executor.run_task("implement feature")

# Run verification
pipeline = VerificationPipeline(project_root=Path.cwd())
verification = await pipeline.run_all()

if verification.passed:
    print("✅ All quality gates passed")
    print(f"Tests: {verification.test_results}")
    print(f"Coverage: {verification.coverage}%")
else:
    print("❌ Quality gates failed")
    print(f"Failures: {verification.failures}")

Configuration Options

Executor Parameters

Parameter Type Default Description
project_root Path Path.cwd() Project root directory
agent_name str "orchestrator" Agent to use
use_sdk_mcp bool True Enable SDK MCP servers
session_id str \| None None Reuse existing session
mcp_config dict \| None None Custom MCP configuration
timeout int 300 Task timeout in seconds

Agent Names

Available agents:

  • orchestrator - Multi-agent coordination
  • architect - Architecture decisions
  • implementer - Feature implementation
  • code-reviewer - Security and correctness review
  • tester - Test execution and validation
  • debugger - Error analysis and debugging

Performance Metrics

Session Continuation Impact

Transition Without Session With Session Improvement
orchestrator → implementer 680ms 45ms 93%
implementer → code-reviewer 280ms 15ms 95%
code-reviewer → tester 280ms 12ms 96%

SDK MCP Impact

Operation stdio MCP SDK MCP Speedup
KB search 850ms 8ms 106x
GitHub API 450ms 45ms 10x
Memory store 120ms 2ms 60x

Error Handling

Retry Logic

from tenacity import retry, stop_after_attempt, wait_exponential

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=4, max=10)
)
async def run_with_retry():
    return await executor.run_task("task")

Circuit Breaker

from src.orchestrator.claude_sdk_executor import CircuitBreakerOpenError

try:
    result = await executor.run_task("task")
except CircuitBreakerOpenError:
    print("⚠️ Circuit breaker open - too many failures")
    # Wait for cooldown or escalate

Rate Limiting

from src.orchestrator.claude_sdk_executor import RateLimitError

try:
    result = await executor.run_task("task")
except RateLimitError:
    print("⚠️ Rate limit exceeded - backing off")
    await asyncio.sleep(60)
    # Retry after backoff

Best Practices

1. Always Use SDK MCP

 ClaudeSDKExecutor(use_sdk_mcp=True)
 ClaudeSDKExecutor(use_sdk_mcp=False)

2. Reuse Sessions

 ClaudeSDKExecutor(session_id=previous_session)
 New executor for each task

3. Track Costs

 async with track_agent_call(...):
        await executor.run_task(...)
 Untracked execution

4. Handle Errors

 try/except with CircuitBreaker
 No error handling

5. Verify Results

 Run verification pipeline after execution
 Deploy without verification

Examples

Example 1: Feature Implementation

from pathlib import Path
from src.orchestrator.claude_sdk_executor import ClaudeSDKExecutor
from src.orchestrator.cost_tracker import track_agent_call

async def implement_feature():
    executor = ClaudeSDKExecutor(
        project_root=Path.cwd(),
        agent_name="orchestrator",
        use_sdk_mcp=True
    )

    async with track_agent_call(agent_name="orchestrator", task="Auth"):
        result = await executor.run_task(
            "implement user authentication with JWT, "
            "refresh tokens, and email validation"
        )

    print(f"Status: {result.status}")
    print(f"Files: {result.files_modified}")
    print(f"Tests: {result.test_results}")
    return result

Example 2: Code Review Workflow

async def review_workflow():
    # Implement
    impl = ClaudeSDKExecutor(agent_name="implementer")
    impl_result = await impl.run_task("implement feature X")

    # Review
    reviewer = ClaudeSDKExecutor(
        agent_name="code-reviewer",
        session_id=impl_result.session_id
    )
    review_result = await reviewer.run_task(
        f"review {impl_result.files_modified} for security"
    )

    if review_result.status == "APPROVED":
        # Test
        tester = ClaudeSDKExecutor(
            agent_name="tester",
            session_id=review_result.session_id
        )
        test_result = await tester.run_task("run full test suite")
        return test_result
    else:
        print(f"Review failed: {review_result.issues}")
        return None

Example 3: Parallel Implementation

from src.orchestrator.parallel_executor import ParallelAgentExecutor, ParallelTask

async def parallel_implementation():
    executor = ParallelAgentExecutor(max_parallel=3)

    tasks = [
        ParallelTask("implementer", "implement user service", "task_1"),
        ParallelTask("implementer", "implement payment service", "task_2"),
        ParallelTask("implementer", "implement notification service", "task_3")
    ]

    results = await executor.run_parallel(tasks)

    for result in results:
        print(f"{result.task_id}: {result.status}")

    return results

Next Steps

  • Session Manager


    Learn about session continuation

    Session API

  • Cost Tracker


    Monitor and optimize costs

    Cost API

  • Knowledge Base


    KB search and retrieval

    KB API