CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl-labs/audit-logs

Collect and normalize agent logs, discover installed verifiers, and dispatch LLM judges to evaluate adherence. Produces per-session verdicts and aggregated reports.

91

3.09x
Quality

90%

Does it follow best practices?

Impact

96%

3.09x

Average score across 3 eval scenarios

SecuritybySnyk

Passed

No known issues

Overview
Quality
Evals
Security
Files

dispatch_friction.pyskills/friction-review/scripts/

#!/usr/bin/env python3
"""
Dispatch friction reviewers to analyze agent sessions for friction points.

Reads prepared transcripts and invokes review_friction.py for each session.
Uses claude -p --model haiku — no API key needed.

Supports caching keyed on transcript content hash (no rules file to key on).

No external dependencies (calls review_friction.py via subprocess).
"""

import argparse
import concurrent.futures
import hashlib
import json
import os
import subprocess
from pathlib import Path

SCRIPTS_DIR = Path(__file__).resolve().parent
REVIEW_FRICTION_PY = SCRIPTS_DIR / "review_friction.py"
FRICTION_PROMPT_PATH = SCRIPTS_DIR.parent / "references" / "friction-prompt.md"


# ─── Caching ───────────────────────────────────────────────────────────────


def hash_transcript(transcript_path: Path) -> str:
    """SHA256 of transcript content for cache keying."""
    content = transcript_path.read_bytes()
    return hashlib.sha256(content).hexdigest()[:16]


def get_cached_sessions(cache_dir: Path) -> dict[str, Path]:
    """Get mapping of session keys to cached friction review paths."""
    cached = {}
    if not cache_dir.exists():
        return cached
    for agent_dir in cache_dir.iterdir():
        if not agent_dir.is_dir() or agent_dir.name.startswith((".", "_")):
            continue
        for vf in agent_dir.glob("*.friction.json"):
            key = f"{agent_dir.name}/{vf.stem.replace('.friction', '')}"
            cached[key] = vf
    return cached


# ─── Session dispatch ─────────────────────────────────────────────────────


def find_sessions(prepared_dir: Path) -> list[dict]:
    """Find all prepared session transcripts."""
    sessions = []
    if not prepared_dir.exists():
        return sessions
    for agent_dir in sorted(prepared_dir.iterdir()):
        if not agent_dir.is_dir() or agent_dir.name.startswith((".", "_")):
            continue
        for txt_file in sorted(agent_dir.glob("*.txt")):
            sessions.append(
                {
                    "agent": agent_dir.name,
                    "session_id": txt_file.stem,
                    "file": txt_file,
                }
            )
    return sessions


def dispatch_session(
    session: dict,
    out_dir: Path,
    model: str = "haiku",
    cache_dir: Path | None = None,
) -> dict:
    """Dispatch a single session friction review and write result."""
    agent = session["agent"]
    session_id = session["session_id"]
    label = f"{agent}/{session_id}"

    try:
        friction_dir = out_dir / "friction" / agent
        friction_dir.mkdir(parents=True, exist_ok=True)
        friction_path = friction_dir / f"{session_id}.friction.json"

        cmd = [
            "uv", "run", "python3", str(REVIEW_FRICTION_PY),
            "--transcript", str(session["file"]),
            "--output", str(friction_path),
            "--friction-prompt", str(FRICTION_PROMPT_PATH),
            "--model", model,
        ]

        result = subprocess.run(
            cmd,
            capture_output=True,
            text=True,
            timeout=300,
        )

        if result.returncode != 0:
            stderr = result.stderr.strip()
            return {"session": label, "status": "error", "error": f"exit {result.returncode}: {stderr}"}

        review = json.loads(friction_path.read_text(encoding="utf-8"))

        # Copy to cache
        if cache_dir:
            cache_agent_dir = cache_dir / agent
            cache_agent_dir.mkdir(parents=True, exist_ok=True)
            cache_path = cache_agent_dir / f"{session_id}.friction.json"
            cache_path.write_text(json.dumps(review, indent=2), encoding="utf-8")

        meta = review.get("_meta", {})
        friction_count = len(review.get("friction", []))
        return {
            "session": label,
            "status": "ok",
            "friction_count": friction_count,
            "input_tokens": meta.get("input_tokens", 0),
            "output_tokens": meta.get("output_tokens", 0),
            "duration_ms": meta.get("duration_ms", 0),
            "cost_usd": meta.get("cost_usd", 0),
            "friction_path": str(friction_path),
        }

    except subprocess.TimeoutExpired:
        return {"session": label, "status": "timeout", "error": "review_friction.py timed out after 300s"}
    except json.JSONDecodeError as e:
        return {"session": label, "status": "json_error", "error": str(e)}
    except Exception as e:
        return {"session": label, "status": "error", "error": str(e)}


def dispatch_all(
    prepared_dir: Path,
    out_dir: Path,
    model: str = "haiku",
    max_parallel: int = 5,
    dry_run: bool = False,
    cache_dir: Path | None = None,
) -> list[dict]:
    """Dispatch friction reviewers for all prepared sessions."""
    all_sessions = find_sessions(prepared_dir)
    if not all_sessions:
        print(f"No sessions found in {prepared_dir}")
        return []

    # Cache logic
    cached_count = 0
    sessions_to_dispatch = all_sessions

    if cache_dir:
        cached = get_cached_sessions(cache_dir)
        sessions_to_dispatch = [
            s for s in all_sessions
            if f"{s['agent']}/{s['session_id']}" not in cached
        ]
        cached_count = len(all_sessions) - len(sessions_to_dispatch)

        # Copy cached results to output dir
        if cached_count > 0:
            dispatched_keys = {f"{s['agent']}/{s['session_id']}" for s in sessions_to_dispatch}
            for s in all_sessions:
                key = f"{s['agent']}/{s['session_id']}"
                if key not in dispatched_keys and key in cached:
                    src = cached[key]
                    if src.exists():
                        dst_dir = out_dir / "friction" / s["agent"]
                        dst_dir.mkdir(parents=True, exist_ok=True)
                        dst = dst_dir / f"{s['session_id']}.friction.json"
                        if not dst.exists():
                            dst.write_text(src.read_text(encoding="utf-8"), encoding="utf-8")

    print(f"Model: {model}")
    print(f"Sessions: {len(all_sessions)} total, {len(sessions_to_dispatch)} to review")
    print(f"Output: {out_dir}")

    if cached_count > 0:
        print(f"Cache: {cached_count} sessions cached, {len(sessions_to_dispatch)} new")

    if not sessions_to_dispatch:
        print("\nAll sessions cached — nothing to dispatch.")
        return []

    if dry_run:
        print("\nDry run — sessions that would be dispatched:")
        for s in sessions_to_dispatch:
            size = s["file"].stat().st_size
            print(f"  {s['agent']}/{s['session_id']} ({size:,} chars)")
        return []

    print(f"\nDispatching {len(sessions_to_dispatch)} friction reviewers via claude CLI (max {max_parallel} parallel)...\n")

    results = []

    with concurrent.futures.ThreadPoolExecutor(max_workers=max_parallel) as pool:
        futures = {
            pool.submit(
                dispatch_session,
                session,
                out_dir,
                model,
                cache_dir,
            ): session
            for session in sessions_to_dispatch
        }

        for future in concurrent.futures.as_completed(futures):
            result = future.result()
            results.append(result)

            status = result["status"]
            label = result["session"]
            if status == "ok":
                friction = result.get("friction_count", 0)
                tokens = result.get("input_tokens", 0) + result.get("output_tokens", 0)
                ms = result.get("duration_ms", 0)
                cost = result.get("cost_usd", 0)
                cost_str = f", ${cost:.4f}" if cost else ""
                friction_str = f", {friction} friction" if friction else ""
                print(f"  ok {label} — {tokens:,} tokens, {ms:,}ms{cost_str}{friction_str}")
            else:
                print(f"  FAIL {label} — {status}: {result.get('error', '')}")

    # Summary
    ok = [r for r in results if r["status"] == "ok"]
    failed = [r for r in results if r["status"] != "ok"]
    total_friction = sum(r.get("friction_count", 0) for r in ok)
    total_input = sum(r.get("input_tokens", 0) for r in ok)
    total_output = sum(r.get("output_tokens", 0) for r in ok)
    total_ms = sum(r.get("duration_ms", 0) for r in ok)
    total_cost = sum(r.get("cost_usd", 0) for r in ok)

    print("\n── Summary ──")
    print(f"  {len(ok)} dispatched, {cached_count} cached, {len(failed)} failed")
    print(f"  Friction events found: {total_friction}")
    print(f"  Total tokens: {total_input + total_output:,} "
          f"({total_input:,} in / {total_output:,} out)")
    print(f"  Total wall time: {total_ms / 1000:.1f}s")
    if total_cost > 0:
        print(f"  Total cost: ${total_cost:.4f}")

    return results


# ─── CLI ────────────────────────────────────────────────────────────────────


def main():
    parser = argparse.ArgumentParser(
        description="Dispatch friction reviewers for session analysis via claude CLI"
    )
    parser.add_argument(
        "--dir",
        required=True,
        help="Run directory (contains prepared/)",
    )
    parser.add_argument(
        "--model",
        default="haiku",
        choices=["haiku", "sonnet", "opus"],
        help="Model to use (default: haiku)",
    )
    parser.add_argument(
        "--out-dir",
        default=None,
        help="Output directory (default: same as --dir)",
    )
    parser.add_argument(
        "--max-parallel",
        type=int,
        default=3,
        help="Max concurrent claude CLI calls (default: 3)",
    )
    parser.add_argument(
        "--dry-run",
        action="store_true",
        help="Show what would be dispatched without calling claude",
    )
    parser.add_argument(
        "--cache-dir",
        default=None,
        help="Friction cache directory",
    )
    args = parser.parse_args()

    run_dir = Path(args.dir)
    prepared_dir = run_dir / "prepared"

    if not prepared_dir.exists():
        print(f"Error: {prepared_dir} not found. Run prepare_sessions.py first.")
        return

    out_dir = Path(args.out_dir) if args.out_dir else run_dir
    cache_dir = Path(args.cache_dir) if args.cache_dir else None

    dispatch_all(
        prepared_dir=prepared_dir,
        out_dir=out_dir,
        model=args.model,
        max_parallel=args.max_parallel,
        dry_run=args.dry_run,
        cache_dir=cache_dir,
    )


if __name__ == "__main__":
    main()

tile.json