Analyze agent sessions against verifier checklists, detect friction points, and create structured verifiers from skills and docs. Produces per-session verdicts and aggregated quality reports.
88
86%
Does it follow best practices?
Impact
97%
2.93xAverage score across 3 eval scenarios
Passed
No known issues
#!/usr/bin/env python3
"""
Collect coding agent logs for a project.
Copies logs from claude-code, codex, gemini, cursor-ide, and cursor-agent
to .tessl/logs/<project>/raw/<agent-name>/.
Only adds new files or updates changed files.
Security: This script only copies files from the user's own local agent log
directories. Collected logs are passed through secret redaction in
normalize_logs.py before any further processing. Logs may contain untrusted
content (tool outputs, web page text) — downstream stages treat all log
content as untrusted data, not instructions.
"""
from __future__ import annotations
import argparse
import hashlib
import json
import os
import shutil
import sqlite3
import sys
from pathlib import Path
from urllib.parse import unquote, urlparse
def get_file_hash(path: Path) -> str:
"""Get MD5 hash of file contents."""
with open(path, "rb") as f:
return hashlib.md5(f.read()).hexdigest()
def normalize_path_for_claude(cwd: str) -> str:
"""Convert path to claude-code project directory format.
Claude Code replaces both ``/`` and ``_`` with ``-`` when deriving
the project directory name from the working directory path.
"""
return cwd.replace("/", "-").replace("_", "-")
def get_gemini_project_hash(cwd: str) -> str:
"""Get SHA256 hash of cwd for Gemini project directory."""
return hashlib.sha256(cwd.encode()).hexdigest()
def _copy_if_changed(src: Path, dest: Path, stats: dict, dry_run: bool) -> None:
"""Copy src to dest if new or changed. Updates stats dict."""
stats["found"] += 1
if dest.exists():
if get_file_hash(src) == get_file_hash(dest):
stats["skipped"] += 1
return
if not dry_run:
shutil.copy2(src, dest)
stats["updated"] += 1
else:
if not dry_run:
shutil.copy2(src, dest)
stats["copied"] += 1
def collect_claude_logs(cwd: str, dest_dir: Path, dry_run: bool = False) -> dict:
"""Collect logs from claude-code."""
claude_projects = Path.home() / ".claude" / "projects"
stats = {"agent": "claude-code", "found": 0, "copied": 0, "updated": 0, "skipped": 0}
project_dir = claude_projects / normalize_path_for_claude(cwd)
if not project_dir.exists():
return stats
dest = dest_dir / "claude-code"
if not dry_run:
dest.mkdir(parents=True, exist_ok=True)
for log_file in project_dir.glob("*.jsonl"):
_copy_if_changed(log_file, dest / log_file.name, stats, dry_run)
return stats
def extract_cwd_from_codex_log(log_path: Path) -> str | None:
"""Extract cwd from a codex log file.
Supports two formats:
- New (codex_cli_rs): ``session_meta`` event with ``payload.cwd``
- Legacy: ``<cwd>…</cwd>`` tag inside a user message ``input_text``
"""
import re
try:
with open(log_path, "r") as f:
for line in f:
try:
data = json.loads(line)
# New format: session_meta with payload.cwd
if data.get("type") == "session_meta":
payload = data.get("payload", {})
cwd = payload.get("cwd")
if cwd:
return cwd
# Legacy format: <cwd> tag in user message
payload = data.get("payload", data)
if payload.get("type") == "message" and payload.get("role") == "user":
for item in payload.get("content", []):
if isinstance(item, dict) and item.get("type") == "input_text":
match = re.search(r"<cwd>([^<]+)</cwd>", item.get("text", ""))
if match:
return match.group(1)
except json.JSONDecodeError:
continue
except Exception:
pass
return None
def collect_codex_logs(cwd: str, dest_dir: Path, dry_run: bool = False) -> dict:
"""Collect logs from codex."""
codex_sessions = Path.home() / ".codex" / "sessions"
stats = {"agent": "codex", "found": 0, "copied": 0, "updated": 0, "skipped": 0}
if not codex_sessions.exists():
return stats
dest = dest_dir / "codex"
if not dry_run:
dest.mkdir(parents=True, exist_ok=True)
for log_file in codex_sessions.rglob("*.jsonl"):
if extract_cwd_from_codex_log(log_file) != cwd:
continue
_copy_if_changed(log_file, dest / log_file.name, stats, dry_run)
return stats
def collect_gemini_logs(cwd: str, dest_dir: Path, dry_run: bool = False) -> dict:
"""Collect logs from gemini."""
gemini_tmp = Path.home() / ".gemini" / "tmp"
stats = {"agent": "gemini", "found": 0, "copied": 0, "updated": 0, "skipped": 0}
project_hash = get_gemini_project_hash(cwd)
dest = dest_dir / "gemini"
# Collect from all directories that may contain sessions for this project:
# 1. The project-hash directory (primary)
# 2. The generic "gemini" fallback directory (sessions matched by projectHash field)
dirs_to_check = []
project_dir = gemini_tmp / project_hash
if project_dir.exists():
dirs_to_check.append(("project", project_dir))
generic_dir = gemini_tmp / "gemini"
if generic_dir.exists() and generic_dir != project_dir:
dirs_to_check.append(("generic", generic_dir))
if not dirs_to_check:
return stats
if not dry_run:
dest.mkdir(parents=True, exist_ok=True)
seen_files: set[str] = set()
for source_type, source_dir in dirs_to_check:
# logs.json (only from project dir)
if source_type == "project":
logs_file = source_dir / "logs.json"
if logs_file.exists():
_copy_if_changed(logs_file, dest / "logs.json", stats, dry_run)
# chats/*.json
chats_dir = source_dir / "chats"
if chats_dir.exists():
for chat_file in chats_dir.glob("*.json"):
if chat_file.name in seen_files:
continue
# For generic dir, filter by projectHash
if source_type == "generic":
try:
with open(chat_file) as f:
data = json.load(f)
if data.get("projectHash") != project_hash:
continue
except (json.JSONDecodeError, OSError):
continue
seen_files.add(chat_file.name)
_copy_if_changed(chat_file, dest / chat_file.name, stats, dry_run)
return stats
# --- Cursor helpers ---
def _cursor_home() -> Path:
return Path.home() / ".cursor"
def _load_json_file(path: Path) -> dict | list | None:
if not path.exists():
return None
try:
return json.loads(path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
return None
def _load_json_from_db(raw: bytes | str | None) -> dict | list | None:
if raw is None:
return None
text = raw.decode("utf-8", errors="ignore") if isinstance(raw, bytes) else str(raw)
try:
return json.loads(text)
except json.JSONDecodeError:
return None
# --- Cursor Agent ---
def _extract_cursor_agent_session(store_db: Path) -> dict | None:
"""Extract raw data from cursor-agent session store.db."""
if not store_db.exists():
return None
try:
with sqlite3.connect(f"file:{store_db}?mode=ro", uri=True) as conn:
meta_row = conn.execute("SELECT value FROM meta WHERE key = '0'").fetchone()
if not meta_row:
return None
meta_json = _load_json_from_db(bytes.fromhex(meta_row[0]))
if not isinstance(meta_json, dict):
return None
blobs = []
for blob_id, blob_data in conn.execute("SELECT id, data FROM blobs"):
entry = {"blob_id": blob_id, "size": len(blob_data) if blob_data else 0}
if blob_data and blob_data[0:1] == b'{':
parsed = _load_json_from_db(blob_data)
if parsed:
entry["format"] = "json"
entry["raw"] = parsed
else:
entry["format"] = "binary"
else:
entry["format"] = "binary"
blobs.append(entry)
return {"meta_raw": meta_json, "blobs": blobs}
except sqlite3.Error:
return None
def _write_cursor_agent_jsonl(dest_file: Path, session_id: str, workspace_path: str,
workspace_hash: str, session: dict) -> None:
with dest_file.open("w", encoding="utf-8") as f:
f.write(json.dumps({"type": "metadata", "session_id": session_id,
"workspace_path": workspace_path, "workspace_hash": workspace_hash,
"meta_raw": session["meta_raw"]}, ensure_ascii=False) + "\n")
for blob in session["blobs"]:
f.write(json.dumps({"type": "blob", **blob}, ensure_ascii=False) + "\n")
def collect_cursor_agent_logs(cwd: str, dest_dir: Path, dry_run: bool = False) -> dict:
"""Collect logs from cursor-agent."""
stats = {"agent": "cursor-agent", "found": 0, "copied": 0, "updated": 0, "skipped": 0}
chats_dir = _cursor_home() / "chats"
if not chats_dir.exists():
return stats
cwd_hash = hashlib.md5(cwd.encode()).hexdigest()
workspace_dir = chats_dir / cwd_hash
if not workspace_dir.is_dir():
return stats
session_dirs = [d for d in workspace_dir.iterdir() if d.is_dir() and (d / "store.db").exists()]
if not session_dirs:
return stats
dest = dest_dir / "cursor-agent"
if not dry_run:
dest.mkdir(parents=True, exist_ok=True)
for session_dir in sorted(session_dirs):
session_data = _extract_cursor_agent_session(session_dir / "store.db")
if not session_data:
continue
stats["found"] += 1
dest_file = dest / f"{session_dir.name}.jsonl"
if dest_file.exists():
import tempfile
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as tmp:
tmp_path = Path(tmp.name)
_write_cursor_agent_jsonl(tmp_path, session_dir.name, cwd, cwd_hash, session_data)
same = get_file_hash(tmp_path) == get_file_hash(dest_file)
tmp_path.unlink()
if same:
stats["skipped"] += 1
continue
if not dry_run:
_write_cursor_agent_jsonl(dest_file, session_dir.name, cwd, cwd_hash, session_data)
stats["updated"] += 1
else:
if not dry_run:
_write_cursor_agent_jsonl(dest_file, session_dir.name, cwd, cwd_hash, session_data)
stats["copied"] += 1
return stats
# --- Cursor IDE ---
def _parse_workspace_folder(raw: str | None) -> Path | None:
if not raw:
return None
parsed = urlparse(raw)
path = parsed.path or raw
if path.startswith("//"):
path = path[2:]
return Path(unquote(path)).expanduser().resolve(strict=False)
def _find_cursor_workspaces(workspace_storage: Path, cwd: str) -> list[tuple[Path, Path]]:
matches = []
if not workspace_storage.is_dir():
return matches
cwd_path = Path(cwd).resolve()
for ws_dir in workspace_storage.iterdir():
if not ws_dir.is_dir():
continue
workspace_json = ws_dir / "workspace.json"
if not workspace_json.exists():
continue
try:
data = json.loads(workspace_json.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
continue
folder_path = _parse_workspace_folder(data.get("folder"))
if not folder_path:
continue
# Exact match only — don't collect parent or child workspace sessions
if folder_path == cwd_path:
matches.append((ws_dir, folder_path))
return matches
def _collect_composer_ids(workspace_db: Path) -> set[str]:
ids: set[str] = set()
if not workspace_db.exists():
return ids
try:
with sqlite3.connect(f"file:{workspace_db}?mode=ro", uri=True) as conn:
row = conn.execute("SELECT value FROM ItemTable WHERE key = ?",
("composer.composerData",)).fetchone()
except sqlite3.Error:
return ids
if not row:
return ids
payload = _load_json_from_db(row[0])
if not payload:
return ids
stack = [payload]
while stack:
current = stack.pop()
if isinstance(current, dict):
if cid := current.get("composerId"):
if isinstance(cid, str):
ids.add(cid)
stack.extend(current.values())
elif isinstance(current, list):
stack.extend(current)
return ids
def _extract_cursor_session(composer_id: str, cursor: sqlite3.Cursor) -> dict | None:
row = cursor.execute("SELECT value FROM cursorDiskKV WHERE key = ?",
(f"composerData:{composer_id}",)).fetchone()
if not row:
return None
composer_data = _load_json_from_db(row[0])
if not isinstance(composer_data, dict):
return None
bubble_refs = composer_data.get("bubbles") or composer_data.get("fullConversationHeadersOnly")
if not isinstance(bubble_refs, list) or not bubble_refs:
return None
bubble_ids = []
for ref in bubble_refs:
if isinstance(ref, dict):
bid = ref.get("id") or ref.get("bubbleId")
if bid:
bubble_ids.append(bid)
if not bubble_ids:
return None
raw_bubbles = []
for bid in bubble_ids:
brow = cursor.execute("SELECT value FROM cursorDiskKV WHERE key = ?",
(f"bubbleId:{composer_id}:{bid}",)).fetchone()
if brow:
bdata = _load_json_from_db(brow[0])
if bdata:
raw_bubbles.append({"bubble_id": bid, "raw": bdata})
return {"composer_id": composer_id, "composer_data_raw": composer_data, "bubbles_raw": raw_bubbles}
def _find_cursor_transcript(cwd: str, composer_id: str) -> Path | None:
"""Find an agent-transcript JSONL for a given composer session.
Newer Cursor versions write flat transcripts to
~/.cursor/projects/<slug>/agent-transcripts/<id>/<id>.jsonl.
"""
slug = cwd.lstrip("/").replace("/", "-")
transcript = _cursor_home() / "projects" / slug / "agent-transcripts" / composer_id / f"{composer_id}.jsonl"
if transcript.exists() and transcript.stat().st_size >= 50:
return transcript
return None
def _write_cursor_session_jsonl(dest_file: Path, composer_id: str, folder_path: Path,
ws_dir: Path, session: dict,
transcript_path: Path | None = None) -> None:
with dest_file.open("w", encoding="utf-8") as f:
f.write(json.dumps({"type": "metadata", "composer_id": composer_id,
"workspace_path": str(folder_path), "workspace_hash": ws_dir.name,
"composer_data_raw": session["composer_data_raw"]},
ensure_ascii=False) + "\n")
for bubble in session["bubbles_raw"]:
f.write(json.dumps({"type": "bubble", **bubble}, ensure_ascii=False) + "\n")
# Append transcript lines if available
if transcript_path:
for line in transcript_path.read_text(encoding="utf-8", errors="replace").splitlines():
line = line.strip()
if not line:
continue
try:
payload = json.loads(line)
if isinstance(payload, dict):
payload["type"] = "transcript"
f.write(json.dumps(payload, ensure_ascii=False) + "\n")
except json.JSONDecodeError:
continue
def collect_cursor_logs(cwd: str, dest_dir: Path, dry_run: bool = False) -> dict:
"""Collect logs from cursor-ide."""
base = Path.home() / "Library" / "Application Support" / "Cursor"
workspace_storage = base / "User" / "workspaceStorage"
global_db = base / "User" / "globalStorage" / "state.vscdb"
stats = {"agent": "cursor-ide", "found": 0, "copied": 0, "updated": 0, "skipped": 0}
if not global_db.exists():
return stats
matches = _find_cursor_workspaces(workspace_storage, cwd)
if not matches:
return stats
dest = dest_dir / "cursor-ide"
if not dry_run:
dest.mkdir(parents=True, exist_ok=True)
# Track which sessions we found via cursorDiskKV (to detect transcript-only later)
seen_ids: set[str] = set()
try:
with sqlite3.connect(f"file:{global_db}?mode=ro", uri=True) as global_conn:
cursor = global_conn.cursor()
for ws_dir, folder_path in matches:
composer_ids = _collect_composer_ids(ws_dir / "state.vscdb")
for composer_id in sorted(composer_ids):
session = _extract_cursor_session(composer_id, cursor)
if not session:
continue
seen_ids.add(composer_id)
stats["found"] += 1
dest_file = dest / f"{composer_id}.jsonl"
transcript = _find_cursor_transcript(cwd, composer_id)
if dest_file.exists():
import tempfile
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as tmp:
tmp_path = Path(tmp.name)
_write_cursor_session_jsonl(tmp_path, composer_id, folder_path, ws_dir,
session, transcript)
same = get_file_hash(tmp_path) == get_file_hash(dest_file)
tmp_path.unlink()
if same:
stats["skipped"] += 1
continue
if not dry_run:
_write_cursor_session_jsonl(dest_file, composer_id, folder_path, ws_dir,
session, transcript)
stats["updated"] += 1
else:
if not dry_run:
_write_cursor_session_jsonl(dest_file, composer_id, folder_path, ws_dir,
session, transcript)
stats["copied"] += 1
except sqlite3.Error:
pass
# Collect transcript-only sessions (no cursorDiskKV data)
slug = cwd.lstrip("/").replace("/", "-")
transcripts_dir = _cursor_home() / "projects" / slug / "agent-transcripts"
if transcripts_dir.is_dir():
for session_dir in sorted(transcripts_dir.iterdir()):
if not session_dir.is_dir():
continue
composer_id = session_dir.name
if composer_id in seen_ids:
continue
jsonl_file = session_dir / f"{composer_id}.jsonl"
if not jsonl_file.exists() or jsonl_file.stat().st_size < 50:
continue
stats["found"] += 1
dest_file = dest / f"{composer_id}.jsonl"
# Write transcript-only file (metadata + transcript lines, no bubbles)
if not dry_run:
with dest_file.open("w", encoding="utf-8") as f:
f.write(json.dumps({"type": "metadata", "composer_id": composer_id,
"workspace_path": cwd},
ensure_ascii=False) + "\n")
for line in jsonl_file.read_text(encoding="utf-8", errors="replace").splitlines():
line = line.strip()
if not line:
continue
try:
payload = json.loads(line)
if isinstance(payload, dict):
payload["type"] = "transcript"
f.write(json.dumps(payload, ensure_ascii=False) + "\n")
except json.JSONDecodeError:
continue
if dest_file.exists():
stats["copied"] += 1
else:
stats["copied"] += 1
return stats
# --- Main ---
AGENTS = ["claude-code", "codex", "gemini", "cursor-ide", "cursor-agent"]
COLLECTORS = {
"claude-code": collect_claude_logs,
"codex": collect_codex_logs,
"gemini": collect_gemini_logs,
"cursor-ide": collect_cursor_logs,
"cursor-agent": collect_cursor_agent_logs,
}
def _project_name(cwd: str) -> str:
"""Derive a project name from a directory path (uses basename)."""
return Path(cwd).name
def _resolve_raw_root(cwd: str, project: str | None, analysis_dir: str | None = None) -> Path:
project_name = project or _project_name(cwd)
base = Path(analysis_dir) if analysis_dir else Path(cwd) / ".tessl" / "logs"
return base / project_name / "raw"
def main():
parser = argparse.ArgumentParser(description="Collect coding agent logs for a project")
parser.add_argument("--cwd", default=os.getcwd(), help="Project directory (default: cwd)")
parser.add_argument("--project", default=None,
help="Project name (default: basename of --cwd)")
parser.add_argument("--analysis-dir", default=None,
help="Output directory for analysis data (default: <cwd>/.tessl/logs)")
parser.add_argument("--dry-run", action="store_true", help="Show what would be copied")
parser.add_argument("--agents", nargs="+", choices=AGENTS, default=AGENTS,
help="Which agents to collect from (default: all)")
args = parser.parse_args()
cwd = os.path.realpath(args.cwd)
project = args.project or _project_name(cwd)
dest_dir = _resolve_raw_root(cwd, args.project, args.analysis_dir)
print(f"Collecting logs for: {cwd}")
print(f"Project: {project}")
print(f"Destination: {dest_dir}")
if args.dry_run:
print("(dry run)")
print()
results = []
for agent in args.agents:
stats = COLLECTORS[agent](cwd, dest_dir, args.dry_run)
results.append(stats)
print(f"{agent}: {stats['found']} found, {stats['copied']} new, "
f"{stats['updated']} updated, {stats['skipped']} unchanged")
total_new = sum(r["copied"] for r in results)
total_upd = sum(r["updated"] for r in results)
if total_new + total_upd > 0:
print(f"\nTotal: {total_new} new, {total_upd} updated")
else:
print("\nNo new or updated logs found.")
if __name__ == "__main__":
main()