Collect and normalize agent logs, discover installed verifiers, and dispatch LLM judges to evaluate adherence. Produces per-session verdicts and aggregated reports.
91
90%
Does it follow best practices?
Impact
96%
3.09xAverage score across 3 eval scenarios
Passed
No known issues
#!/usr/bin/env python3
"""
Dispatch friction reviewers to analyze agent sessions for friction points.
Reads prepared transcripts and invokes review_friction.py for each session.
Uses claude -p --model haiku — no API key needed.
Supports caching keyed on transcript content hash (no rules file to key on).
No external dependencies (calls review_friction.py via subprocess).
"""
import argparse
import concurrent.futures
import hashlib
import json
import os
import subprocess
from pathlib import Path
SCRIPTS_DIR = Path(__file__).resolve().parent
REVIEW_FRICTION_PY = SCRIPTS_DIR / "review_friction.py"
FRICTION_PROMPT_PATH = SCRIPTS_DIR.parent / "references" / "friction-prompt.md"
# ─── Caching ───────────────────────────────────────────────────────────────
def hash_transcript(transcript_path: Path) -> str:
"""SHA256 of transcript content for cache keying."""
content = transcript_path.read_bytes()
return hashlib.sha256(content).hexdigest()[:16]
def get_cached_sessions(cache_dir: Path) -> dict[str, Path]:
"""Get mapping of session keys to cached friction review paths."""
cached = {}
if not cache_dir.exists():
return cached
for agent_dir in cache_dir.iterdir():
if not agent_dir.is_dir() or agent_dir.name.startswith((".", "_")):
continue
for vf in agent_dir.glob("*.friction.json"):
key = f"{agent_dir.name}/{vf.stem.replace('.friction', '')}"
cached[key] = vf
return cached
# ─── Session dispatch ─────────────────────────────────────────────────────
def find_sessions(prepared_dir: Path) -> list[dict]:
"""Find all prepared session transcripts."""
sessions = []
if not prepared_dir.exists():
return sessions
for agent_dir in sorted(prepared_dir.iterdir()):
if not agent_dir.is_dir() or agent_dir.name.startswith((".", "_")):
continue
for txt_file in sorted(agent_dir.glob("*.txt")):
sessions.append(
{
"agent": agent_dir.name,
"session_id": txt_file.stem,
"file": txt_file,
}
)
return sessions
def dispatch_session(
session: dict,
out_dir: Path,
model: str = "haiku",
cache_dir: Path | None = None,
) -> dict:
"""Dispatch a single session friction review and write result."""
agent = session["agent"]
session_id = session["session_id"]
label = f"{agent}/{session_id}"
try:
friction_dir = out_dir / "friction" / agent
friction_dir.mkdir(parents=True, exist_ok=True)
friction_path = friction_dir / f"{session_id}.friction.json"
cmd = [
"uv", "run", "python3", str(REVIEW_FRICTION_PY),
"--transcript", str(session["file"]),
"--output", str(friction_path),
"--friction-prompt", str(FRICTION_PROMPT_PATH),
"--model", model,
]
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=300,
)
if result.returncode != 0:
stderr = result.stderr.strip()
return {"session": label, "status": "error", "error": f"exit {result.returncode}: {stderr}"}
review = json.loads(friction_path.read_text(encoding="utf-8"))
# Copy to cache
if cache_dir:
cache_agent_dir = cache_dir / agent
cache_agent_dir.mkdir(parents=True, exist_ok=True)
cache_path = cache_agent_dir / f"{session_id}.friction.json"
cache_path.write_text(json.dumps(review, indent=2), encoding="utf-8")
meta = review.get("_meta", {})
friction_count = len(review.get("friction", []))
return {
"session": label,
"status": "ok",
"friction_count": friction_count,
"input_tokens": meta.get("input_tokens", 0),
"output_tokens": meta.get("output_tokens", 0),
"duration_ms": meta.get("duration_ms", 0),
"cost_usd": meta.get("cost_usd", 0),
"friction_path": str(friction_path),
}
except subprocess.TimeoutExpired:
return {"session": label, "status": "timeout", "error": "review_friction.py timed out after 300s"}
except json.JSONDecodeError as e:
return {"session": label, "status": "json_error", "error": str(e)}
except Exception as e:
return {"session": label, "status": "error", "error": str(e)}
def dispatch_all(
prepared_dir: Path,
out_dir: Path,
model: str = "haiku",
max_parallel: int = 5,
dry_run: bool = False,
cache_dir: Path | None = None,
) -> list[dict]:
"""Dispatch friction reviewers for all prepared sessions."""
all_sessions = find_sessions(prepared_dir)
if not all_sessions:
print(f"No sessions found in {prepared_dir}")
return []
# Cache logic
cached_count = 0
sessions_to_dispatch = all_sessions
if cache_dir:
cached = get_cached_sessions(cache_dir)
sessions_to_dispatch = [
s for s in all_sessions
if f"{s['agent']}/{s['session_id']}" not in cached
]
cached_count = len(all_sessions) - len(sessions_to_dispatch)
# Copy cached results to output dir
if cached_count > 0:
dispatched_keys = {f"{s['agent']}/{s['session_id']}" for s in sessions_to_dispatch}
for s in all_sessions:
key = f"{s['agent']}/{s['session_id']}"
if key not in dispatched_keys and key in cached:
src = cached[key]
if src.exists():
dst_dir = out_dir / "friction" / s["agent"]
dst_dir.mkdir(parents=True, exist_ok=True)
dst = dst_dir / f"{s['session_id']}.friction.json"
if not dst.exists():
dst.write_text(src.read_text(encoding="utf-8"), encoding="utf-8")
print(f"Model: {model}")
print(f"Sessions: {len(all_sessions)} total, {len(sessions_to_dispatch)} to review")
print(f"Output: {out_dir}")
if cached_count > 0:
print(f"Cache: {cached_count} sessions cached, {len(sessions_to_dispatch)} new")
if not sessions_to_dispatch:
print("\nAll sessions cached — nothing to dispatch.")
return []
if dry_run:
print("\nDry run — sessions that would be dispatched:")
for s in sessions_to_dispatch:
size = s["file"].stat().st_size
print(f" {s['agent']}/{s['session_id']} ({size:,} chars)")
return []
print(f"\nDispatching {len(sessions_to_dispatch)} friction reviewers via claude CLI (max {max_parallel} parallel)...\n")
results = []
with concurrent.futures.ThreadPoolExecutor(max_workers=max_parallel) as pool:
futures = {
pool.submit(
dispatch_session,
session,
out_dir,
model,
cache_dir,
): session
for session in sessions_to_dispatch
}
for future in concurrent.futures.as_completed(futures):
result = future.result()
results.append(result)
status = result["status"]
label = result["session"]
if status == "ok":
friction = result.get("friction_count", 0)
tokens = result.get("input_tokens", 0) + result.get("output_tokens", 0)
ms = result.get("duration_ms", 0)
cost = result.get("cost_usd", 0)
cost_str = f", ${cost:.4f}" if cost else ""
friction_str = f", {friction} friction" if friction else ""
print(f" ok {label} — {tokens:,} tokens, {ms:,}ms{cost_str}{friction_str}")
else:
print(f" FAIL {label} — {status}: {result.get('error', '')}")
# Summary
ok = [r for r in results if r["status"] == "ok"]
failed = [r for r in results if r["status"] != "ok"]
total_friction = sum(r.get("friction_count", 0) for r in ok)
total_input = sum(r.get("input_tokens", 0) for r in ok)
total_output = sum(r.get("output_tokens", 0) for r in ok)
total_ms = sum(r.get("duration_ms", 0) for r in ok)
total_cost = sum(r.get("cost_usd", 0) for r in ok)
print("\n── Summary ──")
print(f" {len(ok)} dispatched, {cached_count} cached, {len(failed)} failed")
print(f" Friction events found: {total_friction}")
print(f" Total tokens: {total_input + total_output:,} "
f"({total_input:,} in / {total_output:,} out)")
print(f" Total wall time: {total_ms / 1000:.1f}s")
if total_cost > 0:
print(f" Total cost: ${total_cost:.4f}")
return results
# ─── CLI ────────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(
description="Dispatch friction reviewers for session analysis via claude CLI"
)
parser.add_argument(
"--dir",
required=True,
help="Run directory (contains prepared/)",
)
parser.add_argument(
"--model",
default="haiku",
choices=["haiku", "sonnet", "opus"],
help="Model to use (default: haiku)",
)
parser.add_argument(
"--out-dir",
default=None,
help="Output directory (default: same as --dir)",
)
parser.add_argument(
"--max-parallel",
type=int,
default=3,
help="Max concurrent claude CLI calls (default: 3)",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Show what would be dispatched without calling claude",
)
parser.add_argument(
"--cache-dir",
default=None,
help="Friction cache directory",
)
args = parser.parse_args()
run_dir = Path(args.dir)
prepared_dir = run_dir / "prepared"
if not prepared_dir.exists():
print(f"Error: {prepared_dir} not found. Run prepare_sessions.py first.")
return
out_dir = Path(args.out_dir) if args.out_dir else run_dir
cache_dir = Path(args.cache_dir) if args.cache_dir else None
dispatch_all(
prepared_dir=prepared_dir,
out_dir=out_dir,
model=args.model,
max_parallel=args.max_parallel,
dry_run=args.dry_run,
cache_dir=cache_dir,
)
if __name__ == "__main__":
main()