Analyze agent sessions against verifier checklists, detect friction points, and create structured verifiers from skills and docs. Produces per-session verdicts and aggregated quality reports.
88
86%
Does it follow best practices?
Impact
97%
2.93xAverage score across 3 eval scenarios
Passed
No known issues
#!/usr/bin/env python3
"""
Synthesize verifier adherence results with friction analysis.
Reads verdicts-aggregate.json and friction-summary.json from a run directory,
correlates friction events with verifier results per session, and classifies
each friction event into one of four skill relationships:
- preventable: skill has instructions to avoid this, agent didn't follow
- introduced: agent followed skill instructions and they caused the problem
- adjacent: friction in the skill's domain but not covered by verifiers
- unrelated: nothing to do with any installed skill
Uses normalized logs to determine which skills were active at each friction
event's turn range, then cross-references verifier pass/fail data.
Usage:
python3 synthesize_findings.py --run-dir <path> --analysis-dir <path>
No external dependencies.
"""
from __future__ import annotations
import argparse
import json
import sys
from collections import defaultdict
from pathlib import Path
# ─── Skill activation from normalized logs ────────────────────────────────
def extract_skill_activations(normalized_dir: Path, agent: str, session_id: str) -> list[dict]:
"""Find skill activation events in a normalized session log.
Returns list of {skill_name, turn, timestamp} for each activation.
"""
activations = []
session_file = normalized_dir / agent / f"{session_id}.jsonl"
if not session_file.exists():
return activations
try:
for line in session_file.read_text(errors="replace").splitlines():
if not line.strip():
continue
try:
event = json.loads(line)
except json.JSONDecodeError:
continue
# Skill activation via tool call
if event.get("kind") == "tool_call":
tool = event.get("tool", {})
tool_name = tool.get("name", "")
# Direct skill tool activation
if tool_name == "skill" or event.get("action") == "skill_activate":
skill_name = None
# Try to get skill name from segments
for seg in event.get("segments", []):
if seg.get("type") == "json":
data = seg.get("data", "")
if isinstance(data, str):
try:
parsed = json.loads(data)
skill_name = parsed.get("skill") or parsed.get("name")
except (json.JSONDecodeError, ValueError):
pass
elif isinstance(data, dict):
skill_name = data.get("skill") or data.get("name")
if skill_name:
activations.append({
"skill_name": skill_name,
"turn": event.get("turn", 0),
"timestamp": event.get("timestamp", ""),
})
# File read of SKILL.md (agent reading skill content)
elif event.get("action") == "file_read":
for seg in event.get("segments", []):
data = seg.get("data", "")
if isinstance(data, str):
try:
parsed = json.loads(data)
fp = parsed.get("file_path", "") or parsed.get("path", "")
except (json.JSONDecodeError, ValueError):
fp = data
elif isinstance(data, dict):
fp = data.get("file_path", "") or data.get("path", "")
else:
fp = ""
if "SKILL.md" in str(fp) and "/skills/" in str(fp):
# Extract skill name from path
parts = str(fp).split("/skills/")
if len(parts) > 1:
skill_name = parts[-1].split("/")[0]
activations.append({
"skill_name": skill_name,
"turn": event.get("turn", 0),
"timestamp": event.get("timestamp", ""),
})
except OSError:
pass
return activations
def find_active_skill_at_turns(
activations: list[dict],
friction_turns: list[int],
) -> str | None:
"""Determine which skill (if any) was active during friction turns.
A skill is considered 'active' if it was activated before or during the
friction event's turn range. The most recently activated skill wins.
"""
if not activations or not friction_turns:
return None
min_turn = min(friction_turns)
# Find the most recent activation before or at the friction turns
relevant = [a for a in activations if a["turn"] <= max(friction_turns)]
if not relevant:
return None
# Return the most recently activated skill
relevant.sort(key=lambda a: a["turn"], reverse=True)
return relevant[0]["skill_name"]
# ─── Verifier data per session ────────────────────────────────────────────
def load_session_verdicts(verdicts_dir: Path) -> dict[str, list[dict]]:
"""Load per-session verdict data.
Returns {session_key: [instruction_entries]} where session_key is agent/session_id.
"""
session_verdicts: dict[str, list[dict]] = {}
if not verdicts_dir.exists():
return session_verdicts
for top_dir in sorted(verdicts_dir.iterdir()):
if not top_dir.is_dir() or top_dir.name.startswith((".", "_")):
continue
# Handle both tile-namespaced and flat layouts
has_verdict_files = any(top_dir.glob("*.verdict.json"))
agent_dirs = [top_dir] if has_verdict_files else [
d for d in top_dir.iterdir()
if d.is_dir() and not d.name.startswith((".", "_"))
]
for agent_dir in agent_dirs:
agent = agent_dir.name
for vf in agent_dir.glob("*.verdict.json"):
session_id = vf.stem.replace(".verdict", "")
key = f"{agent}/{session_id}"
try:
data = json.loads(vf.read_text(encoding="utf-8"))
instructions = data.get("instructions", [])
if key not in session_verdicts:
session_verdicts[key] = []
session_verdicts[key].extend(instructions)
except (json.JSONDecodeError, OSError):
pass
return session_verdicts
def get_tile_for_skill(session_instructions: list[dict], skill_name: str) -> str | None:
"""Find which tile a skill belongs to from verdict data."""
for inst in session_instructions:
tile = inst.get("tile", "")
# Match by tile containing the skill name, or by the verifier source
if skill_name and skill_name.lower() in tile.lower():
return tile
# If no direct match, check if any tile was relevant
tiles = {inst.get("tile", "") for inst in session_instructions if inst.get("relevant")}
return tiles.pop() if len(tiles) == 1 else None
def check_verifier_failures_for_tile(
session_instructions: list[dict],
tile_name: str,
) -> dict:
"""Check verifier pass/fail status for a specific tile in a session.
Returns {has_failures: bool, failed_checks: [...], passed_checks: [...]}
"""
failed = []
passed = []
for inst in session_instructions:
if inst.get("tile", "") != tile_name:
continue
if not inst.get("relevant", False):
continue
for check in inst.get("checks", []):
if not check.get("applicable", False):
continue
if check.get("passed") is True:
passed.append(check.get("name", "unknown"))
elif check.get("passed") is False:
failed.append(check.get("name", "unknown"))
return {
"has_failures": len(failed) > 0,
"failed_checks": failed,
"passed_checks": passed,
}
# ─── Relationship classification ──────────────────────────────────────────
def classify_friction_event(
event: dict,
activations: list[dict],
session_instructions: list[dict],
tile_names: set[str],
) -> dict:
"""Classify a friction event's relationship to skills/tiles.
Returns the event with an added 'skill_relation' field.
"""
turns = event.get("turns", [])
active_skill = find_active_skill_at_turns(activations, turns)
if not active_skill:
return {
**event,
"skill_relation": {
"tile": None,
"relationship": "unrelated",
"explanation": "No skill was active during this friction event",
},
}
# Find which tile this skill belongs to
tile = get_tile_for_skill(session_instructions, active_skill)
if not tile:
# Skill was active but we don't have verifier data for it
# Check if any tile name contains the skill name
for tn in tile_names:
if active_skill.lower() in tn.lower():
tile = tn
break
if not tile:
return {
**event,
"skill_relation": {
"tile": None,
"skill": active_skill,
"relationship": "unrelated",
"explanation": f"Skill '{active_skill}' was active but no matching tile found in verifiers",
},
}
# Check verifier results for this tile in this session
verifier_status = check_verifier_failures_for_tile(session_instructions, tile)
if verifier_status["has_failures"]:
# Skill has verifiers that failed — the friction was likely preventable
return {
**event,
"skill_relation": {
"tile": tile,
"skill": active_skill,
"relationship": "preventable",
"explanation": (
f"Skill '{active_skill}' has instructions covering this area "
f"but verifier(s) failed: {', '.join(verifier_status['failed_checks'])}"
),
"failed_checks": verifier_status["failed_checks"],
},
}
if verifier_status["passed_checks"]:
# Verifiers passed but friction still occurred — skill may have introduced it
return {
**event,
"skill_relation": {
"tile": tile,
"skill": active_skill,
"relationship": "introduced",
"explanation": (
f"Agent followed skill '{active_skill}' instructions "
f"(checks passed: {', '.join(verifier_status['passed_checks'])}) "
f"but friction still occurred — skill instructions may have caused this"
),
"passed_checks": verifier_status["passed_checks"],
},
}
# Skill was active, tile exists, but no verifiers cover this specific area
return {
**event,
"skill_relation": {
"tile": tile,
"skill": active_skill,
"relationship": "adjacent",
"explanation": (
f"Friction in the domain of '{active_skill}' but no verifier "
f"covers this specific area — a gap in skill coverage"
),
},
}
# ─── Main synthesis ───────────────────────────────────────────────────────
def synthesize(
run_dirs: list[Path],
analysis_dirs: list[Path],
) -> dict:
"""Synthesize verifier and friction data into correlated findings.
Accepts multiple run dirs and analysis dirs to support analyzing across
multiple project paths (e.g. worktrees or separate checkouts). The
aggregated verdicts-aggregate.json and friction-summary.json are
expected in the *first* run dir (the primary). Per-session verdicts
and normalized logs are searched across all dirs.
"""
primary_run_dir = run_dirs[0]
# Load verifier aggregate (from primary run dir where merge wrote it)
aggregate_path = primary_run_dir / "verdicts-aggregate.json"
if aggregate_path.exists():
aggregate = json.loads(aggregate_path.read_text(encoding="utf-8"))
else:
aggregate = {"tiles": {}}
# Load friction summary (from primary run dir)
friction_path = primary_run_dir / "friction-summary.json"
if friction_path.exists():
friction_summary = json.loads(friction_path.read_text(encoding="utf-8"))
else:
friction_summary = {"friction_events": []}
# Load per-session verdicts from ALL run dirs
session_verdicts: dict[str, list[dict]] = {}
for rd in run_dirs:
verdicts_dir = rd / "verdicts"
for key, instructions in load_session_verdicts(verdicts_dir).items():
session_verdicts.setdefault(key, []).extend(instructions)
# Collect all normalized log directories — search all analysis dirs
normalized_dirs = [ad / "normalized" for ad in analysis_dirs if (ad / "normalized").exists()]
# Known tile names from verifier data
tile_names = set(aggregate.get("tiles", {}).keys())
# Classify each friction event
classified_events = []
for event in friction_summary.get("friction_events", []):
agent = event.get("agent", "unknown")
session_id = event.get("session_id", "unknown")
session_key = f"{agent}/{session_id}"
# Get skill activations from normalized logs (search all analysis dirs)
activations = []
for nd in normalized_dirs:
activations = extract_skill_activations(nd, agent, session_id)
if activations:
break
# Get verifier results for this session
instructions = session_verdicts.get(session_key, [])
classified = classify_friction_event(
event, activations, instructions, tile_names,
)
classified_events.append(classified)
# Group by tile and relationship
tiles_friction: dict[str, dict[str, list]] = defaultdict(lambda: defaultdict(list))
unrelated_events = []
for event in classified_events:
rel = event.get("skill_relation", {})
relationship = rel.get("relationship", "unrelated")
tile = rel.get("tile")
if tile and relationship != "unrelated":
tiles_friction[tile][relationship].append(event)
else:
unrelated_events.append(event)
# Build per-tile synthesis
tiles_synthesis = {}
for tile_name, tile_data in aggregate.get("tiles", {}).items():
tile_synth: dict = {
"adherence": {
"overall_pass_rate": tile_data.get("overall_pass_rate"),
"failing_checks": [],
},
"friction_by_relationship": {},
}
# Collect failing checks
for inst_file, inst_data in tile_data.get("instructions", {}).items():
for check_name, stats in inst_data.get("checks", {}).items():
if stats.get("pass_rate") is not None and stats["pass_rate"] < 0.8:
tile_synth["adherence"]["failing_checks"].append({
"name": check_name,
"instruction": inst_file,
"pass_rate": stats["pass_rate"],
"applicable_count": stats.get("applicable_count", 0),
})
# Add friction by relationship
tile_fr = tiles_friction.get(tile_name, {})
for relationship in ("preventable", "introduced", "adjacent"):
events = tile_fr.get(relationship, [])
if events:
tile_synth["friction_by_relationship"][relationship] = {
"count": len(events),
"events": [
{
"session_id": e.get("session_id"),
"agent": e.get("agent"),
"type": e.get("type"),
"description": e.get("description"),
"turns": e.get("turns"),
"impact": e.get("impact"),
"explanation": e.get("skill_relation", {}).get("explanation", ""),
}
for e in events
],
"action": _action_for_relationship(relationship),
}
# Generate summary
tile_synth["summary"] = _generate_tile_summary(tile_synth)
tiles_synthesis[tile_name] = tile_synth
# Build output
result = {
"tiles": tiles_synthesis,
"unrelated_friction": {
"count": len(unrelated_events),
"events": [
{
"session_id": e.get("session_id"),
"agent": e.get("agent"),
"type": e.get("type"),
"description": e.get("description"),
"turns": e.get("turns"),
"impact": e.get("impact"),
}
for e in unrelated_events
],
"action": "General agent/environment issues — not addressable through tiles",
},
"session_overview": {
"total_sessions": friction_summary.get("sessions_count", 0),
"sessions_with_friction": friction_summary.get("sessions_with_friction", 0),
"friction_rate": friction_summary.get("friction_rate", 0),
"outcomes": friction_summary.get("outcomes", {}),
"satisfaction": friction_summary.get("satisfaction", {}),
},
}
return result
def _action_for_relationship(relationship: str) -> str:
"""Get actionable guidance for a friction-skill relationship."""
actions = {
"preventable": (
"Strengthen activation — skill has the right instructions "
"but agent isn't following them"
),
"introduced": (
"Fix skill instructions — they're causing problems"
),
"adjacent": (
"Consider extending skill to cover this area"
),
}
return actions.get(relationship, "")
def _generate_tile_summary(tile_synth: dict) -> str:
"""Generate a human-readable summary for a tile."""
parts = []
adherence = tile_synth.get("adherence", {})
pass_rate = adherence.get("overall_pass_rate")
if pass_rate is not None:
if pass_rate >= 0.9:
parts.append("Skill instructions are well followed")
elif pass_rate >= 0.7:
parts.append("Most instructions followed but some gaps")
else:
parts.append("Significant adherence issues")
friction = tile_synth.get("friction_by_relationship", {})
if not friction:
parts.append("no friction detected")
else:
counts = []
for rel, data in friction.items():
counts.append(f"{data['count']} {rel}")
parts.append(f"friction: {', '.join(counts)}")
return "; ".join(parts) if parts else "No data"
# ─── CLI ──────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(
description="Synthesize verifier adherence + friction findings"
)
parser.add_argument(
"--run-dir",
nargs="+",
required=True,
help="Run directory(ies) — first is primary (has aggregated verdicts/friction)",
)
parser.add_argument(
"--analysis-dir",
nargs="+",
required=True,
help="Analysis directory(ies) containing normalized/ logs",
)
parser.add_argument(
"--out",
default=None,
help="Output path (default: <first-run-dir>/synthesis.json)",
)
args = parser.parse_args()
run_dirs = [Path(d) for d in args.run_dir]
analysis_dirs = [Path(d) for d in args.analysis_dir]
out_path = Path(args.out) if args.out else run_dirs[0] / "synthesis.json"
result = synthesize(run_dirs, analysis_dirs)
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_text(json.dumps(result, indent=2), encoding="utf-8")
# Summary
print("Synthesis complete")
for tile_name, tile_data in result["tiles"].items():
summary = tile_data.get("summary", "")
friction_rels = tile_data.get("friction_by_relationship", {})
friction_total = sum(d["count"] for d in friction_rels.values())
pass_rate = tile_data.get("adherence", {}).get("overall_pass_rate")
rate_str = f"{pass_rate:.0%}" if pass_rate is not None else "N/A"
print(f"\n {tile_name}: adherence {rate_str}, {friction_total} friction events")
for rel, data in friction_rels.items():
print(f" {rel} × {data['count']}: {data['action']}")
unrelated = result.get("unrelated_friction", {})
if unrelated.get("count", 0) > 0:
print(f"\n Unrelated friction: {unrelated['count']} events")
overview = result.get("session_overview", {})
print(f"\n Sessions: {overview.get('total_sessions', 0)}, "
f"friction rate: {overview.get('friction_rate', 0):.0%}")
print(f" Output: {out_path}")
if __name__ == "__main__":
main()