Analyze agent sessions against verifier checklists, detect friction points, and create structured verifiers from skills and docs. Produces per-session verdicts and aggregated quality reports.
88
86%
Does it follow best practices?
Impact
97%
2.93xAverage score across 3 eval scenarios
Passed
No known issues
#!/usr/bin/env python3
"""
Run the full analysis pipeline in a single invocation.
collect → normalize → discover → prepare → extract → dispatch → merge → analyze → report
With --friction, also runs friction analysis in parallel:
collect → normalize → discover → prepare ─┬─ extract → dispatch → merge ─┬─ synthesize → analyze
└─ friction dispatch → merge ──┘
This is the main entry point. It orchestrates the individual scripts via
subprocess so each step's output streams to the console. Judge dispatch
uses ``review_session.py`` which calls ``claude -p --model haiku`` — no
API key needed, just the claude CLI.
No external dependencies for this script itself.
Usage:
python3 run_pipeline.py [--project-dir /path/to/project ...] [--tiles-dir .tessl/tiles] [--recent-days 7] [--no-friction]
--project-dir defaults to cwd and accepts multiple paths. Each path gets
its own analysis dir (~/.tessl/session-analyses/<slug>). Collection, normalization,
preparation, and judge dispatch run per path. Merge and aggregation span
all paths so the final report covers sessions from every directory.
This is useful when the same repo has multiple checkout paths (worktrees,
separate clones, renamed directories) — pass them all and the pipeline
keeps each path's data in its own lane, then groups across them at
reporting time.
When --tiles-dir is omitted, auto-detects tiles in both <cwd>/.tessl/tiles
and ~/.tessl/tiles (local tiles take priority over global).
"""
import argparse
import json
import os
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path
SCRIPTS_DIR = Path(__file__).resolve().parent # skills/analyze-sessions/scripts/
TILE_ROOT = SCRIPTS_DIR.parent.parent.parent # agent-quality/ (tile root)
TILE_SCRIPTS_DIR = TILE_ROOT / "scripts" # agent-quality/scripts/
FRICTION_SCRIPTS_DIR = TILE_ROOT / "skills" / "review-friction" / "scripts"
def _analysis_dir_from_project(project_dir: str) -> str:
"""Derive the analysis data directory from a project directory path.
Produces ~/.tessl/session-analyses/<slug> where slug is the absolute path with
``/`` replaced by ``-`` (same convention used by collect_logs).
"""
slug = project_dir.replace("/", "-")
return os.path.join(os.path.expanduser("~"), ".tessl", "session-analyses", slug)
def _run(cmd: list[str], check: bool = True, capture: bool = False) -> subprocess.CompletedProcess:
"""Run a command, streaming output."""
print(f"\n{'─' * 60}")
print(f" {' '.join(cmd)}")
print(f"{'─' * 60}", flush=True)
return subprocess.run(
cmd,
check=check,
capture_output=capture,
text=True if capture else None,
)
def _py_run(
script: str,
*args: str,
capture: bool = False,
check: bool = True,
) -> subprocess.CompletedProcess:
"""Run a Python script as a subprocess using the current interpreter."""
cmd = [sys.executable, script] + list(args)
return _run(cmd, capture=capture, check=check)
def main():
parser = argparse.ArgumentParser(description="Run the full analysis pipeline")
parser.add_argument("--project-dir", nargs="+", default=[os.getcwd()],
help="Project directory(ies) to analyze (default: cwd). "
"Pass multiple paths to analyze across worktrees or separate checkouts.")
parser.add_argument("--analysis-dir", default=None,
help="Analysis data directory (default: ~/.tessl/session-analyses/<project-slug>). "
"Only valid with a single --project-dir.")
parser.add_argument("--tiles-dir", default=None, help="Tiles directory (default: auto-detect local + global)")
parser.add_argument("--recent-days", type=int, default=7, help="Trend window in days (default: 7)")
parser.add_argument("--model", default="haiku", help="Judge model (default: haiku)")
parser.add_argument("--project-label", default=None, help="Label for the report")
parser.add_argument("--agents", nargs="+", default=None, help="Filter to specific agents")
parser.add_argument("--max-sessions", type=int, default=None, help="Only process the N most recent sessions per project path (default: all)")
parser.add_argument("--refresh", action="store_true", help="Re-prepare sessions even if cached")
parser.add_argument("--dry-run", action="store_true", help="Show what would be done without running judges")
parser.add_argument("--no-friction", action="store_true", help="Skip friction analysis (friction runs by default)")
parser.add_argument("--search", nargs="+", default=None, metavar="QUERY",
help="Search mode: collect, normalize, prepare, then grep for QUERY terms. No judges dispatched.")
parser.add_argument("--sessions", nargs="+", default=None, metavar="ID",
help="Only analyze specific sessions (agent/session_id). Use with search results.")
parser.add_argument("--tiles", nargs="+", default=None, metavar="NAME",
help="Only analyze specific tiles (e.g. amyh/my-tile). Default: all tiles with verifiers.")
parser.add_argument("--confirmed", action="store_true", default=False,
help=argparse.SUPPRESS) # intentionally undocumented — used to bypass session safety limit
args = parser.parse_args()
project_dirs = [os.path.realpath(d) for d in args.project_dir]
primary_dir = project_dirs[0]
if args.analysis_dir and len(project_dirs) > 1:
print("Error: --analysis-dir cannot be used with multiple --project-dir values", file=sys.stderr)
print("Each project path gets its own analysis dir automatically.", file=sys.stderr)
sys.exit(1)
# Each project path gets its own analysis dir to keep lanes separate.
if args.analysis_dir:
analysis_dir_map = {primary_dir: os.path.realpath(args.analysis_dir)}
else:
analysis_dir_map = {d: _analysis_dir_from_project(d) for d in project_dirs}
primary_analysis_dir = analysis_dir_map[primary_dir]
run_friction = not args.no_friction and not args.search
tiles_dir = os.path.realpath(args.tiles_dir) if args.tiles_dir else None
scripts = str(SCRIPTS_DIR)
tile_scripts = str(TILE_SCRIPTS_DIR)
if len(project_dirs) == 1:
print(f"Project dir: {primary_dir}")
print(f"Analysis dir: {primary_analysis_dir}")
else:
print(f"Project dirs ({len(project_dirs)}):")
for pd in project_dirs:
print(f" {pd} → {analysis_dir_map[pd]}")
tiles_dir_args = ["--tiles-dir", tiles_dir] if tiles_dir else []
agent_args = []
if args.agents:
agent_args = ["--agents"] + args.agents
# ── Step 1: Collect & Normalize (per project path) ───────────────────
print("\n== Step 1: Collect & Normalize ==")
for project_dir in project_dirs:
analysis_dir = analysis_dir_map[project_dir]
if len(project_dirs) > 1:
print(f"\n── {project_dir} ──")
_py_run(
os.path.join(scripts, "collect_logs.py"),
"--analysis-dir", analysis_dir,
"--cwd", project_dir,
*agent_args,
)
_py_run(
os.path.join(scripts, "normalize_logs.py"),
"--analysis-dir", analysis_dir,
"--cwd", project_dir,
*agent_args,
)
# ── Search mode: collect & normalize, then grep and exit ─────────────
if args.search:
print("\n== Search Mode: Preparing & Searching ==")
search_run_ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%S")
for project_dir in project_dirs:
analysis_dir = analysis_dir_map[project_dir]
search_run_dir = os.path.join(analysis_dir, "runs", search_run_ts)
os.makedirs(search_run_dir, exist_ok=True)
prepare_args = ["--analysis-dir", analysis_dir, "--out-dir", search_run_dir]
prepare_args += agent_args
if args.refresh:
prepare_args.append("--refresh")
_py_run(os.path.join(scripts, "prepare_sessions.py"), *prepare_args)
for project_dir in project_dirs:
analysis_dir = analysis_dir_map[project_dir]
if len(project_dirs) > 1:
print(f"\n── Searching: {project_dir} ──")
_py_run(
os.path.join(scripts, "search_sessions.py"),
"--analysis-dir", analysis_dir,
"--query", *args.search,
)
return
# ── Step 2: Discover Verifiers ───────────────────────────────────────
print("\n== Step 2: Discover Verifiers ==")
result = _py_run(
os.path.join(scripts, "discover_verifiers.py"),
*tiles_dir_args,
capture=True,
)
try:
discovery = json.loads(result.stdout)
tile_names = [t["name"] for t in discovery.get("tiles", [])]
except (json.JSONDecodeError, KeyError):
tile_names = []
print(f" Warning: could not parse discover_verifiers output", flush=True)
if not tile_names:
print("\nNo verifiers found in any installed tiles.")
print("To create verifiers, use the `create-verifiers` skill included in this tile.")
print("It can extract verifiers from skills, CLAUDE.md, AGENTS.md, or your own description.")
return
if args.tiles:
requested = set(args.tiles)
filtered = [t for t in tile_names if t in requested]
skipped = requested - set(tile_names)
if skipped:
print(f" Warning: tiles not found: {', '.join(sorted(skipped))}")
tile_names = filtered
if not tile_names:
print("\nNone of the requested tiles have verifiers.")
return
print(f"\nFound verifiers in {len(tile_names)} tile(s): {', '.join(tile_names)}")
# ── Step 3: Create Run Dirs & Prepare (per project path) ─────────────
print("\n== Step 3: Create Run Dirs & Prepare ==")
run_ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%S")
run_dir_map: dict[str, str] = {}
for project_dir in project_dirs:
analysis_dir = analysis_dir_map[project_dir]
run_dir = os.path.join(analysis_dir, "runs", run_ts)
run_dir_map[project_dir] = run_dir
os.makedirs(run_dir, exist_ok=True)
latest_link = os.path.join(analysis_dir, "latest")
try:
if os.path.islink(latest_link):
os.unlink(latest_link)
os.symlink(run_dir, latest_link)
except OSError:
pass
if len(project_dirs) > 1:
print(f"\n── {project_dir} ──")
prepare_args = ["--analysis-dir", analysis_dir, "--out-dir", run_dir]
if args.max_sessions:
prepare_args += ["--max-sessions", str(args.max_sessions)]
if args.sessions:
prepare_args += ["--sessions"] + args.sessions
if args.refresh:
prepare_args.append("--refresh")
prepare_args += agent_args
_py_run(os.path.join(scripts, "prepare_sessions.py"), *prepare_args)
primary_run_dir = run_dir_map[primary_dir]
all_run_dirs = list(run_dir_map.values())
all_analysis_dirs = list(analysis_dir_map.values())
# ── Step 4: Extract & Dispatch (verifiers + optional friction) ──────
#
# Rules are extracted once per tile. Judges and friction reviewers are
# dispatched per project path (each has its own prepared/ and cache).
# Merging spans all run dirs so aggregated results cover all paths.
friction_scripts = str(FRICTION_SCRIPTS_DIR)
def run_verifier_pipeline():
"""Run the verifier extract → dispatch → merge pipeline."""
print("\n== Step 4a: Extract Rules & Dispatch Judges ==")
for tile_name in tile_names:
tile_slug = tile_name.replace("/", "-")
rules_path = os.path.join(primary_run_dir, f"rules-{tile_slug}.json")
print(f"\n── Tile: {tile_name} ──")
_py_run(
os.path.join(scripts, "extract_checklist.py"),
*tiles_dir_args,
"--tile", tile_name,
"--out", rules_path,
)
for project_dir in project_dirs:
run_dir = run_dir_map[project_dir]
analysis_dir = analysis_dir_map[project_dir]
cache_dir = os.path.join(analysis_dir, "verdict-cache")
if len(project_dirs) > 1:
print(f"\n ▸ {project_dir}")
dispatch_args = [
"--dir", run_dir,
"--rules", rules_path,
"--model", args.model,
"--cache-dir", cache_dir,
"--tile", tile_name,
"--analysis-dir", analysis_dir,
]
if args.dry_run:
dispatch_args.append("--dry-run")
if args.confirmed:
dispatch_args.append("--confirmed")
_py_run(
os.path.join(scripts, "dispatch_judges.py"),
*dispatch_args,
)
if not args.dry_run:
print("\n== Step 5a: Merge Verdicts ==")
merge_args = ["--dir"] + all_run_dirs
if len(all_run_dirs) > 1:
merge_args += ["--out", os.path.join(primary_run_dir, "verdicts-aggregate.json")]
_py_run(os.path.join(scripts, "merge_verdicts.py"), *merge_args)
def run_friction_pipeline():
"""Run the friction dispatch → merge pipeline."""
print("\n== Step 4b: Dispatch Friction Reviewers ==")
for project_dir in project_dirs:
run_dir = run_dir_map[project_dir]
analysis_dir = analysis_dir_map[project_dir]
friction_cache_dir = os.path.join(analysis_dir, "friction-cache")
if len(project_dirs) > 1:
print(f"\n ▸ {project_dir}")
dispatch_args = [
"--dir", run_dir,
"--model", args.model,
"--cache-dir", friction_cache_dir,
]
if args.dry_run:
dispatch_args.append("--dry-run")
if args.confirmed:
dispatch_args.append("--confirmed")
_py_run(
os.path.join(friction_scripts, "dispatch_friction.py"),
*dispatch_args,
)
if not args.dry_run:
print("\n== Step 5b: Merge Friction ==")
merge_args = ["--dir"] + all_run_dirs
if len(all_run_dirs) > 1:
merge_args += ["--out", os.path.join(primary_run_dir, "friction-summary.json")]
_py_run(
os.path.join(friction_scripts, "merge_friction.py"),
*merge_args,
)
if run_friction:
# Run both pipelines sequentially to keep true max concurrency at
# --max-parallel (default 3). Running them in parallel would double the
# concurrent claude processes beyond what we report to the user.
print("\n== Step 4: Dispatch Verifier Judges + Friction Reviewers ==")
errors = []
for pipeline, name in [
(run_verifier_pipeline, "verifier"),
(run_friction_pipeline, "friction"),
]:
try:
pipeline()
except Exception as e:
errors.append(f"{name}: {e}")
print(f"\n ERROR in {name} pipeline: {e}")
if errors:
print(f"\n Pipeline errors: {'; '.join(errors)}")
print(" Skipping synthesis — cannot synthesize from incomplete results.")
sys.exit(1)
# ── Step 6: Synthesize Findings ───────────────────────────────────
print("\n== Step 6: Synthesize Findings ==")
synthesize_script = os.path.join(tile_scripts, "synthesize_findings.py")
if os.path.exists(synthesize_script):
_py_run(
synthesize_script,
"--run-dir", *all_run_dirs,
"--analysis-dir", *all_analysis_dirs,
check=False,
)
else:
print(f" Warning: synthesize_findings.py not found at {synthesize_script}")
else:
# Verifier pipeline only (original behavior)
run_verifier_pipeline()
# ── Step 7: Analyze Trends ────────────────────────────────────────────
print("\n== Step 7: Analyze Trends ==")
analysis_path = os.path.join(primary_run_dir, "analysis.json")
analyze_script = os.path.join(tile_scripts, "analyze_trends.py")
if os.path.exists(analyze_script):
result = _py_run(
analyze_script,
"--analysis-dir", primary_analysis_dir,
"--recent-days", str(args.recent_days),
capture=True,
)
if result.stdout:
Path(analysis_path).write_text(result.stdout)
print(f" Analysis → {analysis_path}")
else:
print(f" Warning: analyze_trends.py not found at {analyze_script}")
print(" Skipping trend analysis.")
# ── Done ─────────────────────────────────────────────────────────────
print(f"\n{'=' * 60}")
print(" Pipeline complete!")
if len(all_run_dirs) == 1:
print(f" Run dir: {primary_run_dir}")
else:
print(f" Run dirs ({len(all_run_dirs)}):")
for rd in all_run_dirs:
print(f" {rd}")
print(f" Aggregated results: {primary_run_dir}")
if run_friction:
print(" Friction: enabled (see friction-summary.json and synthesis.json)")
print(f"{'=' * 60}")
if __name__ == "__main__":
main()