CtrlK
BlogDocsLog inGet started
Tessl Logo

try-tessl/agent-quality

Analyze agent sessions against verifier checklists, detect friction points, and create structured verifiers from skills and docs. Produces per-session verdicts and aggregated quality reports.

88

2.93x
Quality

86%

Does it follow best practices?

Impact

97%

2.93x

Average score across 3 eval scenarios

SecuritybySnyk

Passed

No known issues

Overview
Quality
Evals
Security
Files

dispatch_judges.pyskills/analyze-sessions/scripts/

#!/usr/bin/env python3
"""
Dispatch LLM judges to evaluate agent sessions against checklist-based verifiers.

Reads prepared transcripts and rules, then invokes review_session.py for each
session. The shell script calls `claude -p --model haiku` so no API key is
needed — the user's existing claude CLI credentials are used.

Supports per-tile verdict caching: when --cache-dir and --tile are provided,
checks for existing cached verdicts and only dispatches uncached sessions.
New verdicts are written to both the cache and the run output directory.

Supports model selection (haiku, sonnet, opus) and parallel dispatch.

No external dependencies (calls review_session.py via subprocess).

Security: Transcripts may contain untrusted content from prior agent sessions
(tool outputs, web page text, user messages). Judges are invoked via
``claude -p`` with no tool access, transcript content is wrapped in
<transcript> tags and the judge prompt explicitly instructs the model to treat
it as data to evaluate (not instructions to follow) and to ignore any embedded
instructions or prompt overrides. Output is a structured JSON verdict — judges
cannot take actions, write files, or execute code.
"""

from __future__ import annotations

import argparse
import concurrent.futures
import hashlib
import json
import os
import subprocess
import sys
import time
from pathlib import Path

SCRIPTS_DIR = Path(__file__).resolve().parent
REVIEW_SESSION_PY = SCRIPTS_DIR / "review_session.py"
REVIEW_PROMPT_PATH = SCRIPTS_DIR.parent / "references" / "review-prompt.md"

# Safety limit: refuse to dispatch more than this many sessions without explicit
# confirmation. This prevents runaway resource usage when an agent ignores the
# SKILL.md guidance to start small. The --confirmed flag bypasses this check.
UNCONFIRMED_SESSION_LIMIT = 10


# ─── Caching ───────────────────────────────────────────────────────────────


def hash_rules(rules_path: Path) -> str:
    """SHA256 of rules file content for cache invalidation."""
    content = rules_path.read_bytes()
    return hashlib.sha256(content).hexdigest()[:16]


def tile_cache_dir(cache_dir: Path, tile_name: str) -> Path:
    """Get the cache directory for a specific tile."""
    slug = tile_name.replace("/", "--")
    return cache_dir / slug


def check_cache(cache_dir: Path, tile_name: str, rules_hash: str) -> tuple[Path, bool]:
    """Check if tile cache is valid. Returns (tile_cache_path, is_valid)."""
    tile_dir = tile_cache_dir(cache_dir, tile_name)
    hash_file = tile_dir / "_rules-hash.txt"
    if not hash_file.exists():
        return tile_dir, False
    stored_hash = hash_file.read_text(encoding="utf-8").strip()
    return tile_dir, stored_hash == rules_hash


def get_cached_sessions(tile_cache: Path) -> set[str]:
    """Get set of session IDs that have cached verdicts for this tile."""
    cached = set()
    if not tile_cache.exists():
        return cached
    for agent_dir in tile_cache.iterdir():
        if not agent_dir.is_dir() or agent_dir.name.startswith((".", "_")):
            continue
        for vf in agent_dir.glob("*.verdict.json"):
            cached.add(f"{agent_dir.name}/{vf.stem.replace('.verdict', '')}")
    return cached


def write_cache_hash(tile_cache: Path, rules_hash: str) -> None:
    """Write rules hash to tile cache directory."""
    tile_cache.mkdir(parents=True, exist_ok=True)
    (tile_cache / "_rules-hash.txt").write_text(rules_hash, encoding="utf-8")


# ─── Session dispatch via review_session.py ───────────────────────────────


def find_sessions(prepared_dir: Path) -> list[dict]:
    """Find all prepared session transcripts."""
    sessions = []
    if not prepared_dir.exists():
        return sessions
    for agent_dir in sorted(prepared_dir.iterdir()):
        if not agent_dir.is_dir() or agent_dir.name.startswith((".", "_")):
            continue
        for txt_file in sorted(agent_dir.glob("*.txt")):
            sessions.append(
                {
                    "agent": agent_dir.name,
                    "session_id": txt_file.stem,
                    "file": txt_file,
                }
            )
    return sessions


def dispatch_session(
    session: dict,
    rules_path: Path,
    out_dir: Path,
    model: str = "haiku",
    tile_name: str | None = None,
    tile_cache: Path | None = None,
) -> dict:
    """Dispatch a single session via review_session.py and write verdict."""
    agent = session["agent"]
    session_id = session["session_id"]
    label = f"{agent}/{session_id}"

    try:
        # Verdict output path — namespaced by tile to avoid overwrites
        # when multiple tiles are evaluated against the same session.
        tile_slug = tile_name.replace("/", "--") if tile_name else "_default"
        verdict_dir = out_dir / "verdicts" / tile_slug / agent
        verdict_dir.mkdir(parents=True, exist_ok=True)
        verdict_path = verdict_dir / f"{session_id}.verdict.json"

        # Call review_session.py
        cmd = [
            sys.executable, str(REVIEW_SESSION_PY),
            "--transcript", str(session["file"]),
            "--rules", str(rules_path),
            "--output", str(verdict_path),
            "--review-prompt", str(REVIEW_PROMPT_PATH),
            "--model", model,
        ]

        result = subprocess.run(
            cmd,
            capture_output=True,
            text=True,
            timeout=300,  # 5 minute timeout per session
        )

        if result.returncode != 0:
            stderr = result.stderr.strip()
            return {"session": label, "status": "error", "error": f"exit {result.returncode}: {stderr}"}

        # Read the verdict to extract meta for summary
        verdict = json.loads(verdict_path.read_text(encoding="utf-8"))

        # Add tile name to verdict
        if tile_name:
            verdict["_tile"] = tile_name
            for inst in verdict.get("instructions", []):
                inst["tile"] = tile_name
            verdict_path.write_text(json.dumps(verdict, indent=2), encoding="utf-8")

        # Copy to cache if provided
        if tile_cache:
            cache_agent_dir = tile_cache / agent
            cache_agent_dir.mkdir(parents=True, exist_ok=True)
            cache_path = cache_agent_dir / f"{session_id}.verdict.json"
            cache_path.write_text(json.dumps(verdict, indent=2), encoding="utf-8")

        meta = verdict.get("_meta", {})
        return {
            "session": label,
            "status": "ok",
            "input_tokens": meta.get("input_tokens", 0),
            "output_tokens": meta.get("output_tokens", 0),
            "duration_ms": meta.get("duration_ms", 0),
            "cost_usd": meta.get("cost_usd", 0),
            "verdict_path": str(verdict_path),
        }

    except subprocess.TimeoutExpired:
        return {"session": label, "status": "timeout", "error": "review_session.py timed out after 300s"}
    except json.JSONDecodeError as e:
        return {"session": label, "status": "json_error", "error": str(e)}
    except Exception as e:
        return {"session": label, "status": "error", "error": str(e)}


def dispatch_all(
    prepared_dir: Path,
    rules_path: Path,
    out_dir: Path,
    model: str = "haiku",
    max_parallel: int = 5,
    dry_run: bool = False,
    cache_dir: Path | None = None,
    tile_name: str | None = None,
    confirmed: bool = False,
) -> list[dict]:
    """Dispatch judges for all prepared sessions, with optional caching."""
    rules = json.loads(rules_path.read_text(encoding="utf-8"))

    all_sessions = find_sessions(prepared_dir)
    if not all_sessions:
        print(f"No sessions found in {prepared_dir}")
        return []

    # Cache logic: check for existing verdicts
    tile_cache = None
    cached_count = 0
    sessions_to_dispatch = all_sessions

    if cache_dir and tile_name:
        rules_h = hash_rules(rules_path)
        tile_cache_path, cache_valid = check_cache(cache_dir, tile_name, rules_h)

        if cache_valid:
            cached_ids = get_cached_sessions(tile_cache_path)
            sessions_to_dispatch = [
                s for s in all_sessions
                if f"{s['agent']}/{s['session_id']}" not in cached_ids
            ]
            cached_count = len(all_sessions) - len(sessions_to_dispatch)
            tile_cache = tile_cache_path
        else:
            # Cache invalid — wipe and rebuild
            if tile_cache_path.exists():
                import shutil
                for child in tile_cache_path.iterdir():
                    if child.name != "_rules-hash.txt":
                        if child.is_dir():
                            shutil.rmtree(child)
                        else:
                            child.unlink()
            tile_cache = tile_cache_path
            write_cache_hash(tile_cache, rules_h)

        write_cache_hash(tile_cache_path, rules_h)

    # Copy cached verdicts to the run output dir
    tile_slug = tile_name.replace("/", "--") if tile_name else "_default"
    if cached_count > 0 and tile_cache:
        print(f"Cache: {cached_count} sessions cached, {len(sessions_to_dispatch)} new")
        dispatched_keys = {f"{s['agent']}/{s['session_id']}" for s in sessions_to_dispatch}
        for s in all_sessions:
            key = f"{s['agent']}/{s['session_id']}"
            if key not in dispatched_keys:
                src = tile_cache / s["agent"] / f"{s['session_id']}.verdict.json"
                if src.exists():
                    dst_dir = out_dir / "verdicts" / tile_slug / s["agent"]
                    dst_dir.mkdir(parents=True, exist_ok=True)
                    dst = dst_dir / f"{s['session_id']}.verdict.json"
                    if not dst.exists():
                        dst.write_text(src.read_text(encoding="utf-8"), encoding="utf-8")

    total_checks = rules.get("total_checks", "?")
    total_instructions = rules.get("total_instructions", "?")
    tile_label = f" [{tile_name}]" if tile_name else ""
    print(f"Model: {model}{tile_label}")
    print(f"Sessions: {len(all_sessions)} total, {len(sessions_to_dispatch)} to judge")
    print(f"Instructions: {total_instructions}, Checks: {total_checks}")
    print(f"Output: {out_dir}")

    if not sessions_to_dispatch:
        print("\nAll sessions cached — nothing to dispatch.")
        return []

    # Safety check: refuse large dispatches without explicit confirmation
    if len(sessions_to_dispatch) > UNCONFIRMED_SESSION_LIMIT and not confirmed:
        print(f"\n!! SAFETY LIMIT: {len(sessions_to_dispatch)} sessions to dispatch "
              f"exceeds the limit of {UNCONFIRMED_SESSION_LIMIT}.")
        print(f"   This would launch {len(sessions_to_dispatch)} claude CLI processes "
              f"which can consume significant RAM and API quota.")
        print(f"\n   ACTION REQUIRED: Ask the user to confirm they want to analyze "
              f"{len(sessions_to_dispatch)} sessions.")
        print("   To proceed after confirmation, re-run with the --confirmed flag.")
        sys.exit(1)

    if dry_run:
        print("\nDry run — sessions that would be dispatched:")
        for s in sessions_to_dispatch:
            size = s["file"].stat().st_size
            print(f"  {s['agent']}/{s['session_id']} ({size:,} chars)")
        return []

    print(f"\nDispatching {len(sessions_to_dispatch)} judges via claude CLI (max {max_parallel} parallel)...\n")

    results = []

    with concurrent.futures.ThreadPoolExecutor(max_workers=max_parallel) as pool:
        futures = {
            pool.submit(
                dispatch_session,
                session,
                rules_path,
                out_dir,
                model,
                tile_name,
                tile_cache,
            ): session
            for session in sessions_to_dispatch
        }

        for future in concurrent.futures.as_completed(futures):
            result = future.result()
            results.append(result)

            status = result["status"]
            label = result["session"]
            if status == "ok":
                tokens = result.get("input_tokens", 0) + result.get("output_tokens", 0)
                ms = result.get("duration_ms", 0)
                cost = result.get("cost_usd", 0)
                cost_str = f", ${cost:.4f}" if cost else ""
                print(f"  ok {label} — {tokens:,} tokens, {ms:,}ms{cost_str}")
            else:
                print(f"  FAIL {label} — {status}: {result.get('error', '')}")

    # Summary
    ok = [r for r in results if r["status"] == "ok"]
    failed = [r for r in results if r["status"] != "ok"]
    total_input = sum(r.get("input_tokens", 0) for r in ok)
    total_output = sum(r.get("output_tokens", 0) for r in ok)
    total_ms = sum(r.get("duration_ms", 0) for r in ok)
    total_cost = sum(r.get("cost_usd", 0) for r in ok)

    print("\n── Summary ──")
    print(f"  {len(ok)} dispatched, {cached_count} cached, {len(failed)} failed")
    print(f"  Total tokens: {total_input + total_output:,} "
          f"({total_input:,} in / {total_output:,} out)")
    print(f"  Total wall time: {total_ms / 1000:.1f}s")
    if total_cost > 0:
        print(f"  Total cost: ${total_cost:.4f}")

    return results


# ─── CLI ────────────────────────────────────────────────────────────────────


def main():
    parser = argparse.ArgumentParser(
        description="Dispatch LLM judges for session evaluation via claude CLI"
    )
    parser.add_argument(
        "--dir",
        required=True,
        help="Run directory (contains prepared/ and rules.json)",
    )
    parser.add_argument(
        "--rules",
        default=None,
        help="Path to rules JSON (default: <dir>/rules.json)",
    )
    parser.add_argument(
        "--model",
        default="haiku",
        choices=["haiku", "sonnet", "opus"],
        help="Model to use (default: haiku)",
    )
    parser.add_argument(
        "--out-dir",
        default=None,
        help="Output directory (default: same as --dir)",
    )
    parser.add_argument(
        "--max-parallel",
        type=int,
        default=3,
        help="Max concurrent claude CLI calls (default: 3)",
    )
    parser.add_argument(
        "--dry-run",
        action="store_true",
        help="Show what would be dispatched without calling claude",
    )
    parser.add_argument(
        "--cache-dir",
        default=None,
        help="Verdict cache directory (e.g. ~/.tessl/session-analyses/<slug>/verdict-cache)",
    )
    parser.add_argument(
        "--tile",
        default=None,
        help="Tile name for cache keying (e.g. amyh/research-best-practice)",
    )
    parser.add_argument(
        "--analysis-dir",
        default=None,
        help="Analysis directory (unused, kept for CLI compatibility)",
    )
    parser.add_argument(
        "--confirmed",
        action="store_true",
        default=False,
        help=argparse.SUPPRESS,  # intentionally undocumented
    )
    args = parser.parse_args()

    run_dir = Path(args.dir)
    prepared_dir = run_dir / "prepared"

    if not prepared_dir.exists():
        print(f"Error: {prepared_dir} not found. Run prepare_sessions.py first.")
        return

    rules_path = Path(args.rules) if args.rules else run_dir / "rules.json"
    if not rules_path.exists():
        print(f"Error: {rules_path} not found. Run extract_checklist.py first.")
        return

    out_dir = Path(args.out_dir) if args.out_dir else run_dir
    cache_dir = Path(args.cache_dir) if args.cache_dir else None

    dispatch_all(
        prepared_dir=prepared_dir,
        rules_path=rules_path,
        out_dir=out_dir,
        model=args.model,
        max_parallel=args.max_parallel,
        dry_run=args.dry_run,
        cache_dir=cache_dir,
        tile_name=args.tile,
        confirmed=args.confirmed,
    )


if __name__ == "__main__":
    main()

README.md

tile.json