Analyze agent sessions against verifier checklists, detect friction points, and create structured verifiers from skills and docs. Produces per-session verdicts and aggregated quality reports.
88
86%
Does it follow best practices?
Impact
97%
2.93xAverage score across 3 eval scenarios
Passed
No known issues
#!/usr/bin/env python3
"""
Dispatch LLM judges to evaluate agent sessions against checklist-based verifiers.
Reads prepared transcripts and rules, then invokes review_session.py for each
session. The shell script calls `claude -p --model haiku` so no API key is
needed — the user's existing claude CLI credentials are used.
Supports per-tile verdict caching: when --cache-dir and --tile are provided,
checks for existing cached verdicts and only dispatches uncached sessions.
New verdicts are written to both the cache and the run output directory.
Supports model selection (haiku, sonnet, opus) and parallel dispatch.
No external dependencies (calls review_session.py via subprocess).
Security: Transcripts may contain untrusted content from prior agent sessions
(tool outputs, web page text, user messages). Judges are invoked via
``claude -p`` with no tool access, transcript content is wrapped in
<transcript> tags and the judge prompt explicitly instructs the model to treat
it as data to evaluate (not instructions to follow) and to ignore any embedded
instructions or prompt overrides. Output is a structured JSON verdict — judges
cannot take actions, write files, or execute code.
"""
from __future__ import annotations
import argparse
import concurrent.futures
import hashlib
import json
import os
import subprocess
import sys
import time
from pathlib import Path
SCRIPTS_DIR = Path(__file__).resolve().parent
REVIEW_SESSION_PY = SCRIPTS_DIR / "review_session.py"
REVIEW_PROMPT_PATH = SCRIPTS_DIR.parent / "references" / "review-prompt.md"
# Safety limit: refuse to dispatch more than this many sessions without explicit
# confirmation. This prevents runaway resource usage when an agent ignores the
# SKILL.md guidance to start small. The --confirmed flag bypasses this check.
UNCONFIRMED_SESSION_LIMIT = 10
# ─── Caching ───────────────────────────────────────────────────────────────
def hash_rules(rules_path: Path) -> str:
"""SHA256 of rules file content for cache invalidation."""
content = rules_path.read_bytes()
return hashlib.sha256(content).hexdigest()[:16]
def tile_cache_dir(cache_dir: Path, tile_name: str) -> Path:
"""Get the cache directory for a specific tile."""
slug = tile_name.replace("/", "--")
return cache_dir / slug
def check_cache(cache_dir: Path, tile_name: str, rules_hash: str) -> tuple[Path, bool]:
"""Check if tile cache is valid. Returns (tile_cache_path, is_valid)."""
tile_dir = tile_cache_dir(cache_dir, tile_name)
hash_file = tile_dir / "_rules-hash.txt"
if not hash_file.exists():
return tile_dir, False
stored_hash = hash_file.read_text(encoding="utf-8").strip()
return tile_dir, stored_hash == rules_hash
def get_cached_sessions(tile_cache: Path) -> set[str]:
"""Get set of session IDs that have cached verdicts for this tile."""
cached = set()
if not tile_cache.exists():
return cached
for agent_dir in tile_cache.iterdir():
if not agent_dir.is_dir() or agent_dir.name.startswith((".", "_")):
continue
for vf in agent_dir.glob("*.verdict.json"):
cached.add(f"{agent_dir.name}/{vf.stem.replace('.verdict', '')}")
return cached
def write_cache_hash(tile_cache: Path, rules_hash: str) -> None:
"""Write rules hash to tile cache directory."""
tile_cache.mkdir(parents=True, exist_ok=True)
(tile_cache / "_rules-hash.txt").write_text(rules_hash, encoding="utf-8")
# ─── Session dispatch via review_session.py ───────────────────────────────
def find_sessions(prepared_dir: Path) -> list[dict]:
"""Find all prepared session transcripts."""
sessions = []
if not prepared_dir.exists():
return sessions
for agent_dir in sorted(prepared_dir.iterdir()):
if not agent_dir.is_dir() or agent_dir.name.startswith((".", "_")):
continue
for txt_file in sorted(agent_dir.glob("*.txt")):
sessions.append(
{
"agent": agent_dir.name,
"session_id": txt_file.stem,
"file": txt_file,
}
)
return sessions
def dispatch_session(
session: dict,
rules_path: Path,
out_dir: Path,
model: str = "haiku",
tile_name: str | None = None,
tile_cache: Path | None = None,
) -> dict:
"""Dispatch a single session via review_session.py and write verdict."""
agent = session["agent"]
session_id = session["session_id"]
label = f"{agent}/{session_id}"
try:
# Verdict output path — namespaced by tile to avoid overwrites
# when multiple tiles are evaluated against the same session.
tile_slug = tile_name.replace("/", "--") if tile_name else "_default"
verdict_dir = out_dir / "verdicts" / tile_slug / agent
verdict_dir.mkdir(parents=True, exist_ok=True)
verdict_path = verdict_dir / f"{session_id}.verdict.json"
# Call review_session.py
cmd = [
sys.executable, str(REVIEW_SESSION_PY),
"--transcript", str(session["file"]),
"--rules", str(rules_path),
"--output", str(verdict_path),
"--review-prompt", str(REVIEW_PROMPT_PATH),
"--model", model,
]
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=300, # 5 minute timeout per session
)
if result.returncode != 0:
stderr = result.stderr.strip()
return {"session": label, "status": "error", "error": f"exit {result.returncode}: {stderr}"}
# Read the verdict to extract meta for summary
verdict = json.loads(verdict_path.read_text(encoding="utf-8"))
# Add tile name to verdict
if tile_name:
verdict["_tile"] = tile_name
for inst in verdict.get("instructions", []):
inst["tile"] = tile_name
verdict_path.write_text(json.dumps(verdict, indent=2), encoding="utf-8")
# Copy to cache if provided
if tile_cache:
cache_agent_dir = tile_cache / agent
cache_agent_dir.mkdir(parents=True, exist_ok=True)
cache_path = cache_agent_dir / f"{session_id}.verdict.json"
cache_path.write_text(json.dumps(verdict, indent=2), encoding="utf-8")
meta = verdict.get("_meta", {})
return {
"session": label,
"status": "ok",
"input_tokens": meta.get("input_tokens", 0),
"output_tokens": meta.get("output_tokens", 0),
"duration_ms": meta.get("duration_ms", 0),
"cost_usd": meta.get("cost_usd", 0),
"verdict_path": str(verdict_path),
}
except subprocess.TimeoutExpired:
return {"session": label, "status": "timeout", "error": "review_session.py timed out after 300s"}
except json.JSONDecodeError as e:
return {"session": label, "status": "json_error", "error": str(e)}
except Exception as e:
return {"session": label, "status": "error", "error": str(e)}
def dispatch_all(
prepared_dir: Path,
rules_path: Path,
out_dir: Path,
model: str = "haiku",
max_parallel: int = 5,
dry_run: bool = False,
cache_dir: Path | None = None,
tile_name: str | None = None,
confirmed: bool = False,
) -> list[dict]:
"""Dispatch judges for all prepared sessions, with optional caching."""
rules = json.loads(rules_path.read_text(encoding="utf-8"))
all_sessions = find_sessions(prepared_dir)
if not all_sessions:
print(f"No sessions found in {prepared_dir}")
return []
# Cache logic: check for existing verdicts
tile_cache = None
cached_count = 0
sessions_to_dispatch = all_sessions
if cache_dir and tile_name:
rules_h = hash_rules(rules_path)
tile_cache_path, cache_valid = check_cache(cache_dir, tile_name, rules_h)
if cache_valid:
cached_ids = get_cached_sessions(tile_cache_path)
sessions_to_dispatch = [
s for s in all_sessions
if f"{s['agent']}/{s['session_id']}" not in cached_ids
]
cached_count = len(all_sessions) - len(sessions_to_dispatch)
tile_cache = tile_cache_path
else:
# Cache invalid — wipe and rebuild
if tile_cache_path.exists():
import shutil
for child in tile_cache_path.iterdir():
if child.name != "_rules-hash.txt":
if child.is_dir():
shutil.rmtree(child)
else:
child.unlink()
tile_cache = tile_cache_path
write_cache_hash(tile_cache, rules_h)
write_cache_hash(tile_cache_path, rules_h)
# Copy cached verdicts to the run output dir
tile_slug = tile_name.replace("/", "--") if tile_name else "_default"
if cached_count > 0 and tile_cache:
print(f"Cache: {cached_count} sessions cached, {len(sessions_to_dispatch)} new")
dispatched_keys = {f"{s['agent']}/{s['session_id']}" for s in sessions_to_dispatch}
for s in all_sessions:
key = f"{s['agent']}/{s['session_id']}"
if key not in dispatched_keys:
src = tile_cache / s["agent"] / f"{s['session_id']}.verdict.json"
if src.exists():
dst_dir = out_dir / "verdicts" / tile_slug / s["agent"]
dst_dir.mkdir(parents=True, exist_ok=True)
dst = dst_dir / f"{s['session_id']}.verdict.json"
if not dst.exists():
dst.write_text(src.read_text(encoding="utf-8"), encoding="utf-8")
total_checks = rules.get("total_checks", "?")
total_instructions = rules.get("total_instructions", "?")
tile_label = f" [{tile_name}]" if tile_name else ""
print(f"Model: {model}{tile_label}")
print(f"Sessions: {len(all_sessions)} total, {len(sessions_to_dispatch)} to judge")
print(f"Instructions: {total_instructions}, Checks: {total_checks}")
print(f"Output: {out_dir}")
if not sessions_to_dispatch:
print("\nAll sessions cached — nothing to dispatch.")
return []
# Safety check: refuse large dispatches without explicit confirmation
if len(sessions_to_dispatch) > UNCONFIRMED_SESSION_LIMIT and not confirmed:
print(f"\n!! SAFETY LIMIT: {len(sessions_to_dispatch)} sessions to dispatch "
f"exceeds the limit of {UNCONFIRMED_SESSION_LIMIT}.")
print(f" This would launch {len(sessions_to_dispatch)} claude CLI processes "
f"which can consume significant RAM and API quota.")
print(f"\n ACTION REQUIRED: Ask the user to confirm they want to analyze "
f"{len(sessions_to_dispatch)} sessions.")
print(" To proceed after confirmation, re-run with the --confirmed flag.")
sys.exit(1)
if dry_run:
print("\nDry run — sessions that would be dispatched:")
for s in sessions_to_dispatch:
size = s["file"].stat().st_size
print(f" {s['agent']}/{s['session_id']} ({size:,} chars)")
return []
print(f"\nDispatching {len(sessions_to_dispatch)} judges via claude CLI (max {max_parallel} parallel)...\n")
results = []
with concurrent.futures.ThreadPoolExecutor(max_workers=max_parallel) as pool:
futures = {
pool.submit(
dispatch_session,
session,
rules_path,
out_dir,
model,
tile_name,
tile_cache,
): session
for session in sessions_to_dispatch
}
for future in concurrent.futures.as_completed(futures):
result = future.result()
results.append(result)
status = result["status"]
label = result["session"]
if status == "ok":
tokens = result.get("input_tokens", 0) + result.get("output_tokens", 0)
ms = result.get("duration_ms", 0)
cost = result.get("cost_usd", 0)
cost_str = f", ${cost:.4f}" if cost else ""
print(f" ok {label} — {tokens:,} tokens, {ms:,}ms{cost_str}")
else:
print(f" FAIL {label} — {status}: {result.get('error', '')}")
# Summary
ok = [r for r in results if r["status"] == "ok"]
failed = [r for r in results if r["status"] != "ok"]
total_input = sum(r.get("input_tokens", 0) for r in ok)
total_output = sum(r.get("output_tokens", 0) for r in ok)
total_ms = sum(r.get("duration_ms", 0) for r in ok)
total_cost = sum(r.get("cost_usd", 0) for r in ok)
print("\n── Summary ──")
print(f" {len(ok)} dispatched, {cached_count} cached, {len(failed)} failed")
print(f" Total tokens: {total_input + total_output:,} "
f"({total_input:,} in / {total_output:,} out)")
print(f" Total wall time: {total_ms / 1000:.1f}s")
if total_cost > 0:
print(f" Total cost: ${total_cost:.4f}")
return results
# ─── CLI ────────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(
description="Dispatch LLM judges for session evaluation via claude CLI"
)
parser.add_argument(
"--dir",
required=True,
help="Run directory (contains prepared/ and rules.json)",
)
parser.add_argument(
"--rules",
default=None,
help="Path to rules JSON (default: <dir>/rules.json)",
)
parser.add_argument(
"--model",
default="haiku",
choices=["haiku", "sonnet", "opus"],
help="Model to use (default: haiku)",
)
parser.add_argument(
"--out-dir",
default=None,
help="Output directory (default: same as --dir)",
)
parser.add_argument(
"--max-parallel",
type=int,
default=3,
help="Max concurrent claude CLI calls (default: 3)",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Show what would be dispatched without calling claude",
)
parser.add_argument(
"--cache-dir",
default=None,
help="Verdict cache directory (e.g. ~/.tessl/session-analyses/<slug>/verdict-cache)",
)
parser.add_argument(
"--tile",
default=None,
help="Tile name for cache keying (e.g. amyh/research-best-practice)",
)
parser.add_argument(
"--analysis-dir",
default=None,
help="Analysis directory (unused, kept for CLI compatibility)",
)
parser.add_argument(
"--confirmed",
action="store_true",
default=False,
help=argparse.SUPPRESS, # intentionally undocumented
)
args = parser.parse_args()
run_dir = Path(args.dir)
prepared_dir = run_dir / "prepared"
if not prepared_dir.exists():
print(f"Error: {prepared_dir} not found. Run prepare_sessions.py first.")
return
rules_path = Path(args.rules) if args.rules else run_dir / "rules.json"
if not rules_path.exists():
print(f"Error: {rules_path} not found. Run extract_checklist.py first.")
return
out_dir = Path(args.out_dir) if args.out_dir else run_dir
cache_dir = Path(args.cache_dir) if args.cache_dir else None
dispatch_all(
prepared_dir=prepared_dir,
rules_path=rules_path,
out_dir=out_dir,
model=args.model,
max_parallel=args.max_parallel,
dry_run=args.dry_run,
cache_dir=cache_dir,
tile_name=args.tile,
confirmed=args.confirmed,
)
if __name__ == "__main__":
main()