Collect and normalize agent logs, discover installed verifiers, and dispatch LLM judges to evaluate adherence. Produces per-session verdicts and aggregated reports.
91
90%
Does it follow best practices?
Impact
96%
3.09xAverage score across 3 eval scenarios
Passed
No known issues
#!/usr/bin/env python3
"""
Merge individual verdict files into an aggregated summary.
Reads verdict JSON files from verdicts/<agent>/<session>.verdict.json,
aggregates pass rates per checklist item across sessions, and writes
verdicts-aggregate.json.
No external dependencies.
"""
import argparse
import json
import sys
from collections import defaultdict
from datetime import datetime, timezone
from pathlib import Path
def load_verdicts(verdicts_dir: Path) -> list[dict]:
"""Load all verdict JSON files.
Supports both the tile-namespaced layout (verdicts/{tile}/{agent}/*.verdict.json)
and the legacy flat layout (verdicts/{agent}/*.verdict.json) for backwards
compatibility with older runs.
"""
verdicts = []
if not verdicts_dir.exists():
return verdicts
for top_dir in sorted(verdicts_dir.iterdir()):
if not top_dir.is_dir() or top_dir.name.startswith((".", "_")):
continue
# Check if this is a tile dir (contains agent subdirs) or an agent dir (contains verdict files directly)
has_verdict_files = any(top_dir.glob("*.verdict.json"))
if has_verdict_files:
# Legacy flat layout: verdicts/{agent}/*.verdict.json
for verdict_file in sorted(top_dir.glob("*.verdict.json")):
try:
data = json.loads(verdict_file.read_text(encoding="utf-8"))
data["_source_file"] = str(verdict_file)
verdicts.append(data)
except (json.JSONDecodeError, OSError) as e:
print(f"Warning: failed to read {verdict_file}: {e}",
file=sys.stderr)
else:
# Tile-namespaced layout: verdicts/{tile}/{agent}/*.verdict.json
for agent_dir in sorted(top_dir.iterdir()):
if not agent_dir.is_dir() or agent_dir.name.startswith((".", "_")):
continue
for verdict_file in sorted(agent_dir.glob("*.verdict.json")):
try:
data = json.loads(verdict_file.read_text(encoding="utf-8"))
data["_source_file"] = str(verdict_file)
verdicts.append(data)
except (json.JSONDecodeError, OSError) as e:
print(f"Warning: failed to read {verdict_file}: {e}",
file=sys.stderr)
return verdicts
def aggregate(verdicts: list[dict]) -> dict:
"""Aggregate verdicts into per-tile, per-instruction, per-check stats."""
# Structure: tiles -> instructions -> checks -> stats
tiles: dict[str, dict] = {}
total_input_tokens = 0
total_output_tokens = 0
for verdict in verdicts:
meta = verdict.get("_meta", {})
total_input_tokens += meta.get("input_tokens", 0) or 0
total_output_tokens += meta.get("output_tokens", 0) or 0
for inst in verdict.get("instructions", []):
tile_name = inst.get("tile", "unknown")
inst_file = inst.get("file", "unknown")
if tile_name not in tiles:
tiles[tile_name] = {"instructions": {}}
tile_data = tiles[tile_name]
if inst_file not in tile_data["instructions"]:
tile_data["instructions"][inst_file] = {
"instruction": inst.get("instruction", ""),
"checks": {},
}
inst_data = tile_data["instructions"][inst_file]
# Skip if instruction not relevant
if not inst.get("relevant", True):
continue
for check in inst.get("checks", []):
check_name = check.get("name", "unknown")
if check_name not in inst_data["checks"]:
inst_data["checks"][check_name] = {
"applicable_count": 0,
"passed_count": 0,
"failed_count": 0,
"not_applicable_count": 0,
"confidence_breakdown": {"high": 0, "medium": 0, "low": 0},
}
stats = inst_data["checks"][check_name]
if not check.get("applicable", False):
stats["not_applicable_count"] += 1
continue
stats["applicable_count"] += 1
passed = check.get("passed")
if passed is True:
stats["passed_count"] += 1
elif passed is False:
stats["failed_count"] += 1
confidence = check.get("confidence", "low")
if confidence in stats["confidence_breakdown"]:
stats["confidence_breakdown"][confidence] += 1
# Calculate pass rates
for tile_name, tile_data in tiles.items():
tile_applicable = 0
tile_passed = 0
for inst_file, inst_data in tile_data["instructions"].items():
for check_name, stats in inst_data["checks"].items():
if stats["applicable_count"] > 0:
stats["pass_rate"] = round(
stats["passed_count"] / stats["applicable_count"], 2
)
else:
stats["pass_rate"] = None
tile_applicable += stats["applicable_count"]
tile_passed += stats["passed_count"]
tile_data["overall_pass_rate"] = (
round(tile_passed / tile_applicable, 2)
if tile_applicable > 0
else None
)
# Cost estimate (haiku pricing as default)
estimated_cost = (
total_input_tokens / 1_000_000 * 0.80
+ total_output_tokens / 1_000_000 * 4.0
)
return {
"timestamp": datetime.now(timezone.utc).isoformat(),
"sessions_count": len(verdicts),
"tiles": tiles,
"cost": {
"total_input_tokens": total_input_tokens,
"total_output_tokens": total_output_tokens,
"estimated_cost_usd": round(estimated_cost, 4),
},
}
def main():
parser = argparse.ArgumentParser(
description="Merge verdict files into aggregate summary"
)
parser.add_argument(
"--dir",
nargs="+",
required=True,
help="Run directory(ies) containing verdicts/",
)
parser.add_argument(
"--out",
default=None,
help="Output path (default: <first-dir>/verdicts-aggregate.json)",
)
args = parser.parse_args()
verdicts = []
for d in args.dir:
verdicts_dir = Path(d) / "verdicts"
if not verdicts_dir.exists():
print(f"Warning: {verdicts_dir} not found, skipping", file=sys.stderr)
continue
verdicts.extend(load_verdicts(verdicts_dir))
if not verdicts:
print("No verdict files found", file=sys.stderr)
sys.exit(1)
result = aggregate(verdicts)
out_path = Path(args.out) if args.out else Path(args.dir[0]) / "verdicts-aggregate.json"
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_text(json.dumps(result, indent=2), encoding="utf-8")
# Summary
print(f"Aggregated {result['sessions_count']} sessions")
for tile_name, tile_data in result["tiles"].items():
rate = tile_data.get("overall_pass_rate")
rate_str = f"{rate:.0%}" if rate is not None else "N/A"
n_inst = len(tile_data["instructions"])
n_checks = sum(
len(inst["checks"])
for inst in tile_data["instructions"].values()
)
print(f" {tile_name}: {rate_str} ({n_inst} instructions, {n_checks} checks)")
cost = result["cost"]
print(f"\nCost: ${cost['estimated_cost_usd']:.4f} "
f"({cost['total_input_tokens'] + cost['total_output_tokens']:,} tokens)")
print(f"Output: {out_path}")
if __name__ == "__main__":
main()