CtrlK
BlogDocsLog inGet started
Tessl Logo

try-tessl/agent-quality

Analyze agent sessions against verifier checklists, detect friction points, and create structured verifiers from skills and docs. Produces per-session verdicts and aggregated quality reports.

88

2.93x
Quality

86%

Does it follow best practices?

Impact

97%

2.93x

Average score across 3 eval scenarios

SecuritybySnyk

Passed

No known issues

Overview
Quality
Evals
Security
Files

collect_logs.pyskills/analyze-sessions/scripts/

#!/usr/bin/env python3
"""
Collect coding agent logs for a project.

Copies logs from claude-code, codex, gemini, cursor-ide, and cursor-agent
to .tessl/logs/<project>/raw/<agent-name>/.
Only adds new files or updates changed files.

Security: This script only copies files from the user's own local agent log
directories. Collected logs are passed through secret redaction in
normalize_logs.py before any further processing. Logs may contain untrusted
content (tool outputs, web page text) — downstream stages treat all log
content as untrusted data, not instructions.
"""

from __future__ import annotations

import argparse
import hashlib
import json
import os
import shutil
import sqlite3
import sys
from pathlib import Path
from urllib.parse import unquote, urlparse


def get_file_hash(path: Path) -> str:
    """Get MD5 hash of file contents."""
    with open(path, "rb") as f:
        return hashlib.md5(f.read()).hexdigest()


def normalize_path_for_claude(cwd: str) -> str:
    """Convert path to claude-code project directory format.

    Claude Code replaces both ``/`` and ``_`` with ``-`` when deriving
    the project directory name from the working directory path.
    """
    return cwd.replace("/", "-").replace("_", "-")


def get_gemini_project_hash(cwd: str) -> str:
    """Get SHA256 hash of cwd for Gemini project directory."""
    return hashlib.sha256(cwd.encode()).hexdigest()


def _copy_if_changed(src: Path, dest: Path, stats: dict, dry_run: bool) -> None:
    """Copy src to dest if new or changed. Updates stats dict."""
    stats["found"] += 1
    if dest.exists():
        if get_file_hash(src) == get_file_hash(dest):
            stats["skipped"] += 1
            return
        if not dry_run:
            shutil.copy2(src, dest)
        stats["updated"] += 1
    else:
        if not dry_run:
            shutil.copy2(src, dest)
        stats["copied"] += 1


def collect_claude_logs(cwd: str, dest_dir: Path, dry_run: bool = False) -> dict:
    """Collect logs from claude-code."""
    claude_projects = Path.home() / ".claude" / "projects"
    stats = {"agent": "claude-code", "found": 0, "copied": 0, "updated": 0, "skipped": 0}

    project_dir = claude_projects / normalize_path_for_claude(cwd)
    if not project_dir.exists():
        return stats

    dest = dest_dir / "claude-code"
    if not dry_run:
        dest.mkdir(parents=True, exist_ok=True)

    for log_file in project_dir.glob("*.jsonl"):
        _copy_if_changed(log_file, dest / log_file.name, stats, dry_run)

    return stats


def extract_cwd_from_codex_log(log_path: Path) -> str | None:
    """Extract cwd from a codex log file.

    Supports two formats:
      - New (codex_cli_rs): ``session_meta`` event with ``payload.cwd``
      - Legacy: ``<cwd>…</cwd>`` tag inside a user message ``input_text``
    """
    import re
    try:
        with open(log_path, "r") as f:
            for line in f:
                try:
                    data = json.loads(line)
                    # New format: session_meta with payload.cwd
                    if data.get("type") == "session_meta":
                        payload = data.get("payload", {})
                        cwd = payload.get("cwd")
                        if cwd:
                            return cwd
                    # Legacy format: <cwd> tag in user message
                    payload = data.get("payload", data)
                    if payload.get("type") == "message" and payload.get("role") == "user":
                        for item in payload.get("content", []):
                            if isinstance(item, dict) and item.get("type") == "input_text":
                                match = re.search(r"<cwd>([^<]+)</cwd>", item.get("text", ""))
                                if match:
                                    return match.group(1)
                except json.JSONDecodeError:
                    continue
    except Exception:
        pass
    return None


def collect_codex_logs(cwd: str, dest_dir: Path, dry_run: bool = False) -> dict:
    """Collect logs from codex."""
    codex_sessions = Path.home() / ".codex" / "sessions"
    stats = {"agent": "codex", "found": 0, "copied": 0, "updated": 0, "skipped": 0}

    if not codex_sessions.exists():
        return stats

    dest = dest_dir / "codex"
    if not dry_run:
        dest.mkdir(parents=True, exist_ok=True)

    for log_file in codex_sessions.rglob("*.jsonl"):
        if extract_cwd_from_codex_log(log_file) != cwd:
            continue
        _copy_if_changed(log_file, dest / log_file.name, stats, dry_run)

    return stats


def collect_gemini_logs(cwd: str, dest_dir: Path, dry_run: bool = False) -> dict:
    """Collect logs from gemini."""
    gemini_tmp = Path.home() / ".gemini" / "tmp"
    stats = {"agent": "gemini", "found": 0, "copied": 0, "updated": 0, "skipped": 0}

    project_hash = get_gemini_project_hash(cwd)
    dest = dest_dir / "gemini"

    # Collect from all directories that may contain sessions for this project:
    # 1. The project-hash directory (primary)
    # 2. The generic "gemini" fallback directory (sessions matched by projectHash field)
    dirs_to_check = []
    project_dir = gemini_tmp / project_hash
    if project_dir.exists():
        dirs_to_check.append(("project", project_dir))
    generic_dir = gemini_tmp / "gemini"
    if generic_dir.exists() and generic_dir != project_dir:
        dirs_to_check.append(("generic", generic_dir))

    if not dirs_to_check:
        return stats

    if not dry_run:
        dest.mkdir(parents=True, exist_ok=True)

    seen_files: set[str] = set()
    for source_type, source_dir in dirs_to_check:
        # logs.json (only from project dir)
        if source_type == "project":
            logs_file = source_dir / "logs.json"
            if logs_file.exists():
                _copy_if_changed(logs_file, dest / "logs.json", stats, dry_run)

        # chats/*.json
        chats_dir = source_dir / "chats"
        if chats_dir.exists():
            for chat_file in chats_dir.glob("*.json"):
                if chat_file.name in seen_files:
                    continue
                # For generic dir, filter by projectHash
                if source_type == "generic":
                    try:
                        with open(chat_file) as f:
                            data = json.load(f)
                        if data.get("projectHash") != project_hash:
                            continue
                    except (json.JSONDecodeError, OSError):
                        continue
                seen_files.add(chat_file.name)
                _copy_if_changed(chat_file, dest / chat_file.name, stats, dry_run)

    return stats


# --- Cursor helpers ---

def _cursor_home() -> Path:
    return Path.home() / ".cursor"


def _load_json_file(path: Path) -> dict | list | None:
    if not path.exists():
        return None
    try:
        return json.loads(path.read_text(encoding="utf-8"))
    except (OSError, json.JSONDecodeError):
        return None


def _load_json_from_db(raw: bytes | str | None) -> dict | list | None:
    if raw is None:
        return None
    text = raw.decode("utf-8", errors="ignore") if isinstance(raw, bytes) else str(raw)
    try:
        return json.loads(text)
    except json.JSONDecodeError:
        return None


# --- Cursor Agent ---

def _extract_cursor_agent_session(store_db: Path) -> dict | None:
    """Extract raw data from cursor-agent session store.db."""
    if not store_db.exists():
        return None
    try:
        with sqlite3.connect(f"file:{store_db}?mode=ro", uri=True) as conn:
            meta_row = conn.execute("SELECT value FROM meta WHERE key = '0'").fetchone()
            if not meta_row:
                return None
            meta_json = _load_json_from_db(bytes.fromhex(meta_row[0]))
            if not isinstance(meta_json, dict):
                return None

            blobs = []
            for blob_id, blob_data in conn.execute("SELECT id, data FROM blobs"):
                entry = {"blob_id": blob_id, "size": len(blob_data) if blob_data else 0}
                if blob_data and blob_data[0:1] == b'{':
                    parsed = _load_json_from_db(blob_data)
                    if parsed:
                        entry["format"] = "json"
                        entry["raw"] = parsed
                    else:
                        entry["format"] = "binary"
                else:
                    entry["format"] = "binary"
                blobs.append(entry)

            return {"meta_raw": meta_json, "blobs": blobs}
    except sqlite3.Error:
        return None


def _write_cursor_agent_jsonl(dest_file: Path, session_id: str, workspace_path: str,
                               workspace_hash: str, session: dict) -> None:
    with dest_file.open("w", encoding="utf-8") as f:
        f.write(json.dumps({"type": "metadata", "session_id": session_id,
                             "workspace_path": workspace_path, "workspace_hash": workspace_hash,
                             "meta_raw": session["meta_raw"]}, ensure_ascii=False) + "\n")
        for blob in session["blobs"]:
            f.write(json.dumps({"type": "blob", **blob}, ensure_ascii=False) + "\n")


def collect_cursor_agent_logs(cwd: str, dest_dir: Path, dry_run: bool = False) -> dict:
    """Collect logs from cursor-agent."""
    stats = {"agent": "cursor-agent", "found": 0, "copied": 0, "updated": 0, "skipped": 0}
    chats_dir = _cursor_home() / "chats"
    if not chats_dir.exists():
        return stats

    cwd_hash = hashlib.md5(cwd.encode()).hexdigest()
    workspace_dir = chats_dir / cwd_hash
    if not workspace_dir.is_dir():
        return stats

    session_dirs = [d for d in workspace_dir.iterdir() if d.is_dir() and (d / "store.db").exists()]
    if not session_dirs:
        return stats

    dest = dest_dir / "cursor-agent"
    if not dry_run:
        dest.mkdir(parents=True, exist_ok=True)

    for session_dir in sorted(session_dirs):
        session_data = _extract_cursor_agent_session(session_dir / "store.db")
        if not session_data:
            continue

        stats["found"] += 1
        dest_file = dest / f"{session_dir.name}.jsonl"

        if dest_file.exists():
            import tempfile
            with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as tmp:
                tmp_path = Path(tmp.name)
            _write_cursor_agent_jsonl(tmp_path, session_dir.name, cwd, cwd_hash, session_data)
            same = get_file_hash(tmp_path) == get_file_hash(dest_file)
            tmp_path.unlink()
            if same:
                stats["skipped"] += 1
                continue
            if not dry_run:
                _write_cursor_agent_jsonl(dest_file, session_dir.name, cwd, cwd_hash, session_data)
            stats["updated"] += 1
        else:
            if not dry_run:
                _write_cursor_agent_jsonl(dest_file, session_dir.name, cwd, cwd_hash, session_data)
            stats["copied"] += 1

    return stats


# --- Cursor IDE ---

def _parse_workspace_folder(raw: str | None) -> Path | None:
    if not raw:
        return None
    parsed = urlparse(raw)
    path = parsed.path or raw
    if path.startswith("//"):
        path = path[2:]
    return Path(unquote(path)).expanduser().resolve(strict=False)


def _find_cursor_workspaces(workspace_storage: Path, cwd: str) -> list[tuple[Path, Path]]:
    matches = []
    if not workspace_storage.is_dir():
        return matches
    cwd_path = Path(cwd).resolve()
    for ws_dir in workspace_storage.iterdir():
        if not ws_dir.is_dir():
            continue
        workspace_json = ws_dir / "workspace.json"
        if not workspace_json.exists():
            continue
        try:
            data = json.loads(workspace_json.read_text(encoding="utf-8"))
        except (OSError, json.JSONDecodeError):
            continue
        folder_path = _parse_workspace_folder(data.get("folder"))
        if not folder_path:
            continue
        # Exact match only — don't collect parent or child workspace sessions
        if folder_path == cwd_path:
            matches.append((ws_dir, folder_path))
    return matches


def _collect_composer_ids(workspace_db: Path) -> set[str]:
    ids: set[str] = set()
    if not workspace_db.exists():
        return ids
    try:
        with sqlite3.connect(f"file:{workspace_db}?mode=ro", uri=True) as conn:
            row = conn.execute("SELECT value FROM ItemTable WHERE key = ?",
                               ("composer.composerData",)).fetchone()
    except sqlite3.Error:
        return ids
    if not row:
        return ids
    payload = _load_json_from_db(row[0])
    if not payload:
        return ids
    stack = [payload]
    while stack:
        current = stack.pop()
        if isinstance(current, dict):
            if cid := current.get("composerId"):
                if isinstance(cid, str):
                    ids.add(cid)
            stack.extend(current.values())
        elif isinstance(current, list):
            stack.extend(current)
    return ids


def _extract_cursor_session(composer_id: str, cursor: sqlite3.Cursor) -> dict | None:
    row = cursor.execute("SELECT value FROM cursorDiskKV WHERE key = ?",
                         (f"composerData:{composer_id}",)).fetchone()
    if not row:
        return None
    composer_data = _load_json_from_db(row[0])
    if not isinstance(composer_data, dict):
        return None
    bubble_refs = composer_data.get("bubbles") or composer_data.get("fullConversationHeadersOnly")
    if not isinstance(bubble_refs, list) or not bubble_refs:
        return None
    bubble_ids = []
    for ref in bubble_refs:
        if isinstance(ref, dict):
            bid = ref.get("id") or ref.get("bubbleId")
            if bid:
                bubble_ids.append(bid)
    if not bubble_ids:
        return None
    raw_bubbles = []
    for bid in bubble_ids:
        brow = cursor.execute("SELECT value FROM cursorDiskKV WHERE key = ?",
                              (f"bubbleId:{composer_id}:{bid}",)).fetchone()
        if brow:
            bdata = _load_json_from_db(brow[0])
            if bdata:
                raw_bubbles.append({"bubble_id": bid, "raw": bdata})
    return {"composer_id": composer_id, "composer_data_raw": composer_data, "bubbles_raw": raw_bubbles}


def _find_cursor_transcript(cwd: str, composer_id: str) -> Path | None:
    """Find an agent-transcript JSONL for a given composer session.

    Newer Cursor versions write flat transcripts to
    ~/.cursor/projects/<slug>/agent-transcripts/<id>/<id>.jsonl.
    """
    slug = cwd.lstrip("/").replace("/", "-")
    transcript = _cursor_home() / "projects" / slug / "agent-transcripts" / composer_id / f"{composer_id}.jsonl"
    if transcript.exists() and transcript.stat().st_size >= 50:
        return transcript
    return None


def _write_cursor_session_jsonl(dest_file: Path, composer_id: str, folder_path: Path,
                                 ws_dir: Path, session: dict,
                                 transcript_path: Path | None = None) -> None:
    with dest_file.open("w", encoding="utf-8") as f:
        f.write(json.dumps({"type": "metadata", "composer_id": composer_id,
                             "workspace_path": str(folder_path), "workspace_hash": ws_dir.name,
                             "composer_data_raw": session["composer_data_raw"]},
                            ensure_ascii=False) + "\n")
        for bubble in session["bubbles_raw"]:
            f.write(json.dumps({"type": "bubble", **bubble}, ensure_ascii=False) + "\n")
        # Append transcript lines if available
        if transcript_path:
            for line in transcript_path.read_text(encoding="utf-8", errors="replace").splitlines():
                line = line.strip()
                if not line:
                    continue
                try:
                    payload = json.loads(line)
                    if isinstance(payload, dict):
                        payload["type"] = "transcript"
                        f.write(json.dumps(payload, ensure_ascii=False) + "\n")
                except json.JSONDecodeError:
                    continue


def collect_cursor_logs(cwd: str, dest_dir: Path, dry_run: bool = False) -> dict:
    """Collect logs from cursor-ide."""
    base = Path.home() / "Library" / "Application Support" / "Cursor"
    workspace_storage = base / "User" / "workspaceStorage"
    global_db = base / "User" / "globalStorage" / "state.vscdb"
    stats = {"agent": "cursor-ide", "found": 0, "copied": 0, "updated": 0, "skipped": 0}

    if not global_db.exists():
        return stats

    matches = _find_cursor_workspaces(workspace_storage, cwd)
    if not matches:
        return stats

    dest = dest_dir / "cursor-ide"
    if not dry_run:
        dest.mkdir(parents=True, exist_ok=True)

    # Track which sessions we found via cursorDiskKV (to detect transcript-only later)
    seen_ids: set[str] = set()

    try:
        with sqlite3.connect(f"file:{global_db}?mode=ro", uri=True) as global_conn:
            cursor = global_conn.cursor()
            for ws_dir, folder_path in matches:
                composer_ids = _collect_composer_ids(ws_dir / "state.vscdb")
                for composer_id in sorted(composer_ids):
                    session = _extract_cursor_session(composer_id, cursor)
                    if not session:
                        continue
                    seen_ids.add(composer_id)
                    stats["found"] += 1
                    dest_file = dest / f"{composer_id}.jsonl"
                    transcript = _find_cursor_transcript(cwd, composer_id)
                    if dest_file.exists():
                        import tempfile
                        with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as tmp:
                            tmp_path = Path(tmp.name)
                        _write_cursor_session_jsonl(tmp_path, composer_id, folder_path, ws_dir,
                                                     session, transcript)
                        same = get_file_hash(tmp_path) == get_file_hash(dest_file)
                        tmp_path.unlink()
                        if same:
                            stats["skipped"] += 1
                            continue
                        if not dry_run:
                            _write_cursor_session_jsonl(dest_file, composer_id, folder_path, ws_dir,
                                                         session, transcript)
                        stats["updated"] += 1
                    else:
                        if not dry_run:
                            _write_cursor_session_jsonl(dest_file, composer_id, folder_path, ws_dir,
                                                         session, transcript)
                        stats["copied"] += 1
    except sqlite3.Error:
        pass

    # Collect transcript-only sessions (no cursorDiskKV data)
    slug = cwd.lstrip("/").replace("/", "-")
    transcripts_dir = _cursor_home() / "projects" / slug / "agent-transcripts"
    if transcripts_dir.is_dir():
        for session_dir in sorted(transcripts_dir.iterdir()):
            if not session_dir.is_dir():
                continue
            composer_id = session_dir.name
            if composer_id in seen_ids:
                continue
            jsonl_file = session_dir / f"{composer_id}.jsonl"
            if not jsonl_file.exists() or jsonl_file.stat().st_size < 50:
                continue
            stats["found"] += 1
            dest_file = dest / f"{composer_id}.jsonl"
            # Write transcript-only file (metadata + transcript lines, no bubbles)
            if not dry_run:
                with dest_file.open("w", encoding="utf-8") as f:
                    f.write(json.dumps({"type": "metadata", "composer_id": composer_id,
                                         "workspace_path": cwd},
                                        ensure_ascii=False) + "\n")
                    for line in jsonl_file.read_text(encoding="utf-8", errors="replace").splitlines():
                        line = line.strip()
                        if not line:
                            continue
                        try:
                            payload = json.loads(line)
                            if isinstance(payload, dict):
                                payload["type"] = "transcript"
                                f.write(json.dumps(payload, ensure_ascii=False) + "\n")
                        except json.JSONDecodeError:
                            continue
            if dest_file.exists():
                stats["copied"] += 1
            else:
                stats["copied"] += 1

    return stats


# --- Main ---

AGENTS = ["claude-code", "codex", "gemini", "cursor-ide", "cursor-agent"]

COLLECTORS = {
    "claude-code": collect_claude_logs,
    "codex": collect_codex_logs,
    "gemini": collect_gemini_logs,
    "cursor-ide": collect_cursor_logs,
    "cursor-agent": collect_cursor_agent_logs,
}


def _project_name(cwd: str) -> str:
    """Derive a project name from a directory path (uses basename)."""
    return Path(cwd).name


def _resolve_raw_root(cwd: str, project: str | None, analysis_dir: str | None = None) -> Path:
    project_name = project or _project_name(cwd)
    base = Path(analysis_dir) if analysis_dir else Path(cwd) / ".tessl" / "logs"
    return base / project_name / "raw"


def main():
    parser = argparse.ArgumentParser(description="Collect coding agent logs for a project")
    parser.add_argument("--cwd", default=os.getcwd(), help="Project directory (default: cwd)")
    parser.add_argument("--project", default=None,
                        help="Project name (default: basename of --cwd)")
    parser.add_argument("--analysis-dir", default=None,
                        help="Output directory for analysis data (default: <cwd>/.tessl/logs)")
    parser.add_argument("--dry-run", action="store_true", help="Show what would be copied")
    parser.add_argument("--agents", nargs="+", choices=AGENTS, default=AGENTS,
                        help="Which agents to collect from (default: all)")
    args = parser.parse_args()

    cwd = os.path.realpath(args.cwd)
    project = args.project or _project_name(cwd)
    dest_dir = _resolve_raw_root(cwd, args.project, args.analysis_dir)

    print(f"Collecting logs for: {cwd}")
    print(f"Project: {project}")
    print(f"Destination: {dest_dir}")
    if args.dry_run:
        print("(dry run)")
    print()

    results = []
    for agent in args.agents:
        stats = COLLECTORS[agent](cwd, dest_dir, args.dry_run)
        results.append(stats)
        print(f"{agent}: {stats['found']} found, {stats['copied']} new, "
              f"{stats['updated']} updated, {stats['skipped']} unchanged")

    total_new = sum(r["copied"] for r in results)
    total_upd = sum(r["updated"] for r in results)
    if total_new + total_upd > 0:
        print(f"\nTotal: {total_new} new, {total_upd} updated")
    else:
        print("\nNo new or updated logs found.")


if __name__ == "__main__":
    main()

README.md

tile.json