CtrlK
BlogDocsLog inGet started
Tessl Logo

try-tessl/agent-quality

Analyze agent sessions against verifier checklists, detect friction points, and create structured verifiers from skills and docs. Produces per-session verdicts and aggregated quality reports.

88

2.93x
Quality

86%

Does it follow best practices?

Impact

97%

2.93x

Average score across 3 eval scenarios

SecuritybySnyk

Passed

No known issues

Overview
Quality
Evals
Security
Files

synthesize_findings.pyskills/analyze-sessions/scripts/

#!/usr/bin/env python3
"""
Synthesize verifier adherence results with friction analysis.

Reads verdicts-aggregate.json and friction-summary.json from a run directory,
correlates friction events with verifier results per session, and classifies
each friction event into one of four skill relationships:

  - preventable: skill has instructions to avoid this, agent didn't follow
  - introduced: agent followed skill instructions and they caused the problem
  - adjacent: friction in the skill's domain but not covered by verifiers
  - unrelated: nothing to do with any installed skill

Uses normalized logs to determine which skills were active at each friction
event's turn range, then cross-references verifier pass/fail data.

Usage:
    python3 synthesize_findings.py --run-dir <path> --analysis-dir <path>

No external dependencies.
"""

from __future__ import annotations

import argparse
import json
import sys
from collections import defaultdict
from pathlib import Path


# ─── Skill activation from normalized logs ────────────────────────────────


def extract_skill_activations(normalized_dir: Path, agent: str, session_id: str) -> list[dict]:
    """Find skill activation events in a normalized session log.

    Returns list of {skill_name, turn, timestamp} for each activation.
    """
    activations = []
    session_file = normalized_dir / agent / f"{session_id}.jsonl"
    if not session_file.exists():
        return activations

    try:
        for line in session_file.read_text(errors="replace").splitlines():
            if not line.strip():
                continue
            try:
                event = json.loads(line)
            except json.JSONDecodeError:
                continue

            # Skill activation via tool call
            if event.get("kind") == "tool_call":
                tool = event.get("tool", {})
                tool_name = tool.get("name", "")

                # Direct skill tool activation
                if tool_name == "skill" or event.get("action") == "skill_activate":
                    skill_name = None
                    # Try to get skill name from segments
                    for seg in event.get("segments", []):
                        if seg.get("type") == "json":
                            data = seg.get("data", "")
                            if isinstance(data, str):
                                try:
                                    parsed = json.loads(data)
                                    skill_name = parsed.get("skill") or parsed.get("name")
                                except (json.JSONDecodeError, ValueError):
                                    pass
                            elif isinstance(data, dict):
                                skill_name = data.get("skill") or data.get("name")
                    if skill_name:
                        activations.append({
                            "skill_name": skill_name,
                            "turn": event.get("turn", 0),
                            "timestamp": event.get("timestamp", ""),
                        })

                # File read of SKILL.md (agent reading skill content)
                elif event.get("action") == "file_read":
                    for seg in event.get("segments", []):
                        data = seg.get("data", "")
                        if isinstance(data, str):
                            try:
                                parsed = json.loads(data)
                                fp = parsed.get("file_path", "") or parsed.get("path", "")
                            except (json.JSONDecodeError, ValueError):
                                fp = data
                        elif isinstance(data, dict):
                            fp = data.get("file_path", "") or data.get("path", "")
                        else:
                            fp = ""
                        if "SKILL.md" in str(fp) and "/skills/" in str(fp):
                            # Extract skill name from path
                            parts = str(fp).split("/skills/")
                            if len(parts) > 1:
                                skill_name = parts[-1].split("/")[0]
                                activations.append({
                                    "skill_name": skill_name,
                                    "turn": event.get("turn", 0),
                                    "timestamp": event.get("timestamp", ""),
                                })

    except OSError:
        pass

    return activations


def find_active_skill_at_turns(
    activations: list[dict],
    friction_turns: list[int],
) -> str | None:
    """Determine which skill (if any) was active during friction turns.

    A skill is considered 'active' if it was activated before or during the
    friction event's turn range. The most recently activated skill wins.
    """
    if not activations or not friction_turns:
        return None

    min_turn = min(friction_turns)
    # Find the most recent activation before or at the friction turns
    relevant = [a for a in activations if a["turn"] <= max(friction_turns)]
    if not relevant:
        return None

    # Return the most recently activated skill
    relevant.sort(key=lambda a: a["turn"], reverse=True)
    return relevant[0]["skill_name"]


# ─── Verifier data per session ────────────────────────────────────────────


def load_session_verdicts(verdicts_dir: Path) -> dict[str, list[dict]]:
    """Load per-session verdict data.

    Returns {session_key: [instruction_entries]} where session_key is agent/session_id.
    """
    session_verdicts: dict[str, list[dict]] = {}

    if not verdicts_dir.exists():
        return session_verdicts

    for top_dir in sorted(verdicts_dir.iterdir()):
        if not top_dir.is_dir() or top_dir.name.startswith((".", "_")):
            continue

        # Handle both tile-namespaced and flat layouts
        has_verdict_files = any(top_dir.glob("*.verdict.json"))
        agent_dirs = [top_dir] if has_verdict_files else [
            d for d in top_dir.iterdir()
            if d.is_dir() and not d.name.startswith((".", "_"))
        ]

        for agent_dir in agent_dirs:
            agent = agent_dir.name
            for vf in agent_dir.glob("*.verdict.json"):
                session_id = vf.stem.replace(".verdict", "")
                key = f"{agent}/{session_id}"
                try:
                    data = json.loads(vf.read_text(encoding="utf-8"))
                    instructions = data.get("instructions", [])
                    if key not in session_verdicts:
                        session_verdicts[key] = []
                    session_verdicts[key].extend(instructions)
                except (json.JSONDecodeError, OSError):
                    pass

    return session_verdicts


def get_tile_for_skill(session_instructions: list[dict], skill_name: str) -> str | None:
    """Find which tile a skill belongs to from verdict data."""
    for inst in session_instructions:
        tile = inst.get("tile", "")
        # Match by tile containing the skill name, or by the verifier source
        if skill_name and skill_name.lower() in tile.lower():
            return tile
    # If no direct match, check if any tile was relevant
    tiles = {inst.get("tile", "") for inst in session_instructions if inst.get("relevant")}
    return tiles.pop() if len(tiles) == 1 else None


def check_verifier_failures_for_tile(
    session_instructions: list[dict],
    tile_name: str,
) -> dict:
    """Check verifier pass/fail status for a specific tile in a session.

    Returns {has_failures: bool, failed_checks: [...], passed_checks: [...]}
    """
    failed = []
    passed = []

    for inst in session_instructions:
        if inst.get("tile", "") != tile_name:
            continue
        if not inst.get("relevant", False):
            continue
        for check in inst.get("checks", []):
            if not check.get("applicable", False):
                continue
            if check.get("passed") is True:
                passed.append(check.get("name", "unknown"))
            elif check.get("passed") is False:
                failed.append(check.get("name", "unknown"))

    return {
        "has_failures": len(failed) > 0,
        "failed_checks": failed,
        "passed_checks": passed,
    }


# ─── Relationship classification ──────────────────────────────────────────


def classify_friction_event(
    event: dict,
    activations: list[dict],
    session_instructions: list[dict],
    tile_names: set[str],
) -> dict:
    """Classify a friction event's relationship to skills/tiles.

    Returns the event with an added 'skill_relation' field.
    """
    turns = event.get("turns", [])
    active_skill = find_active_skill_at_turns(activations, turns)

    if not active_skill:
        return {
            **event,
            "skill_relation": {
                "tile": None,
                "relationship": "unrelated",
                "explanation": "No skill was active during this friction event",
            },
        }

    # Find which tile this skill belongs to
    tile = get_tile_for_skill(session_instructions, active_skill)

    if not tile:
        # Skill was active but we don't have verifier data for it
        # Check if any tile name contains the skill name
        for tn in tile_names:
            if active_skill.lower() in tn.lower():
                tile = tn
                break

    if not tile:
        return {
            **event,
            "skill_relation": {
                "tile": None,
                "skill": active_skill,
                "relationship": "unrelated",
                "explanation": f"Skill '{active_skill}' was active but no matching tile found in verifiers",
            },
        }

    # Check verifier results for this tile in this session
    verifier_status = check_verifier_failures_for_tile(session_instructions, tile)

    if verifier_status["has_failures"]:
        # Skill has verifiers that failed — the friction was likely preventable
        return {
            **event,
            "skill_relation": {
                "tile": tile,
                "skill": active_skill,
                "relationship": "preventable",
                "explanation": (
                    f"Skill '{active_skill}' has instructions covering this area "
                    f"but verifier(s) failed: {', '.join(verifier_status['failed_checks'])}"
                ),
                "failed_checks": verifier_status["failed_checks"],
            },
        }

    if verifier_status["passed_checks"]:
        # Verifiers passed but friction still occurred — skill may have introduced it
        return {
            **event,
            "skill_relation": {
                "tile": tile,
                "skill": active_skill,
                "relationship": "introduced",
                "explanation": (
                    f"Agent followed skill '{active_skill}' instructions "
                    f"(checks passed: {', '.join(verifier_status['passed_checks'])}) "
                    f"but friction still occurred — skill instructions may have caused this"
                ),
                "passed_checks": verifier_status["passed_checks"],
            },
        }

    # Skill was active, tile exists, but no verifiers cover this specific area
    return {
        **event,
        "skill_relation": {
            "tile": tile,
            "skill": active_skill,
            "relationship": "adjacent",
            "explanation": (
                f"Friction in the domain of '{active_skill}' but no verifier "
                f"covers this specific area — a gap in skill coverage"
            ),
        },
    }


# ─── Main synthesis ───────────────────────────────────────────────────────


def synthesize(
    run_dirs: list[Path],
    analysis_dirs: list[Path],
) -> dict:
    """Synthesize verifier and friction data into correlated findings.

    Accepts multiple run dirs and analysis dirs to support analyzing across
    multiple project paths (e.g. worktrees or separate checkouts).  The
    aggregated verdicts-aggregate.json and friction-summary.json are
    expected in the *first* run dir (the primary).  Per-session verdicts
    and normalized logs are searched across all dirs.
    """
    primary_run_dir = run_dirs[0]

    # Load verifier aggregate (from primary run dir where merge wrote it)
    aggregate_path = primary_run_dir / "verdicts-aggregate.json"
    if aggregate_path.exists():
        aggregate = json.loads(aggregate_path.read_text(encoding="utf-8"))
    else:
        aggregate = {"tiles": {}}

    # Load friction summary (from primary run dir)
    friction_path = primary_run_dir / "friction-summary.json"
    if friction_path.exists():
        friction_summary = json.loads(friction_path.read_text(encoding="utf-8"))
    else:
        friction_summary = {"friction_events": []}

    # Load per-session verdicts from ALL run dirs
    session_verdicts: dict[str, list[dict]] = {}
    for rd in run_dirs:
        verdicts_dir = rd / "verdicts"
        for key, instructions in load_session_verdicts(verdicts_dir).items():
            session_verdicts.setdefault(key, []).extend(instructions)

    # Collect all normalized log directories — search all analysis dirs
    normalized_dirs = [ad / "normalized" for ad in analysis_dirs if (ad / "normalized").exists()]

    # Known tile names from verifier data
    tile_names = set(aggregate.get("tiles", {}).keys())

    # Classify each friction event
    classified_events = []
    for event in friction_summary.get("friction_events", []):
        agent = event.get("agent", "unknown")
        session_id = event.get("session_id", "unknown")
        session_key = f"{agent}/{session_id}"

        # Get skill activations from normalized logs (search all analysis dirs)
        activations = []
        for nd in normalized_dirs:
            activations = extract_skill_activations(nd, agent, session_id)
            if activations:
                break

        # Get verifier results for this session
        instructions = session_verdicts.get(session_key, [])

        classified = classify_friction_event(
            event, activations, instructions, tile_names,
        )
        classified_events.append(classified)

    # Group by tile and relationship
    tiles_friction: dict[str, dict[str, list]] = defaultdict(lambda: defaultdict(list))
    unrelated_events = []

    for event in classified_events:
        rel = event.get("skill_relation", {})
        relationship = rel.get("relationship", "unrelated")
        tile = rel.get("tile")

        if tile and relationship != "unrelated":
            tiles_friction[tile][relationship].append(event)
        else:
            unrelated_events.append(event)

    # Build per-tile synthesis
    tiles_synthesis = {}
    for tile_name, tile_data in aggregate.get("tiles", {}).items():
        tile_synth: dict = {
            "adherence": {
                "overall_pass_rate": tile_data.get("overall_pass_rate"),
                "failing_checks": [],
            },
            "friction_by_relationship": {},
        }

        # Collect failing checks
        for inst_file, inst_data in tile_data.get("instructions", {}).items():
            for check_name, stats in inst_data.get("checks", {}).items():
                if stats.get("pass_rate") is not None and stats["pass_rate"] < 0.8:
                    tile_synth["adherence"]["failing_checks"].append({
                        "name": check_name,
                        "instruction": inst_file,
                        "pass_rate": stats["pass_rate"],
                        "applicable_count": stats.get("applicable_count", 0),
                    })

        # Add friction by relationship
        tile_fr = tiles_friction.get(tile_name, {})
        for relationship in ("preventable", "introduced", "adjacent"):
            events = tile_fr.get(relationship, [])
            if events:
                tile_synth["friction_by_relationship"][relationship] = {
                    "count": len(events),
                    "events": [
                        {
                            "session_id": e.get("session_id"),
                            "agent": e.get("agent"),
                            "type": e.get("type"),
                            "description": e.get("description"),
                            "turns": e.get("turns"),
                            "impact": e.get("impact"),
                            "explanation": e.get("skill_relation", {}).get("explanation", ""),
                        }
                        for e in events
                    ],
                    "action": _action_for_relationship(relationship),
                }

        # Generate summary
        tile_synth["summary"] = _generate_tile_summary(tile_synth)
        tiles_synthesis[tile_name] = tile_synth

    # Build output
    result = {
        "tiles": tiles_synthesis,
        "unrelated_friction": {
            "count": len(unrelated_events),
            "events": [
                {
                    "session_id": e.get("session_id"),
                    "agent": e.get("agent"),
                    "type": e.get("type"),
                    "description": e.get("description"),
                    "turns": e.get("turns"),
                    "impact": e.get("impact"),
                }
                for e in unrelated_events
            ],
            "action": "General agent/environment issues — not addressable through tiles",
        },
        "session_overview": {
            "total_sessions": friction_summary.get("sessions_count", 0),
            "sessions_with_friction": friction_summary.get("sessions_with_friction", 0),
            "friction_rate": friction_summary.get("friction_rate", 0),
            "outcomes": friction_summary.get("outcomes", {}),
            "satisfaction": friction_summary.get("satisfaction", {}),
        },
    }

    return result


def _action_for_relationship(relationship: str) -> str:
    """Get actionable guidance for a friction-skill relationship."""
    actions = {
        "preventable": (
            "Strengthen activation — skill has the right instructions "
            "but agent isn't following them"
        ),
        "introduced": (
            "Fix skill instructions — they're causing problems"
        ),
        "adjacent": (
            "Consider extending skill to cover this area"
        ),
    }
    return actions.get(relationship, "")


def _generate_tile_summary(tile_synth: dict) -> str:
    """Generate a human-readable summary for a tile."""
    parts = []
    adherence = tile_synth.get("adherence", {})
    pass_rate = adherence.get("overall_pass_rate")

    if pass_rate is not None:
        if pass_rate >= 0.9:
            parts.append("Skill instructions are well followed")
        elif pass_rate >= 0.7:
            parts.append("Most instructions followed but some gaps")
        else:
            parts.append("Significant adherence issues")

    friction = tile_synth.get("friction_by_relationship", {})
    if not friction:
        parts.append("no friction detected")
    else:
        counts = []
        for rel, data in friction.items():
            counts.append(f"{data['count']} {rel}")
        parts.append(f"friction: {', '.join(counts)}")

    return "; ".join(parts) if parts else "No data"


# ─── CLI ──────────────────────────────────────────────────────────────────


def main():
    parser = argparse.ArgumentParser(
        description="Synthesize verifier adherence + friction findings"
    )
    parser.add_argument(
        "--run-dir",
        nargs="+",
        required=True,
        help="Run directory(ies) — first is primary (has aggregated verdicts/friction)",
    )
    parser.add_argument(
        "--analysis-dir",
        nargs="+",
        required=True,
        help="Analysis directory(ies) containing normalized/ logs",
    )
    parser.add_argument(
        "--out",
        default=None,
        help="Output path (default: <first-run-dir>/synthesis.json)",
    )
    args = parser.parse_args()

    run_dirs = [Path(d) for d in args.run_dir]
    analysis_dirs = [Path(d) for d in args.analysis_dir]
    out_path = Path(args.out) if args.out else run_dirs[0] / "synthesis.json"

    result = synthesize(run_dirs, analysis_dirs)

    out_path.parent.mkdir(parents=True, exist_ok=True)
    out_path.write_text(json.dumps(result, indent=2), encoding="utf-8")

    # Summary
    print("Synthesis complete")
    for tile_name, tile_data in result["tiles"].items():
        summary = tile_data.get("summary", "")
        friction_rels = tile_data.get("friction_by_relationship", {})
        friction_total = sum(d["count"] for d in friction_rels.values())
        pass_rate = tile_data.get("adherence", {}).get("overall_pass_rate")
        rate_str = f"{pass_rate:.0%}" if pass_rate is not None else "N/A"
        print(f"\n  {tile_name}: adherence {rate_str}, {friction_total} friction events")
        for rel, data in friction_rels.items():
            print(f"    {rel} × {data['count']}: {data['action']}")

    unrelated = result.get("unrelated_friction", {})
    if unrelated.get("count", 0) > 0:
        print(f"\n  Unrelated friction: {unrelated['count']} events")

    overview = result.get("session_overview", {})
    print(f"\n  Sessions: {overview.get('total_sessions', 0)}, "
          f"friction rate: {overview.get('friction_rate', 0):.0%}")
    print(f"  Output: {out_path}")


if __name__ == "__main__":
    main()

README.md

tile.json