Skip to content

Session Manager API

API reference for session management and state continuation.

Overview

The SessionManager implements lightweight session continuation following Anthropic's guidance for 88-95% overhead reduction through compressed session state.

from src.orchestrator.session_manager import SessionManager, SessionState

manager = SessionManager(project_root=Path.cwd())
session_id = await manager.get_or_create_session()
result = await manager.run_task(session_id, "Your task here")

API Reference

SessionManager

SessionManager(project_root: Path | None = None, session_dir: Path | None = None, max_age_hours: int = 24, max_concurrent_sessions: int = 100, auto_cleanup: bool = True)

Manages SDK client sessions with Anthropic-recommended patterns.

Responsibilities: 1. Create and retrieve sessions with lightweight state 2. Track session metadata and metrics 3. Support session forking for exploration 4. Cleanup resources (files, state) on deletion 5. Serialize/deserialize session state

Architecture

SessionManager ├── Session storage (in-memory, backed by JSON) ├── State compression (aggressive pruning) ├── Fork support (creates child sessions) └── Cleanup (removes old sessions)

Token Limits: - Session pruned when: total_tokens > 150,000 - Decision limit: max 10 decisions before pruning - Cost impact: 30-50% reduction via pruning

Initialize SessionManager.


project_root: Root directory of the project
session_dir: Custom session storage directory (default: .nova/sessions)
max_age_hours: Delete sessions older than this (default 24 hours)
max_concurrent_sessions: Maximum concurrent sessions before cleanup
auto_cleanup: Enable automatic background cleanup (default True)

RuntimeError: If sessions_dir creation fails
Source code in src/orchestrator/session_manager.py
def __init__(
    self,
    project_root: Path | None = None,
    session_dir: Path | None = None,
    max_age_hours: int = 24,
    max_concurrent_sessions: int = 100,
    auto_cleanup: bool = True,
) -> None:
    """Initialize SessionManager.

    Args:
    ----
        project_root: Root directory of the project
        session_dir: Custom session storage directory (default: .nova/sessions)
        max_age_hours: Delete sessions older than this (default 24 hours)
        max_concurrent_sessions: Maximum concurrent sessions before cleanup
        auto_cleanup: Enable automatic background cleanup (default True)

    Raises:
    ------
        RuntimeError: If sessions_dir creation fails

    """
    self.project_root = project_root or Path.cwd()
    self.session_dir = session_dir or (self.project_root / self.DEFAULT_SESSION_DIR)
    self.max_age_hours = max_age_hours
    self.max_concurrent_sessions = max_concurrent_sessions
    self.auto_cleanup = auto_cleanup

    # Create session directory if needed
    try:
        self.session_dir.mkdir(parents=True, exist_ok=True)
        logger.info(f"SessionManager initialized at {self.session_dir}")
    except Exception as e:
        raise RuntimeError(f"Failed to create sessions directory: {e}")

    # BLOCKER #1: Add thread synchronization to prevent race conditions
    self._lock = threading.Lock()

    # In-memory session cache (for performance)
    self.active_sessions: dict[str, SessionState] = {}
    self.session_configs: dict[str, dict[str, Any]] = {}

    # BLOCKER #2: Start automatic cleanup task if enabled
    self._cleanup_task: threading.Thread | None = None
    self._shutdown_event = threading.Event()
    if self.auto_cleanup:
        self._start_cleanup_task()

get_or_create_session async

get_or_create_session(session_id: str | None = None, model: str | None = None, tags: set[str] | None = None, agent_name: str | None = None) -> str

Get existing session or create new one.

This is the primary entry point for session management. It handles: 1. Retrieving existing session (with optional reload from disk) 2. Creating new session with UUID 3. Loading from persistent storage if available


session_id: Optional existing session ID to resume. If None, creates new.
model: Claude model for this session (default: claude-sonnet-4-5-20250929)
tags: Optional tags for session filtering (e.g., {"feature", "pr"})
agent_name: Name of agent using session (for backward compatibility)

Session ID (string UUID v4 or custom ID)

FileNotFoundError: If specified session_id doesn't exist
ValueError: If session_id format invalid
Source code in src/orchestrator/session_manager.py
async def get_or_create_session(
    self,
    session_id: str | None = None,
    model: str | None = None,
    tags: set[str] | None = None,
    agent_name: str | None = None,
) -> str:
    """Get existing session or create new one.

    This is the primary entry point for session management. It handles:
    1. Retrieving existing session (with optional reload from disk)
    2. Creating new session with UUID
    3. Loading from persistent storage if available

    Args:
    ----
        session_id: Optional existing session ID to resume. If None, creates new.
        model: Claude model for this session (default: claude-sonnet-4-5-20250929)
        tags: Optional tags for session filtering (e.g., {"feature", "pr"})
        agent_name: Name of agent using session (for backward compatibility)

    Returns:
    -------
        Session ID (string UUID v4 or custom ID)

    Raises:
    ------
        FileNotFoundError: If specified session_id doesn't exist
        ValueError: If session_id format invalid

    """
    # BLOCKER #3: Validate session_id BEFORE any processing
    # Must check "is not None" to catch empty strings
    if session_id is not None:
        self._validate_session_id_format(session_id)

    # Case 1: Resume existing session
    if session_id:
        if not await self._session_exists(session_id):
            with self._lock:  # BLOCKER #1: Thread-safe access
                available = list(self.active_sessions.keys())
            raise FileNotFoundError(
                f"Session '{session_id}' not found. " f"Available: {available}",
            )
        # Load from cache or disk
        with self._lock:  # BLOCKER #1: Thread-safe access
            exists_in_cache = session_id in self.active_sessions

        if not exists_in_cache:
            await self._load_session_from_disk(session_id)
        logger.info(f"Resumed session: {session_id}")
        return session_id

    # Case 2: Create new session
    new_session_id = str(uuid.uuid4())
    now = datetime.utcnow().isoformat()

    state = SessionState(
        session_id=new_session_id,
        created_at=now,
        last_updated=now,
        model=model or "claude-sonnet-4-5-20250929",
        agent_name=agent_name,
        tags=tags or set(),
    )

    # Store in cache
    with self._lock:  # BLOCKER #1: Thread-safe access
        self.active_sessions[new_session_id] = state
        self.session_configs[new_session_id] = {
            "created_at": now,
            "tags": list(tags or set()),
        }

    # Persist to disk
    await self._save_session_to_disk(new_session_id, state)

    logger.info(f"Created new session: {new_session_id}")
    return new_session_id

run_task async

run_task(session_id: str, user_input: str, track_decision: bool = False, decision_context: dict[str, str] | None = None) -> dict[str, Any]

Execute task within session context.

This prepares the session state for execution by the SDK client. It handles state updates and context preparation.


session_id: Session ID to execute in
user_input: User's task description
track_decision: Whether to track this as a key decision
decision_context: Optional context for decision tracking
    {"decision": "...", "rationale": "..."}

Dictionary with execution context:
    {
        "session_id": str,
        "resume_from_session": str (for SDK resume parameter),
        "context": str (session context for system prompt),
        "state": SessionState
    }

ValueError: If session_id invalid or session not found
Source code in src/orchestrator/session_manager.py
async def run_task(
    self,
    session_id: str,
    user_input: str,
    track_decision: bool = False,
    decision_context: dict[str, str] | None = None,
) -> dict[str, Any]:
    """Execute task within session context.

    This prepares the session state for execution by the SDK client.
    It handles state updates and context preparation.

    Args:
    ----
        session_id: Session ID to execute in
        user_input: User's task description
        track_decision: Whether to track this as a key decision
        decision_context: Optional context for decision tracking
            {"decision": "...", "rationale": "..."}

    Returns:
    -------
        Dictionary with execution context:
            {
                "session_id": str,
                "resume_from_session": str (for SDK resume parameter),
                "context": str (session context for system prompt),
                "state": SessionState
            }

    Raises:
    ------
        ValueError: If session_id invalid or session not found

    """
    with self._lock:  # BLOCKER #1: Thread-safe access
        if session_id not in self.active_sessions:
            raise ValueError(f"Session '{session_id}' not found")

        state = self.active_sessions[session_id]

    # Track decision if requested
    if track_decision and decision_context:
        state.add_decision(
            decision=decision_context.get("decision", ""),
            rationale=decision_context.get("rationale", ""),
        )

    # Increment turn counter
    state.increment_turns(1)

    # Check if pruning needed
    if state.should_prune():
        state.compress()
        await self._save_session_to_disk(session_id, state)
        logger.info(f"Session {session_id}: Compressed state after pruning")

    # Build context for SDK (minimal information)
    context_lines = [
        f"Session ID: {session_id}",
        f"Model: {state.model}",
        f"Turns: {state.turn_count}",
    ]

    if state.current_plan:
        context_lines.append(f"Plan: {state.current_plan}")

    if state.key_decisions:
        recent_decision = state.key_decisions[-1]
        context_lines.append(
            f"Recent decision: {recent_decision.get('decision', '')}",
        )

    context = "\n".join(context_lines)

    # BLOCKER FIX #30: Log only metadata (never full state/messages/content)
    # Security: Session state may contain API keys, user data, conversation history
    logger.debug(
        f"Session {session_id}: Executing task "
        f"(turn {state.turn_count}, decisions: {len(state.key_decisions)})"
    )

    return {
        "session_id": session_id,
        "resume_from_session": session_id,  # SDK's resume parameter
        "context": context,
        "state": state,
        "model": state.model,
    }

fork_session

fork_session(parent_session_id: str) -> str

Create a new session forked from existing session.

Forking is useful for: - Exploration without affecting original session - Trying alternative approaches - A/B testing different strategies

The forked session: - Inherits parent's plan, decisions, artifacts - Has its own conversation history - Is independent after forking


parent_session_id: Session to fork from

New session ID (child session)

ValueError: If parent_session_id not found
Source code in src/orchestrator/session_manager.py
def fork_session(self, parent_session_id: str) -> str:
    """Create a new session forked from existing session.

    Forking is useful for:
    - Exploration without affecting original session
    - Trying alternative approaches
    - A/B testing different strategies

    The forked session:
    - Inherits parent's plan, decisions, artifacts
    - Has its own conversation history
    - Is independent after forking

    Args:
    ----
        parent_session_id: Session to fork from

    Returns:
    -------
        New session ID (child session)

    Raises:
    ------
        ValueError: If parent_session_id not found

    """
    with self._lock:  # BLOCKER #1: Thread-safe access
        if parent_session_id not in self.active_sessions:
            raise ValueError(f"Parent session '{parent_session_id}' not found")

        parent_state = self.active_sessions[parent_session_id]

    # Create child session with inherited state
    child_session_id = str(uuid.uuid4())
    now = datetime.utcnow().isoformat()

    child_state = SessionState(
        session_id=child_session_id,
        created_at=now,
        last_updated=now,
        model=parent_state.model,
        agent_name=parent_state.agent_name,
        current_plan=parent_state.current_plan,
        key_decisions=parent_state.key_decisions.copy(),
        latest_artifacts=parent_state.latest_artifacts.copy(),
        tokens_used=parent_state.tokens_used,
        tags=parent_state.tags.copy(),
        fork_parent_id=parent_session_id,
    )

    # Store child session
    with self._lock:  # BLOCKER #1: Thread-safe access
        self.active_sessions[child_session_id] = child_state
        self.session_configs[child_session_id] = {
            "created_at": now,
            "parent_session_id": parent_session_id,
            "tags": list(child_state.tags),
        }

    logger.info(
        f"Forked session: {child_session_id} from parent {parent_session_id}",
    )
    return child_session_id

cleanup_session async

cleanup_session(session_id: str) -> None

Clean up session resources.

Removes: - In-memory session state - Persisted session files - Session metadata


session_id: Session to delete

ValueError: If session not found
Source code in src/orchestrator/session_manager.py
async def cleanup_session(self, session_id: str) -> None:
    """Clean up session resources.

    Removes:
    - In-memory session state
    - Persisted session files
    - Session metadata

    Args:
    ----
        session_id: Session to delete

    Raises:
    ------
        ValueError: If session not found

    """
    with self._lock:  # BLOCKER #1: Thread-safe access
        if session_id not in self.active_sessions:
            raise ValueError(f"Session '{session_id}' not found")

        # Remove from memory
        self.active_sessions.pop(session_id, None)
        self.session_configs.pop(session_id, None)

    # Remove from disk
    session_file = self.session_dir / f"{session_id}.json"
    if session_file.exists():
        try:
            session_file.unlink()
            logger.info(f"Cleaned up session: {session_id}")
        except Exception as e:
            logger.error(f"Failed to delete session file: {e}")

get_session_state

get_session_state(session_id: str) -> SessionState

Get current session state (read-only access).


session_id: Session to retrieve

SessionState object

ValueError: If session not found
Source code in src/orchestrator/session_manager.py
def get_session_state(self, session_id: str) -> SessionState:
    """Get current session state (read-only access).

    Args:
    ----
        session_id: Session to retrieve

    Returns:
    -------
        SessionState object

    Raises:
    ------
        ValueError: If session not found

    """
    with self._lock:  # BLOCKER #1: Thread-safe access
        if session_id not in self.active_sessions:
            raise ValueError(f"Session '{session_id}' not found")

        return self.active_sessions[session_id]

SessionState Model

SessionState

Bases: BaseModel

Compressed session state following Anthropic's guidance.

This model stores ONLY essential information for session continuation: - Session metadata (ID, creation time, model) - Current plan summary (not full plan) - Key decisions (most recent only, max 10) - Latest artifacts (file paths only, not content)

Philosophy: Store pointers, not content. Content lives in files.

Attributes

session_id: Unique session identifier (UUID v4 or custom)
created_at: Session creation timestamp (ISO 8601)
last_updated: Last modification timestamp (ISO 8601)
model: Claude model being used
agent_name: Name of agent using this session
current_plan: High-level plan summary (1-2 sentences max)
key_decisions: List of recent decisions with justification
latest_artifacts: File paths to important artifacts
tokens_used: Approximate token count for this session
turn_count: Number of conversation turns
tags: Session metadata tags for filtering
metrics: Session metrics (for backward compatibility)
fork_parent_id: Parent session ID if forked
fork_count: Number of forks from this session
pruned: Whether session has been pruned
last_pruned_at: Timestamp of last pruning

Config

Pydantic configuration.

validate_session_id

validate_session_id(v: str) -> str

Validate session ID format and prevent path traversal.

Security: Rejects session IDs with path separators or traversal attempts. Valid format: alphanumeric, hyphens, underscores only.

Source code in src/orchestrator/session_manager.py
@validator("session_id")
def validate_session_id(cls, v: str) -> str:
    """Validate session ID format and prevent path traversal.

    Security: Rejects session IDs with path separators or traversal attempts.
    Valid format: alphanumeric, hyphens, underscores only.
    """
    if not v or len(v) < 8:
        raise ValueError("Session ID must be at least 8 characters")

    # BLOCKER #3: Prevent path traversal attacks
    if "/" in v or "\\" in v or ".." in v:
        raise ValueError(
            f"Invalid session_id '{v}': cannot contain path separators or '..' sequences"
        )

    # Enforce safe character set (alphanumeric, hyphens, underscores)
    if not re.match(r"^[a-zA-Z0-9_-]+$", v):
        raise ValueError(
            f"Invalid session_id '{v}': must contain only alphanumeric, hyphens, underscores"
        )

    return v

should_prune

should_prune(max_decisions: int = 10, max_tokens: int = 150000) -> bool

Check if session state needs pruning.

Returns True if: - More than max_decisions stored - Tokens exceeded max_tokens threshold - Already pruned (returns False)


max_decisions: Maximum number of decisions to retain (default 10)
max_tokens: Maximum tokens before pruning (default 150K)

True if pruning recommended, False otherwise
Source code in src/orchestrator/session_manager.py
def should_prune(self, max_decisions: int = 10, max_tokens: int = 150_000) -> bool:
    """Check if session state needs pruning.

    Returns True if:
    - More than max_decisions stored
    - Tokens exceeded max_tokens threshold
    - Already pruned (returns False)

    Args:
    ----
        max_decisions: Maximum number of decisions to retain (default 10)
        max_tokens: Maximum tokens before pruning (default 150K)

    Returns:
    -------
        True if pruning recommended, False otherwise

    """
    if self.pruned:
        return False

    decisions_overflow = len(self.key_decisions) > max_decisions
    tokens_overflow = self.tokens_used > max_tokens
    return decisions_overflow or tokens_overflow

compress

compress(max_decisions: int = 10, max_tokens: int = 150000) -> None

Aggressively compress state per Anthropic guidance.

Operations: 1. Keep only most recent decisions (LIFO order) 2. Clear old artifacts if tokens exceeded 3. Reset decision count on pruning


max_decisions: Maximum decisions to retain (default 10)
max_tokens: Maximum tokens before resetting (default 150K)
Side Effects

Modifies self.key_decisions, self.latest_artifacts, self.metrics

Source code in src/orchestrator/session_manager.py
def compress(self, max_decisions: int = 10, max_tokens: int = 150_000) -> None:
    """Aggressively compress state per Anthropic guidance.

    Operations:
    1. Keep only most recent decisions (LIFO order)
    2. Clear old artifacts if tokens exceeded
    3. Reset decision count on pruning

    Args:
    ----
        max_decisions: Maximum decisions to retain (default 10)
        max_tokens: Maximum tokens before resetting (default 150K)

    Side Effects:
        Modifies self.key_decisions, self.latest_artifacts, self.metrics

    """
    # Keep only most recent N decisions (LIFO - Last In First Out)
    if len(self.key_decisions) > max_decisions:
        self.key_decisions = self.key_decisions[-max_decisions:]
        logger.info(
            f"Session {self.session_id}: Pruned decisions to {max_decisions}",
        )

    # Clear old artifacts if tokens exceeded
    if self.tokens_used > max_tokens:
        self.latest_artifacts.clear()
        logger.warning(
            f"Session {self.session_id}: Tokens exceeded {max_tokens}, " "cleared artifacts",
        )

    # Reset metrics after pruning
    self.metrics.decision_count = 0
    self.pruned = True
    self.last_pruned_at = datetime.utcnow().isoformat()
    logger.info(
        f"Session {self.session_id}: Compressed state after pruning "
        f"(tokens: {self.tokens_used})",
    )

add_decision

add_decision(decision: str, rationale: str, timestamp: str | None = None) -> None

Add a decision to the session history.

Automatically prunes to keep only most recent 10.


decision: Decision statement (e.g., "Use async/await for I/O")
rationale: Why this decision was made
timestamp: ISO 8601 timestamp (auto-generated if not provided)
Source code in src/orchestrator/session_manager.py
def add_decision(
    self,
    decision: str,
    rationale: str,
    timestamp: str | None = None,
) -> None:
    """Add a decision to the session history.

    Automatically prunes to keep only most recent 10.

    Args:
    ----
        decision: Decision statement (e.g., "Use async/await for I/O")
        rationale: Why this decision was made
        timestamp: ISO 8601 timestamp (auto-generated if not provided)

    """
    if timestamp is None:
        timestamp = datetime.utcnow().isoformat()

    decision_record = {
        "decision": decision,
        "rationale": rationale,
        "timestamp": timestamp,
    }

    self.key_decisions.append(decision_record)
    self.metrics.decision_count += 1

    # Auto-prune to max 10
    if len(self.key_decisions) > 10:
        self.key_decisions = self.key_decisions[-10:]

add_artifact

add_artifact(artifact_name: str, artifact_path: str) -> None

Register an artifact (file path only).


artifact_name: Logical name of artifact (e.g., "feature-branch")
artifact_path: Absolute file path or ref
Source code in src/orchestrator/session_manager.py
def add_artifact(self, artifact_name: str, artifact_path: str) -> None:
    """Register an artifact (file path only).

    Args:
    ----
        artifact_name: Logical name of artifact (e.g., "feature-branch")
        artifact_path: Absolute file path or ref

    """
    self.latest_artifacts[artifact_name] = str(artifact_path)

update_tokens

update_tokens(tokens_added: int) -> None

Update token counter.


tokens_added: Number of tokens to add (should be positive)
Source code in src/orchestrator/session_manager.py
def update_tokens(self, tokens_added: int) -> None:
    """Update token counter.

    Args:
    ----
        tokens_added: Number of tokens to add (should be positive)

    """
    self.tokens_used += max(0, tokens_added)
    self.metrics.total_tokens += max(0, tokens_added)

increment_turns

increment_turns(count: int = 1) -> None

Increment conversation turn count.


count: Number of turns to add (default 1)
Source code in src/orchestrator/session_manager.py
def increment_turns(self, count: int = 1) -> None:
    """Increment conversation turn count.

    Args:
    ----
        count: Number of turns to add (default 1)

    """
    self.turn_count += max(0, count)
    self.metrics.task_count += 1

to_dict

to_dict(include_tags: bool = True) -> dict[str, Any]

Convert to dictionary for serialization.


include_tags: Whether to include tags in output (default True)

Dictionary representation of session state
Source code in src/orchestrator/session_manager.py
def to_dict(self, include_tags: bool = True) -> dict[str, Any]:
    """Convert to dictionary for serialization.

    Args:
    ----
        include_tags: Whether to include tags in output (default True)

    Returns:
    -------
        Dictionary representation of session state

    """
    data = self.dict(exclude_none=True)
    if not include_tags:
        data.pop("tags", None)
    # Convert tags set to list for JSON serialization
    if "tags" in data:
        data["tags"] = list(data["tags"])
    # Convert metrics dataclass to dict
    if "metrics" in data and isinstance(data["metrics"], SessionMetrics):
        data["metrics"] = asdict(data["metrics"])
    return data

from_dict classmethod

from_dict(data: dict[str, Any]) -> SessionState

Create SessionState from dictionary.


data: Dictionary with session state data

SessionState instance

ValueError: If required fields missing or invalid
Source code in src/orchestrator/session_manager.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> SessionState:
    """Create SessionState from dictionary.

    Args:
    ----
        data: Dictionary with session state data

    Returns:
    -------
        SessionState instance

    Raises:
    ------
        ValueError: If required fields missing or invalid

    """
    # Convert tags list back to set if present
    if "tags" in data and isinstance(data["tags"], list):
        data["tags"] = set(data["tags"])

    # Reconstruct metrics if present
    if "metrics" in data and isinstance(data["metrics"], dict):
        data["metrics"] = SessionMetrics(**data["metrics"])

    return cls(**data)

Usage Examples

Basic Session

from pathlib import Path
from src.orchestrator.session_manager import SessionManager

manager = SessionManager(project_root=Path.cwd())

# Create session
session_id = await manager.get_or_create_session(session_id=None)

# Run task in session
result = await manager.run_task(
    session_id=session_id,
    user_input="Implement user authentication"
)

print(f"Session: {session_id}")
print(f"Result: {result}")

Session Continuation

# First task
manager = SessionManager(project_root=Path.cwd())
session_id = await manager.get_or_create_session()
result1 = await manager.run_task(session_id, "Implement registration")

# Continue in same session (88-95% overhead reduction)
result2 = await manager.run_task(session_id, "Add email verification")
result3 = await manager.run_task(session_id, "Add password reset")

Session Forking

# Main session
main_session = await manager.get_or_create_session()
await manager.run_task(main_session, "Implement feature")

# Fork for exploration
exploration = manager.fork_session(parent_session_id=main_session)
await manager.run_task(exploration, "Try alternative approach")

# Original session unchanged

Session State

# Get session state
state = manager.get_session_state(session_id)
print(f"Agent: {state.agent_name}")
print(f"Decisions: {len(state.key_decisions)}")
print(f"Artifacts: {state.latest_artifacts}")

# Update session state
manager.update_session_state(
    session_id=session_id,
    updates={"plan_summary": "Updated plan"}
)

Best Practices

  1. Reuse Sessions - Continue related tasks in same session
  2. Fork for Exploration - Keep main session clean
  3. Clean Up - Call cleanup_session() when done
  4. Monitor Metrics - Track token usage per session

Next Steps

Orchestrator API Cost Tracker API