#!/usr/bin/env python3 """ Session Catchup Script for planning-with-files Session-agnostic scanning: finds the most recent planning file update across ALL sessions, then collects all conversation from that point forward through all subsequent sessions until now. Usage: python3 session-catchup.py [project-path] """ import json import sys import os from pathlib import Path from typing import List, Dict, Optional, Tuple PLANNING_FILES = ['task_plan.md', 'progress.md', 'findings.md'] def get_project_dir(project_path: str) -> Path: """Convert project path to Claude's storage path format.""" sanitized = project_path.replace('/', '-') if not sanitized.startswith('-'): sanitized = '-' + sanitized sanitized = sanitized.replace('_', '-') return Path.home() / '.claude' / 'projects' / sanitized def get_sessions_sorted(project_dir: Path) -> List[Path]: """Get all session files sorted by modification time (newest first).""" sessions = list(project_dir.glob('*.jsonl')) main_sessions = [s for s in sessions if not s.name.startswith('agent-')] return sorted(main_sessions, key=lambda p: p.stat().st_mtime, reverse=True) def get_session_first_timestamp(session_file: Path) -> Optional[str]: """Get the timestamp of the first message in a session.""" try: with open(session_file, 'r') as f: for line in f: try: data = json.loads(line) ts = data.get('timestamp') if ts: return ts except: continue except: pass return None def scan_for_planning_update(session_file: Path) -> Tuple[int, Optional[str]]: """ Quickly scan a session file for planning file updates. Returns (line_number, filename) of last update, or (-1, None) if none found. """ last_update_line = -1 last_update_file = None try: with open(session_file, 'r') as f: for line_num, line in enumerate(f): if '"Write"' not in line and '"Edit"' not in line: continue try: data = json.loads(line) if data.get('type') != 'assistant': continue content = data.get('message', {}).get('content', []) if not isinstance(content, list): continue for item in content: if item.get('type') != 'tool_use': continue tool_name = item.get('name', '') if tool_name not in ('Write', 'Edit'): continue file_path = item.get('input', {}).get('file_path', '') for pf in PLANNING_FILES: if file_path.endswith(pf): last_update_line = line_num last_update_file = pf break except json.JSONDecodeError: continue except Exception: pass return last_update_line, last_update_file def extract_messages_from_session(session_file: Path, after_line: int = -1) -> List[Dict]: """ Extract conversation messages from a session file. If after_line >= 0, only extract messages after that line. If after_line < 0, extract all messages. """ result = [] try: with open(session_file, 'r') as f: for line_num, line in enumerate(f): if after_line >= 0 and line_num <= after_line: continue try: msg = json.loads(line) except json.JSONDecodeError: continue msg_type = msg.get('type') is_meta = msg.get('isMeta', False) if msg_type == 'user' and not is_meta: content = msg.get('message', {}).get('content', '') if isinstance(content, list): for item in content: if isinstance(item, dict) and item.get('type') == 'text': content = item.get('text', '') break else: content = '' if content and isinstance(content, str): # Skip system/command messages if content.startswith((' 20: result.append({ 'role': 'user', 'content': content, 'line': line_num, 'session': session_file.stem[:8] }) elif msg_type == 'assistant': msg_content = msg.get('message', {}).get('content', '') text_content = '' tool_uses = [] if isinstance(msg_content, str): text_content = msg_content elif isinstance(msg_content, list): for item in msg_content: if item.get('type') == 'text': text_content = item.get('text', '') elif item.get('type') == 'tool_use': tool_name = item.get('name', '') tool_input = item.get('input', {}) if tool_name == 'Edit': tool_uses.append(f"Edit: {tool_input.get('file_path', 'unknown')}") elif tool_name == 'Write': tool_uses.append(f"Write: {tool_input.get('file_path', 'unknown')}") elif tool_name == 'Bash': cmd = tool_input.get('command', '')[:80] tool_uses.append(f"Bash: {cmd}") elif tool_name == 'AskUserQuestion': tool_uses.append("AskUserQuestion") else: tool_uses.append(f"{tool_name}") if text_content or tool_uses: result.append({ 'role': 'assistant', 'content': text_content[:600] if text_content else '', 'tools': tool_uses, 'line': line_num, 'session': session_file.stem[:8] }) except Exception: pass return result def main(): project_path = sys.argv[1] if len(sys.argv) > 1 else os.getcwd() project_dir = get_project_dir(project_path) if not project_dir.exists(): return sessions = get_sessions_sorted(project_dir) if len(sessions) < 2: return # Skip the current session (most recently modified = index 0) previous_sessions = sessions[1:] # Find the most recent planning file update across ALL previous sessions # Sessions are sorted newest first, so we scan in order update_session = None update_line = -1 update_file = None update_session_idx = -1 for idx, session in enumerate(previous_sessions): line, filename = scan_for_planning_update(session) if line >= 0: update_session = session update_line = line update_file = filename update_session_idx = idx break if not update_session: # No planning file updates found in any previous session return # Collect ALL messages from the update point forward, across all sessions all_messages = [] # 1. Get messages from the session with the update (after the update line) messages_from_update_session = extract_messages_from_session(update_session, after_line=update_line) all_messages.extend(messages_from_update_session) # 2. Get ALL messages from sessions between update_session and current # These are sessions[1:update_session_idx] (newer than update_session) intermediate_sessions = previous_sessions[:update_session_idx] # Process from oldest to newest for correct chronological order for session in reversed(intermediate_sessions): messages = extract_messages_from_session(session, after_line=-1) # Get all messages all_messages.extend(messages) if not all_messages: return # Output catchup report print("\n[planning-with-files] SESSION CATCHUP DETECTED") print(f"Last planning update: {update_file} in session {update_session.stem[:8]}...") sessions_covered = update_session_idx + 1 if sessions_covered > 1: print(f"Scanning {sessions_covered} sessions for unsynced context") print(f"Unsynced messages: {len(all_messages)}") print("\n--- UNSYNCED CONTEXT ---") # Show up to 100 messages MAX_MESSAGES = 100 if len(all_messages) > MAX_MESSAGES: print(f"(Showing last {MAX_MESSAGES} of {len(all_messages)} messages)\n") messages_to_show = all_messages[-MAX_MESSAGES:] else: messages_to_show = all_messages current_session = None for msg in messages_to_show: # Show session marker when it changes if msg.get('session') != current_session: current_session = msg.get('session') print(f"\n[Session: {current_session}...]") if msg['role'] == 'user': print(f"USER: {msg['content'][:300]}") else: if msg.get('content'): print(f"CLAUDE: {msg['content'][:300]}") if msg.get('tools'): print(f" Tools: {', '.join(msg['tools'][:4])}") print("\n--- RECOMMENDED ---") print("1. Run: git diff --stat") print("2. Read: task_plan.md, progress.md, findings.md") print("3. Update planning files based on above context") print("4. Continue with task") if __name__ == '__main__': main()