Analyze agent sessions against verifier checklists, detect friction points, and create structured verifiers from skills and docs. Produces per-session verdicts and aggregated quality reports.
88
86%
Does it follow best practices?
Impact
97%
2.93xAverage score across 3 eval scenarios
Passed
No known issues
#!/usr/bin/env python3
"""
Compare the current analysis run against the previous run to show trends.
Loads verdicts-aggregate.json from the current and previous runs,
computes per-check deltas, and classifies changes as improved, degraded,
new, or stable.
Also identifies the most recent sessions in the current run (by timestamp)
and reports their individual results so the user can see how their latest
work scored.
Usage:
python3 compare_runs.py --analysis-dir ~/.tessl/session-analyses/<slug> [--run <timestamp>]
No external dependencies.
"""
from __future__ import annotations
import argparse
import json
import os
import sys
from datetime import datetime, timezone
from pathlib import Path
def find_runs(audit_dir: Path) -> list[Path]:
"""Find all run directories sorted by timestamp (newest first)."""
runs_dir = audit_dir / "runs"
if not runs_dir.exists():
return []
runs = []
for d in sorted(runs_dir.iterdir(), reverse=True):
if d.is_dir() and not d.is_symlink() and (d / "verdicts-aggregate.json").exists():
runs.append(d)
return runs
def load_aggregate(run_dir: Path) -> dict | None:
"""Load verdicts-aggregate.json from a run directory."""
path = run_dir / "verdicts-aggregate.json"
if not path.exists():
return None
try:
return json.loads(path.read_text(encoding="utf-8"))
except (json.JSONDecodeError, OSError):
return None
def extract_check_rates(agg: dict) -> dict[str, dict]:
"""Extract flat check_name -> {pass_rate, applicable, passed, failed, instruction} map."""
checks = {}
for tile_name, tile_data in agg.get("tiles", {}).items():
for inst_file, inst_data in tile_data.get("instructions", {}).items():
instruction = inst_data.get("instruction", "")
for check_name, stats in inst_data.get("checks", {}).items():
checks[check_name] = {
"instruction_file": inst_file,
"instruction": instruction,
"pass_rate": stats.get("pass_rate"),
"applicable_count": stats.get("applicable_count", 0),
"passed_count": stats.get("passed_count", 0),
"failed_count": stats.get("failed_count", 0),
"tile": tile_name,
}
return checks
def find_recent_sessions(run_dir: Path, n: int = 5) -> list[dict]:
"""Find the N most recent session verdicts by file mtime."""
verdicts_dir = run_dir / "verdicts"
if not verdicts_dir.exists():
return []
all_verdicts = []
for agent_dir in verdicts_dir.iterdir():
if not agent_dir.is_dir() or agent_dir.name.startswith((".", "_")):
continue
for vf in agent_dir.glob("*.verdict.json"):
try:
data = json.loads(vf.read_text(encoding="utf-8"))
# Use the _meta timestamp if available, else file mtime
ts = None
meta = data.get("_meta", {})
if meta.get("completed_at"):
ts = meta["completed_at"]
all_verdicts.append({
"file": vf.name,
"agent": agent_dir.name,
"timestamp": ts,
"mtime": vf.stat().st_mtime,
"data": data,
})
except (json.JSONDecodeError, OSError):
continue
# Sort by mtime descending, take top N
all_verdicts.sort(key=lambda v: v["mtime"], reverse=True)
return all_verdicts[:n]
def score_session(verdict: dict) -> dict:
"""Score a single session verdict into pass/fail/na counts."""
total_applicable = 0
total_passed = 0
total_failed = 0
failures = []
for inst in verdict.get("instructions", []):
if not inst.get("relevant", True):
continue
for check in inst.get("checks", []):
if not check.get("applicable", False):
continue
total_applicable += 1
if check.get("passed") is True:
total_passed += 1
elif check.get("passed") is False:
total_failed += 1
failures.append({
"check": check.get("name", "?"),
"instruction": inst.get("instruction", ""),
"evidence": check.get("evidence", ""),
"confidence": check.get("confidence", ""),
})
rate = round(total_passed / total_applicable, 2) if total_applicable > 0 else None
return {
"applicable": total_applicable,
"passed": total_passed,
"failed": total_failed,
"pass_rate": rate,
"failures": failures,
}
def compare(current_agg: dict, previous_agg: dict | None) -> dict:
"""Compare current run against previous, produce a trend report."""
current_checks = extract_check_rates(current_agg)
if previous_agg is None:
# No previous run — everything is new
return {
"has_previous": False,
"current_sessions": current_agg.get("sessions_count", 0),
"checks": {
name: {**info, "delta": None, "trend": "new"}
for name, info in current_checks.items()
},
}
previous_checks = extract_check_rates(previous_agg)
checks = {}
for name, curr in current_checks.items():
prev = previous_checks.get(name)
if prev is None or prev["pass_rate"] is None:
checks[name] = {**curr, "delta": None, "previous_rate": None, "trend": "new"}
elif curr["pass_rate"] is None:
checks[name] = {**curr, "delta": None, "previous_rate": prev["pass_rate"], "trend": "no_data"}
else:
delta = round(curr["pass_rate"] - prev["pass_rate"], 2)
if delta > 0.05:
trend = "improved"
elif delta < -0.05:
trend = "degraded"
else:
trend = "stable"
checks[name] = {
**curr,
"delta": delta,
"previous_rate": prev["pass_rate"],
"trend": trend,
}
return {
"has_previous": True,
"current_sessions": current_agg.get("sessions_count", 0),
"previous_sessions": previous_agg.get("sessions_count", 0),
"checks": checks,
}
def main():
parser = argparse.ArgumentParser(
description="Compare current analysis run against previous"
)
parser.add_argument(
"--analysis-dir",
required=True,
help="Analysis directory (e.g. ~/.tessl/session-analyses/<slug>)",
)
parser.add_argument(
"--run",
default=None,
help="Specific run timestamp (default: latest)",
)
parser.add_argument(
"--recent",
type=int,
default=5,
help="Number of recent sessions to highlight (default: 5)",
)
parser.add_argument(
"--out",
default=None,
help="Output path (default: stdout)",
)
args = parser.parse_args()
analysis_dir = Path(args.analysis_dir)
runs = find_runs(analysis_dir)
if not runs:
print("No completed runs found", file=sys.stderr)
sys.exit(1)
# Find current and previous
if args.run:
current_dir = analysis_dir / "runs" / args.run
if not current_dir.exists():
print(f"Run not found: {current_dir}", file=sys.stderr)
sys.exit(1)
else:
current_dir = runs[0]
current_agg = load_aggregate(current_dir)
if not current_agg:
print(f"No verdicts-aggregate.json in {current_dir}", file=sys.stderr)
sys.exit(1)
# Find previous (first run that isn't the current one)
previous_agg = None
previous_dir = None
for r in runs:
if r != current_dir:
previous_agg = load_aggregate(r)
previous_dir = r
break
# Compare
trend = compare(current_agg, previous_agg)
trend["current_run"] = current_dir.name
trend["previous_run"] = previous_dir.name if previous_dir else None
# Recent sessions
recent = find_recent_sessions(current_dir, args.recent)
trend["recent_sessions"] = []
for sess in recent:
scores = score_session(sess["data"])
trend["recent_sessions"].append({
"session": f"{sess['agent']}/{sess['file']}",
**scores,
})
output = json.dumps(trend, indent=2)
if args.out:
out_path = Path(args.out)
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_text(output, encoding="utf-8")
print(f"Comparison written to {out_path}", file=sys.stderr)
else:
print(output)
if __name__ == "__main__":
main()