The SessionManager implements lightweight session continuation following Anthropic's guidance for 88-95% overhead reduction through compressed session state.
Manages SDK client sessions with Anthropic-recommended patterns.
Responsibilities:
1. Create and retrieve sessions with lightweight state
2. Track session metadata and metrics
3. Support session forking for exploration
4. Cleanup resources (files, state) on deletion
5. Serialize/deserialize session state
Architecture
SessionManager
├── Session storage (in-memory, backed by JSON)
├── State compression (aggressive pruning)
├── Fork support (creates child sessions)
└── Cleanup (removes old sessions)
Token Limits:
- Session pruned when: total_tokens > 150,000
- Decision limit: max 10 decisions before pruning
- Cost impact: 30-50% reduction via pruning
Initialize SessionManager.
project_root: Root directory of the project
session_dir: Custom session storage directory (default: .nova/sessions)
max_age_hours: Delete sessions older than this (default 24 hours)
max_concurrent_sessions: Maximum concurrent sessions before cleanup
auto_cleanup: Enable automatic background cleanup (default True)
RuntimeError: If sessions_dir creation fails
Source code in src/orchestrator/session_manager.py
This is the primary entry point for session management. It handles:
1. Retrieving existing session (with optional reload from disk)
2. Creating new session with UUID
3. Loading from persistent storage if available
session_id: Optional existing session ID to resume. If None, creates new.
model: Claude model for this session (default: claude-sonnet-4-5-20250929)
tags: Optional tags for session filtering (e.g., {"feature", "pr"})
agent_name: Name of agent using session (for backward compatibility)
Session ID (string UUID v4 or custom ID)
FileNotFoundError: If specified session_id doesn't exist
ValueError: If session_id format invalid
Source code in src/orchestrator/session_manager.py
asyncdefget_or_create_session(self,session_id:str|None=None,model:str|None=None,tags:set[str]|None=None,agent_name:str|None=None,)->str:"""Get existing session or create new one. This is the primary entry point for session management. It handles: 1. Retrieving existing session (with optional reload from disk) 2. Creating new session with UUID 3. Loading from persistent storage if available Args: ---- session_id: Optional existing session ID to resume. If None, creates new. model: Claude model for this session (default: claude-sonnet-4-5-20250929) tags: Optional tags for session filtering (e.g., {"feature", "pr"}) agent_name: Name of agent using session (for backward compatibility) Returns: ------- Session ID (string UUID v4 or custom ID) Raises: ------ FileNotFoundError: If specified session_id doesn't exist ValueError: If session_id format invalid """# BLOCKER #3: Validate session_id BEFORE any processing# Must check "is not None" to catch empty stringsifsession_idisnotNone:self._validate_session_id_format(session_id)# Case 1: Resume existing sessionifsession_id:ifnotawaitself._session_exists(session_id):withself._lock:# BLOCKER #1: Thread-safe accessavailable=list(self.active_sessions.keys())raiseFileNotFoundError(f"Session '{session_id}' not found. "f"Available: {available}",)# Load from cache or diskwithself._lock:# BLOCKER #1: Thread-safe accessexists_in_cache=session_idinself.active_sessionsifnotexists_in_cache:awaitself._load_session_from_disk(session_id)logger.info(f"Resumed session: {session_id}")returnsession_id# Case 2: Create new sessionnew_session_id=str(uuid.uuid4())now=datetime.utcnow().isoformat()state=SessionState(session_id=new_session_id,created_at=now,last_updated=now,model=modelor"claude-sonnet-4-5-20250929",agent_name=agent_name,tags=tagsorset(),)# Store in cachewithself._lock:# BLOCKER #1: Thread-safe accessself.active_sessions[new_session_id]=stateself.session_configs[new_session_id]={"created_at":now,"tags":list(tagsorset()),}# Persist to diskawaitself._save_session_to_disk(new_session_id,state)logger.info(f"Created new session: {new_session_id}")returnnew_session_id
This prepares the session state for execution by the SDK client.
It handles state updates and context preparation.
session_id: Session ID to execute in
user_input: User's task description
track_decision: Whether to track this as a key decision
decision_context: Optional context for decision tracking
{"decision": "...", "rationale": "..."}
Dictionary with execution context:
{
"session_id": str,
"resume_from_session": str (for SDK resume parameter),
"context": str (session context for system prompt),
"state": SessionState
}
ValueError: If session_id invalid or session not found
Source code in src/orchestrator/session_manager.py
asyncdefrun_task(self,session_id:str,user_input:str,track_decision:bool=False,decision_context:dict[str,str]|None=None,)->dict[str,Any]:"""Execute task within session context. This prepares the session state for execution by the SDK client. It handles state updates and context preparation. Args: ---- session_id: Session ID to execute in user_input: User's task description track_decision: Whether to track this as a key decision decision_context: Optional context for decision tracking {"decision": "...", "rationale": "..."} Returns: ------- Dictionary with execution context: { "session_id": str, "resume_from_session": str (for SDK resume parameter), "context": str (session context for system prompt), "state": SessionState } Raises: ------ ValueError: If session_id invalid or session not found """withself._lock:# BLOCKER #1: Thread-safe accessifsession_idnotinself.active_sessions:raiseValueError(f"Session '{session_id}' not found")state=self.active_sessions[session_id]# Track decision if requestediftrack_decisionanddecision_context:state.add_decision(decision=decision_context.get("decision",""),rationale=decision_context.get("rationale",""),)# Increment turn counterstate.increment_turns(1)# Check if pruning neededifstate.should_prune():state.compress()awaitself._save_session_to_disk(session_id,state)logger.info(f"Session {session_id}: Compressed state after pruning")# Build context for SDK (minimal information)context_lines=[f"Session ID: {session_id}",f"Model: {state.model}",f"Turns: {state.turn_count}",]ifstate.current_plan:context_lines.append(f"Plan: {state.current_plan}")ifstate.key_decisions:recent_decision=state.key_decisions[-1]context_lines.append(f"Recent decision: {recent_decision.get('decision','')}",)context="\n".join(context_lines)# BLOCKER FIX #30: Log only metadata (never full state/messages/content)# Security: Session state may contain API keys, user data, conversation historylogger.debug(f"Session {session_id}: Executing task "f"(turn {state.turn_count}, decisions: {len(state.key_decisions)})")return{"session_id":session_id,"resume_from_session":session_id,# SDK's resume parameter"context":context,"state":state,"model":state.model,}
deffork_session(self,parent_session_id:str)->str:"""Create a new session forked from existing session. Forking is useful for: - Exploration without affecting original session - Trying alternative approaches - A/B testing different strategies The forked session: - Inherits parent's plan, decisions, artifacts - Has its own conversation history - Is independent after forking Args: ---- parent_session_id: Session to fork from Returns: ------- New session ID (child session) Raises: ------ ValueError: If parent_session_id not found """withself._lock:# BLOCKER #1: Thread-safe accessifparent_session_idnotinself.active_sessions:raiseValueError(f"Parent session '{parent_session_id}' not found")parent_state=self.active_sessions[parent_session_id]# Create child session with inherited statechild_session_id=str(uuid.uuid4())now=datetime.utcnow().isoformat()child_state=SessionState(session_id=child_session_id,created_at=now,last_updated=now,model=parent_state.model,agent_name=parent_state.agent_name,current_plan=parent_state.current_plan,key_decisions=parent_state.key_decisions.copy(),latest_artifacts=parent_state.latest_artifacts.copy(),tokens_used=parent_state.tokens_used,tags=parent_state.tags.copy(),fork_parent_id=parent_session_id,)# Store child sessionwithself._lock:# BLOCKER #1: Thread-safe accessself.active_sessions[child_session_id]=child_stateself.session_configs[child_session_id]={"created_at":now,"parent_session_id":parent_session_id,"tags":list(child_state.tags),}logger.info(f"Forked session: {child_session_id} from parent {parent_session_id}",)returnchild_session_id
asyncdefcleanup_session(self,session_id:str)->None:"""Clean up session resources. Removes: - In-memory session state - Persisted session files - Session metadata Args: ---- session_id: Session to delete Raises: ------ ValueError: If session not found """withself._lock:# BLOCKER #1: Thread-safe accessifsession_idnotinself.active_sessions:raiseValueError(f"Session '{session_id}' not found")# Remove from memoryself.active_sessions.pop(session_id,None)self.session_configs.pop(session_id,None)# Remove from disksession_file=self.session_dir/f"{session_id}.json"ifsession_file.exists():try:session_file.unlink()logger.info(f"Cleaned up session: {session_id}")exceptExceptionase:logger.error(f"Failed to delete session file: {e}")
defget_session_state(self,session_id:str)->SessionState:"""Get current session state (read-only access). Args: ---- session_id: Session to retrieve Returns: ------- SessionState object Raises: ------ ValueError: If session not found """withself._lock:# BLOCKER #1: Thread-safe accessifsession_idnotinself.active_sessions:raiseValueError(f"Session '{session_id}' not found")returnself.active_sessions[session_id]
Compressed session state following Anthropic's guidance.
This model stores ONLY essential information for session continuation:
- Session metadata (ID, creation time, model)
- Current plan summary (not full plan)
- Key decisions (most recent only, max 10)
- Latest artifacts (file paths only, not content)
Philosophy: Store pointers, not content. Content lives in files.
session_id: Unique session identifier (UUID v4 or custom)
created_at: Session creation timestamp (ISO 8601)
last_updated: Last modification timestamp (ISO 8601)
model: Claude model being used
agent_name: Name of agent using this session
current_plan: High-level plan summary (1-2 sentences max)
key_decisions: List of recent decisions with justification
latest_artifacts: File paths to important artifacts
tokens_used: Approximate token count for this session
turn_count: Number of conversation turns
tags: Session metadata tags for filtering
metrics: Session metrics (for backward compatibility)
fork_parent_id: Parent session ID if forked
fork_count: Number of forks from this session
pruned: Whether session has been pruned
last_pruned_at: Timestamp of last pruning
@validator("session_id")defvalidate_session_id(cls,v:str)->str:"""Validate session ID format and prevent path traversal. Security: Rejects session IDs with path separators or traversal attempts. Valid format: alphanumeric, hyphens, underscores only. """ifnotvorlen(v)<8:raiseValueError("Session ID must be at least 8 characters")# BLOCKER #3: Prevent path traversal attacksif"/"invor"\\"invor".."inv:raiseValueError(f"Invalid session_id '{v}': cannot contain path separators or '..' sequences")# Enforce safe character set (alphanumeric, hyphens, underscores)ifnotre.match(r"^[a-zA-Z0-9_-]+$",v):raiseValueError(f"Invalid session_id '{v}': must contain only alphanumeric, hyphens, underscores")returnv
defshould_prune(self,max_decisions:int=10,max_tokens:int=150_000)->bool:"""Check if session state needs pruning. Returns True if: - More than max_decisions stored - Tokens exceeded max_tokens threshold - Already pruned (returns False) Args: ---- max_decisions: Maximum number of decisions to retain (default 10) max_tokens: Maximum tokens before pruning (default 150K) Returns: ------- True if pruning recommended, False otherwise """ifself.pruned:returnFalsedecisions_overflow=len(self.key_decisions)>max_decisionstokens_overflow=self.tokens_used>max_tokensreturndecisions_overflowortokens_overflow
defcompress(self,max_decisions:int=10,max_tokens:int=150_000)->None:"""Aggressively compress state per Anthropic guidance. Operations: 1. Keep only most recent decisions (LIFO order) 2. Clear old artifacts if tokens exceeded 3. Reset decision count on pruning Args: ---- max_decisions: Maximum decisions to retain (default 10) max_tokens: Maximum tokens before resetting (default 150K) Side Effects: Modifies self.key_decisions, self.latest_artifacts, self.metrics """# Keep only most recent N decisions (LIFO - Last In First Out)iflen(self.key_decisions)>max_decisions:self.key_decisions=self.key_decisions[-max_decisions:]logger.info(f"Session {self.session_id}: Pruned decisions to {max_decisions}",)# Clear old artifacts if tokens exceededifself.tokens_used>max_tokens:self.latest_artifacts.clear()logger.warning(f"Session {self.session_id}: Tokens exceeded {max_tokens}, ""cleared artifacts",)# Reset metrics after pruningself.metrics.decision_count=0self.pruned=Trueself.last_pruned_at=datetime.utcnow().isoformat()logger.info(f"Session {self.session_id}: Compressed state after pruning "f"(tokens: {self.tokens_used})",)
decision: Decision statement (e.g., "Use async/await for I/O")
rationale: Why this decision was made
timestamp: ISO 8601 timestamp (auto-generated if not provided)
Source code in src/orchestrator/session_manager.py
defadd_decision(self,decision:str,rationale:str,timestamp:str|None=None,)->None:"""Add a decision to the session history. Automatically prunes to keep only most recent 10. Args: ---- decision: Decision statement (e.g., "Use async/await for I/O") rationale: Why this decision was made timestamp: ISO 8601 timestamp (auto-generated if not provided) """iftimestampisNone:timestamp=datetime.utcnow().isoformat()decision_record={"decision":decision,"rationale":rationale,"timestamp":timestamp,}self.key_decisions.append(decision_record)self.metrics.decision_count+=1# Auto-prune to max 10iflen(self.key_decisions)>10:self.key_decisions=self.key_decisions[-10:]
defupdate_tokens(self,tokens_added:int)->None:"""Update token counter. Args: ---- tokens_added: Number of tokens to add (should be positive) """self.tokens_used+=max(0,tokens_added)self.metrics.total_tokens+=max(0,tokens_added)
defincrement_turns(self,count:int=1)->None:"""Increment conversation turn count. Args: ---- count: Number of turns to add (default 1) """self.turn_count+=max(0,count)self.metrics.task_count+=1
defto_dict(self,include_tags:bool=True)->dict[str,Any]:"""Convert to dictionary for serialization. Args: ---- include_tags: Whether to include tags in output (default True) Returns: ------- Dictionary representation of session state """data=self.dict(exclude_none=True)ifnotinclude_tags:data.pop("tags",None)# Convert tags set to list for JSON serializationif"tags"indata:data["tags"]=list(data["tags"])# Convert metrics dataclass to dictif"metrics"indataandisinstance(data["metrics"],SessionMetrics):data["metrics"]=asdict(data["metrics"])returndata
@classmethoddeffrom_dict(cls,data:dict[str,Any])->SessionState:"""Create SessionState from dictionary. Args: ---- data: Dictionary with session state data Returns: ------- SessionState instance Raises: ------ ValueError: If required fields missing or invalid """# Convert tags list back to set if presentif"tags"indataandisinstance(data["tags"],list):data["tags"]=set(data["tags"])# Reconstruct metrics if presentif"metrics"indataandisinstance(data["metrics"],dict):data["metrics"]=SessionMetrics(**data["metrics"])returncls(**data)
frompathlibimportPathfromsrc.orchestrator.session_managerimportSessionManagermanager=SessionManager(project_root=Path.cwd())# Create sessionsession_id=awaitmanager.get_or_create_session(session_id=None)# Run task in sessionresult=awaitmanager.run_task(session_id=session_id,user_input="Implement user authentication")print(f"Session: {session_id}")print(f"Result: {result}")
# First taskmanager=SessionManager(project_root=Path.cwd())session_id=awaitmanager.get_or_create_session()result1=awaitmanager.run_task(session_id,"Implement registration")# Continue in same session (88-95% overhead reduction)result2=awaitmanager.run_task(session_id,"Add email verification")result3=awaitmanager.run_task(session_id,"Add password reset")
# Main sessionmain_session=awaitmanager.get_or_create_session()awaitmanager.run_task(main_session,"Implement feature")# Fork for explorationexploration=manager.fork_session(parent_session_id=main_session)awaitmanager.run_task(exploration,"Try alternative approach")# Original session unchanged