Collect and normalize agent logs, discover installed verifiers, and dispatch LLM judges to evaluate adherence. Produces per-session verdicts and aggregated reports.
91
90%
Does it follow best practices?
Impact
96%
3.09xAverage score across 3 eval scenarios
Passed
No known issues
#!/usr/bin/env python3
"""
Collect coding agent logs for a project.
Copies logs from claude-code, codex, gemini, cursor-ide, and cursor-agent
to .tessl/logs/<project>/raw/<agent-name>/.
Only adds new files or updates changed files.
Security: This script only copies files from the user's own local agent log
directories. Collected logs are passed through secret redaction in
normalize_logs.py before any further processing. Logs may contain untrusted
content (tool outputs, web page text) — downstream stages treat all log
content as untrusted data, not instructions.
"""
import argparse
import hashlib
import json
import os
import shutil
import sqlite3
import sys
from pathlib import Path
from urllib.parse import unquote, urlparse
def get_file_hash(path: Path) -> str:
"""Get MD5 hash of file contents."""
with open(path, "rb") as f:
return hashlib.md5(f.read()).hexdigest()
def normalize_path_for_claude(cwd: str) -> str:
"""Convert path to claude-code project directory format.
Claude Code replaces both ``/`` and ``_`` with ``-`` when deriving
the project directory name from the working directory path.
"""
return cwd.replace("/", "-").replace("_", "-")
def get_gemini_project_hash(cwd: str) -> str:
"""Get SHA256 hash of cwd for Gemini project directory."""
return hashlib.sha256(cwd.encode()).hexdigest()
def _copy_if_changed(src: Path, dest: Path, stats: dict, dry_run: bool) -> None:
"""Copy src to dest if new or changed. Updates stats dict."""
stats["found"] += 1
if dest.exists():
if get_file_hash(src) == get_file_hash(dest):
stats["skipped"] += 1
return
if not dry_run:
shutil.copy2(src, dest)
stats["updated"] += 1
else:
if not dry_run:
shutil.copy2(src, dest)
stats["copied"] += 1
def collect_claude_logs(cwd: str, dest_dir: Path, dry_run: bool = False) -> dict:
"""Collect logs from claude-code."""
claude_projects = Path.home() / ".claude" / "projects"
stats = {"agent": "claude-code", "found": 0, "copied": 0, "updated": 0, "skipped": 0}
project_dir = claude_projects / normalize_path_for_claude(cwd)
if not project_dir.exists():
return stats
dest = dest_dir / "claude-code"
if not dry_run:
dest.mkdir(parents=True, exist_ok=True)
for log_file in project_dir.glob("*.jsonl"):
_copy_if_changed(log_file, dest / log_file.name, stats, dry_run)
return stats
def extract_cwd_from_codex_log(log_path: Path) -> str | None:
"""Extract cwd from a codex log file.
Supports two formats:
- New (codex_cli_rs): ``session_meta`` event with ``payload.cwd``
- Legacy: ``<cwd>…</cwd>`` tag inside a user message ``input_text``
"""
import re
try:
with open(log_path, "r") as f:
for line in f:
try:
data = json.loads(line)
# New format: session_meta with payload.cwd
if data.get("type") == "session_meta":
payload = data.get("payload", {})
cwd = payload.get("cwd")
if cwd:
return cwd
# Legacy format: <cwd> tag in user message
payload = data.get("payload", data)
if payload.get("type") == "message" and payload.get("role") == "user":
for item in payload.get("content", []):
if isinstance(item, dict) and item.get("type") == "input_text":
match = re.search(r"<cwd>([^<]+)</cwd>", item.get("text", ""))
if match:
return match.group(1)
except json.JSONDecodeError:
continue
except Exception:
pass
return None
def collect_codex_logs(cwd: str, dest_dir: Path, dry_run: bool = False) -> dict:
"""Collect logs from codex."""
codex_sessions = Path.home() / ".codex" / "sessions"
stats = {"agent": "codex", "found": 0, "copied": 0, "updated": 0, "skipped": 0}
if not codex_sessions.exists():
return stats
dest = dest_dir / "codex"
if not dry_run:
dest.mkdir(parents=True, exist_ok=True)
for log_file in codex_sessions.rglob("*.jsonl"):
if extract_cwd_from_codex_log(log_file) != cwd:
continue
_copy_if_changed(log_file, dest / log_file.name, stats, dry_run)
return stats
def collect_gemini_logs(cwd: str, dest_dir: Path, dry_run: bool = False) -> dict:
"""Collect logs from gemini."""
gemini_tmp = Path.home() / ".gemini" / "tmp"
stats = {"agent": "gemini", "found": 0, "copied": 0, "updated": 0, "skipped": 0}
project_hash = get_gemini_project_hash(cwd)
dest = dest_dir / "gemini"
# Collect from all directories that may contain sessions for this project:
# 1. The project-hash directory (primary)
# 2. The generic "gemini" fallback directory (sessions matched by projectHash field)
dirs_to_check = []
project_dir = gemini_tmp / project_hash
if project_dir.exists():
dirs_to_check.append(("project", project_dir))
generic_dir = gemini_tmp / "gemini"
if generic_dir.exists() and generic_dir != project_dir:
dirs_to_check.append(("generic", generic_dir))
if not dirs_to_check:
return stats
if not dry_run:
dest.mkdir(parents=True, exist_ok=True)
seen_files: set[str] = set()
for source_type, source_dir in dirs_to_check:
# logs.json (only from project dir)
if source_type == "project":
logs_file = source_dir / "logs.json"
if logs_file.exists():
_copy_if_changed(logs_file, dest / "logs.json", stats, dry_run)
# chats/*.json
chats_dir = source_dir / "chats"
if chats_dir.exists():
for chat_file in chats_dir.glob("*.json"):
if chat_file.name in seen_files:
continue
# For generic dir, filter by projectHash
if source_type == "generic":
try:
with open(chat_file) as f:
data = json.load(f)
if data.get("projectHash") != project_hash:
continue
except (json.JSONDecodeError, OSError):
continue
seen_files.add(chat_file.name)
_copy_if_changed(chat_file, dest / chat_file.name, stats, dry_run)
return stats
# --- Cursor helpers ---
def _cursor_home() -> Path:
return Path.home() / ".cursor"
def _load_json_file(path: Path) -> dict | list | None:
if not path.exists():
return None
try:
return json.loads(path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
return None
def _load_json_from_db(raw: bytes | str | None) -> dict | list | None:
if raw is None:
return None
text = raw.decode("utf-8", errors="ignore") if isinstance(raw, bytes) else str(raw)
try:
return json.loads(text)
except json.JSONDecodeError:
return None
# --- Cursor Agent ---
def _extract_cursor_agent_session(store_db: Path) -> dict | None:
"""Extract raw data from cursor-agent session store.db."""
if not store_db.exists():
return None
try:
with sqlite3.connect(f"file:{store_db}?mode=ro", uri=True) as conn:
meta_row = conn.execute("SELECT value FROM meta WHERE key = '0'").fetchone()
if not meta_row:
return None
meta_json = _load_json_from_db(bytes.fromhex(meta_row[0]))
if not isinstance(meta_json, dict):
return None
blobs = []
for blob_id, blob_data in conn.execute("SELECT id, data FROM blobs"):
entry = {"blob_id": blob_id, "size": len(blob_data) if blob_data else 0}
if blob_data and blob_data[0:1] == b'{':
parsed = _load_json_from_db(blob_data)
if parsed:
entry["format"] = "json"
entry["raw"] = parsed
else:
entry["format"] = "binary"
else:
entry["format"] = "binary"
blobs.append(entry)
return {"meta_raw": meta_json, "blobs": blobs}
except sqlite3.Error:
return None
def _write_cursor_agent_jsonl(dest_file: Path, session_id: str, workspace_path: str,
workspace_hash: str, session: dict) -> None:
with dest_file.open("w", encoding="utf-8") as f:
f.write(json.dumps({"type": "metadata", "session_id": session_id,
"workspace_path": workspace_path, "workspace_hash": workspace_hash,
"meta_raw": session["meta_raw"]}, ensure_ascii=False) + "\n")
for blob in session["blobs"]:
f.write(json.dumps({"type": "blob", **blob}, ensure_ascii=False) + "\n")
def collect_cursor_agent_logs(cwd: str, dest_dir: Path, dry_run: bool = False) -> dict:
"""Collect logs from cursor-agent."""
stats = {"agent": "cursor-agent", "found": 0, "copied": 0, "updated": 0, "skipped": 0}
chats_dir = _cursor_home() / "chats"
if not chats_dir.exists():
return stats
cwd_hash = hashlib.md5(cwd.encode()).hexdigest()
workspace_dir = chats_dir / cwd_hash
if not workspace_dir.is_dir():
return stats
session_dirs = [d for d in workspace_dir.iterdir() if d.is_dir() and (d / "store.db").exists()]
if not session_dirs:
return stats
dest = dest_dir / "cursor-agent"
if not dry_run:
dest.mkdir(parents=True, exist_ok=True)
for session_dir in sorted(session_dirs):
session_data = _extract_cursor_agent_session(session_dir / "store.db")
if not session_data:
continue
stats["found"] += 1
dest_file = dest / f"{session_dir.name}.jsonl"
if dest_file.exists():
import tempfile
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as tmp:
tmp_path = Path(tmp.name)
_write_cursor_agent_jsonl(tmp_path, session_dir.name, cwd, cwd_hash, session_data)
same = get_file_hash(tmp_path) == get_file_hash(dest_file)
tmp_path.unlink()
if same:
stats["skipped"] += 1
continue
if not dry_run:
_write_cursor_agent_jsonl(dest_file, session_dir.name, cwd, cwd_hash, session_data)
stats["updated"] += 1
else:
if not dry_run:
_write_cursor_agent_jsonl(dest_file, session_dir.name, cwd, cwd_hash, session_data)
stats["copied"] += 1
return stats
# --- Cursor IDE ---
def _parse_workspace_folder(raw: str | None) -> Path | None:
if not raw:
return None
parsed = urlparse(raw)
path = parsed.path or raw
if path.startswith("//"):
path = path[2:]
return Path(unquote(path)).expanduser().resolve(strict=False)
def _find_cursor_workspaces(workspace_storage: Path, cwd: str) -> list[tuple[Path, Path]]:
matches = []
if not workspace_storage.is_dir():
return matches
cwd_path = Path(cwd).resolve()
for ws_dir in workspace_storage.iterdir():
if not ws_dir.is_dir():
continue
workspace_json = ws_dir / "workspace.json"
if not workspace_json.exists():
continue
try:
data = json.loads(workspace_json.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
continue
folder_path = _parse_workspace_folder(data.get("folder"))
if not folder_path:
continue
# Exact match only — don't collect parent or child workspace sessions
if folder_path == cwd_path:
matches.append((ws_dir, folder_path))
return matches
def _collect_composer_ids(workspace_db: Path) -> set[str]:
ids: set[str] = set()
if not workspace_db.exists():
return ids
try:
with sqlite3.connect(f"file:{workspace_db}?mode=ro", uri=True) as conn:
row = conn.execute("SELECT value FROM ItemTable WHERE key = ?",
("composer.composerData",)).fetchone()
except sqlite3.Error:
return ids
if not row:
return ids
payload = _load_json_from_db(row[0])
if not payload:
return ids
stack = [payload]
while stack:
current = stack.pop()
if isinstance(current, dict):
if cid := current.get("composerId"):
if isinstance(cid, str):
ids.add(cid)
stack.extend(current.values())
elif isinstance(current, list):
stack.extend(current)
return ids
def _extract_cursor_session(composer_id: str, cursor: sqlite3.Cursor) -> dict | None:
row = cursor.execute("SELECT value FROM cursorDiskKV WHERE key = ?",
(f"composerData:{composer_id}",)).fetchone()
if not row:
return None
composer_data = _load_json_from_db(row[0])
if not isinstance(composer_data, dict):
return None
bubble_refs = composer_data.get("bubbles") or composer_data.get("fullConversationHeadersOnly")
if not isinstance(bubble_refs, list) or not bubble_refs:
return None
bubble_ids = []
for ref in bubble_refs:
if isinstance(ref, dict):
bid = ref.get("id") or ref.get("bubbleId")
if bid:
bubble_ids.append(bid)
if not bubble_ids:
return None
raw_bubbles = []
for bid in bubble_ids:
brow = cursor.execute("SELECT value FROM cursorDiskKV WHERE key = ?",
(f"bubbleId:{composer_id}:{bid}",)).fetchone()
if brow:
bdata = _load_json_from_db(brow[0])
if bdata:
raw_bubbles.append({"bubble_id": bid, "raw": bdata})
return {"composer_id": composer_id, "composer_data_raw": composer_data, "bubbles_raw": raw_bubbles}
def _find_cursor_transcript(cwd: str, composer_id: str) -> Path | None:
"""Find an agent-transcript JSONL for a given composer session.
Newer Cursor versions write flat transcripts to
~/.cursor/projects/<slug>/agent-transcripts/<id>/<id>.jsonl.
"""
slug = cwd.lstrip("/").replace("/", "-")
transcript = _cursor_home() / "projects" / slug / "agent-transcripts" / composer_id / f"{composer_id}.jsonl"
if transcript.exists() and transcript.stat().st_size >= 50:
return transcript
return None
def _write_cursor_session_jsonl(dest_file: Path, composer_id: str, folder_path: Path,
ws_dir: Path, session: dict,
transcript_path: Path | None = None) -> None:
with dest_file.open("w", encoding="utf-8") as f:
f.write(json.dumps({"type": "metadata", "composer_id": composer_id,
"workspace_path": str(folder_path), "workspace_hash": ws_dir.name,
"composer_data_raw": session["composer_data_raw"]},
ensure_ascii=False) + "\n")
for bubble in session["bubbles_raw"]:
f.write(json.dumps({"type": "bubble", **bubble}, ensure_ascii=False) + "\n")
# Append transcript lines if available
if transcript_path:
for line in transcript_path.read_text(encoding="utf-8", errors="replace").splitlines():
line = line.strip()
if not line:
continue
try:
payload = json.loads(line)
if isinstance(payload, dict):
payload["type"] = "transcript"
f.write(json.dumps(payload, ensure_ascii=False) + "\n")
except json.JSONDecodeError:
continue
def collect_cursor_logs(cwd: str, dest_dir: Path, dry_run: bool = False) -> dict:
"""Collect logs from cursor-ide."""
base = Path.home() / "Library" / "Application Support" / "Cursor"
workspace_storage = base / "User" / "workspaceStorage"
global_db = base / "User" / "globalStorage" / "state.vscdb"
stats = {"agent": "cursor-ide", "found": 0, "copied": 0, "updated": 0, "skipped": 0}
if not global_db.exists():
return stats
matches = _find_cursor_workspaces(workspace_storage, cwd)
if not matches:
return stats
dest = dest_dir / "cursor-ide"
if not dry_run:
dest.mkdir(parents=True, exist_ok=True)
# Track which sessions we found via cursorDiskKV (to detect transcript-only later)
seen_ids: set[str] = set()
try:
with sqlite3.connect(f"file:{global_db}?mode=ro", uri=True) as global_conn:
cursor = global_conn.cursor()
for ws_dir, folder_path in matches:
composer_ids = _collect_composer_ids(ws_dir / "state.vscdb")
for composer_id in sorted(composer_ids):
session = _extract_cursor_session(composer_id, cursor)
if not session:
continue
seen_ids.add(composer_id)
stats["found"] += 1
dest_file = dest / f"{composer_id}.jsonl"
transcript = _find_cursor_transcript(cwd, composer_id)
if dest_file.exists():
import tempfile
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as tmp:
tmp_path = Path(tmp.name)
_write_cursor_session_jsonl(tmp_path, composer_id, folder_path, ws_dir,
session, transcript)
same = get_file_hash(tmp_path) == get_file_hash(dest_file)
tmp_path.unlink()
if same:
stats["skipped"] += 1
continue
if not dry_run:
_write_cursor_session_jsonl(dest_file, composer_id, folder_path, ws_dir,
session, transcript)
stats["updated"] += 1
else:
if not dry_run:
_write_cursor_session_jsonl(dest_file, composer_id, folder_path, ws_dir,
session, transcript)
stats["copied"] += 1
except sqlite3.Error:
pass
# Collect transcript-only sessions (no cursorDiskKV data)
slug = cwd.lstrip("/").replace("/", "-")
transcripts_dir = _cursor_home() / "projects" / slug / "agent-transcripts"
if transcripts_dir.is_dir():
for session_dir in sorted(transcripts_dir.iterdir()):
if not session_dir.is_dir():
continue
composer_id = session_dir.name
if composer_id in seen_ids:
continue
jsonl_file = session_dir / f"{composer_id}.jsonl"
if not jsonl_file.exists() or jsonl_file.stat().st_size < 50:
continue
stats["found"] += 1
dest_file = dest / f"{composer_id}.jsonl"
# Write transcript-only file (metadata + transcript lines, no bubbles)
if not dry_run:
with dest_file.open("w", encoding="utf-8") as f:
f.write(json.dumps({"type": "metadata", "composer_id": composer_id,
"workspace_path": cwd},
ensure_ascii=False) + "\n")
for line in jsonl_file.read_text(encoding="utf-8", errors="replace").splitlines():
line = line.strip()
if not line:
continue
try:
payload = json.loads(line)
if isinstance(payload, dict):
payload["type"] = "transcript"
f.write(json.dumps(payload, ensure_ascii=False) + "\n")
except json.JSONDecodeError:
continue
if dest_file.exists():
stats["copied"] += 1
else:
stats["copied"] += 1
return stats
# --- Main ---
AGENTS = ["claude-code", "codex", "gemini", "cursor-ide", "cursor-agent"]
COLLECTORS = {
"claude-code": collect_claude_logs,
"codex": collect_codex_logs,
"gemini": collect_gemini_logs,
"cursor-ide": collect_cursor_logs,
"cursor-agent": collect_cursor_agent_logs,
}
def _project_name(cwd: str) -> str:
"""Derive a project name from a directory path (uses basename)."""
return Path(cwd).name
def _resolve_raw_root(cwd: str, project: str | None, audit_dir: str | None = None) -> Path:
project_name = project or _project_name(cwd)
base = Path(audit_dir) if audit_dir else Path(cwd) / ".tessl" / "logs"
return base / project_name / "raw"
def main():
parser = argparse.ArgumentParser(description="Collect coding agent logs for a project")
parser.add_argument("--cwd", default=os.getcwd(), help="Project directory (default: cwd)")
parser.add_argument("--project", default=None,
help="Project name (default: basename of --cwd)")
parser.add_argument("--audit-dir", default=None,
help="Output directory for audit data (default: <cwd>/.tessl/logs)")
parser.add_argument("--dry-run", action="store_true", help="Show what would be copied")
parser.add_argument("--agents", nargs="+", choices=AGENTS, default=AGENTS,
help="Which agents to collect from (default: all)")
args = parser.parse_args()
cwd = os.path.realpath(args.cwd)
project = args.project or _project_name(cwd)
dest_dir = _resolve_raw_root(cwd, args.project, args.audit_dir)
print(f"Collecting logs for: {cwd}")
print(f"Project: {project}")
print(f"Destination: {dest_dir}")
if args.dry_run:
print("(dry run)")
print()
results = []
for agent in args.agents:
stats = COLLECTORS[agent](cwd, dest_dir, args.dry_run)
results.append(stats)
print(f"{agent}: {stats['found']} found, {stats['copied']} new, "
f"{stats['updated']} updated, {stats['skipped']} unchanged")
total_new = sum(r["copied"] for r in results)
total_upd = sum(r["updated"] for r in results)
if total_new + total_upd > 0:
print(f"\nTotal: {total_new} new, {total_upd} updated")
else:
print("\nNo new or updated logs found.")
if __name__ == "__main__":
main()