CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl-labs/audit-logs

Collect and normalize agent logs, discover installed verifiers, and dispatch LLM judges to evaluate adherence. Produces per-session verdicts and aggregated reports.

91

3.09x
Quality

90%

Does it follow best practices?

Impact

96%

3.09x

Average score across 3 eval scenarios

SecuritybySnyk

Passed

No known issues

Overview
Quality
Evals
Security
Files

collect_logs.pyskills/audit-logs/scripts/

#!/usr/bin/env python3
"""
Collect coding agent logs for a project.

Copies logs from claude-code, codex, gemini, cursor-ide, and cursor-agent
to .tessl/logs/<project>/raw/<agent-name>/.
Only adds new files or updates changed files.

Security: This script only copies files from the user's own local agent log
directories. Collected logs are passed through secret redaction in
normalize_logs.py before any further processing. Logs may contain untrusted
content (tool outputs, web page text) — downstream stages treat all log
content as untrusted data, not instructions.
"""

import argparse
import hashlib
import json
import os
import shutil
import sqlite3
import sys
from pathlib import Path
from urllib.parse import unquote, urlparse


def get_file_hash(path: Path) -> str:
    """Get MD5 hash of file contents."""
    with open(path, "rb") as f:
        return hashlib.md5(f.read()).hexdigest()


def normalize_path_for_claude(cwd: str) -> str:
    """Convert path to claude-code project directory format.

    Claude Code replaces both ``/`` and ``_`` with ``-`` when deriving
    the project directory name from the working directory path.
    """
    return cwd.replace("/", "-").replace("_", "-")


def get_gemini_project_hash(cwd: str) -> str:
    """Get SHA256 hash of cwd for Gemini project directory."""
    return hashlib.sha256(cwd.encode()).hexdigest()


def _copy_if_changed(src: Path, dest: Path, stats: dict, dry_run: bool) -> None:
    """Copy src to dest if new or changed. Updates stats dict."""
    stats["found"] += 1
    if dest.exists():
        if get_file_hash(src) == get_file_hash(dest):
            stats["skipped"] += 1
            return
        if not dry_run:
            shutil.copy2(src, dest)
        stats["updated"] += 1
    else:
        if not dry_run:
            shutil.copy2(src, dest)
        stats["copied"] += 1


def collect_claude_logs(cwd: str, dest_dir: Path, dry_run: bool = False) -> dict:
    """Collect logs from claude-code."""
    claude_projects = Path.home() / ".claude" / "projects"
    stats = {"agent": "claude-code", "found": 0, "copied": 0, "updated": 0, "skipped": 0}

    project_dir = claude_projects / normalize_path_for_claude(cwd)
    if not project_dir.exists():
        return stats

    dest = dest_dir / "claude-code"
    if not dry_run:
        dest.mkdir(parents=True, exist_ok=True)

    for log_file in project_dir.glob("*.jsonl"):
        _copy_if_changed(log_file, dest / log_file.name, stats, dry_run)

    return stats


def extract_cwd_from_codex_log(log_path: Path) -> str | None:
    """Extract cwd from a codex log file.

    Supports two formats:
      - New (codex_cli_rs): ``session_meta`` event with ``payload.cwd``
      - Legacy: ``<cwd>…</cwd>`` tag inside a user message ``input_text``
    """
    import re
    try:
        with open(log_path, "r") as f:
            for line in f:
                try:
                    data = json.loads(line)
                    # New format: session_meta with payload.cwd
                    if data.get("type") == "session_meta":
                        payload = data.get("payload", {})
                        cwd = payload.get("cwd")
                        if cwd:
                            return cwd
                    # Legacy format: <cwd> tag in user message
                    payload = data.get("payload", data)
                    if payload.get("type") == "message" and payload.get("role") == "user":
                        for item in payload.get("content", []):
                            if isinstance(item, dict) and item.get("type") == "input_text":
                                match = re.search(r"<cwd>([^<]+)</cwd>", item.get("text", ""))
                                if match:
                                    return match.group(1)
                except json.JSONDecodeError:
                    continue
    except Exception:
        pass
    return None


def collect_codex_logs(cwd: str, dest_dir: Path, dry_run: bool = False) -> dict:
    """Collect logs from codex."""
    codex_sessions = Path.home() / ".codex" / "sessions"
    stats = {"agent": "codex", "found": 0, "copied": 0, "updated": 0, "skipped": 0}

    if not codex_sessions.exists():
        return stats

    dest = dest_dir / "codex"
    if not dry_run:
        dest.mkdir(parents=True, exist_ok=True)

    for log_file in codex_sessions.rglob("*.jsonl"):
        if extract_cwd_from_codex_log(log_file) != cwd:
            continue
        _copy_if_changed(log_file, dest / log_file.name, stats, dry_run)

    return stats


def collect_gemini_logs(cwd: str, dest_dir: Path, dry_run: bool = False) -> dict:
    """Collect logs from gemini."""
    gemini_tmp = Path.home() / ".gemini" / "tmp"
    stats = {"agent": "gemini", "found": 0, "copied": 0, "updated": 0, "skipped": 0}

    project_hash = get_gemini_project_hash(cwd)
    dest = dest_dir / "gemini"

    # Collect from all directories that may contain sessions for this project:
    # 1. The project-hash directory (primary)
    # 2. The generic "gemini" fallback directory (sessions matched by projectHash field)
    dirs_to_check = []
    project_dir = gemini_tmp / project_hash
    if project_dir.exists():
        dirs_to_check.append(("project", project_dir))
    generic_dir = gemini_tmp / "gemini"
    if generic_dir.exists() and generic_dir != project_dir:
        dirs_to_check.append(("generic", generic_dir))

    if not dirs_to_check:
        return stats

    if not dry_run:
        dest.mkdir(parents=True, exist_ok=True)

    seen_files: set[str] = set()
    for source_type, source_dir in dirs_to_check:
        # logs.json (only from project dir)
        if source_type == "project":
            logs_file = source_dir / "logs.json"
            if logs_file.exists():
                _copy_if_changed(logs_file, dest / "logs.json", stats, dry_run)

        # chats/*.json
        chats_dir = source_dir / "chats"
        if chats_dir.exists():
            for chat_file in chats_dir.glob("*.json"):
                if chat_file.name in seen_files:
                    continue
                # For generic dir, filter by projectHash
                if source_type == "generic":
                    try:
                        with open(chat_file) as f:
                            data = json.load(f)
                        if data.get("projectHash") != project_hash:
                            continue
                    except (json.JSONDecodeError, OSError):
                        continue
                seen_files.add(chat_file.name)
                _copy_if_changed(chat_file, dest / chat_file.name, stats, dry_run)

    return stats


# --- Cursor helpers ---

def _cursor_home() -> Path:
    return Path.home() / ".cursor"


def _load_json_file(path: Path) -> dict | list | None:
    if not path.exists():
        return None
    try:
        return json.loads(path.read_text(encoding="utf-8"))
    except (OSError, json.JSONDecodeError):
        return None


def _load_json_from_db(raw: bytes | str | None) -> dict | list | None:
    if raw is None:
        return None
    text = raw.decode("utf-8", errors="ignore") if isinstance(raw, bytes) else str(raw)
    try:
        return json.loads(text)
    except json.JSONDecodeError:
        return None


# --- Cursor Agent ---

def _extract_cursor_agent_session(store_db: Path) -> dict | None:
    """Extract raw data from cursor-agent session store.db."""
    if not store_db.exists():
        return None
    try:
        with sqlite3.connect(f"file:{store_db}?mode=ro", uri=True) as conn:
            meta_row = conn.execute("SELECT value FROM meta WHERE key = '0'").fetchone()
            if not meta_row:
                return None
            meta_json = _load_json_from_db(bytes.fromhex(meta_row[0]))
            if not isinstance(meta_json, dict):
                return None

            blobs = []
            for blob_id, blob_data in conn.execute("SELECT id, data FROM blobs"):
                entry = {"blob_id": blob_id, "size": len(blob_data) if blob_data else 0}
                if blob_data and blob_data[0:1] == b'{':
                    parsed = _load_json_from_db(blob_data)
                    if parsed:
                        entry["format"] = "json"
                        entry["raw"] = parsed
                    else:
                        entry["format"] = "binary"
                else:
                    entry["format"] = "binary"
                blobs.append(entry)

            return {"meta_raw": meta_json, "blobs": blobs}
    except sqlite3.Error:
        return None


def _write_cursor_agent_jsonl(dest_file: Path, session_id: str, workspace_path: str,
                               workspace_hash: str, session: dict) -> None:
    with dest_file.open("w", encoding="utf-8") as f:
        f.write(json.dumps({"type": "metadata", "session_id": session_id,
                             "workspace_path": workspace_path, "workspace_hash": workspace_hash,
                             "meta_raw": session["meta_raw"]}, ensure_ascii=False) + "\n")
        for blob in session["blobs"]:
            f.write(json.dumps({"type": "blob", **blob}, ensure_ascii=False) + "\n")


def collect_cursor_agent_logs(cwd: str, dest_dir: Path, dry_run: bool = False) -> dict:
    """Collect logs from cursor-agent."""
    stats = {"agent": "cursor-agent", "found": 0, "copied": 0, "updated": 0, "skipped": 0}
    chats_dir = _cursor_home() / "chats"
    if not chats_dir.exists():
        return stats

    cwd_hash = hashlib.md5(cwd.encode()).hexdigest()
    workspace_dir = chats_dir / cwd_hash
    if not workspace_dir.is_dir():
        return stats

    session_dirs = [d for d in workspace_dir.iterdir() if d.is_dir() and (d / "store.db").exists()]
    if not session_dirs:
        return stats

    dest = dest_dir / "cursor-agent"
    if not dry_run:
        dest.mkdir(parents=True, exist_ok=True)

    for session_dir in sorted(session_dirs):
        session_data = _extract_cursor_agent_session(session_dir / "store.db")
        if not session_data:
            continue

        stats["found"] += 1
        dest_file = dest / f"{session_dir.name}.jsonl"

        if dest_file.exists():
            import tempfile
            with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as tmp:
                tmp_path = Path(tmp.name)
            _write_cursor_agent_jsonl(tmp_path, session_dir.name, cwd, cwd_hash, session_data)
            same = get_file_hash(tmp_path) == get_file_hash(dest_file)
            tmp_path.unlink()
            if same:
                stats["skipped"] += 1
                continue
            if not dry_run:
                _write_cursor_agent_jsonl(dest_file, session_dir.name, cwd, cwd_hash, session_data)
            stats["updated"] += 1
        else:
            if not dry_run:
                _write_cursor_agent_jsonl(dest_file, session_dir.name, cwd, cwd_hash, session_data)
            stats["copied"] += 1

    return stats


# --- Cursor IDE ---

def _parse_workspace_folder(raw: str | None) -> Path | None:
    if not raw:
        return None
    parsed = urlparse(raw)
    path = parsed.path or raw
    if path.startswith("//"):
        path = path[2:]
    return Path(unquote(path)).expanduser().resolve(strict=False)


def _find_cursor_workspaces(workspace_storage: Path, cwd: str) -> list[tuple[Path, Path]]:
    matches = []
    if not workspace_storage.is_dir():
        return matches
    cwd_path = Path(cwd).resolve()
    for ws_dir in workspace_storage.iterdir():
        if not ws_dir.is_dir():
            continue
        workspace_json = ws_dir / "workspace.json"
        if not workspace_json.exists():
            continue
        try:
            data = json.loads(workspace_json.read_text(encoding="utf-8"))
        except (OSError, json.JSONDecodeError):
            continue
        folder_path = _parse_workspace_folder(data.get("folder"))
        if not folder_path:
            continue
        # Exact match only — don't collect parent or child workspace sessions
        if folder_path == cwd_path:
            matches.append((ws_dir, folder_path))
    return matches


def _collect_composer_ids(workspace_db: Path) -> set[str]:
    ids: set[str] = set()
    if not workspace_db.exists():
        return ids
    try:
        with sqlite3.connect(f"file:{workspace_db}?mode=ro", uri=True) as conn:
            row = conn.execute("SELECT value FROM ItemTable WHERE key = ?",
                               ("composer.composerData",)).fetchone()
    except sqlite3.Error:
        return ids
    if not row:
        return ids
    payload = _load_json_from_db(row[0])
    if not payload:
        return ids
    stack = [payload]
    while stack:
        current = stack.pop()
        if isinstance(current, dict):
            if cid := current.get("composerId"):
                if isinstance(cid, str):
                    ids.add(cid)
            stack.extend(current.values())
        elif isinstance(current, list):
            stack.extend(current)
    return ids


def _extract_cursor_session(composer_id: str, cursor: sqlite3.Cursor) -> dict | None:
    row = cursor.execute("SELECT value FROM cursorDiskKV WHERE key = ?",
                         (f"composerData:{composer_id}",)).fetchone()
    if not row:
        return None
    composer_data = _load_json_from_db(row[0])
    if not isinstance(composer_data, dict):
        return None
    bubble_refs = composer_data.get("bubbles") or composer_data.get("fullConversationHeadersOnly")
    if not isinstance(bubble_refs, list) or not bubble_refs:
        return None
    bubble_ids = []
    for ref in bubble_refs:
        if isinstance(ref, dict):
            bid = ref.get("id") or ref.get("bubbleId")
            if bid:
                bubble_ids.append(bid)
    if not bubble_ids:
        return None
    raw_bubbles = []
    for bid in bubble_ids:
        brow = cursor.execute("SELECT value FROM cursorDiskKV WHERE key = ?",
                              (f"bubbleId:{composer_id}:{bid}",)).fetchone()
        if brow:
            bdata = _load_json_from_db(brow[0])
            if bdata:
                raw_bubbles.append({"bubble_id": bid, "raw": bdata})
    return {"composer_id": composer_id, "composer_data_raw": composer_data, "bubbles_raw": raw_bubbles}


def _find_cursor_transcript(cwd: str, composer_id: str) -> Path | None:
    """Find an agent-transcript JSONL for a given composer session.

    Newer Cursor versions write flat transcripts to
    ~/.cursor/projects/<slug>/agent-transcripts/<id>/<id>.jsonl.
    """
    slug = cwd.lstrip("/").replace("/", "-")
    transcript = _cursor_home() / "projects" / slug / "agent-transcripts" / composer_id / f"{composer_id}.jsonl"
    if transcript.exists() and transcript.stat().st_size >= 50:
        return transcript
    return None


def _write_cursor_session_jsonl(dest_file: Path, composer_id: str, folder_path: Path,
                                 ws_dir: Path, session: dict,
                                 transcript_path: Path | None = None) -> None:
    with dest_file.open("w", encoding="utf-8") as f:
        f.write(json.dumps({"type": "metadata", "composer_id": composer_id,
                             "workspace_path": str(folder_path), "workspace_hash": ws_dir.name,
                             "composer_data_raw": session["composer_data_raw"]},
                            ensure_ascii=False) + "\n")
        for bubble in session["bubbles_raw"]:
            f.write(json.dumps({"type": "bubble", **bubble}, ensure_ascii=False) + "\n")
        # Append transcript lines if available
        if transcript_path:
            for line in transcript_path.read_text(encoding="utf-8", errors="replace").splitlines():
                line = line.strip()
                if not line:
                    continue
                try:
                    payload = json.loads(line)
                    if isinstance(payload, dict):
                        payload["type"] = "transcript"
                        f.write(json.dumps(payload, ensure_ascii=False) + "\n")
                except json.JSONDecodeError:
                    continue


def collect_cursor_logs(cwd: str, dest_dir: Path, dry_run: bool = False) -> dict:
    """Collect logs from cursor-ide."""
    base = Path.home() / "Library" / "Application Support" / "Cursor"
    workspace_storage = base / "User" / "workspaceStorage"
    global_db = base / "User" / "globalStorage" / "state.vscdb"
    stats = {"agent": "cursor-ide", "found": 0, "copied": 0, "updated": 0, "skipped": 0}

    if not global_db.exists():
        return stats

    matches = _find_cursor_workspaces(workspace_storage, cwd)
    if not matches:
        return stats

    dest = dest_dir / "cursor-ide"
    if not dry_run:
        dest.mkdir(parents=True, exist_ok=True)

    # Track which sessions we found via cursorDiskKV (to detect transcript-only later)
    seen_ids: set[str] = set()

    try:
        with sqlite3.connect(f"file:{global_db}?mode=ro", uri=True) as global_conn:
            cursor = global_conn.cursor()
            for ws_dir, folder_path in matches:
                composer_ids = _collect_composer_ids(ws_dir / "state.vscdb")
                for composer_id in sorted(composer_ids):
                    session = _extract_cursor_session(composer_id, cursor)
                    if not session:
                        continue
                    seen_ids.add(composer_id)
                    stats["found"] += 1
                    dest_file = dest / f"{composer_id}.jsonl"
                    transcript = _find_cursor_transcript(cwd, composer_id)
                    if dest_file.exists():
                        import tempfile
                        with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as tmp:
                            tmp_path = Path(tmp.name)
                        _write_cursor_session_jsonl(tmp_path, composer_id, folder_path, ws_dir,
                                                     session, transcript)
                        same = get_file_hash(tmp_path) == get_file_hash(dest_file)
                        tmp_path.unlink()
                        if same:
                            stats["skipped"] += 1
                            continue
                        if not dry_run:
                            _write_cursor_session_jsonl(dest_file, composer_id, folder_path, ws_dir,
                                                         session, transcript)
                        stats["updated"] += 1
                    else:
                        if not dry_run:
                            _write_cursor_session_jsonl(dest_file, composer_id, folder_path, ws_dir,
                                                         session, transcript)
                        stats["copied"] += 1
    except sqlite3.Error:
        pass

    # Collect transcript-only sessions (no cursorDiskKV data)
    slug = cwd.lstrip("/").replace("/", "-")
    transcripts_dir = _cursor_home() / "projects" / slug / "agent-transcripts"
    if transcripts_dir.is_dir():
        for session_dir in sorted(transcripts_dir.iterdir()):
            if not session_dir.is_dir():
                continue
            composer_id = session_dir.name
            if composer_id in seen_ids:
                continue
            jsonl_file = session_dir / f"{composer_id}.jsonl"
            if not jsonl_file.exists() or jsonl_file.stat().st_size < 50:
                continue
            stats["found"] += 1
            dest_file = dest / f"{composer_id}.jsonl"
            # Write transcript-only file (metadata + transcript lines, no bubbles)
            if not dry_run:
                with dest_file.open("w", encoding="utf-8") as f:
                    f.write(json.dumps({"type": "metadata", "composer_id": composer_id,
                                         "workspace_path": cwd},
                                        ensure_ascii=False) + "\n")
                    for line in jsonl_file.read_text(encoding="utf-8", errors="replace").splitlines():
                        line = line.strip()
                        if not line:
                            continue
                        try:
                            payload = json.loads(line)
                            if isinstance(payload, dict):
                                payload["type"] = "transcript"
                                f.write(json.dumps(payload, ensure_ascii=False) + "\n")
                        except json.JSONDecodeError:
                            continue
            if dest_file.exists():
                stats["copied"] += 1
            else:
                stats["copied"] += 1

    return stats


# --- Main ---

AGENTS = ["claude-code", "codex", "gemini", "cursor-ide", "cursor-agent"]

COLLECTORS = {
    "claude-code": collect_claude_logs,
    "codex": collect_codex_logs,
    "gemini": collect_gemini_logs,
    "cursor-ide": collect_cursor_logs,
    "cursor-agent": collect_cursor_agent_logs,
}


def _project_name(cwd: str) -> str:
    """Derive a project name from a directory path (uses basename)."""
    return Path(cwd).name


def _resolve_raw_root(cwd: str, project: str | None, audit_dir: str | None = None) -> Path:
    project_name = project or _project_name(cwd)
    base = Path(audit_dir) if audit_dir else Path(cwd) / ".tessl" / "logs"
    return base / project_name / "raw"


def main():
    parser = argparse.ArgumentParser(description="Collect coding agent logs for a project")
    parser.add_argument("--cwd", default=os.getcwd(), help="Project directory (default: cwd)")
    parser.add_argument("--project", default=None,
                        help="Project name (default: basename of --cwd)")
    parser.add_argument("--audit-dir", default=None,
                        help="Output directory for audit data (default: <cwd>/.tessl/logs)")
    parser.add_argument("--dry-run", action="store_true", help="Show what would be copied")
    parser.add_argument("--agents", nargs="+", choices=AGENTS, default=AGENTS,
                        help="Which agents to collect from (default: all)")
    args = parser.parse_args()

    cwd = os.path.realpath(args.cwd)
    project = args.project or _project_name(cwd)
    dest_dir = _resolve_raw_root(cwd, args.project, args.audit_dir)

    print(f"Collecting logs for: {cwd}")
    print(f"Project: {project}")
    print(f"Destination: {dest_dir}")
    if args.dry_run:
        print("(dry run)")
    print()

    results = []
    for agent in args.agents:
        stats = COLLECTORS[agent](cwd, dest_dir, args.dry_run)
        results.append(stats)
        print(f"{agent}: {stats['found']} found, {stats['copied']} new, "
              f"{stats['updated']} updated, {stats['skipped']} unchanged")

    total_new = sum(r["copied"] for r in results)
    total_upd = sum(r["updated"] for r in results)
    if total_new + total_upd > 0:
        print(f"\nTotal: {total_new} new, {total_upd} updated")
    else:
        print("\nNo new or updated logs found.")


if __name__ == "__main__":
    main()

tile.json