Scan a directory or workspace for SKILL.md files across all agents and repos, capture supporting files (references, scripts, linked docs), dedupe vendored copies, enrich each Tessl tile with registry signals, and emit a canonical JSON inventory validated by JSON Schema. Then run four analytical phases in parallel against the inventory — staleness + git provenance (history, broken refs, contributors), quality (Tessl `skill review`), duplicates (similarity + LLM judgement), registry-search (per-standalone-skill registry suggestions, HTTP only) — and render a self-contained interactive HTML report with a top-of-report health overview, top-issues panel, recently-changed list, and per-tessl.json manifests view.
84
90%
Does it follow best practices?
Impact
97%
1.44xAverage score across 2 eval scenarios
Advisory
Suggest reviewing before use
#!/usr/bin/env python3
"""Cluster duplicate verdicts + emit the final duplicates.json.
Reads the work file produced by prepare_duplicates.py and a verdicts file
populated by the orchestrator (one entry per pair_id). Builds an undirected
graph from `duplicate` verdicts, finds connected components, picks dominant
skills, computes severity, and emits duplicates.json conforming to
references/schemas/duplicates.schema.json.
The prompts index, every per-pair verdict, and the final output are all
validated against their schemas at the IO boundary (best-effort if
`jsonschema` is installed). Verdict validation is non-strict: a malformed
verdict from one subagent is recorded in `metadata.failed_pairs[]` and the
rest of the run continues.
Usage:
finalize_duplicates.py --prompts <path> --verdicts <path> [--output <path>]
"""
from __future__ import annotations
import argparse
import json
import sys
from datetime import datetime, timezone
from pathlib import Path
# Resolve the bundled schema directory and load the shared validation helper.
# Tile layout: <tile-root>/skills/<phase>/scripts/<script>.py
# <tile-root>/references/schemas/{_validate.py, *.schema.json}
_SCHEMA_DIR_PATH = Path(__file__).resolve().parent.parent.parent.parent / "references" / "schemas"
if str(_SCHEMA_DIR_PATH) not in sys.path:
sys.path.insert(0, str(_SCHEMA_DIR_PATH))
from _validate import SCHEMA_DIR, validate_against_schema # noqa: E402
TOOL_VERSION = "skill-insights@0.10.0"
SCHEMA_VERSION = "1.0"
# ── Union-find for transitive clustering ──────────────────────────────────
class UnionFind:
def __init__(self) -> None:
self.parent: dict[str, str] = {}
def find(self, x: str) -> str:
if x not in self.parent:
self.parent[x] = x
return x
while self.parent[x] != x:
self.parent[x] = self.parent[self.parent[x]]
x = self.parent[x]
return x
def union(self, a: str, b: str) -> None:
ra, rb = self.find(a), self.find(b)
if ra != rb:
self.parent[ra] = rb
# ── Cluster construction ──────────────────────────────────────────────────
def pick_dominant(skill_ids: list[str], pair_records: list[dict], skills_meta: dict) -> str:
"""Vote-then-fall-back. skills_meta maps skill_id -> Skill object from work."""
votes: dict[str, int] = {}
for r in pair_records:
d = r.get("dominant")
if d in skill_ids:
votes[d] = votes.get(d, 0) + 1
if votes:
return max(votes, key=votes.get)
# Fall back: skills with evals (would require discovery; we don't have it here)
# → most lines (from skill_md_content length)
by_size = sorted(
skill_ids,
key=lambda sid: -len(skills_meta.get(sid, {}).get("skill_md_content", "")),
)
return by_size[0]
def cluster_severity(
cluster_size: int,
skill_ids: list[str],
skills_meta: dict,
) -> str:
if cluster_size >= 4:
return "critical"
repos = {skills_meta.get(sid, {}).get("repo") for sid in skill_ids}
repos.discard(None)
if cluster_size >= 2 and len(repos) >= 2:
return "critical"
if cluster_size == 3:
return "high"
if cluster_size == 2:
a, b = skill_ids
harness_a = set(skills_meta.get(a, {}).get("agent_harnesses") or [])
harness_b = set(skills_meta.get(b, {}).get("agent_harnesses") or [])
if harness_a & harness_b:
return "high"
return "medium"
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(description="Finalize duplicate detection from per-pair verdicts")
p.add_argument("--prompts", required=True, help="Path to duplicates-prompts/ directory")
p.add_argument("--verdicts", required=True,
help="Path to duplicates-verdicts/ directory")
p.add_argument("--output", default=None)
p.add_argument(
"--keep-intermediates",
action="store_true",
help="Don't delete the prompts/ and verdicts/ directories after finalizing.",
)
return p.parse_args()
def _cleanup_intermediates(prompts_arg: Path, verdicts_arg: Path) -> None:
"""Remove the prompts/ and verdicts/ directories after a successful finalize."""
import shutil
for p in (prompts_arg, verdicts_arg):
try:
shutil.rmtree(p)
except Exception as e:
print(f"WARN: could not clean up {p}: {e}", file=sys.stderr)
def _load_index(prompts_arg: Path) -> tuple[dict, list[dict]]:
"""Return (metadata, items[]) from a prompts directory."""
idx_path = prompts_arg / "index.json"
if not idx_path.exists():
raise FileNotFoundError(f"index.json not found in {prompts_arg}")
idx = json.loads(idx_path.read_text())
validate_against_schema(
idx,
SCHEMA_DIR / "duplicates-prompts-index.schema.json",
role="input (index.json)",
source="finalize_duplicates.py",
)
return idx.get("metadata", {}), idx.get("items", [])
def _load_verdicts(verdicts_arg: Path, items: list[dict]) -> dict[str, dict]:
"""Return pair_id -> verdict from a verdicts directory."""
out: dict[str, dict] = {}
idx_to_pair = {item["idx"]: item["pair_id"] for item in items if "idx" in item}
for f in sorted(verdicts_arg.glob("*.json")):
try:
stem = int(f.stem)
except ValueError:
continue
pid = idx_to_pair.get(stem)
if not pid:
continue
try:
out[pid] = json.loads(f.read_text())
except Exception:
continue
return out
def main() -> int:
args = parse_args()
prompts_arg = Path(args.prompts).resolve()
verdicts_arg = Path(args.verdicts).resolve()
if not prompts_arg.is_dir():
print(f"ERROR: prompts directory not found: {prompts_arg}", file=sys.stderr)
return 2
if not verdicts_arg.is_dir():
print(f"ERROR: verdicts directory not found: {verdicts_arg}", file=sys.stderr)
return 2
output_path = Path(args.output) if args.output else prompts_arg.parent / "duplicates.json"
work_metadata, items = _load_index(prompts_arg)
verdicts = _load_verdicts(verdicts_arg, items)
# Build skills_meta from the index (for size-based dominant selection)
skills_meta: dict[str, dict] = {}
for item in items:
for side in ("skill_a", "skill_b"):
s = item[side]
sid = s["skill_id"]
skills_meta.setdefault(sid, {
"skill_id": sid,
"name": s.get("name") or "",
"primary_path": s.get("primary_path"),
"agent_harnesses": s.get("agent_harnesses") or [],
"owning_package": s.get("owning_package"),
"skill_md_content": "", # not stored in directory mode (would re-load if needed)
"repo": s.get("repo"),
})
# Process verdicts
uf = UnionFind()
duplicate_pair_records: list[dict] = []
overlapping_pair_records: list[dict] = []
failed_pairs: list[dict] = []
by_pair_id = {item["pair_id"]: item for item in items}
work = {"metadata": work_metadata, "candidate_pairs": items}
for pair_id, entry in by_pair_id.items():
verdict = verdicts.get(pair_id)
if verdict is None:
failed_pairs.append({
"skill_a": entry["skill_a"]["skill_id"],
"skill_b": entry["skill_b"]["skill_id"],
"reason": "no verdict in verdicts file",
})
continue
# Per-pair verdicts come from independent subagents; one bad shape
# shouldn't abort the whole finalize. Record it as a failed pair and
# move on, matching how a missing-verdict file is handled.
verdict_valid = validate_against_schema(
verdict,
SCHEMA_DIR / "duplicate-verdict.schema.json",
role=f"verdict ({pair_id})",
source="finalize_duplicates.py",
strict=False,
)
if not verdict_valid:
failed_pairs.append({
"skill_a": entry["skill_a"]["skill_id"],
"skill_b": entry["skill_b"]["skill_id"],
"reason": "verdict failed schema validation",
})
continue
v = verdict.get("verdict")
if v == "duplicate":
a, b = entry["skill_a"]["skill_id"], entry["skill_b"]["skill_id"]
uf.union(a, b)
duplicate_pair_records.append({
"pair_id": pair_id,
"skill_a": a,
"skill_b": b,
"verdict": "duplicate",
"reason": verdict.get("reason", ""),
"dominant": verdict.get("dominant"),
"similarity_score": entry["similarity_score"],
})
elif v == "overlapping":
overlapping_pair_records.append({
"skill_a": entry["skill_a"]["skill_id"],
"skill_b": entry["skill_b"]["skill_id"],
"verdict": "overlapping",
"reason": verdict.get("reason", ""),
"similarity_score": entry["similarity_score"],
"severity": (
"high" if entry["similarity_score"] >= 0.7
else "medium" if entry["similarity_score"] >= 0.5
else "low"
),
})
elif v == "independent":
pass # drop
else:
failed_pairs.append({
"skill_a": entry["skill_a"]["skill_id"],
"skill_b": entry["skill_b"]["skill_id"],
"reason": f"unrecognised verdict '{v}'",
})
# Build clusters from union-find
components: dict[str, list[str]] = {}
for sid in {p["skill_a"] for p in duplicate_pair_records} | {p["skill_b"] for p in duplicate_pair_records}:
root = uf.find(sid)
components.setdefault(root, []).append(sid)
clusters = []
for i, (root, ids) in enumerate(sorted(components.items(), key=lambda kv: -len(kv[1]))):
if len(ids) < 2:
continue
ids_sorted = sorted(ids)
cluster_pairs = [
r for r in duplicate_pair_records
if r["skill_a"] in ids_sorted and r["skill_b"] in ids_sorted
]
dominant = pick_dominant(ids_sorted, cluster_pairs, skills_meta)
severity = cluster_severity(len(ids_sorted), ids_sorted, skills_meta)
# Synthesize a one-line reason from the most-common pair reason
reasons = [r["reason"] for r in cluster_pairs if r.get("reason")]
cluster_reason = reasons[0] if reasons else "Multiple skills converge on the same workflow."
clusters.append({
"cluster_id": f"cluster-{i + 1:03d}",
"skill_ids": ids_sorted,
"dominant_skill_id": dominant,
"verdict": "duplicate",
"reason": cluster_reason,
"severity": severity,
"confirmed_pairs": [
{k: r[k] for k in ("skill_a", "skill_b", "verdict", "reason", "similarity_score")}
for r in cluster_pairs
],
})
# Sort overlapping_pairs by similarity desc
overlapping_pair_records.sort(key=lambda r: -r["similarity_score"])
skills_in_clusters = {sid for c in clusters for sid in c["skill_ids"]}
skills_in_overlapping = {p["skill_a"] for p in overlapping_pair_records} | {p["skill_b"] for p in overlapping_pair_records}
total_implicated = skills_in_clusters | skills_in_overlapping
reduction_potential = sum(len(c["skill_ids"]) - 1 for c in clusters)
output = {
"schema_version": SCHEMA_VERSION,
"metadata": {
"scan_id": work.get("metadata", {}).get("scan_id"),
"scanned_at": datetime.now(timezone.utc).isoformat(),
"tool_version": TOOL_VERSION,
"skill_count": work.get("metadata", {}).get("skill_count", 0),
"candidate_pairs_generated": work.get("metadata", {}).get("candidate_pairs_generated", 0),
"candidate_pairs_evaluated": len(by_pair_id),
"max_pairs_cap": work.get("metadata", {}).get("max_pairs_cap"),
"allow_cross_repo": work.get("metadata", {}).get("allow_cross_repo", False),
"llm_calls": len([v for v in verdicts.values() if isinstance(v, dict)]),
"failed_pairs": failed_pairs,
},
"clusters": clusters,
"overlapping_pairs": overlapping_pair_records,
"estate_summary": {
"duplicate_clusters": len(clusters),
"skills_in_clusters": len(skills_in_clusters),
"overlapping_pair_count": len(overlapping_pair_records),
"total_skills_implicated": len(total_implicated),
"estimated_skill_reduction_potential": reduction_potential,
},
"warnings": [],
}
validate_against_schema(
output,
SCHEMA_DIR / "duplicates.schema.json",
role="output",
source="finalize_duplicates.py",
)
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(json.dumps(output, indent=2))
if not args.keep_intermediates:
_cleanup_intermediates(prompts_arg, verdicts_arg)
print(
f"Duplicate finalize complete.\n"
f" Pairs evaluated: {len(by_pair_id)}\n"
f" Confirmed duplicates: {len(duplicate_pair_records)}\n"
f" Overlapping pairs: {len(overlapping_pair_records)}\n"
f" Clusters: {len(clusters)} (skills implicated: {len(skills_in_clusters)})\n"
f" Reduction potential: {reduction_potential}\n"
f" Failed pairs: {len(failed_pairs)}\n"
f" Output: {output_path}",
file=sys.stderr,
)
return 0
if __name__ == "__main__":
sys.exit(main())