CtrlK
BlogDocsLog inGet started
Tessl Logo

try-tessl/agent-quality

Analyze agent sessions against verifier checklists, detect friction points, and create structured verifiers from skills and docs. Produces per-session verdicts and aggregated quality reports.

88

2.93x
Quality

86%

Does it follow best practices?

Impact

97%

2.93x

Average score across 3 eval scenarios

SecuritybySnyk

Passed

No known issues

Overview
Quality
Evals
Security
Files

run_pipeline.pyskills/analyze-sessions/scripts/

#!/usr/bin/env python3
"""
Run the full analysis pipeline in a single invocation.

    collect → normalize → discover → prepare → extract → dispatch → merge → analyze → report

With --friction, also runs friction analysis in parallel:

    collect → normalize → discover → prepare ─┬─ extract → dispatch → merge ─┬─ synthesize → analyze
                                               └─ friction dispatch → merge ──┘

This is the main entry point. It orchestrates the individual scripts via
subprocess so each step's output streams to the console. Judge dispatch
uses ``review_session.py`` which calls ``claude -p --model haiku`` — no
API key needed, just the claude CLI.

No external dependencies for this script itself.

Usage:
    python3 run_pipeline.py [--project-dir /path/to/project ...] [--tiles-dir .tessl/tiles] [--recent-days 7] [--no-friction]

--project-dir defaults to cwd and accepts multiple paths. Each path gets
its own analysis dir (~/.tessl/session-analyses/<slug>). Collection, normalization,
preparation, and judge dispatch run per path. Merge and aggregation span
all paths so the final report covers sessions from every directory.

This is useful when the same repo has multiple checkout paths (worktrees,
separate clones, renamed directories) — pass them all and the pipeline
keeps each path's data in its own lane, then groups across them at
reporting time.

When --tiles-dir is omitted, auto-detects tiles in both <cwd>/.tessl/tiles
and ~/.tessl/tiles (local tiles take priority over global).
"""

import argparse
import json
import os
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path


SCRIPTS_DIR = Path(__file__).resolve().parent          # skills/analyze-sessions/scripts/
TILE_ROOT = SCRIPTS_DIR.parent.parent.parent           # agent-quality/ (tile root)
TILE_SCRIPTS_DIR = TILE_ROOT / "scripts"               # agent-quality/scripts/
FRICTION_SCRIPTS_DIR = TILE_ROOT / "skills" / "review-friction" / "scripts"


def _analysis_dir_from_project(project_dir: str) -> str:
    """Derive the analysis data directory from a project directory path.

    Produces ~/.tessl/session-analyses/<slug> where slug is the absolute path with
    ``/`` replaced by ``-`` (same convention used by collect_logs).
    """
    slug = project_dir.replace("/", "-")
    return os.path.join(os.path.expanduser("~"), ".tessl", "session-analyses", slug)


def _run(cmd: list[str], check: bool = True, capture: bool = False) -> subprocess.CompletedProcess:
    """Run a command, streaming output."""
    print(f"\n{'─' * 60}")
    print(f"  {' '.join(cmd)}")
    print(f"{'─' * 60}", flush=True)
    return subprocess.run(
        cmd,
        check=check,
        capture_output=capture,
        text=True if capture else None,
    )


def _py_run(
    script: str,
    *args: str,
    capture: bool = False,
    check: bool = True,
) -> subprocess.CompletedProcess:
    """Run a Python script as a subprocess using the current interpreter."""
    cmd = [sys.executable, script] + list(args)
    return _run(cmd, capture=capture, check=check)


def main():
    parser = argparse.ArgumentParser(description="Run the full analysis pipeline")
    parser.add_argument("--project-dir", nargs="+", default=[os.getcwd()],
                        help="Project directory(ies) to analyze (default: cwd). "
                             "Pass multiple paths to analyze across worktrees or separate checkouts.")
    parser.add_argument("--analysis-dir", default=None,
                        help="Analysis data directory (default: ~/.tessl/session-analyses/<project-slug>). "
                             "Only valid with a single --project-dir.")
    parser.add_argument("--tiles-dir", default=None, help="Tiles directory (default: auto-detect local + global)")
    parser.add_argument("--recent-days", type=int, default=7, help="Trend window in days (default: 7)")
    parser.add_argument("--model", default="haiku", help="Judge model (default: haiku)")
    parser.add_argument("--project-label", default=None, help="Label for the report")
    parser.add_argument("--agents", nargs="+", default=None, help="Filter to specific agents")
    parser.add_argument("--max-sessions", type=int, default=None, help="Only process the N most recent sessions per project path (default: all)")
    parser.add_argument("--refresh", action="store_true", help="Re-prepare sessions even if cached")
    parser.add_argument("--dry-run", action="store_true", help="Show what would be done without running judges")
    parser.add_argument("--no-friction", action="store_true", help="Skip friction analysis (friction runs by default)")
    parser.add_argument("--search", nargs="+", default=None, metavar="QUERY",
                        help="Search mode: collect, normalize, prepare, then grep for QUERY terms. No judges dispatched.")
    parser.add_argument("--sessions", nargs="+", default=None, metavar="ID",
                        help="Only analyze specific sessions (agent/session_id). Use with search results.")
    parser.add_argument("--tiles", nargs="+", default=None, metavar="NAME",
                        help="Only analyze specific tiles (e.g. amyh/my-tile). Default: all tiles with verifiers.")
    parser.add_argument("--confirmed", action="store_true", default=False,
                        help=argparse.SUPPRESS)  # intentionally undocumented — used to bypass session safety limit
    args = parser.parse_args()

    project_dirs = [os.path.realpath(d) for d in args.project_dir]
    primary_dir = project_dirs[0]

    if args.analysis_dir and len(project_dirs) > 1:
        print("Error: --analysis-dir cannot be used with multiple --project-dir values", file=sys.stderr)
        print("Each project path gets its own analysis dir automatically.", file=sys.stderr)
        sys.exit(1)

    # Each project path gets its own analysis dir to keep lanes separate.
    if args.analysis_dir:
        analysis_dir_map = {primary_dir: os.path.realpath(args.analysis_dir)}
    else:
        analysis_dir_map = {d: _analysis_dir_from_project(d) for d in project_dirs}

    primary_analysis_dir = analysis_dir_map[primary_dir]
    run_friction = not args.no_friction and not args.search
    tiles_dir = os.path.realpath(args.tiles_dir) if args.tiles_dir else None
    scripts = str(SCRIPTS_DIR)
    tile_scripts = str(TILE_SCRIPTS_DIR)

    if len(project_dirs) == 1:
        print(f"Project dir: {primary_dir}")
        print(f"Analysis dir:   {primary_analysis_dir}")
    else:
        print(f"Project dirs ({len(project_dirs)}):")
        for pd in project_dirs:
            print(f"  {pd} → {analysis_dir_map[pd]}")

    tiles_dir_args = ["--tiles-dir", tiles_dir] if tiles_dir else []

    agent_args = []
    if args.agents:
        agent_args = ["--agents"] + args.agents

    # ── Step 1: Collect & Normalize (per project path) ───────────────────
    print("\n== Step 1: Collect & Normalize ==")

    for project_dir in project_dirs:
        analysis_dir = analysis_dir_map[project_dir]
        if len(project_dirs) > 1:
            print(f"\n── {project_dir} ──")
        _py_run(
            os.path.join(scripts, "collect_logs.py"),
            "--analysis-dir", analysis_dir,
            "--cwd", project_dir,
            *agent_args,
        )
        _py_run(
            os.path.join(scripts, "normalize_logs.py"),
            "--analysis-dir", analysis_dir,
            "--cwd", project_dir,
            *agent_args,
        )

    # ── Search mode: collect & normalize, then grep and exit ─────────────
    if args.search:
        print("\n== Search Mode: Preparing & Searching ==")

        search_run_ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%S")
        for project_dir in project_dirs:
            analysis_dir = analysis_dir_map[project_dir]
            search_run_dir = os.path.join(analysis_dir, "runs", search_run_ts)
            os.makedirs(search_run_dir, exist_ok=True)

            prepare_args = ["--analysis-dir", analysis_dir, "--out-dir", search_run_dir]
            prepare_args += agent_args
            if args.refresh:
                prepare_args.append("--refresh")
            _py_run(os.path.join(scripts, "prepare_sessions.py"), *prepare_args)

        for project_dir in project_dirs:
            analysis_dir = analysis_dir_map[project_dir]
            if len(project_dirs) > 1:
                print(f"\n── Searching: {project_dir} ──")
            _py_run(
                os.path.join(scripts, "search_sessions.py"),
                "--analysis-dir", analysis_dir,
                "--query", *args.search,
            )
        return

    # ── Step 2: Discover Verifiers ───────────────────────────────────────
    print("\n== Step 2: Discover Verifiers ==")

    result = _py_run(
        os.path.join(scripts, "discover_verifiers.py"),
        *tiles_dir_args,
        capture=True,
    )

    try:
        discovery = json.loads(result.stdout)
        tile_names = [t["name"] for t in discovery.get("tiles", [])]
    except (json.JSONDecodeError, KeyError):
        tile_names = []
        print(f"  Warning: could not parse discover_verifiers output", flush=True)

    if not tile_names:
        print("\nNo verifiers found in any installed tiles.")
        print("To create verifiers, use the `create-verifiers` skill included in this tile.")
        print("It can extract verifiers from skills, CLAUDE.md, AGENTS.md, or your own description.")
        return

    if args.tiles:
        requested = set(args.tiles)
        filtered = [t for t in tile_names if t in requested]
        skipped = requested - set(tile_names)
        if skipped:
            print(f"  Warning: tiles not found: {', '.join(sorted(skipped))}")
        tile_names = filtered
        if not tile_names:
            print("\nNone of the requested tiles have verifiers.")
            return

    print(f"\nFound verifiers in {len(tile_names)} tile(s): {', '.join(tile_names)}")

    # ── Step 3: Create Run Dirs & Prepare (per project path) ─────────────
    print("\n== Step 3: Create Run Dirs & Prepare ==")

    run_ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%S")
    run_dir_map: dict[str, str] = {}

    for project_dir in project_dirs:
        analysis_dir = analysis_dir_map[project_dir]
        run_dir = os.path.join(analysis_dir, "runs", run_ts)
        run_dir_map[project_dir] = run_dir
        os.makedirs(run_dir, exist_ok=True)

        latest_link = os.path.join(analysis_dir, "latest")
        try:
            if os.path.islink(latest_link):
                os.unlink(latest_link)
            os.symlink(run_dir, latest_link)
        except OSError:
            pass

        if len(project_dirs) > 1:
            print(f"\n── {project_dir} ──")

        prepare_args = ["--analysis-dir", analysis_dir, "--out-dir", run_dir]
        if args.max_sessions:
            prepare_args += ["--max-sessions", str(args.max_sessions)]
        if args.sessions:
            prepare_args += ["--sessions"] + args.sessions
        if args.refresh:
            prepare_args.append("--refresh")
        prepare_args += agent_args

        _py_run(os.path.join(scripts, "prepare_sessions.py"), *prepare_args)

    primary_run_dir = run_dir_map[primary_dir]
    all_run_dirs = list(run_dir_map.values())
    all_analysis_dirs = list(analysis_dir_map.values())

    # ── Step 4: Extract & Dispatch (verifiers + optional friction) ──────
    #
    # Rules are extracted once per tile. Judges and friction reviewers are
    # dispatched per project path (each has its own prepared/ and cache).
    # Merging spans all run dirs so aggregated results cover all paths.

    friction_scripts = str(FRICTION_SCRIPTS_DIR)

    def run_verifier_pipeline():
        """Run the verifier extract → dispatch → merge pipeline."""
        print("\n== Step 4a: Extract Rules & Dispatch Judges ==")

        for tile_name in tile_names:
            tile_slug = tile_name.replace("/", "-")
            rules_path = os.path.join(primary_run_dir, f"rules-{tile_slug}.json")

            print(f"\n── Tile: {tile_name} ──")

            _py_run(
                os.path.join(scripts, "extract_checklist.py"),
                *tiles_dir_args,
                "--tile", tile_name,
                "--out", rules_path,
            )

            for project_dir in project_dirs:
                run_dir = run_dir_map[project_dir]
                analysis_dir = analysis_dir_map[project_dir]
                cache_dir = os.path.join(analysis_dir, "verdict-cache")

                if len(project_dirs) > 1:
                    print(f"\n  ▸ {project_dir}")

                dispatch_args = [
                    "--dir", run_dir,
                    "--rules", rules_path,
                    "--model", args.model,
                    "--cache-dir", cache_dir,
                    "--tile", tile_name,
                    "--analysis-dir", analysis_dir,
                ]
                if args.dry_run:
                    dispatch_args.append("--dry-run")
                if args.confirmed:
                    dispatch_args.append("--confirmed")

                _py_run(
                    os.path.join(scripts, "dispatch_judges.py"),
                    *dispatch_args,
                )

        if not args.dry_run:
            print("\n== Step 5a: Merge Verdicts ==")
            merge_args = ["--dir"] + all_run_dirs
            if len(all_run_dirs) > 1:
                merge_args += ["--out", os.path.join(primary_run_dir, "verdicts-aggregate.json")]
            _py_run(os.path.join(scripts, "merge_verdicts.py"), *merge_args)

    def run_friction_pipeline():
        """Run the friction dispatch → merge pipeline."""
        print("\n== Step 4b: Dispatch Friction Reviewers ==")

        for project_dir in project_dirs:
            run_dir = run_dir_map[project_dir]
            analysis_dir = analysis_dir_map[project_dir]
            friction_cache_dir = os.path.join(analysis_dir, "friction-cache")

            if len(project_dirs) > 1:
                print(f"\n  ▸ {project_dir}")

            dispatch_args = [
                "--dir", run_dir,
                "--model", args.model,
                "--cache-dir", friction_cache_dir,
            ]
            if args.dry_run:
                dispatch_args.append("--dry-run")
            if args.confirmed:
                dispatch_args.append("--confirmed")

            _py_run(
                os.path.join(friction_scripts, "dispatch_friction.py"),
                *dispatch_args,
            )

        if not args.dry_run:
            print("\n== Step 5b: Merge Friction ==")
            merge_args = ["--dir"] + all_run_dirs
            if len(all_run_dirs) > 1:
                merge_args += ["--out", os.path.join(primary_run_dir, "friction-summary.json")]
            _py_run(
                os.path.join(friction_scripts, "merge_friction.py"),
                *merge_args,
            )

    if run_friction:
        # Run both pipelines sequentially to keep true max concurrency at
        # --max-parallel (default 3). Running them in parallel would double the
        # concurrent claude processes beyond what we report to the user.
        print("\n== Step 4: Dispatch Verifier Judges + Friction Reviewers ==")

        errors = []
        for pipeline, name in [
            (run_verifier_pipeline, "verifier"),
            (run_friction_pipeline, "friction"),
        ]:
            try:
                pipeline()
            except Exception as e:
                errors.append(f"{name}: {e}")
                print(f"\n  ERROR in {name} pipeline: {e}")

        if errors:
            print(f"\n  Pipeline errors: {'; '.join(errors)}")
            print("  Skipping synthesis — cannot synthesize from incomplete results.")
            sys.exit(1)

        # ── Step 6: Synthesize Findings ───────────────────────────────────
        print("\n== Step 6: Synthesize Findings ==")

        synthesize_script = os.path.join(tile_scripts, "synthesize_findings.py")
        if os.path.exists(synthesize_script):
            _py_run(
                synthesize_script,
                "--run-dir", *all_run_dirs,
                "--analysis-dir", *all_analysis_dirs,
                check=False,
            )
        else:
            print(f"  Warning: synthesize_findings.py not found at {synthesize_script}")
    else:
        # Verifier pipeline only (original behavior)
        run_verifier_pipeline()

    # ── Step 7: Analyze Trends ────────────────────────────────────────────
    print("\n== Step 7: Analyze Trends ==")

    analysis_path = os.path.join(primary_run_dir, "analysis.json")
    analyze_script = os.path.join(tile_scripts, "analyze_trends.py")

    if os.path.exists(analyze_script):
        result = _py_run(
            analyze_script,
            "--analysis-dir", primary_analysis_dir,
            "--recent-days", str(args.recent_days),
            capture=True,
        )
        if result.stdout:
            Path(analysis_path).write_text(result.stdout)
            print(f"  Analysis → {analysis_path}")
    else:
        print(f"  Warning: analyze_trends.py not found at {analyze_script}")
        print("  Skipping trend analysis.")

    # ── Done ─────────────────────────────────────────────────────────────
    print(f"\n{'=' * 60}")
    print("  Pipeline complete!")
    if len(all_run_dirs) == 1:
        print(f"  Run dir:  {primary_run_dir}")
    else:
        print(f"  Run dirs ({len(all_run_dirs)}):")
        for rd in all_run_dirs:
            print(f"    {rd}")
        print(f"  Aggregated results: {primary_run_dir}")
    if run_friction:
        print("  Friction: enabled (see friction-summary.json and synthesis.json)")
    print(f"{'=' * 60}")


if __name__ == "__main__":
    main()

README.md

tile.json