CtrlK
BlogDocsLog inGet started
Tessl Logo

try-tessl/agent-quality

Analyze agent sessions against verifier checklists, detect friction points, and create structured verifiers from skills and docs. Produces per-session verdicts and aggregated quality reports.

88

2.93x
Quality

86%

Does it follow best practices?

Impact

97%

2.93x

Average score across 3 eval scenarios

SecuritybySnyk

Passed

No known issues

Overview
Quality
Evals
Security
Files

prepare_sessions.pyskills/analyze-sessions/scripts/

#!/usr/bin/env python3
"""
Prepare condensed session transcripts for judge-based analysis.

Reads normalized JSONL session files and produces condensed text transcripts
optimized for LLM consumption. Keeps user messages, assistant messages, tool
calls and results, with turn numbers for citation.

Output: per-session .txt files + manifest.json in _prepared/

No external dependencies.
"""

from __future__ import annotations

import argparse
import json
import os
from datetime import datetime, timezone
from pathlib import Path
from typing import Any


# ─── Helpers ────────────────────────────────────────────────────────────────


def _text(event: dict) -> str | None:
    segments = event.get("segments", [])
    texts = [
        s.get("data", "")
        for s in segments
        if isinstance(s, dict) and s.get("type") == "text"
    ]
    return " ".join(texts).strip() if texts else None


def _json_data(event: dict) -> dict | None:
    """Extract first JSON segment as parsed dict."""
    for s in event.get("segments", []):
        if isinstance(s, dict) and s.get("type") == "json":
            data = s.get("data", "")
            if isinstance(data, str):
                try:
                    return json.loads(data)
                except (json.JSONDecodeError, ValueError):
                    pass
            elif isinstance(data, dict):
                return data
    return None


def _tool_name(event: dict) -> str:
    tool = event.get("tool", {})
    return tool.get("display_name") or tool.get("name") or "?"


def _tool_status(event: dict) -> str | None:
    return event.get("tool", {}).get("status")


def _trunc(s: str, n: int) -> str:
    if len(s) <= n:
        return s
    return s[:n] + "..."


def _trunc_multiline(s: str, max_lines: int = 100, keep: int = 20) -> str:
    """Truncate content with many lines, keeping first/last `keep` lines."""
    lines = s.splitlines()
    if len(lines) <= max_lines:
        return s
    head = lines[:keep]
    tail = lines[-keep:]
    omitted = len(lines) - 2 * keep
    return "\n".join(head + [f"[... {omitted} lines omitted ...]"] + tail)


def _summarize_tool_args(event: dict) -> str:
    """Extract a short summary of tool call arguments."""
    jdata = _json_data(event)
    text = _text(event)

    parts = []
    if jdata and isinstance(jdata, dict):
        for key in (
            "file_path",
            "path",
            "command",
            "pattern",
            "query",
            "url",
            "skill",
        ):
            if key in jdata:
                val = str(jdata[key])
                parts.append(f"{key}={_trunc(val, 80)}")
        if not parts:
            for key in list(jdata.keys())[:2]:
                val = str(jdata[key])
                parts.append(f"{key}={_trunc(val, 60)}")
    elif text:
        parts.append(_trunc(text, 100))

    return ", ".join(parts) if parts else ""


def _extract_file_content(event: dict) -> str | None:
    """Extract written/edited file content from a tool_call event.

    Works across agents:
      - Claude Write/Edit: JSON segment has ``content`` or ``new_string``
      - Codex cat-redirect: JSON segment has ``cmd`` with heredoc content
      - Cursor/Gemini: similar ``content`` field in JSON segment
    """
    action = event.get("action", "")
    if action not in ("file_write", "file_edit"):
        return None

    jdata = _json_data(event)
    if not jdata or not isinstance(jdata, dict):
        return None

    # Direct content fields (Claude Write, Cursor write, Gemini)
    content = (jdata.get("content") or jdata.get("contents")
               or jdata.get("new_string") or "")
    if content:
        return content

    # Codex: content embedded in cmd after heredoc marker (cat > file <<'TAG'\n...)
    cmd = jdata.get("cmd", "")
    if cmd and ("<<" in cmd):
        # Extract everything after the first newline (skip the cat > ... <<'TAG' line)
        first_nl = cmd.find("\n")
        if first_nl >= 0:
            body = cmd[first_nl + 1:]
            # Strip trailing heredoc delimiter line if present
            lines = body.rsplit("\n", 1)
            if len(lines) == 2 and lines[1].strip() in (
                "HTML", "EOF", "HEREDOC", "END", "DOC", "CSS", "JS", "PY",
                "'HTML'", "'EOF'", "'HEREDOC'", "'END'",
            ):
                body = lines[0]
            return body

    return None


# ─── Timestamp fallback ────────────────────────────────────────────────────


def _extract_raw_timestamp(normalized_path: str) -> str:
    """Try to get a timestamp from the raw JSONL log when normalized events lack one.

    Reads the first line of the normalized file looking for any timestamp field,
    then falls back to the file's mtime as ISO 8601.
    """
    if not normalized_path:
        return ""
    p = Path(normalized_path)
    if not p.exists():
        return ""

    # Try first few lines for any timestamp-like field
    try:
        with open(p, encoding="utf-8", errors="replace") as f:
            for line in f:
                line = line.strip()
                if not line:
                    continue
                try:
                    event = json.loads(line)
                    for key in ("timestamp", "ts", "created_at", "time", "date"):
                        val = event.get(key, "")
                        if val and isinstance(val, str) and len(val) >= 10:
                            return val
                except (json.JSONDecodeError, ValueError):
                    pass
                break  # only check first non-empty line
    except OSError:
        pass

    # Last resort: file modification time
    try:
        mtime = p.stat().st_mtime
        return datetime.fromtimestamp(mtime, tz=timezone.utc).isoformat()
    except OSError:
        return ""


# ─── Condensation ──────────────────────────────────────────────────────────


def condense_session(events: list[dict], max_chars: int = 150000) -> str:
    """Convert normalized events into a condensed text transcript.

    Limits are generous to preserve evidence the judge needs — especially
    file write content and tool results.  The overall max_chars cap is the
    main safety valve; per-element limits just keep individual items sane.
    """
    lines: list[str] = []
    # Track the action of the most recent tool_call so we can
    # collapse read results (tool_results don't carry the action).
    last_call_action: str = ""
    prev_turn = None

    for ev in events:
        kind = ev.get("kind", "")
        actor = ev.get("actor", "")
        turn = ev.get("turn", "?")

        if kind in ("usage", "system"):
            continue

        # Visual separator between turns
        if turn != prev_turn and prev_turn is not None:
            lines.append(f"{'~' * 40}")
        prev_turn = turn

        prefix = f"[turn {turn}]"

        if kind == "message" and actor == "user":
            text = _text(ev) or ""
            lines.append(f"{prefix} USER: {text}")

        elif kind == "message" and actor == "assistant":
            text = _text(ev) or ""
            if len(text) > 3000:
                text = text[:2000] + " [...] " + text[-800:]
            lines.append(f"{prefix} ASSISTANT: {text}")

        elif kind == "tool_call":
            name = _tool_name(ev)
            action = ev.get("action", "")
            last_call_action = action
            args_summary = _summarize_tool_args(ev)

            # For shell/bash tools, annotate with the semantic action so
            # the judge can distinguish reads from writes (especially for
            # Codex where all operations go through shell()).
            action_label = ""
            if action == "file_write":
                action_label = " [WRITE]"
            elif action == "file_read":
                action_label = " [READ]"
            elif action == "file_search":
                action_label = " [SEARCH]"

            # Include file content for writes/edits — keep generously,
            # since this is the primary evidence the judge needs to
            # verify code-level rules (fonts, CSS variables, animations…).
            # Only truncate truly large files; anything under 25k chars
            # is kept in full since it easily fits the token budget.
            file_content = _extract_file_content(ev) or ""
            content_note = ""
            if file_content:
                if len(file_content) <= 25000:
                    truncated = file_content
                else:
                    truncated = _trunc_multiline(file_content, max_lines=600, keep=200)
                    if len(truncated) > 40000:
                        truncated = truncated[:28000] + "\n[... content truncated ...]\n" + truncated[-10000:]
                content_note = f"\n  FILE_CONTENT:\n{truncated}"

            call_line = f"{prefix} [TOOL] {name}{action_label}"
            if args_summary:
                call_line += f"({_trunc(args_summary, 150)})"
            call_line += content_note
            lines.append(call_line)

        elif kind == "tool_result":
            name = _tool_name(ev)
            status = _tool_status(ev)
            text = _text(ev) or ""
            is_read = (
                name.lower() in ("read", "read_file", "read_file_v2")
                or last_call_action == "file_read"
            )

            if status == "error":
                lines.append(f"{prefix} [ERROR] {name}: {_trunc(text, 500)}")
            elif is_read:
                # Collapse file read results to just metadata.
                # The judge only needs to know what was read, not its
                # content — showing content causes false positives when
                # the judge confuses read code with agent-authored code.
                line_count = text.count("\n") + 1 if text else 0
                lines.append(
                    f"{prefix} [RESULT] {name}: ({line_count} lines read)"
                )
            elif name.lower() in ("glob", "grep", "search") or last_call_action == "file_search":
                # Search results — keep short, truncate long lists
                if len(text) <= 500:
                    lines.append(f"{prefix} [RESULT] {name}: {text}")
                else:
                    result_lines = text.splitlines()
                    kept = result_lines[:10]
                    if len(result_lines) > 10:
                        kept.append(f"[... {len(result_lines) - 10} more results ...]")
                    lines.append(
                        f"{prefix} [RESULT] {name}: {chr(10).join(kept)}"
                    )
            elif len(text) <= 500:
                lines.append(f"{prefix} [RESULT] {name}: {text}")
            else:
                truncated = _trunc_multiline(text, max_lines=120, keep=40)
                if len(truncated) > 3000:
                    truncated = truncated[:2500] + "... ({} chars)".format(len(text))
                lines.append(
                    f"{prefix} [RESULT] {name}: {truncated}"
                )

        elif kind == "error":
            text = _text(ev) or ""
            lines.append(f"{prefix} [ERROR] {_trunc(text, 500)}")

    result = "\n".join(lines)
    if len(result) > max_chars:
        # Smart truncation: keep the beginning (context, skill activation)
        # and end (final output, verification) with more weight on the start
        head_chars = int(max_chars * 0.6)
        tail_chars = max_chars - head_chars
        result = (
            result[:head_chars]
            + "\n\n[... transcript truncated — middle turns omitted ...]\n\n"
            + result[-tail_chars:]
        )

    return result


# ─── Session loading ───────────────────────────────────────────────────────


def load_sessions(
    logs_dir: Path, agents: list[str] | None = None
) -> list[dict[str, Any]]:
    """Load all normalized sessions with metadata."""
    sessions = []
    for agent_dir in sorted(logs_dir.iterdir()):
        if not agent_dir.is_dir():
            continue
        agent = agent_dir.name
        if agents and agent not in agents:
            continue
        for f in sorted(agent_dir.glob("*.jsonl")):
            events = []
            for line in f.read_text(errors="replace").splitlines():
                try:
                    events.append(json.loads(line))
                except json.JSONDecodeError:
                    pass
            if events:
                sessions.append(
                    {
                        "agent": agent,
                        "file": str(f),
                        "session_id": f.stem,
                        "events": events,
                    }
                )
    return sessions


def prepare_session(session: dict, max_chars: int = 150000) -> dict | None:
    """Prepare a single session. Returns metadata + condensed text, or None if too short."""
    events = session["events"]
    if len(events) < 3:
        return None

    transcript = condense_session(events, max_chars)
    max_turn = max((e.get("turn", 0) for e in events), default=0)

    timestamps = [e.get("timestamp", "") for e in events if e.get("timestamp")]
    first_ts = timestamps[0] if timestamps else ""
    last_ts = timestamps[-1] if timestamps else ""

    # Classify session type based on whether the agent wrote/edited files
    has_writes = any(
        e.get("action") in ("file_write", "file_edit")
        for e in events
        if e.get("kind") == "tool_call"
    )
    session_type = "modifying" if has_writes else "read-only"

    header = (
        f"SESSION: {session['session_id']}\n"
        f"AGENT: {session['agent']}\n"
        f"SOURCE: {session['file']}\n"
        f"EVENTS: {len(events)}\n"
        f"TURNS: {max_turn}\n"
        f"DATE: {first_ts}\n"
        f"SESSION_TYPE: {session_type}\n"
        f"---\n"
    )

    # If no timestamps in normalized events, try to extract from raw log
    if not first_ts:
        first_ts = _extract_raw_timestamp(session.get("file", ""))
        if first_ts and not last_ts:
            last_ts = first_ts

    return {
        "session_id": session["session_id"],
        "agent": session["agent"],
        "source_file": session["file"],
        "events": len(events),
        "turns": max_turn,
        "session_timestamp": first_ts,
        "first_timestamp": first_ts,
        "last_timestamp": last_ts,
        "condensed_chars": len(transcript),
        "full_text": header + transcript,
    }


# ─── Project helpers ───────────────────────────────────────────────────────


def _discover_projects(base_dir: Path) -> list[str]:
    if not base_dir.exists():
        return []
    return [
        d.name
        for d in sorted(base_dir.iterdir())
        if d.is_dir()
        and not d.name.startswith((".", "_"))
        and (d / "normalized").is_dir()
    ]


def prepare_project(
    logs_dir: Path,
    out_dir: Path,
    max_chars: int = 150000,
    agents: list[str] | None = None,
    refresh: bool = False,
    max_sessions: int | None = None,
    session_ids: list[str] | None = None,
) -> dict:
    """Prepare all sessions for a single project. Returns manifest data.

    If *max_sessions* is set, only the N most recent sessions (by timestamp)
    are prepared — useful for quick-check mode.

    If *session_ids* is set, only those specific sessions are prepared.
    Format: ``agent/session_id`` (e.g. ``claude-code/abc123``).
    """
    prep_dir = out_dir / "prepared"
    manifest_path = prep_dir / "manifest.json"

    if not refresh and manifest_path.exists():
        manifest = json.loads(manifest_path.read_text())
        print(
            f"  Already prepared ({manifest.get('total_sessions', '?')} sessions). "
            f"Use --refresh to redo."
        )
        return manifest

    sessions = load_sessions(logs_dir, agents)

    # When --sessions is set, filter to only those specific session IDs
    if session_ids:
        id_set = set(session_ids)
        sessions = [
            s for s in sessions
            if f"{s['agent']}/{s['session_id']}" in id_set
        ]
        print(f"  Filtering to {len(sessions)} of {len(id_set)} requested session(s)")

    # When --max-sessions is set, sort by most recent first and take top N.
    # We peek at each session's first timestamp (or file mtime as fallback)
    # to determine recency.
    if max_sessions and max_sessions > 0 and len(sessions) > max_sessions:
        def _session_sort_key(sess: dict) -> str:
            for ev in sess["events"]:
                ts = ev.get("timestamp", "")
                if ts:
                    return ts
            # Fallback: file mtime
            try:
                mtime = Path(sess["file"]).stat().st_mtime
                return datetime.fromtimestamp(mtime, tz=timezone.utc).isoformat()
            except OSError:
                return ""

        sessions.sort(key=_session_sort_key, reverse=True)
        sessions = sessions[:max_sessions]
        print(f"  Limiting to {max_sessions} most recent session(s)")

    prep_dir.mkdir(parents=True, exist_ok=True)

    prepared = []
    skipped = 0
    for sess in sessions:
        result = prepare_session(sess, max_chars)
        if result is None:
            skipped += 1
            continue

        # Write condensed transcript
        agent_dir = prep_dir / result["agent"]
        agent_dir.mkdir(parents=True, exist_ok=True)
        txt_file = agent_dir / f"{result['session_id']}.txt"
        txt_file.write_text(result["full_text"], encoding="utf-8")

        # Add to manifest (without full_text)
        entry = {k: v for k, v in result.items() if k != "full_text"}
        entry["prepared_file"] = str(txt_file.relative_to(out_dir))
        prepared.append(entry)

    manifest = {
        "prepared_at": datetime.now(timezone.utc).isoformat(),
        "total_sessions": len(prepared),
        "sessions_skipped": skipped,
        "total_condensed_chars": sum(p["condensed_chars"] for p in prepared),
        "sessions": prepared,
    }
    manifest_path.write_text(json.dumps(manifest, indent=2), encoding="utf-8")

    print(f"  Prepared {len(prepared)} sessions ({skipped} skipped, <3 events)")
    print(f"  Total condensed: {manifest['total_condensed_chars']:,} chars")
    print(f"  Written to {prep_dir}/")

    return manifest


# ─── CLI ────────────────────────────────────────────────────────────────────


def main():
    parser = argparse.ArgumentParser(
        description="Prepare condensed session transcripts for judge-based analysis"
    )
    parser.add_argument(
        "--project", default=None, help="Project name (default: auto-discover)"
    )
    parser.add_argument(
        "--cwd", default=os.getcwd(), help="Project directory (default: cwd)"
    )
    parser.add_argument(
        "--analysis-dir",
        default=None,
        help="Base logs directory (default: <cwd>/.tessl/logs)",
    )
    parser.add_argument(
        "--label",
        default=None,
        help="Label for output directory (default: today's date)",
    )
    parser.add_argument(
        "--agents",
        nargs="+",
        default=None,
        help="Filter to specific agents (e.g. claude-code codex)",
    )
    parser.add_argument(
        "--max-transcript-chars",
        type=int,
        default=150000,
        help="Max chars per condensed transcript (default: 150000)",
    )
    parser.add_argument(
        "--max-sessions",
        type=int,
        default=None,
        help="Only prepare the N most recent sessions (default: all)",
    )
    parser.add_argument(
        "--refresh", action="store_true", help="Re-prepare even if already done"
    )
    parser.add_argument(
        "--sessions",
        nargs="+",
        default=None,
        help="Only prepare specific sessions (agent/session_id format)",
    )
    parser.add_argument(
        "--out-dir",
        default=None,
        help="Output directory for prepared transcripts (default: <analysis-dir>/<project>/results/<label>)",
    )
    args = parser.parse_args()

    cwd = os.path.realpath(args.cwd)
    base_dir = Path(args.analysis_dir) if args.analysis_dir else Path(cwd) / ".tessl" / "logs"
    label = args.label or datetime.now().strftime("%Y-%m-%d")

    if args.project:
        projects = [args.project]
    else:
        projects = _discover_projects(base_dir)
        if not projects:
            print(f"No projects found under {base_dir}/")
            return

    for project in projects:
        if len(projects) > 1:
            print(f"── {project} ──")
        # Security: reads only from normalized/ where secrets have already been
        # redacted by normalize_logs.py — raw logs are never used past that stage.
        logs_dir = base_dir / project / "normalized"
        if not logs_dir.exists():
            print(f"  Normalized logs not found: {logs_dir}")
            continue
        if args.out_dir:
            out_dir = Path(args.out_dir)
        else:
            out_dir = base_dir / project / "results" / label
        prepare_project(
            logs_dir, out_dir, args.max_transcript_chars, args.agents, args.refresh,
            max_sessions=args.max_sessions,
            session_ids=args.sessions,
        )


if __name__ == "__main__":
    main()

README.md

tile.json