Analyze agent sessions against verifier checklists, detect friction points, and create structured verifiers from skills and docs. Produces per-session verdicts and aggregated quality reports.
88
86%
Does it follow best practices?
Impact
97%
2.93xAverage score across 3 eval scenarios
Passed
No known issues
#!/usr/bin/env python3
"""
Analyze verdict trends from the verdict cache.
Reads all cached verdicts across tiles, extracts session timestamps,
and produces three analysis views:
1. Latest session — how did the most recent session score vs baseline?
2. Recent vs prior — are things getting better? (configurable window)
3. Timeseries — per-check pass rates over time (for charting)
Usage:
python3 analyze_trends.py --analysis-dir ~/.tessl/session-analyses/<slug> [--recent-days 7]
No external dependencies.
"""
from __future__ import annotations
import argparse
import json
import sys
from datetime import datetime, timezone, timedelta
from pathlib import Path
# ─── Loading verdicts from cache ──────────────────────────────────────────
def _resolve_session_timestamp(data: dict, audit_dir: Path | None = None) -> str:
"""Get the actual session timestamp, avoiding judge-dispatch time.
Priority: _session_timestamp > normalized log first event > _meta.started_at
"""
ts = data.get("_session_timestamp")
if ts:
return ts
# Try to read from normalized log
if audit_dir:
agent = data.get("_cache_agent", "")
session_id = data.get("_cache_session_id", "")
if agent and session_id:
for project_dir in audit_dir.iterdir():
if not project_dir.is_dir() or project_dir.name.startswith((".", "_")):
continue
candidate = project_dir / "normalized" / agent / f"{session_id}.jsonl"
if candidate.exists():
try:
with open(candidate, encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
event = json.loads(line)
if event.get("timestamp"):
return event["timestamp"]
except (json.JSONDecodeError, OSError):
pass
break
# Last resort: judge dispatch time (less accurate for trending)
meta = data.get("_meta", {})
return meta.get("started_at", "")
def load_all_cached_verdicts(cache_dir: Path, audit_dir: Path | None = None) -> list[dict]:
"""Load all verdicts from verdict-cache across all tiles."""
verdicts = []
if not cache_dir.exists():
return verdicts
for tile_dir in sorted(cache_dir.iterdir()):
if not tile_dir.is_dir() or tile_dir.name.startswith("."):
continue
tile_name = tile_dir.name.replace("--", "/")
for agent_dir in sorted(tile_dir.iterdir()):
if not agent_dir.is_dir() or agent_dir.name.startswith((".", "_")):
continue
for vf in sorted(agent_dir.glob("*.verdict.json")):
try:
data = json.loads(vf.read_text(encoding="utf-8"))
data["_cache_tile"] = tile_name
data["_cache_agent"] = agent_dir.name
data["_cache_session_id"] = vf.stem.replace(".verdict", "")
data["_sort_ts"] = _resolve_session_timestamp(data, audit_dir)
verdicts.append(data)
except (json.JSONDecodeError, OSError):
continue
return verdicts
def load_run_verdicts(run_dir: Path) -> list[dict]:
"""Load verdicts from a run directory (fallback when no cache)."""
verdicts = []
verdicts_dir = run_dir / "verdicts"
if not verdicts_dir.exists():
return verdicts
for agent_dir in sorted(verdicts_dir.iterdir()):
if not agent_dir.is_dir() or agent_dir.name.startswith((".", "_")):
continue
for vf in sorted(agent_dir.glob("*.verdict.json")):
try:
data = json.loads(vf.read_text(encoding="utf-8"))
data["_cache_agent"] = agent_dir.name
data["_cache_session_id"] = vf.stem.replace(".verdict", "")
session_ts = data.get("_session_timestamp")
if not session_ts:
meta = data.get("_meta", {})
session_ts = meta.get("started_at")
data["_sort_ts"] = session_ts or ""
verdicts.append(data)
except (json.JSONDecodeError, OSError):
continue
return verdicts
# ─── Scoring helpers ──────────────────────────────────────────────────────
def score_session(verdict: dict) -> dict:
"""Score a single session verdict into pass/fail/na counts."""
total_applicable = 0
total_passed = 0
total_failed = 0
failures = []
for inst in verdict.get("instructions", []):
if not inst.get("relevant", True):
continue
for check in inst.get("checks", []):
if not check.get("applicable", False):
continue
total_applicable += 1
if check.get("passed") is True:
total_passed += 1
elif check.get("passed") is False:
total_failed += 1
failures.append({
"check": check.get("name", "?"),
"instruction": inst.get("instruction", ""),
"tile": inst.get("tile", verdict.get("_cache_tile", "?")),
"evidence": check.get("evidence", ""),
"confidence": check.get("confidence", ""),
})
rate = round(total_passed / total_applicable, 2) if total_applicable > 0 else None
return {
"applicable": total_applicable,
"passed": total_passed,
"failed": total_failed,
"pass_rate": rate,
"failures": failures,
}
def aggregate_checks(verdicts: list[dict]) -> dict[str, dict]:
"""Aggregate per-check stats across a list of verdicts."""
checks: dict[str, dict] = {}
for verdict in verdicts:
for inst in verdict.get("instructions", []):
if not inst.get("relevant", True):
continue
tile = inst.get("tile", verdict.get("_cache_tile", "unknown"))
for check in inst.get("checks", []):
name = check.get("name", "unknown")
if name not in checks:
checks[name] = {
"instruction": inst.get("instruction", ""),
"tile": tile,
"applicable": 0,
"passed": 0,
"failed": 0,
}
if not check.get("applicable", False):
continue
checks[name]["applicable"] += 1
if check.get("passed") is True:
checks[name]["passed"] += 1
elif check.get("passed") is False:
checks[name]["failed"] += 1
for stats in checks.values():
if stats["applicable"] > 0:
stats["pass_rate"] = round(stats["passed"] / stats["applicable"], 2)
else:
stats["pass_rate"] = None
return checks
# ─── Analysis views ───────────────────────────────────────────────────────
def view_latest_session(verdicts: list[dict]) -> dict:
"""View 1: How did the most recent session score vs the baseline?"""
if not verdicts:
return {"has_data": False}
sorted_v = sorted(verdicts, key=lambda v: v["_sort_ts"], reverse=True)
# Find the most recent session (by session_id — may have multiple tile verdicts)
latest_session_id = sorted_v[0].get("_cache_session_id", "")
latest_agent = sorted_v[0].get("_cache_agent", "")
# Gather all verdicts for this session (across tiles)
latest_verdicts = [
v for v in verdicts
if v.get("_cache_session_id") == latest_session_id
and v.get("_cache_agent") == latest_agent
]
# Merge into one combined verdict for scoring
combined = {"instructions": []}
for v in latest_verdicts:
combined["instructions"].extend(v.get("instructions", []))
latest_score = score_session(combined)
# Baseline: all other sessions
other_verdicts = [
v for v in verdicts
if not (v.get("_cache_session_id") == latest_session_id
and v.get("_cache_agent") == latest_agent)
]
baseline = aggregate_checks(other_verdicts)
# Compare latest checks against baseline
latest_checks = aggregate_checks(latest_verdicts)
comparisons = []
for name, stats in latest_checks.items():
base = baseline.get(name)
comparisons.append({
"check": name,
"tile": stats.get("tile", "?"),
"passed": stats["passed"] > 0 if stats["applicable"] > 0 else None,
"baseline_rate": base["pass_rate"] if base else None,
"note": (
"above baseline" if base and stats.get("pass_rate") is not None
and base["pass_rate"] is not None
and (stats["pass_rate"] or 0) > (base["pass_rate"] or 0)
else "below baseline" if base and stats.get("pass_rate") is not None
and base["pass_rate"] is not None
and (stats["pass_rate"] or 0) < (base["pass_rate"] or 0)
else "matches baseline"
),
})
return {
"has_data": True,
"session": f"{latest_agent}/{latest_session_id}",
"session_timestamp": sorted_v[0].get("_sort_ts"),
"score": latest_score,
"check_comparisons": comparisons,
}
def view_recent_vs_prior(verdicts: list[dict], recent_days: int = 7) -> dict:
"""View 2: Recent window vs everything before it."""
if not verdicts:
return {"has_data": False}
cutoff = (datetime.now(timezone.utc) - timedelta(days=recent_days)).isoformat()
recent = [v for v in verdicts if v["_sort_ts"] >= cutoff]
prior = [v for v in verdicts if v["_sort_ts"] < cutoff]
if not recent:
return {"has_data": False, "reason": "no sessions in recent window"}
recent_checks = aggregate_checks(recent)
prior_checks = aggregate_checks(prior)
comparisons = {}
all_names = set(list(recent_checks.keys()) + list(prior_checks.keys()))
for name in sorted(all_names):
r = recent_checks.get(name, {})
p = prior_checks.get(name, {})
r_rate = r.get("pass_rate")
p_rate = p.get("pass_rate")
if r_rate is not None and p_rate is not None:
delta = round(r_rate - p_rate, 2)
trend = "improved" if delta > 0.05 else "degraded" if delta < -0.05 else "stable"
elif p_rate is None:
delta = None
trend = "new"
else:
delta = None
trend = "no_recent_data"
comparisons[name] = {
"tile": r.get("tile", p.get("tile", "?")),
"instruction": r.get("instruction", p.get("instruction", "")),
"recent_rate": r_rate,
"recent_applicable": r.get("applicable", 0),
"prior_rate": p_rate,
"prior_applicable": p.get("applicable", 0),
"delta": delta,
"trend": trend,
}
return {
"has_data": True,
"recent_days": recent_days,
"recent_sessions": len(set(
f"{v.get('_cache_agent')}/{v.get('_cache_session_id')}" for v in recent
)),
"prior_sessions": len(set(
f"{v.get('_cache_agent')}/{v.get('_cache_session_id')}" for v in prior
)),
"checks": comparisons,
}
def _auto_granularity(timestamps: list[str]) -> str:
"""Pick bucket granularity based on the time span of the data.
Returns one of: "hourly", "daily", "weekly", "monthly".
"""
if not timestamps:
return "daily"
dates = sorted(timestamps)
try:
first = datetime.fromisoformat(dates[0].replace("Z", "+00:00"))
last = datetime.fromisoformat(dates[-1].replace("Z", "+00:00"))
except (ValueError, TypeError):
return "daily"
span_days = (last - first).total_seconds() / 86400
if span_days < 2:
return "hourly"
if span_days <= 30:
return "daily"
if span_days <= 180:
return "weekly"
return "monthly"
def _bucket_key(ts: str, granularity: str) -> str:
"""Convert an ISO timestamp to a bucket key at the given granularity."""
if granularity == "hourly":
return ts[:13] # YYYY-MM-DDTHH
if granularity == "weekly":
try:
dt = datetime.fromisoformat(ts.replace("Z", "+00:00"))
# ISO week: Monday-based, format as YYYY-Www
iso_year, iso_week, _ = dt.isocalendar()
return f"{iso_year}-W{iso_week:02d}"
except (ValueError, TypeError):
return ts[:10]
if granularity == "monthly":
return ts[:7] # YYYY-MM
return ts[:10] # daily: YYYY-MM-DD
def _bucket_label(key: str, granularity: str) -> str:
"""Human-readable label for a bucket key."""
if granularity == "hourly":
# YYYY-MM-DDTHH -> "Mar 11 14:00"
try:
dt = datetime.fromisoformat(key + ":00:00+00:00")
return dt.strftime("%b %d %H:%M")
except (ValueError, TypeError):
return key
if granularity == "weekly":
# YYYY-Www -> "Week of Mar 3"
try:
year, week = key.split("-W")
dt = datetime.strptime(f"{year} {week} 1", "%G %V %u")
return f"Week of {dt.strftime('%b %d')}"
except (ValueError, TypeError):
return key
if granularity == "monthly":
# YYYY-MM -> "Mar 2026"
try:
dt = datetime.strptime(key, "%Y-%m")
return dt.strftime("%b %Y")
except (ValueError, TypeError):
return key
# daily: YYYY-MM-DD -> "Mar 11"
try:
dt = datetime.strptime(key, "%Y-%m-%d")
return dt.strftime("%b %d")
except (ValueError, TypeError):
return key
def _bucket_timeseries(verdicts: list[dict], granularity: str) -> dict:
"""Bucket verdicts and compute per-check pass rates at given granularity."""
buckets: dict[str, list[dict]] = {}
for v in verdicts:
ts = v.get("_sort_ts", "")
if not ts:
continue
key = _bucket_key(ts, granularity)
if key not in buckets:
buckets[key] = []
buckets[key].append(v)
if not buckets:
return {"has_data": False, "reason": "no timestamped sessions"}
series: dict[str, list[dict]] = {}
sorted_keys = sorted(buckets.keys())
for key in sorted_keys:
bucket_verdicts = buckets[key]
label = _bucket_label(key, granularity)
checks = aggregate_checks(bucket_verdicts)
for name, stats in checks.items():
if name not in series:
series[name] = []
series[name].append({
"date": key,
"label": label,
"pass_rate": stats["pass_rate"],
"applicable": stats["applicable"],
"passed": stats["passed"],
})
return {
"has_data": True,
"granularity": granularity,
"bucket_count": len(sorted_keys),
"date_range": {
"start": sorted_keys[0],
"end": sorted_keys[-1],
"total_buckets": len(sorted_keys),
},
"labels": {k: _bucket_label(k, granularity) for k in sorted_keys},
"series": series,
}
def view_timeseries(verdicts: list[dict], granularity: str | None = None) -> dict:
"""View 3: Per-check pass rates bucketed by time period.
If granularity is None, auto-selects based on data span.
Always returns all granularities so the report can switch client-side.
"""
if not verdicts:
return {"has_data": False}
timestamps = [v.get("_sort_ts", "") for v in verdicts if v.get("_sort_ts")]
if not timestamps:
return {"has_data": False, "reason": "no timestamped sessions"}
auto = _auto_granularity(timestamps)
selected = granularity or auto
# Compute all granularities for client-side switching
all_granularities = {}
for g in ("hourly", "daily", "weekly", "monthly"):
result = _bucket_timeseries(verdicts, g)
if result.get("has_data"):
all_granularities[g] = result
# Build top-level result — selected granularity's fields + metadata
selected_data = all_granularities.get(selected, {})
return {
"has_data": True,
"auto_granularity": auto,
"selected_granularity": selected,
"granularity": selected_data.get("granularity", selected),
"bucket_count": selected_data.get("bucket_count", 0),
"date_range": selected_data.get("date_range", {}),
"labels": selected_data.get("labels", {}),
"series": selected_data.get("series", {}),
"all_granularities": all_granularities,
}
# ─── Top-level recent sessions view ──────────────────────────────────────
def view_recent_sessions(verdicts: list[dict], n: int = 5) -> list[dict]:
"""Get the N most recent individual sessions with scores."""
# Deduplicate: group by session_id + agent, merge instructions across tiles
sessions: dict[str, dict] = {}
for v in verdicts:
key = f"{v.get('_cache_agent', '')}/{v.get('_cache_session_id', '')}"
if key not in sessions:
sessions[key] = {
"session": key,
"timestamp": v.get("_sort_ts", ""),
"instructions": [],
}
sessions[key]["instructions"].extend(v.get("instructions", []))
# Use latest timestamp
ts = v.get("_sort_ts", "")
if ts > sessions[key]["timestamp"]:
sessions[key]["timestamp"] = ts
# Sort by timestamp, take top N
sorted_sessions = sorted(sessions.values(), key=lambda s: s["timestamp"], reverse=True)[:n]
results = []
for sess in sorted_sessions:
scores = score_session(sess)
results.append({
"session": sess["session"],
"timestamp": sess["timestamp"],
**scores,
})
return results
# ─── CLI ──────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(
description="Analyze verdict trends from cache"
)
parser.add_argument(
"--analysis-dir",
required=True,
help="Analysis directory (e.g. ~/.tessl/session-analyses/<slug>)",
)
parser.add_argument(
"--recent-days",
type=int,
default=7,
help="Window for 'recent' in recent-vs-prior view (default: 7)",
)
parser.add_argument(
"--recent-sessions",
type=int,
default=5,
help="Number of recent sessions to show (default: 5)",
)
parser.add_argument(
"--view",
choices=["all", "latest", "recent-vs-prior", "timeseries", "sessions"],
default="all",
help="Which analysis view to produce (default: all)",
)
parser.add_argument(
"--bucket",
choices=["auto", "hourly", "daily", "weekly", "monthly"],
default="auto",
help="Timeseries bucket granularity (default: auto based on data span)",
)
parser.add_argument(
"--out",
default=None,
help="Output path (default: stdout)",
)
args = parser.parse_args()
analysis_dir = Path(args.analysis_dir)
cache_dir = analysis_dir / "verdict-cache"
# Load verdicts from cache, fall back to latest run
verdicts = load_all_cached_verdicts(cache_dir, analysis_dir)
if not verdicts:
# Fall back to latest run
runs_dir = analysis_dir / "runs"
if runs_dir.exists():
run_dirs = sorted(
[d for d in runs_dir.iterdir() if d.is_dir() and not d.is_symlink()],
reverse=True,
)
if run_dirs:
verdicts = load_run_verdicts(run_dirs[0])
print(f"No verdict cache found, loaded {len(verdicts)} verdicts from latest run",
file=sys.stderr)
if not verdicts:
print("No verdicts found", file=sys.stderr)
sys.exit(1)
print(f"Loaded {len(verdicts)} verdicts", file=sys.stderr)
result = {}
view = args.view
if view in ("all", "sessions"):
result["recent_sessions"] = view_recent_sessions(verdicts, args.recent_sessions)
if view in ("all", "latest"):
result["latest_session"] = view_latest_session(verdicts)
if view in ("all", "recent-vs-prior"):
result["recent_vs_prior"] = view_recent_vs_prior(verdicts, args.recent_days)
if view in ("all", "timeseries"):
bucket = None if args.bucket == "auto" else args.bucket
result["timeseries"] = view_timeseries(verdicts, granularity=bucket)
output = json.dumps(result, indent=2)
if args.out:
out_path = Path(args.out)
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_text(output, encoding="utf-8")
print(f"Analysis written to {out_path}", file=sys.stderr)
else:
print(output)
if __name__ == "__main__":
main()