CtrlK
BlogDocsLog inGet started
Tessl Logo

tessleng/skill-insights

Scan a directory or workspace for SKILL.md files across all agents and repos, capture supporting files (references, scripts, linked docs), dedupe vendored copies, enrich each Tessl tile with registry signals, and emit a canonical JSON inventory validated by JSON Schema. Then run four analytical phases in parallel against the inventory — staleness + git provenance (history, broken refs, contributors), quality (Tessl `skill review`), duplicates (similarity + LLM judgement), registry-search (per-standalone-skill registry suggestions, HTTP only) — and render a self-contained interactive HTML report with a top-of-report health overview, top-issues panel, recently-changed list, and per-tessl.json manifests view.

84

1.44x
Quality

90%

Does it follow best practices?

Impact

97%

1.44x

Average score across 2 eval scenarios

SecuritybySnyk

Advisory

Suggest reviewing before use

Overview
Quality
Evals
Security
Files

finalize_duplicates.pyskills/detect-skill-duplicates/scripts/

#!/usr/bin/env python3
"""Cluster duplicate verdicts + emit the final duplicates.json.

Reads the work file produced by prepare_duplicates.py and a verdicts file
populated by the orchestrator (one entry per pair_id). Builds an undirected
graph from `duplicate` verdicts, finds connected components, picks dominant
skills, computes severity, and emits duplicates.json conforming to
references/schemas/duplicates.schema.json.

The prompts index, every per-pair verdict, and the final output are all
validated against their schemas at the IO boundary (best-effort if
`jsonschema` is installed). Verdict validation is non-strict: a malformed
verdict from one subagent is recorded in `metadata.failed_pairs[]` and the
rest of the run continues.

Usage:
    finalize_duplicates.py --prompts <path> --verdicts <path> [--output <path>]
"""
from __future__ import annotations

import argparse
import json
import sys
from datetime import datetime, timezone
from pathlib import Path

# Resolve the bundled schema directory and load the shared validation helper.
# Tile layout: <tile-root>/skills/<phase>/scripts/<script>.py
#              <tile-root>/references/schemas/{_validate.py, *.schema.json}
_SCHEMA_DIR_PATH = Path(__file__).resolve().parent.parent.parent.parent / "references" / "schemas"
if str(_SCHEMA_DIR_PATH) not in sys.path:
    sys.path.insert(0, str(_SCHEMA_DIR_PATH))
from _validate import SCHEMA_DIR, validate_against_schema  # noqa: E402

TOOL_VERSION = "skill-insights@0.10.0"
SCHEMA_VERSION = "1.0"


# ── Union-find for transitive clustering ──────────────────────────────────


class UnionFind:
    def __init__(self) -> None:
        self.parent: dict[str, str] = {}

    def find(self, x: str) -> str:
        if x not in self.parent:
            self.parent[x] = x
            return x
        while self.parent[x] != x:
            self.parent[x] = self.parent[self.parent[x]]
            x = self.parent[x]
        return x

    def union(self, a: str, b: str) -> None:
        ra, rb = self.find(a), self.find(b)
        if ra != rb:
            self.parent[ra] = rb


# ── Cluster construction ──────────────────────────────────────────────────


def pick_dominant(skill_ids: list[str], pair_records: list[dict], skills_meta: dict) -> str:
    """Vote-then-fall-back. skills_meta maps skill_id -> Skill object from work."""
    votes: dict[str, int] = {}
    for r in pair_records:
        d = r.get("dominant")
        if d in skill_ids:
            votes[d] = votes.get(d, 0) + 1
    if votes:
        return max(votes, key=votes.get)

    # Fall back: skills with evals (would require discovery; we don't have it here)
    # → most lines (from skill_md_content length)
    by_size = sorted(
        skill_ids,
        key=lambda sid: -len(skills_meta.get(sid, {}).get("skill_md_content", "")),
    )
    return by_size[0]


def cluster_severity(
    cluster_size: int,
    skill_ids: list[str],
    skills_meta: dict,
) -> str:
    if cluster_size >= 4:
        return "critical"

    repos = {skills_meta.get(sid, {}).get("repo") for sid in skill_ids}
    repos.discard(None)
    if cluster_size >= 2 and len(repos) >= 2:
        return "critical"

    if cluster_size == 3:
        return "high"

    if cluster_size == 2:
        a, b = skill_ids
        harness_a = set(skills_meta.get(a, {}).get("agent_harnesses") or [])
        harness_b = set(skills_meta.get(b, {}).get("agent_harnesses") or [])
        if harness_a & harness_b:
            return "high"

    return "medium"


def parse_args() -> argparse.Namespace:
    p = argparse.ArgumentParser(description="Finalize duplicate detection from per-pair verdicts")
    p.add_argument("--prompts", required=True, help="Path to duplicates-prompts/ directory")
    p.add_argument("--verdicts", required=True,
                   help="Path to duplicates-verdicts/ directory")
    p.add_argument("--output", default=None)
    p.add_argument(
        "--keep-intermediates",
        action="store_true",
        help="Don't delete the prompts/ and verdicts/ directories after finalizing.",
    )
    return p.parse_args()


def _cleanup_intermediates(prompts_arg: Path, verdicts_arg: Path) -> None:
    """Remove the prompts/ and verdicts/ directories after a successful finalize."""
    import shutil
    for p in (prompts_arg, verdicts_arg):
        try:
            shutil.rmtree(p)
        except Exception as e:
            print(f"WARN: could not clean up {p}: {e}", file=sys.stderr)


def _load_index(prompts_arg: Path) -> tuple[dict, list[dict]]:
    """Return (metadata, items[]) from a prompts directory."""
    idx_path = prompts_arg / "index.json"
    if not idx_path.exists():
        raise FileNotFoundError(f"index.json not found in {prompts_arg}")
    idx = json.loads(idx_path.read_text())
    validate_against_schema(
        idx,
        SCHEMA_DIR / "duplicates-prompts-index.schema.json",
        role="input (index.json)",
        source="finalize_duplicates.py",
    )
    return idx.get("metadata", {}), idx.get("items", [])


def _load_verdicts(verdicts_arg: Path, items: list[dict]) -> dict[str, dict]:
    """Return pair_id -> verdict from a verdicts directory."""
    out: dict[str, dict] = {}
    idx_to_pair = {item["idx"]: item["pair_id"] for item in items if "idx" in item}
    for f in sorted(verdicts_arg.glob("*.json")):
        try:
            stem = int(f.stem)
        except ValueError:
            continue
        pid = idx_to_pair.get(stem)
        if not pid:
            continue
        try:
            out[pid] = json.loads(f.read_text())
        except Exception:
            continue
    return out


def main() -> int:
    args = parse_args()
    prompts_arg = Path(args.prompts).resolve()
    verdicts_arg = Path(args.verdicts).resolve()
    if not prompts_arg.is_dir():
        print(f"ERROR: prompts directory not found: {prompts_arg}", file=sys.stderr)
        return 2
    if not verdicts_arg.is_dir():
        print(f"ERROR: verdicts directory not found: {verdicts_arg}", file=sys.stderr)
        return 2

    output_path = Path(args.output) if args.output else prompts_arg.parent / "duplicates.json"

    work_metadata, items = _load_index(prompts_arg)
    verdicts = _load_verdicts(verdicts_arg, items)

    # Build skills_meta from the index (for size-based dominant selection)
    skills_meta: dict[str, dict] = {}
    for item in items:
        for side in ("skill_a", "skill_b"):
            s = item[side]
            sid = s["skill_id"]
            skills_meta.setdefault(sid, {
                "skill_id": sid,
                "name": s.get("name") or "",
                "primary_path": s.get("primary_path"),
                "agent_harnesses": s.get("agent_harnesses") or [],
                "owning_package": s.get("owning_package"),
                "skill_md_content": "",  # not stored in directory mode (would re-load if needed)
                "repo": s.get("repo"),
            })

    # Process verdicts
    uf = UnionFind()
    duplicate_pair_records: list[dict] = []
    overlapping_pair_records: list[dict] = []
    failed_pairs: list[dict] = []
    by_pair_id = {item["pair_id"]: item for item in items}
    work = {"metadata": work_metadata, "candidate_pairs": items}

    for pair_id, entry in by_pair_id.items():
        verdict = verdicts.get(pair_id)
        if verdict is None:
            failed_pairs.append({
                "skill_a": entry["skill_a"]["skill_id"],
                "skill_b": entry["skill_b"]["skill_id"],
                "reason": "no verdict in verdicts file",
            })
            continue
        # Per-pair verdicts come from independent subagents; one bad shape
        # shouldn't abort the whole finalize. Record it as a failed pair and
        # move on, matching how a missing-verdict file is handled.
        verdict_valid = validate_against_schema(
            verdict,
            SCHEMA_DIR / "duplicate-verdict.schema.json",
            role=f"verdict ({pair_id})",
            source="finalize_duplicates.py",
            strict=False,
        )
        if not verdict_valid:
            failed_pairs.append({
                "skill_a": entry["skill_a"]["skill_id"],
                "skill_b": entry["skill_b"]["skill_id"],
                "reason": "verdict failed schema validation",
            })
            continue
        v = verdict.get("verdict")
        if v == "duplicate":
            a, b = entry["skill_a"]["skill_id"], entry["skill_b"]["skill_id"]
            uf.union(a, b)
            duplicate_pair_records.append({
                "pair_id": pair_id,
                "skill_a": a,
                "skill_b": b,
                "verdict": "duplicate",
                "reason": verdict.get("reason", ""),
                "dominant": verdict.get("dominant"),
                "similarity_score": entry["similarity_score"],
            })
        elif v == "overlapping":
            overlapping_pair_records.append({
                "skill_a": entry["skill_a"]["skill_id"],
                "skill_b": entry["skill_b"]["skill_id"],
                "verdict": "overlapping",
                "reason": verdict.get("reason", ""),
                "similarity_score": entry["similarity_score"],
                "severity": (
                    "high" if entry["similarity_score"] >= 0.7
                    else "medium" if entry["similarity_score"] >= 0.5
                    else "low"
                ),
            })
        elif v == "independent":
            pass  # drop
        else:
            failed_pairs.append({
                "skill_a": entry["skill_a"]["skill_id"],
                "skill_b": entry["skill_b"]["skill_id"],
                "reason": f"unrecognised verdict '{v}'",
            })

    # Build clusters from union-find
    components: dict[str, list[str]] = {}
    for sid in {p["skill_a"] for p in duplicate_pair_records} | {p["skill_b"] for p in duplicate_pair_records}:
        root = uf.find(sid)
        components.setdefault(root, []).append(sid)

    clusters = []
    for i, (root, ids) in enumerate(sorted(components.items(), key=lambda kv: -len(kv[1]))):
        if len(ids) < 2:
            continue
        ids_sorted = sorted(ids)
        cluster_pairs = [
            r for r in duplicate_pair_records
            if r["skill_a"] in ids_sorted and r["skill_b"] in ids_sorted
        ]
        dominant = pick_dominant(ids_sorted, cluster_pairs, skills_meta)
        severity = cluster_severity(len(ids_sorted), ids_sorted, skills_meta)
        # Synthesize a one-line reason from the most-common pair reason
        reasons = [r["reason"] for r in cluster_pairs if r.get("reason")]
        cluster_reason = reasons[0] if reasons else "Multiple skills converge on the same workflow."

        clusters.append({
            "cluster_id": f"cluster-{i + 1:03d}",
            "skill_ids": ids_sorted,
            "dominant_skill_id": dominant,
            "verdict": "duplicate",
            "reason": cluster_reason,
            "severity": severity,
            "confirmed_pairs": [
                {k: r[k] for k in ("skill_a", "skill_b", "verdict", "reason", "similarity_score")}
                for r in cluster_pairs
            ],
        })

    # Sort overlapping_pairs by similarity desc
    overlapping_pair_records.sort(key=lambda r: -r["similarity_score"])

    skills_in_clusters = {sid for c in clusters for sid in c["skill_ids"]}
    skills_in_overlapping = {p["skill_a"] for p in overlapping_pair_records} | {p["skill_b"] for p in overlapping_pair_records}
    total_implicated = skills_in_clusters | skills_in_overlapping

    reduction_potential = sum(len(c["skill_ids"]) - 1 for c in clusters)

    output = {
        "schema_version": SCHEMA_VERSION,
        "metadata": {
            "scan_id": work.get("metadata", {}).get("scan_id"),
            "scanned_at": datetime.now(timezone.utc).isoformat(),
            "tool_version": TOOL_VERSION,
            "skill_count": work.get("metadata", {}).get("skill_count", 0),
            "candidate_pairs_generated": work.get("metadata", {}).get("candidate_pairs_generated", 0),
            "candidate_pairs_evaluated": len(by_pair_id),
            "max_pairs_cap": work.get("metadata", {}).get("max_pairs_cap"),
            "allow_cross_repo": work.get("metadata", {}).get("allow_cross_repo", False),
            "llm_calls": len([v for v in verdicts.values() if isinstance(v, dict)]),
            "failed_pairs": failed_pairs,
        },
        "clusters": clusters,
        "overlapping_pairs": overlapping_pair_records,
        "estate_summary": {
            "duplicate_clusters": len(clusters),
            "skills_in_clusters": len(skills_in_clusters),
            "overlapping_pair_count": len(overlapping_pair_records),
            "total_skills_implicated": len(total_implicated),
            "estimated_skill_reduction_potential": reduction_potential,
        },
        "warnings": [],
    }

    validate_against_schema(
        output,
        SCHEMA_DIR / "duplicates.schema.json",
        role="output",
        source="finalize_duplicates.py",
    )

    output_path.parent.mkdir(parents=True, exist_ok=True)
    output_path.write_text(json.dumps(output, indent=2))

    if not args.keep_intermediates:
        _cleanup_intermediates(prompts_arg, verdicts_arg)

    print(
        f"Duplicate finalize complete.\n"
        f"  Pairs evaluated:        {len(by_pair_id)}\n"
        f"  Confirmed duplicates:   {len(duplicate_pair_records)}\n"
        f"  Overlapping pairs:      {len(overlapping_pair_records)}\n"
        f"  Clusters:               {len(clusters)} (skills implicated: {len(skills_in_clusters)})\n"
        f"  Reduction potential:    {reduction_potential}\n"
        f"  Failed pairs:           {len(failed_pairs)}\n"
        f"  Output:                 {output_path}",
        file=sys.stderr,
    )
    return 0


if __name__ == "__main__":
    sys.exit(main())

skills

README.md

tile.json