CtrlK
BlogDocsLog inGet started
Tessl Logo

try-tessl/agent-quality

Analyze agent sessions against verifier checklists, detect friction points, and create structured verifiers from skills and docs. Produces per-session verdicts and aggregated quality reports.

88

2.93x
Quality

86%

Does it follow best practices?

Impact

97%

2.93x

Average score across 3 eval scenarios

SecuritybySnyk

Passed

No known issues

Overview
Quality
Evals
Security
Files

compare_runs.pyskills/analyze-sessions/scripts/

#!/usr/bin/env python3
"""
Compare the current analysis run against the previous run to show trends.

Loads verdicts-aggregate.json from the current and previous runs,
computes per-check deltas, and classifies changes as improved, degraded,
new, or stable.

Also identifies the most recent sessions in the current run (by timestamp)
and reports their individual results so the user can see how their latest
work scored.

Usage:
    python3 compare_runs.py --analysis-dir ~/.tessl/session-analyses/<slug> [--run <timestamp>]

No external dependencies.
"""

from __future__ import annotations

import argparse
import json
import os
import sys
from datetime import datetime, timezone
from pathlib import Path


def find_runs(audit_dir: Path) -> list[Path]:
    """Find all run directories sorted by timestamp (newest first)."""
    runs_dir = audit_dir / "runs"
    if not runs_dir.exists():
        return []
    runs = []
    for d in sorted(runs_dir.iterdir(), reverse=True):
        if d.is_dir() and not d.is_symlink() and (d / "verdicts-aggregate.json").exists():
            runs.append(d)
    return runs


def load_aggregate(run_dir: Path) -> dict | None:
    """Load verdicts-aggregate.json from a run directory."""
    path = run_dir / "verdicts-aggregate.json"
    if not path.exists():
        return None
    try:
        return json.loads(path.read_text(encoding="utf-8"))
    except (json.JSONDecodeError, OSError):
        return None


def extract_check_rates(agg: dict) -> dict[str, dict]:
    """Extract flat check_name -> {pass_rate, applicable, passed, failed, instruction} map."""
    checks = {}
    for tile_name, tile_data in agg.get("tiles", {}).items():
        for inst_file, inst_data in tile_data.get("instructions", {}).items():
            instruction = inst_data.get("instruction", "")
            for check_name, stats in inst_data.get("checks", {}).items():
                checks[check_name] = {
                    "instruction_file": inst_file,
                    "instruction": instruction,
                    "pass_rate": stats.get("pass_rate"),
                    "applicable_count": stats.get("applicable_count", 0),
                    "passed_count": stats.get("passed_count", 0),
                    "failed_count": stats.get("failed_count", 0),
                    "tile": tile_name,
                }
    return checks


def find_recent_sessions(run_dir: Path, n: int = 5) -> list[dict]:
    """Find the N most recent session verdicts by file mtime."""
    verdicts_dir = run_dir / "verdicts"
    if not verdicts_dir.exists():
        return []

    all_verdicts = []
    for agent_dir in verdicts_dir.iterdir():
        if not agent_dir.is_dir() or agent_dir.name.startswith((".", "_")):
            continue
        for vf in agent_dir.glob("*.verdict.json"):
            try:
                data = json.loads(vf.read_text(encoding="utf-8"))
                # Use the _meta timestamp if available, else file mtime
                ts = None
                meta = data.get("_meta", {})
                if meta.get("completed_at"):
                    ts = meta["completed_at"]
                all_verdicts.append({
                    "file": vf.name,
                    "agent": agent_dir.name,
                    "timestamp": ts,
                    "mtime": vf.stat().st_mtime,
                    "data": data,
                })
            except (json.JSONDecodeError, OSError):
                continue

    # Sort by mtime descending, take top N
    all_verdicts.sort(key=lambda v: v["mtime"], reverse=True)
    return all_verdicts[:n]


def score_session(verdict: dict) -> dict:
    """Score a single session verdict into pass/fail/na counts."""
    total_applicable = 0
    total_passed = 0
    total_failed = 0
    failures = []

    for inst in verdict.get("instructions", []):
        if not inst.get("relevant", True):
            continue
        for check in inst.get("checks", []):
            if not check.get("applicable", False):
                continue
            total_applicable += 1
            if check.get("passed") is True:
                total_passed += 1
            elif check.get("passed") is False:
                total_failed += 1
                failures.append({
                    "check": check.get("name", "?"),
                    "instruction": inst.get("instruction", ""),
                    "evidence": check.get("evidence", ""),
                    "confidence": check.get("confidence", ""),
                })

    rate = round(total_passed / total_applicable, 2) if total_applicable > 0 else None
    return {
        "applicable": total_applicable,
        "passed": total_passed,
        "failed": total_failed,
        "pass_rate": rate,
        "failures": failures,
    }


def compare(current_agg: dict, previous_agg: dict | None) -> dict:
    """Compare current run against previous, produce a trend report."""
    current_checks = extract_check_rates(current_agg)

    if previous_agg is None:
        # No previous run — everything is new
        return {
            "has_previous": False,
            "current_sessions": current_agg.get("sessions_count", 0),
            "checks": {
                name: {**info, "delta": None, "trend": "new"}
                for name, info in current_checks.items()
            },
        }

    previous_checks = extract_check_rates(previous_agg)

    checks = {}
    for name, curr in current_checks.items():
        prev = previous_checks.get(name)
        if prev is None or prev["pass_rate"] is None:
            checks[name] = {**curr, "delta": None, "previous_rate": None, "trend": "new"}
        elif curr["pass_rate"] is None:
            checks[name] = {**curr, "delta": None, "previous_rate": prev["pass_rate"], "trend": "no_data"}
        else:
            delta = round(curr["pass_rate"] - prev["pass_rate"], 2)
            if delta > 0.05:
                trend = "improved"
            elif delta < -0.05:
                trend = "degraded"
            else:
                trend = "stable"
            checks[name] = {
                **curr,
                "delta": delta,
                "previous_rate": prev["pass_rate"],
                "trend": trend,
            }

    return {
        "has_previous": True,
        "current_sessions": current_agg.get("sessions_count", 0),
        "previous_sessions": previous_agg.get("sessions_count", 0),
        "checks": checks,
    }


def main():
    parser = argparse.ArgumentParser(
        description="Compare current analysis run against previous"
    )
    parser.add_argument(
        "--analysis-dir",
        required=True,
        help="Analysis directory (e.g. ~/.tessl/session-analyses/<slug>)",
    )
    parser.add_argument(
        "--run",
        default=None,
        help="Specific run timestamp (default: latest)",
    )
    parser.add_argument(
        "--recent",
        type=int,
        default=5,
        help="Number of recent sessions to highlight (default: 5)",
    )
    parser.add_argument(
        "--out",
        default=None,
        help="Output path (default: stdout)",
    )
    args = parser.parse_args()

    analysis_dir = Path(args.analysis_dir)
    runs = find_runs(analysis_dir)

    if not runs:
        print("No completed runs found", file=sys.stderr)
        sys.exit(1)

    # Find current and previous
    if args.run:
        current_dir = analysis_dir / "runs" / args.run
        if not current_dir.exists():
            print(f"Run not found: {current_dir}", file=sys.stderr)
            sys.exit(1)
    else:
        current_dir = runs[0]

    current_agg = load_aggregate(current_dir)
    if not current_agg:
        print(f"No verdicts-aggregate.json in {current_dir}", file=sys.stderr)
        sys.exit(1)

    # Find previous (first run that isn't the current one)
    previous_agg = None
    previous_dir = None
    for r in runs:
        if r != current_dir:
            previous_agg = load_aggregate(r)
            previous_dir = r
            break

    # Compare
    trend = compare(current_agg, previous_agg)
    trend["current_run"] = current_dir.name
    trend["previous_run"] = previous_dir.name if previous_dir else None

    # Recent sessions
    recent = find_recent_sessions(current_dir, args.recent)
    trend["recent_sessions"] = []
    for sess in recent:
        scores = score_session(sess["data"])
        trend["recent_sessions"].append({
            "session": f"{sess['agent']}/{sess['file']}",
            **scores,
        })

    output = json.dumps(trend, indent=2)
    if args.out:
        out_path = Path(args.out)
        out_path.parent.mkdir(parents=True, exist_ok=True)
        out_path.write_text(output, encoding="utf-8")
        print(f"Comparison written to {out_path}", file=sys.stderr)
    else:
        print(output)


if __name__ == "__main__":
    main()

README.md

tile.json