CtrlK
BlogDocsLog inGet started
Tessl Logo

try-tessl/agent-quality

Analyze agent sessions against verifier checklists, detect friction points, and create structured verifiers from skills and docs. Produces per-session verdicts and aggregated quality reports.

88

2.93x
Quality

86%

Does it follow best practices?

Impact

97%

2.93x

Average score across 3 eval scenarios

SecuritybySnyk

Passed

No known issues

Overview
Quality
Evals
Security
Files

analyze_trends.pyskills/analyze-sessions/scripts/

#!/usr/bin/env python3
"""
Analyze verdict trends from the verdict cache.

Reads all cached verdicts across tiles, extracts session timestamps,
and produces three analysis views:

  1. Latest session — how did the most recent session score vs baseline?
  2. Recent vs prior — are things getting better? (configurable window)
  3. Timeseries — per-check pass rates over time (for charting)

Usage:
    python3 analyze_trends.py --analysis-dir ~/.tessl/session-analyses/<slug> [--recent-days 7]

No external dependencies.
"""

from __future__ import annotations

import argparse
import json
import sys
from datetime import datetime, timezone, timedelta
from pathlib import Path


# ─── Loading verdicts from cache ──────────────────────────────────────────


def _resolve_session_timestamp(data: dict, audit_dir: Path | None = None) -> str:
    """Get the actual session timestamp, avoiding judge-dispatch time.

    Priority: _session_timestamp > normalized log first event > _meta.started_at
    """
    ts = data.get("_session_timestamp")
    if ts:
        return ts

    # Try to read from normalized log
    if audit_dir:
        agent = data.get("_cache_agent", "")
        session_id = data.get("_cache_session_id", "")
        if agent and session_id:
            for project_dir in audit_dir.iterdir():
                if not project_dir.is_dir() or project_dir.name.startswith((".", "_")):
                    continue
                candidate = project_dir / "normalized" / agent / f"{session_id}.jsonl"
                if candidate.exists():
                    try:
                        with open(candidate, encoding="utf-8") as f:
                            for line in f:
                                line = line.strip()
                                if not line:
                                    continue
                                event = json.loads(line)
                                if event.get("timestamp"):
                                    return event["timestamp"]
                    except (json.JSONDecodeError, OSError):
                        pass
                    break

    # Last resort: judge dispatch time (less accurate for trending)
    meta = data.get("_meta", {})
    return meta.get("started_at", "")


def load_all_cached_verdicts(cache_dir: Path, audit_dir: Path | None = None) -> list[dict]:
    """Load all verdicts from verdict-cache across all tiles."""
    verdicts = []
    if not cache_dir.exists():
        return verdicts

    for tile_dir in sorted(cache_dir.iterdir()):
        if not tile_dir.is_dir() or tile_dir.name.startswith("."):
            continue
        tile_name = tile_dir.name.replace("--", "/")

        for agent_dir in sorted(tile_dir.iterdir()):
            if not agent_dir.is_dir() or agent_dir.name.startswith((".", "_")):
                continue
            for vf in sorted(agent_dir.glob("*.verdict.json")):
                try:
                    data = json.loads(vf.read_text(encoding="utf-8"))
                    data["_cache_tile"] = tile_name
                    data["_cache_agent"] = agent_dir.name
                    data["_cache_session_id"] = vf.stem.replace(".verdict", "")
                    data["_sort_ts"] = _resolve_session_timestamp(data, audit_dir)

                    verdicts.append(data)
                except (json.JSONDecodeError, OSError):
                    continue

    return verdicts


def load_run_verdicts(run_dir: Path) -> list[dict]:
    """Load verdicts from a run directory (fallback when no cache)."""
    verdicts = []
    verdicts_dir = run_dir / "verdicts"
    if not verdicts_dir.exists():
        return verdicts

    for agent_dir in sorted(verdicts_dir.iterdir()):
        if not agent_dir.is_dir() or agent_dir.name.startswith((".", "_")):
            continue
        for vf in sorted(agent_dir.glob("*.verdict.json")):
            try:
                data = json.loads(vf.read_text(encoding="utf-8"))
                data["_cache_agent"] = agent_dir.name
                data["_cache_session_id"] = vf.stem.replace(".verdict", "")
                session_ts = data.get("_session_timestamp")
                if not session_ts:
                    meta = data.get("_meta", {})
                    session_ts = meta.get("started_at")
                data["_sort_ts"] = session_ts or ""
                verdicts.append(data)
            except (json.JSONDecodeError, OSError):
                continue

    return verdicts


# ─── Scoring helpers ──────────────────────────────────────────────────────


def score_session(verdict: dict) -> dict:
    """Score a single session verdict into pass/fail/na counts."""
    total_applicable = 0
    total_passed = 0
    total_failed = 0
    failures = []

    for inst in verdict.get("instructions", []):
        if not inst.get("relevant", True):
            continue
        for check in inst.get("checks", []):
            if not check.get("applicable", False):
                continue
            total_applicable += 1
            if check.get("passed") is True:
                total_passed += 1
            elif check.get("passed") is False:
                total_failed += 1
                failures.append({
                    "check": check.get("name", "?"),
                    "instruction": inst.get("instruction", ""),
                    "tile": inst.get("tile", verdict.get("_cache_tile", "?")),
                    "evidence": check.get("evidence", ""),
                    "confidence": check.get("confidence", ""),
                })

    rate = round(total_passed / total_applicable, 2) if total_applicable > 0 else None
    return {
        "applicable": total_applicable,
        "passed": total_passed,
        "failed": total_failed,
        "pass_rate": rate,
        "failures": failures,
    }


def aggregate_checks(verdicts: list[dict]) -> dict[str, dict]:
    """Aggregate per-check stats across a list of verdicts."""
    checks: dict[str, dict] = {}
    for verdict in verdicts:
        for inst in verdict.get("instructions", []):
            if not inst.get("relevant", True):
                continue
            tile = inst.get("tile", verdict.get("_cache_tile", "unknown"))
            for check in inst.get("checks", []):
                name = check.get("name", "unknown")
                if name not in checks:
                    checks[name] = {
                        "instruction": inst.get("instruction", ""),
                        "tile": tile,
                        "applicable": 0,
                        "passed": 0,
                        "failed": 0,
                    }
                if not check.get("applicable", False):
                    continue
                checks[name]["applicable"] += 1
                if check.get("passed") is True:
                    checks[name]["passed"] += 1
                elif check.get("passed") is False:
                    checks[name]["failed"] += 1

    for stats in checks.values():
        if stats["applicable"] > 0:
            stats["pass_rate"] = round(stats["passed"] / stats["applicable"], 2)
        else:
            stats["pass_rate"] = None

    return checks


# ─── Analysis views ───────────────────────────────────────────────────────


def view_latest_session(verdicts: list[dict]) -> dict:
    """View 1: How did the most recent session score vs the baseline?"""
    if not verdicts:
        return {"has_data": False}

    sorted_v = sorted(verdicts, key=lambda v: v["_sort_ts"], reverse=True)

    # Find the most recent session (by session_id — may have multiple tile verdicts)
    latest_session_id = sorted_v[0].get("_cache_session_id", "")
    latest_agent = sorted_v[0].get("_cache_agent", "")

    # Gather all verdicts for this session (across tiles)
    latest_verdicts = [
        v for v in verdicts
        if v.get("_cache_session_id") == latest_session_id
        and v.get("_cache_agent") == latest_agent
    ]

    # Merge into one combined verdict for scoring
    combined = {"instructions": []}
    for v in latest_verdicts:
        combined["instructions"].extend(v.get("instructions", []))

    latest_score = score_session(combined)

    # Baseline: all other sessions
    other_verdicts = [
        v for v in verdicts
        if not (v.get("_cache_session_id") == latest_session_id
                and v.get("_cache_agent") == latest_agent)
    ]
    baseline = aggregate_checks(other_verdicts)

    # Compare latest checks against baseline
    latest_checks = aggregate_checks(latest_verdicts)
    comparisons = []
    for name, stats in latest_checks.items():
        base = baseline.get(name)
        comparisons.append({
            "check": name,
            "tile": stats.get("tile", "?"),
            "passed": stats["passed"] > 0 if stats["applicable"] > 0 else None,
            "baseline_rate": base["pass_rate"] if base else None,
            "note": (
                "above baseline" if base and stats.get("pass_rate") is not None
                and base["pass_rate"] is not None
                and (stats["pass_rate"] or 0) > (base["pass_rate"] or 0)
                else "below baseline" if base and stats.get("pass_rate") is not None
                and base["pass_rate"] is not None
                and (stats["pass_rate"] or 0) < (base["pass_rate"] or 0)
                else "matches baseline"
            ),
        })

    return {
        "has_data": True,
        "session": f"{latest_agent}/{latest_session_id}",
        "session_timestamp": sorted_v[0].get("_sort_ts"),
        "score": latest_score,
        "check_comparisons": comparisons,
    }


def view_recent_vs_prior(verdicts: list[dict], recent_days: int = 7) -> dict:
    """View 2: Recent window vs everything before it."""
    if not verdicts:
        return {"has_data": False}

    cutoff = (datetime.now(timezone.utc) - timedelta(days=recent_days)).isoformat()

    recent = [v for v in verdicts if v["_sort_ts"] >= cutoff]
    prior = [v for v in verdicts if v["_sort_ts"] < cutoff]

    if not recent:
        return {"has_data": False, "reason": "no sessions in recent window"}

    recent_checks = aggregate_checks(recent)
    prior_checks = aggregate_checks(prior)

    comparisons = {}
    all_names = set(list(recent_checks.keys()) + list(prior_checks.keys()))
    for name in sorted(all_names):
        r = recent_checks.get(name, {})
        p = prior_checks.get(name, {})
        r_rate = r.get("pass_rate")
        p_rate = p.get("pass_rate")

        if r_rate is not None and p_rate is not None:
            delta = round(r_rate - p_rate, 2)
            trend = "improved" if delta > 0.05 else "degraded" if delta < -0.05 else "stable"
        elif p_rate is None:
            delta = None
            trend = "new"
        else:
            delta = None
            trend = "no_recent_data"

        comparisons[name] = {
            "tile": r.get("tile", p.get("tile", "?")),
            "instruction": r.get("instruction", p.get("instruction", "")),
            "recent_rate": r_rate,
            "recent_applicable": r.get("applicable", 0),
            "prior_rate": p_rate,
            "prior_applicable": p.get("applicable", 0),
            "delta": delta,
            "trend": trend,
        }

    return {
        "has_data": True,
        "recent_days": recent_days,
        "recent_sessions": len(set(
            f"{v.get('_cache_agent')}/{v.get('_cache_session_id')}" for v in recent
        )),
        "prior_sessions": len(set(
            f"{v.get('_cache_agent')}/{v.get('_cache_session_id')}" for v in prior
        )),
        "checks": comparisons,
    }


def _auto_granularity(timestamps: list[str]) -> str:
    """Pick bucket granularity based on the time span of the data.

    Returns one of: "hourly", "daily", "weekly", "monthly".
    """
    if not timestamps:
        return "daily"
    dates = sorted(timestamps)
    try:
        first = datetime.fromisoformat(dates[0].replace("Z", "+00:00"))
        last = datetime.fromisoformat(dates[-1].replace("Z", "+00:00"))
    except (ValueError, TypeError):
        return "daily"
    span_days = (last - first).total_seconds() / 86400
    if span_days < 2:
        return "hourly"
    if span_days <= 30:
        return "daily"
    if span_days <= 180:
        return "weekly"
    return "monthly"


def _bucket_key(ts: str, granularity: str) -> str:
    """Convert an ISO timestamp to a bucket key at the given granularity."""
    if granularity == "hourly":
        return ts[:13]  # YYYY-MM-DDTHH
    if granularity == "weekly":
        try:
            dt = datetime.fromisoformat(ts.replace("Z", "+00:00"))
            # ISO week: Monday-based, format as YYYY-Www
            iso_year, iso_week, _ = dt.isocalendar()
            return f"{iso_year}-W{iso_week:02d}"
        except (ValueError, TypeError):
            return ts[:10]
    if granularity == "monthly":
        return ts[:7]  # YYYY-MM
    return ts[:10]  # daily: YYYY-MM-DD


def _bucket_label(key: str, granularity: str) -> str:
    """Human-readable label for a bucket key."""
    if granularity == "hourly":
        # YYYY-MM-DDTHH -> "Mar 11 14:00"
        try:
            dt = datetime.fromisoformat(key + ":00:00+00:00")
            return dt.strftime("%b %d %H:%M")
        except (ValueError, TypeError):
            return key
    if granularity == "weekly":
        # YYYY-Www -> "Week of Mar 3"
        try:
            year, week = key.split("-W")
            dt = datetime.strptime(f"{year} {week} 1", "%G %V %u")
            return f"Week of {dt.strftime('%b %d')}"
        except (ValueError, TypeError):
            return key
    if granularity == "monthly":
        # YYYY-MM -> "Mar 2026"
        try:
            dt = datetime.strptime(key, "%Y-%m")
            return dt.strftime("%b %Y")
        except (ValueError, TypeError):
            return key
    # daily: YYYY-MM-DD -> "Mar 11"
    try:
        dt = datetime.strptime(key, "%Y-%m-%d")
        return dt.strftime("%b %d")
    except (ValueError, TypeError):
        return key


def _bucket_timeseries(verdicts: list[dict], granularity: str) -> dict:
    """Bucket verdicts and compute per-check pass rates at given granularity."""
    buckets: dict[str, list[dict]] = {}
    for v in verdicts:
        ts = v.get("_sort_ts", "")
        if not ts:
            continue
        key = _bucket_key(ts, granularity)
        if key not in buckets:
            buckets[key] = []
        buckets[key].append(v)

    if not buckets:
        return {"has_data": False, "reason": "no timestamped sessions"}

    series: dict[str, list[dict]] = {}
    sorted_keys = sorted(buckets.keys())

    for key in sorted_keys:
        bucket_verdicts = buckets[key]
        label = _bucket_label(key, granularity)
        checks = aggregate_checks(bucket_verdicts)
        for name, stats in checks.items():
            if name not in series:
                series[name] = []
            series[name].append({
                "date": key,
                "label": label,
                "pass_rate": stats["pass_rate"],
                "applicable": stats["applicable"],
                "passed": stats["passed"],
            })

    return {
        "has_data": True,
        "granularity": granularity,
        "bucket_count": len(sorted_keys),
        "date_range": {
            "start": sorted_keys[0],
            "end": sorted_keys[-1],
            "total_buckets": len(sorted_keys),
        },
        "labels": {k: _bucket_label(k, granularity) for k in sorted_keys},
        "series": series,
    }


def view_timeseries(verdicts: list[dict], granularity: str | None = None) -> dict:
    """View 3: Per-check pass rates bucketed by time period.

    If granularity is None, auto-selects based on data span.
    Always returns all granularities so the report can switch client-side.
    """
    if not verdicts:
        return {"has_data": False}

    timestamps = [v.get("_sort_ts", "") for v in verdicts if v.get("_sort_ts")]
    if not timestamps:
        return {"has_data": False, "reason": "no timestamped sessions"}

    auto = _auto_granularity(timestamps)
    selected = granularity or auto

    # Compute all granularities for client-side switching
    all_granularities = {}
    for g in ("hourly", "daily", "weekly", "monthly"):
        result = _bucket_timeseries(verdicts, g)
        if result.get("has_data"):
            all_granularities[g] = result

    # Build top-level result — selected granularity's fields + metadata
    selected_data = all_granularities.get(selected, {})
    return {
        "has_data": True,
        "auto_granularity": auto,
        "selected_granularity": selected,
        "granularity": selected_data.get("granularity", selected),
        "bucket_count": selected_data.get("bucket_count", 0),
        "date_range": selected_data.get("date_range", {}),
        "labels": selected_data.get("labels", {}),
        "series": selected_data.get("series", {}),
        "all_granularities": all_granularities,
    }


# ─── Top-level recent sessions view ──────────────────────────────────────


def view_recent_sessions(verdicts: list[dict], n: int = 5) -> list[dict]:
    """Get the N most recent individual sessions with scores."""
    # Deduplicate: group by session_id + agent, merge instructions across tiles
    sessions: dict[str, dict] = {}
    for v in verdicts:
        key = f"{v.get('_cache_agent', '')}/{v.get('_cache_session_id', '')}"
        if key not in sessions:
            sessions[key] = {
                "session": key,
                "timestamp": v.get("_sort_ts", ""),
                "instructions": [],
            }
        sessions[key]["instructions"].extend(v.get("instructions", []))
        # Use latest timestamp
        ts = v.get("_sort_ts", "")
        if ts > sessions[key]["timestamp"]:
            sessions[key]["timestamp"] = ts

    # Sort by timestamp, take top N
    sorted_sessions = sorted(sessions.values(), key=lambda s: s["timestamp"], reverse=True)[:n]

    results = []
    for sess in sorted_sessions:
        scores = score_session(sess)
        results.append({
            "session": sess["session"],
            "timestamp": sess["timestamp"],
            **scores,
        })
    return results


# ─── CLI ──────────────────────────────────────────────────────────────────


def main():
    parser = argparse.ArgumentParser(
        description="Analyze verdict trends from cache"
    )
    parser.add_argument(
        "--analysis-dir",
        required=True,
        help="Analysis directory (e.g. ~/.tessl/session-analyses/<slug>)",
    )
    parser.add_argument(
        "--recent-days",
        type=int,
        default=7,
        help="Window for 'recent' in recent-vs-prior view (default: 7)",
    )
    parser.add_argument(
        "--recent-sessions",
        type=int,
        default=5,
        help="Number of recent sessions to show (default: 5)",
    )
    parser.add_argument(
        "--view",
        choices=["all", "latest", "recent-vs-prior", "timeseries", "sessions"],
        default="all",
        help="Which analysis view to produce (default: all)",
    )
    parser.add_argument(
        "--bucket",
        choices=["auto", "hourly", "daily", "weekly", "monthly"],
        default="auto",
        help="Timeseries bucket granularity (default: auto based on data span)",
    )
    parser.add_argument(
        "--out",
        default=None,
        help="Output path (default: stdout)",
    )
    args = parser.parse_args()

    analysis_dir = Path(args.analysis_dir)
    cache_dir = analysis_dir / "verdict-cache"

    # Load verdicts from cache, fall back to latest run
    verdicts = load_all_cached_verdicts(cache_dir, analysis_dir)
    if not verdicts:
        # Fall back to latest run
        runs_dir = analysis_dir / "runs"
        if runs_dir.exists():
            run_dirs = sorted(
                [d for d in runs_dir.iterdir() if d.is_dir() and not d.is_symlink()],
                reverse=True,
            )
            if run_dirs:
                verdicts = load_run_verdicts(run_dirs[0])
                print(f"No verdict cache found, loaded {len(verdicts)} verdicts from latest run",
                      file=sys.stderr)

    if not verdicts:
        print("No verdicts found", file=sys.stderr)
        sys.exit(1)

    print(f"Loaded {len(verdicts)} verdicts", file=sys.stderr)

    result = {}
    view = args.view

    if view in ("all", "sessions"):
        result["recent_sessions"] = view_recent_sessions(verdicts, args.recent_sessions)

    if view in ("all", "latest"):
        result["latest_session"] = view_latest_session(verdicts)

    if view in ("all", "recent-vs-prior"):
        result["recent_vs_prior"] = view_recent_vs_prior(verdicts, args.recent_days)

    if view in ("all", "timeseries"):
        bucket = None if args.bucket == "auto" else args.bucket
        result["timeseries"] = view_timeseries(verdicts, granularity=bucket)

    output = json.dumps(result, indent=2)
    if args.out:
        out_path = Path(args.out)
        out_path.parent.mkdir(parents=True, exist_ok=True)
        out_path.write_text(output, encoding="utf-8")
        print(f"Analysis written to {out_path}", file=sys.stderr)
    else:
        print(output)


if __name__ == "__main__":
    main()

README.md

tile.json