Orchestrator API¶
Complete API reference for the ClaudeSDKExecutor class - Nova AI's main orchestration engine.
Overview¶
The ClaudeSDKExecutor is the core orchestration class that:
- Coordinates agents - Delegates to specialist agents (implementer, code-reviewer, tester)
- Manages sessions - Lightweight continuation for 88-95% overhead reduction
- Handles MCP - Integrates SDK MCP servers (KB, GitHub, memory)
- Tracks costs - Production-grade cost tracking with LangFuse
- Enforces quality - Code review, testing, validation gates
graph TB
A[ClaudeSDKExecutor] --> B[Agent Management]
A --> C[Session Management]
A --> D[MCP Integration]
A --> E[Cost Tracking]
A --> F[Quality Gates]
style A fill:#3f51b5,color:#fff
Basic Usage¶
Simple Task Execution¶
from pathlib import Path
from src.orchestrator.claude_sdk_executor import ClaudeSDKExecutor
# Initialize executor
executor = ClaudeSDKExecutor(
project_root=Path.cwd(),
agent_name="orchestrator",
use_sdk_mcp=True # Enable SDK MCP (recommended)
)
# Run task
result = await executor.run_task("implement user authentication with JWT")
# Check results
print(f"Status: {result.status}")
print(f"Files modified: {result.files_modified}")
print(f"Summary: {result.summary}")
Session Continuation¶
# First task (creates session)
executor1 = ClaudeSDKExecutor(agent_name="orchestrator")
result1 = await executor1.run_task("implement user registration")
# Reuse session (88-95% overhead reduction)
executor2 = ClaudeSDKExecutor(
agent_name="implementer",
session_id=result1.session_id # Reuse session
)
result2 = await executor2.run_task("add email verification")
Cost Tracking¶
from src.orchestrator.cost_tracker import track_agent_call
# Track cost for task
async with track_agent_call(agent_name="implementer", task="Add auth"):
result = await executor.run_task("implement authentication")
# Get cost summary
from src.orchestrator.cost_tracker import get_tracker
tracker = get_tracker()
summary = tracker.get_cost_summary()
print(f"Total cost: ${summary['total_cost_usd']:.4f}")
API Reference¶
ClaudeSDKExecutor
¶
ClaudeSDKExecutor(*, project_root: Path, agent_root: Path | None = None, agent_name: str | None = None, allowed_tools: list[str] | None = None, model: str | None = None, session_id: str | None = None, permission_callback: CanUseTool | None = None, add_dirs: list[str] | None = None, hooks: dict[str, Any] | None = None, plugins: list[Any] | None = None, streaming: bool = True, agent_definitions: dict[str, Any] | None = None, use_sdk_mcp: bool = True, enable_security_hooks: bool = True, max_turns: int = 50, enable_auto_context_management: bool | None = None, memory_dir: Path | None = None, enable_verification: bool = True)
Thin wrapper around the Claude Agent SDK.
Initialize ClaudeSDKExecutor with hybrid agent loading support.
Agent Loading Priority: 1. Programmatic definitions (via agent_definitions parameter) 2. File-based agents (from .claude/agents/) 3. Default Claude Code preset (fallback)
project_root: Root directory of the project (for Git operations and file operations)
agent_root: Optional directory to load agents from (defaults to project_root).
Enables cross-project agent usage where agents are loaded from a
central location (e.g., Nova AI repository) while executing tasks
in a different project directory.
Example: agent_root=/Users/jeff/nova_ai, project_root=/Users/jeff/my-app
agent_name: Name of agent to load (from .claude/agents/)
allowed_tools: List of allowed tool names (default: Read, Write, Edit, MultiEdit, Bash, Git)
model: Claude model ID (defaults to claude-sonnet-4-5-20250929)
session_id: Session ID for resuming existing sessions (optional)
permission_callback: Optional callback for custom tool permissions
add_dirs: List of directories to restrict file operations to for enhanced security (optional)
hooks: Optional hooks configuration dict (for backward compatibility)
plugins: List of plugin configurations for SDK extensibility (optional)
streaming: Enable streaming with partial messages for real-time output (default: True)
agent_definitions: Optional dict of programmatic agent definitions.
Allows runtime agent creation alongside file-based agents.
Example: {"custom-agent": AgentDefinition(...)}
use_sdk_mcp: Enable SDK MCP servers for 10-100x performance boost (default: True)
enable_security_hooks: Enable bash command security validation (default: True).
Blocks dangerous commands like rm -rf /, sudo, etc.
Value: $10,000/year in incident prevention.
max_turns: Maximum number of conversation turns before stopping (default: 50).
Prevents infinite loops and runaway costs. Typical conversations use 5-20 turns,
complex workflows use 20-40 turns. Default provides safety margin.
Value: Prevents worst-case $100+ runaway sessions.
enable_auto_context_management: Enable automatic context usage warnings (default: True).
When token usage reaches 80% of model capacity, logs a warning suggesting
the user clear context to save costs. Configurable via ENABLE_AUTO_CONTEXT_MANAGEMENT
environment variable. Expected savings: 20-30% per Anthropic's guidance.
memory_dir: DEPRECATED - now created internally as project_root/logs/{agent_name or "default"}
enable_verification: DEPRECATED - verification is always enabled
Usage Examples
File-based only (standard usage)¶
executor = ClaudeSDKExecutor( project_root=Path.cwd(), agent_name="code-reviewer" # Loads from .claude/agents/ )
Cross-project agent usage (Nova AI agents in different project)¶
executor = ClaudeSDKExecutor( project_root=Path("/Users/jeff/my-app"), # Target project agent_root=Path("/Users/jeff/nova_ai"), # Agent source agent_name="implementer" # Loaded from nova_ai/.claude/agents/ )
Programmatic only¶
from claude_agent_sdk import AgentDefinition executor = ClaudeSDKExecutor( project_root=Path.cwd(), agent_definitions={ "custom-agent": AgentDefinition( name="custom-agent", system_prompt="You are a custom agent...", model="claude-haiku-4-5-20251001" ) } )
Hybrid (both)¶
executor = ClaudeSDKExecutor( project_root=Path.cwd(), agent_name="code-reviewer", # File-based agent_definitions={ "custom-agent": {...} # Programmatic } )
Context Management Best Practices¶
Efficient context management reduces token usage by 20-30%, significantly lowering costs and improving response times. Without proper context management, conversation histories accumulate irrelevant information, leading to bloated contexts and unnecessary token usage.
Key Strategies:
- Clear Context Between Unrelated Tasks Use /clear in interactive sessions when switching between unrelated tasks. This prevents accumulation of irrelevant conversation history.
Example:¶
# In Claude Code interactive session:
> Implement user authentication
[Complete implementation...]
> /clear # Clear context before switching tasks
> Optimize database queries
[Fresh context, no auth-related history]
When to clear: - Switching from Feature A to unrelated Feature B - Starting a new day's work - After completing a major milestone - When context feels "heavy" (slow responses)
- Leverage CLAUDE.md for Project Context Document project structure, conventions, and patterns in .claude/CLAUDE.md instead of repeating the same context in every conversation. The SDK automatically includes CLAUDE.md with prompt caching (90% cost reduction).
What to put in CLAUDE.md: - Project architecture overview - Coding conventions and style guides - Common patterns and utilities - Dependency management rules - Testing strategies
What NOT to repeat in conversations: - "Use pytest for testing" (document in CLAUDE.md) - "Follow PEP 8" (document in CLAUDE.md) - "Our API is RESTful" (document in CLAUDE.md)
- Use Specific File References Reference files by path instead of pasting large code blocks inline. Claude Code will read files directly, keeping context lean.
Example:¶
❌ BAD:
"Review this code:
[paste 500 lines of code]"
✅ GOOD:
"Review src/orchestrator/executor.py:500-600 for error handling"
Benefits: - Reduces conversation tokens by 80-90% - Claude reads current file state (no stale code) - Easier to reference specific line numbers
- Delegate to Subagents for Parallel Work Use subagents for parallel work to isolate context. Each subagent has an independent context window, preventing context pollution.
Example:¶
# Review 3 modules independently
review_results = await asyncio.gather(
spawn_agent("code-reviewer", "Review auth module"),
spawn_agent("code-reviewer", "Review API module"),
spawn_agent("code-reviewer", "Review database module")
)
When to use subagents: - Parallel code reviews (multiple files) - Independent feature implementations - Multi-module refactoring - Batch testing different components
- Session Continuity for Related Work Resume sessions for related multi-turn workflows using the session_id parameter. Don't create new sessions for follow-up questions on the same topic.
Example:¶
# First interaction
executor = ClaudeSDKExecutor(
project_root=Path.cwd(),
agent_name="architect"
)
session_id = await executor.run_task("Design authentication system")
# Later - resume with context (30 minutes later)
executor = ClaudeSDKExecutor(
project_root=Path.cwd(),
agent_name="architect",
session_id=session_id # Continues previous conversation
)
result = await executor.run_task(
"Add OAuth2 support to the authentication system"
)
When to resume sessions: - Follow-up questions on same topic - Incremental feature development - Multi-step refactoring - Iterative design discussions
When to start fresh: - Different feature/topic - New day's work (unless continuing) - After major milestone completion
Cost Impact:
Without context management: - Average tokens per conversation: 50,000 - Daily conversations: 20 - Monthly tokens: 30M tokens - Monthly cost: ~\(1,020 (\)12,200/year per developer)
With context management best practices: - Average tokens per conversation: 35,000 (-30%) - Prompt caching on CLAUDE.md: -90% on repeated content - File references instead of pasting: -80% on code context - Monthly cost: ~\(710 (\)8,500/year per developer)
Total Savings: 30% (~$3,700/year per developer)
Real-World Example - Before/After:
Before (Bloated Context): Conversation 1: "Implement auth" (10K tokens) Conversation 2: "Add OAuth" + history from Conv 1 (25K tokens) Conversation 3: "Fix bug" + history from Conv 1+2 (45K tokens) Total: 80K tokens
After (Managed Context): Conversation 1: "Implement auth" (10K tokens) /clear Conversation 2: "Add OAuth" (12K tokens, references Conv 1 via git log) /clear Conversation 3: "Fix bug in src/auth.py:234" (8K tokens, file reference) Total: 30K tokens (62% reduction)
Monitoring Context Size:
You can check current context size using
executor._client.get_session_info(session_id)
Watch for these warning signs of bloated context: - Responses taking >5 seconds to start - Unexpected errors about context length - Session costs >$0.50 per interaction - Claude referencing very old conversation turns
For more details, see: - .claude/CLAUDE.md: Project-specific context management patterns - https://docs.anthropic.com/claude-code: Official best practices - SDK_COMPLIANCE_AUDIT_PLAN.md: Context optimization strategies
Source code in src/orchestrator/claude_sdk_executor.py
296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 | |
run_task
¶
Source code in src/orchestrator/claude_sdk_executor.py
run_task_async
async
¶
Source code in src/orchestrator/claude_sdk_executor.py
2078 2079 2080 2081 2082 2083 2084 2085 2086 2087 2088 2089 2090 2091 2092 2093 2094 2095 2096 2097 2098 2099 2100 2101 2102 2103 2104 2105 2106 2107 2108 2109 2110 2111 2112 2113 2114 2115 2116 2117 2118 2119 2120 2121 2122 2123 2124 2125 2126 2127 2128 2129 2130 2131 2132 2133 2134 2135 2136 2137 2138 2139 2140 2141 2142 2143 2144 2145 2146 2147 2148 2149 2150 2151 2152 2153 2154 2155 2156 2157 2158 2159 2160 2161 2162 2163 2164 2165 2166 2167 2168 2169 2170 2171 2172 2173 2174 2175 2176 2177 2178 2179 2180 2181 2182 2183 2184 2185 2186 2187 2188 2189 2190 2191 2192 2193 2194 2195 2196 2197 2198 2199 2200 2201 2202 2203 2204 2205 2206 2207 2208 2209 2210 2211 2212 2213 2214 2215 2216 2217 2218 2219 2220 2221 2222 2223 2224 2225 2226 2227 2228 2229 2230 2231 2232 2233 2234 2235 2236 2237 2238 2239 2240 2241 2242 2243 2244 2245 2246 2247 2248 2249 2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260 2261 2262 2263 2264 2265 2266 2267 2268 2269 2270 2271 2272 2273 2274 2275 2276 2277 2278 2279 2280 2281 2282 2283 2284 2285 2286 2287 2288 2289 2290 2291 2292 2293 2294 2295 2296 2297 2298 2299 2300 2301 2302 2303 2304 2305 2306 2307 2308 2309 2310 2311 2312 2313 2314 2315 2316 2317 2318 2319 2320 2321 2322 2323 2324 2325 2326 2327 2328 2329 2330 2331 2332 2333 2334 2335 2336 2337 2338 2339 2340 2341 2342 2343 2344 2345 2346 2347 2348 2349 2350 2351 2352 2353 2354 2355 2356 2357 2358 2359 2360 2361 2362 2363 2364 2365 2366 2367 2368 2369 2370 2371 2372 2373 2374 2375 2376 2377 2378 2379 2380 2381 2382 2383 2384 2385 2386 2387 2388 2389 2390 2391 2392 2393 2394 2395 2396 2397 2398 2399 2400 2401 | |
Related Classes¶
CircuitBreaker¶
CircuitBreaker
¶
Simple circuit breaker to prevent cascading failures.
Tracks consecutive failures and opens circuit after threshold. After cooldown period, enters half-open state to test recovery.
Initialize circuit breaker.
failure_threshold: Number of consecutive failures before opening (default: 5)
cooldown_seconds: Seconds to wait before attempting recovery (default: 60)
Source code in src/orchestrator/claude_sdk_executor.py
record_success
¶
record_failure
¶
Record failed API call - increment counter and potentially open circuit.
Source code in src/orchestrator/claude_sdk_executor.py
check_state
¶
Check circuit state and potentially transition to half-open.
Raises¶
CircuitBreakerOpenError: If circuit is open and cooldown not elapsed
Source code in src/orchestrator/claude_sdk_executor.py
Usage Example¶
from src.orchestrator.claude_sdk_executor import CircuitBreaker
# Initialize circuit breaker
breaker = CircuitBreaker(
failure_threshold=5, # Open after 5 failures
cooldown_seconds=60 # Wait 60s before retry
)
# Use in retry logic
async def call_with_circuit_breaker():
breaker.check_state() # Raises if circuit open
try:
result = await executor.run_task("task")
breaker.record_success()
return result
except Exception as e:
breaker.record_failure()
raise
Advanced Usage¶
Parallel Execution¶
Run independent tasks in parallel:
from src.orchestrator.parallel_executor import ParallelAgentExecutor, ParallelTask
executor = ParallelAgentExecutor(
project_root=Path.cwd(),
max_parallel=3 # Run 3 tasks simultaneously
)
# Define tasks
tasks = [
ParallelTask(
agent_name="implementer",
task="implement user service",
task_id="task_1"
),
ParallelTask(
agent_name="implementer",
task="implement payment service",
task_id="task_2"
),
ParallelTask(
agent_name="implementer",
task="implement notification service",
task_id="task_3"
)
]
# Execute in parallel
results = await executor.run_parallel(tasks)
for result in results:
print(f"Task {result.task_id}: {result.status}")
Custom MCP Configuration¶
executor = ClaudeSDKExecutor(
project_root=Path.cwd(),
agent_name="orchestrator",
use_sdk_mcp=True,
mcp_config={
"kb": {
"kb_dir": "custom_kb_path",
"max_results": 20
},
"github": {
"token": os.getenv("GITHUB_TOKEN")
}
}
)
Verification Pipeline¶
Run quality gates after execution:
from src.orchestrator.verification import VerificationPipeline
# Execute task
result = await executor.run_task("implement feature")
# Run verification
pipeline = VerificationPipeline(project_root=Path.cwd())
verification = await pipeline.run_all()
if verification.passed:
print("✅ All quality gates passed")
print(f"Tests: {verification.test_results}")
print(f"Coverage: {verification.coverage}%")
else:
print("❌ Quality gates failed")
print(f"Failures: {verification.failures}")
Configuration Options¶
Executor Parameters¶
| Parameter | Type | Default | Description |
|---|---|---|---|
project_root |
Path |
Path.cwd() |
Project root directory |
agent_name |
str |
"orchestrator" |
Agent to use |
use_sdk_mcp |
bool |
True |
Enable SDK MCP servers |
session_id |
str \| None |
None |
Reuse existing session |
mcp_config |
dict \| None |
None |
Custom MCP configuration |
timeout |
int |
300 |
Task timeout in seconds |
Agent Names¶
Available agents:
orchestrator- Multi-agent coordinationarchitect- Architecture decisionsimplementer- Feature implementationcode-reviewer- Security and correctness reviewtester- Test execution and validationdebugger- Error analysis and debugging
Performance Metrics¶
Session Continuation Impact¶
| Transition | Without Session | With Session | Improvement |
|---|---|---|---|
| orchestrator → implementer | 680ms | 45ms | 93% |
| implementer → code-reviewer | 280ms | 15ms | 95% |
| code-reviewer → tester | 280ms | 12ms | 96% |
SDK MCP Impact¶
| Operation | stdio MCP | SDK MCP | Speedup |
|---|---|---|---|
| KB search | 850ms | 8ms | 106x |
| GitHub API | 450ms | 45ms | 10x |
| Memory store | 120ms | 2ms | 60x |
Error Handling¶
Retry Logic¶
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10)
)
async def run_with_retry():
return await executor.run_task("task")
Circuit Breaker¶
from src.orchestrator.claude_sdk_executor import CircuitBreakerOpenError
try:
result = await executor.run_task("task")
except CircuitBreakerOpenError:
print("⚠️ Circuit breaker open - too many failures")
# Wait for cooldown or escalate
Rate Limiting¶
from src.orchestrator.claude_sdk_executor import RateLimitError
try:
result = await executor.run_task("task")
except RateLimitError:
print("⚠️ Rate limit exceeded - backing off")
await asyncio.sleep(60)
# Retry after backoff
Best Practices¶
1. Always Use SDK MCP¶
2. Reuse Sessions¶
3. Track Costs¶
4. Handle Errors¶
5. Verify Results¶
Examples¶
Example 1: Feature Implementation¶
from pathlib import Path
from src.orchestrator.claude_sdk_executor import ClaudeSDKExecutor
from src.orchestrator.cost_tracker import track_agent_call
async def implement_feature():
executor = ClaudeSDKExecutor(
project_root=Path.cwd(),
agent_name="orchestrator",
use_sdk_mcp=True
)
async with track_agent_call(agent_name="orchestrator", task="Auth"):
result = await executor.run_task(
"implement user authentication with JWT, "
"refresh tokens, and email validation"
)
print(f"Status: {result.status}")
print(f"Files: {result.files_modified}")
print(f"Tests: {result.test_results}")
return result
Example 2: Code Review Workflow¶
async def review_workflow():
# Implement
impl = ClaudeSDKExecutor(agent_name="implementer")
impl_result = await impl.run_task("implement feature X")
# Review
reviewer = ClaudeSDKExecutor(
agent_name="code-reviewer",
session_id=impl_result.session_id
)
review_result = await reviewer.run_task(
f"review {impl_result.files_modified} for security"
)
if review_result.status == "APPROVED":
# Test
tester = ClaudeSDKExecutor(
agent_name="tester",
session_id=review_result.session_id
)
test_result = await tester.run_task("run full test suite")
return test_result
else:
print(f"Review failed: {review_result.issues}")
return None
Example 3: Parallel Implementation¶
from src.orchestrator.parallel_executor import ParallelAgentExecutor, ParallelTask
async def parallel_implementation():
executor = ParallelAgentExecutor(max_parallel=3)
tasks = [
ParallelTask("implementer", "implement user service", "task_1"),
ParallelTask("implementer", "implement payment service", "task_2"),
ParallelTask("implementer", "implement notification service", "task_3")
]
results = await executor.run_parallel(tasks)
for result in results:
print(f"{result.task_id}: {result.status}")
return results
Next Steps¶
-
Session Manager
Learn about session continuation
-
Cost Tracker
Monitor and optimize costs
-
Knowledge Base
KB search and retrieval