CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl-labs/audit-logs

Collect and normalize agent logs, discover installed verifiers, and dispatch LLM judges to evaluate adherence. Produces per-session verdicts and aggregated reports.

91

3.09x
Quality

90%

Does it follow best practices?

Impact

96%

3.09x

Average score across 3 eval scenarios

SecuritybySnyk

Passed

No known issues

Overview
Quality
Evals
Security
Files

merge_verdicts.pyskills/audit-logs/scripts/

#!/usr/bin/env python3
"""
Merge individual verdict files into an aggregated summary.

Reads verdict JSON files from verdicts/<agent>/<session>.verdict.json,
aggregates pass rates per checklist item across sessions, and writes
verdicts-aggregate.json.

No external dependencies.
"""

import argparse
import json
import sys
from collections import defaultdict
from datetime import datetime, timezone
from pathlib import Path


def load_verdicts(verdicts_dir: Path) -> list[dict]:
    """Load all verdict JSON files.

    Supports both the tile-namespaced layout (verdicts/{tile}/{agent}/*.verdict.json)
    and the legacy flat layout (verdicts/{agent}/*.verdict.json) for backwards
    compatibility with older runs.
    """
    verdicts = []
    if not verdicts_dir.exists():
        return verdicts

    for top_dir in sorted(verdicts_dir.iterdir()):
        if not top_dir.is_dir() or top_dir.name.startswith((".", "_")):
            continue

        # Check if this is a tile dir (contains agent subdirs) or an agent dir (contains verdict files directly)
        has_verdict_files = any(top_dir.glob("*.verdict.json"))

        if has_verdict_files:
            # Legacy flat layout: verdicts/{agent}/*.verdict.json
            for verdict_file in sorted(top_dir.glob("*.verdict.json")):
                try:
                    data = json.loads(verdict_file.read_text(encoding="utf-8"))
                    data["_source_file"] = str(verdict_file)
                    verdicts.append(data)
                except (json.JSONDecodeError, OSError) as e:
                    print(f"Warning: failed to read {verdict_file}: {e}",
                          file=sys.stderr)
        else:
            # Tile-namespaced layout: verdicts/{tile}/{agent}/*.verdict.json
            for agent_dir in sorted(top_dir.iterdir()):
                if not agent_dir.is_dir() or agent_dir.name.startswith((".", "_")):
                    continue
                for verdict_file in sorted(agent_dir.glob("*.verdict.json")):
                    try:
                        data = json.loads(verdict_file.read_text(encoding="utf-8"))
                        data["_source_file"] = str(verdict_file)
                        verdicts.append(data)
                    except (json.JSONDecodeError, OSError) as e:
                        print(f"Warning: failed to read {verdict_file}: {e}",
                              file=sys.stderr)

    return verdicts


def aggregate(verdicts: list[dict]) -> dict:
    """Aggregate verdicts into per-tile, per-instruction, per-check stats."""
    # Structure: tiles -> instructions -> checks -> stats
    tiles: dict[str, dict] = {}

    total_input_tokens = 0
    total_output_tokens = 0

    for verdict in verdicts:
        meta = verdict.get("_meta", {})
        total_input_tokens += meta.get("input_tokens", 0) or 0
        total_output_tokens += meta.get("output_tokens", 0) or 0

        for inst in verdict.get("instructions", []):
            tile_name = inst.get("tile", "unknown")
            inst_file = inst.get("file", "unknown")

            if tile_name not in tiles:
                tiles[tile_name] = {"instructions": {}}

            tile_data = tiles[tile_name]
            if inst_file not in tile_data["instructions"]:
                tile_data["instructions"][inst_file] = {
                    "instruction": inst.get("instruction", ""),
                    "checks": {},
                }

            inst_data = tile_data["instructions"][inst_file]

            # Skip if instruction not relevant
            if not inst.get("relevant", True):
                continue

            for check in inst.get("checks", []):
                check_name = check.get("name", "unknown")

                if check_name not in inst_data["checks"]:
                    inst_data["checks"][check_name] = {
                        "applicable_count": 0,
                        "passed_count": 0,
                        "failed_count": 0,
                        "not_applicable_count": 0,
                        "confidence_breakdown": {"high": 0, "medium": 0, "low": 0},
                    }

                stats = inst_data["checks"][check_name]

                if not check.get("applicable", False):
                    stats["not_applicable_count"] += 1
                    continue

                stats["applicable_count"] += 1
                passed = check.get("passed")
                if passed is True:
                    stats["passed_count"] += 1
                elif passed is False:
                    stats["failed_count"] += 1

                confidence = check.get("confidence", "low")
                if confidence in stats["confidence_breakdown"]:
                    stats["confidence_breakdown"][confidence] += 1

    # Calculate pass rates
    for tile_name, tile_data in tiles.items():
        tile_applicable = 0
        tile_passed = 0

        for inst_file, inst_data in tile_data["instructions"].items():
            for check_name, stats in inst_data["checks"].items():
                if stats["applicable_count"] > 0:
                    stats["pass_rate"] = round(
                        stats["passed_count"] / stats["applicable_count"], 2
                    )
                else:
                    stats["pass_rate"] = None

                tile_applicable += stats["applicable_count"]
                tile_passed += stats["passed_count"]

        tile_data["overall_pass_rate"] = (
            round(tile_passed / tile_applicable, 2)
            if tile_applicable > 0
            else None
        )

    # Cost estimate (haiku pricing as default)
    estimated_cost = (
        total_input_tokens / 1_000_000 * 0.80
        + total_output_tokens / 1_000_000 * 4.0
    )

    return {
        "timestamp": datetime.now(timezone.utc).isoformat(),
        "sessions_count": len(verdicts),
        "tiles": tiles,
        "cost": {
            "total_input_tokens": total_input_tokens,
            "total_output_tokens": total_output_tokens,
            "estimated_cost_usd": round(estimated_cost, 4),
        },
    }


def main():
    parser = argparse.ArgumentParser(
        description="Merge verdict files into aggregate summary"
    )
    parser.add_argument(
        "--dir",
        nargs="+",
        required=True,
        help="Run directory(ies) containing verdicts/",
    )
    parser.add_argument(
        "--out",
        default=None,
        help="Output path (default: <first-dir>/verdicts-aggregate.json)",
    )
    args = parser.parse_args()

    verdicts = []
    for d in args.dir:
        verdicts_dir = Path(d) / "verdicts"
        if not verdicts_dir.exists():
            print(f"Warning: {verdicts_dir} not found, skipping", file=sys.stderr)
            continue
        verdicts.extend(load_verdicts(verdicts_dir))

    if not verdicts:
        print("No verdict files found", file=sys.stderr)
        sys.exit(1)

    result = aggregate(verdicts)

    out_path = Path(args.out) if args.out else Path(args.dir[0]) / "verdicts-aggregate.json"
    out_path.parent.mkdir(parents=True, exist_ok=True)
    out_path.write_text(json.dumps(result, indent=2), encoding="utf-8")

    # Summary
    print(f"Aggregated {result['sessions_count']} sessions")
    for tile_name, tile_data in result["tiles"].items():
        rate = tile_data.get("overall_pass_rate")
        rate_str = f"{rate:.0%}" if rate is not None else "N/A"
        n_inst = len(tile_data["instructions"])
        n_checks = sum(
            len(inst["checks"])
            for inst in tile_data["instructions"].values()
        )
        print(f"  {tile_name}: {rate_str} ({n_inst} instructions, {n_checks} checks)")

    cost = result["cost"]
    print(f"\nCost: ${cost['estimated_cost_usd']:.4f} "
          f"({cost['total_input_tokens'] + cost['total_output_tokens']:,} tokens)")
    print(f"Output: {out_path}")


if __name__ == "__main__":
    main()

tile.json