Scan a directory or workspace for SKILL.md files across all agents and repos, capture supporting files (references, scripts, linked docs), dedupe vendored copies, enrich each Tessl tile with registry signals, and emit a canonical JSON inventory validated by JSON Schema. Then run four analytical phases in parallel against the inventory — staleness + git provenance (history, broken refs, contributors), quality (Tessl `skill review`), duplicates (similarity + LLM judgement), registry-search (per-standalone-skill registry suggestions, HTTP only) — and render a self-contained interactive HTML report with a top-of-report health overview, top-issues panel, recently-changed list, and per-tessl.json manifests view.
84
90%
Does it follow best practices?
Impact
97%
1.44xAverage score across 2 eval scenarios
Advisory
Suggest reviewing before use
#!/usr/bin/env python3
"""Prepare candidate pairs for duplicate detection.
Reads a discovery.json, computes Jaccard similarity over tokenised name +
description (and body preview), generates candidate pairs, loads both skills'
SKILL.md content, composes the judgement prompt, and emits a work file the
orchestrator can hand to subagents.
No LLM calls in this script — purely deterministic pre-screen.
Usage:
prepare_duplicates.py --discovery <path> [--prompts-dir <path>]
[--max-pairs N] [--allow-cross-repo]
(default --max-pairs is 10)
"""
from __future__ import annotations
import argparse
import json
import re
import sys
from datetime import datetime, timezone
from itertools import combinations
from pathlib import Path
# Resolve the bundled schema directory and load the shared validation helper.
# Tile layout: <tile-root>/skills/<phase>/scripts/<script>.py
# <tile-root>/references/schemas/{_validate.py, *.schema.json}
_SCHEMA_DIR_PATH = Path(__file__).resolve().parent.parent.parent.parent / "references" / "schemas"
if str(_SCHEMA_DIR_PATH) not in sys.path:
sys.path.insert(0, str(_SCHEMA_DIR_PATH))
from _validate import SCHEMA_DIR, validate_against_schema # noqa: E402
TOOL_VERSION = "skill-insights@0.10.0"
SCHEMA_VERSION = "1.0"
STOPWORDS = {
"and", "the", "a", "an", "of", "to", "for", "in", "on", "with",
"skill", "skills", "tile", "use", "when", "this", "that", "is",
"are", "or", "as", "by", "be", "it", "from",
}
NEVER_COMPARE_PATH_PATTERNS = [
re.compile(r"(^|/)tests?/__fixtures__(/|$)"),
re.compile(r"(^|/)tests?/fixtures(/|$)"),
re.compile(r"(^|/)__fixtures__(/|$)"),
re.compile(r"(^|/)local/repos/"),
]
def tokenise(text: str) -> set[str]:
if not text:
return set()
tokens = re.split(r"[^A-Za-z0-9]+", text.lower())
return {t for t in tokens if t and t not in STOPWORDS and len(t) > 1}
def jaccard(a: set[str], b: set[str]) -> float:
if not a and not b:
return 0.0
return len(a & b) / max(1, len(a | b))
def common_significant_word(a: str, b: str) -> bool:
"""Both names share a meaningful noun/verb (lint, sync, review, analyze, etc.)."""
significant = {
"lint", "sync", "review", "analyze", "discover", "audit", "inspect",
"create", "build", "generate", "publish", "deploy", "test", "validate",
"verify", "check", "scan", "find", "detect", "fix", "format",
"summarize", "summarise", "summary", "compare", "diff", "merge",
"search", "query",
}
a_tokens = tokenise(a)
b_tokens = tokenise(b)
return bool(a_tokens & b_tokens & significant)
def is_never_compare_path(path: str) -> bool:
p = "/" + path
return any(pat.search(p) for pat in NEVER_COMPARE_PATH_PATTERNS)
def load_skill_md_content(repo_path: Path, primary_path: str) -> str:
"""Read the SKILL.md content from disk. Returns empty string on failure."""
full = repo_path / primary_path
try:
return full.read_text(encoding="utf-8", errors="replace")
except Exception:
return ""
JUDGEMENT_PROMPT_TEMPLATE = """You are comparing two skills to decide if they are duplicates, overlapping, or independent.
# Skill A: {name_a}
**id**: {skill_id_a}
**path**: {primary_path_a}
```
{content_a}
```
# Skill B: {name_b}
**id**: {skill_id_b}
**path**: {primary_path_b}
```
{content_b}
```
# Definitions
- **duplicate**: the two skills cover the same workflow; either could be deleted with no loss of capability. Body content is functionally equivalent or near-equivalent.
- **overlapping**: meaningful shared scope, but each has unique value. Worth flagging for refactor (extract shared content, or merge with a wider scope).
- **independent**: separate concerns despite name/description similarity. The pre-screen flagged a similarity that doesn't reflect real semantic overlap.
# Output
Return ONLY this JSON object (no prose, no markdown):
{{
"pair_id": "{pair_id}",
"verdict": "duplicate" | "overlapping" | "independent",
"reason": "<one sentence explaining your call>",
"dominant": "<skill_id of the better-quality / more-tested / more-recent skill, OR null if genuinely interchangeable>"
}}
Use:
- `dominant = "{skill_id_a}"` or `dominant = "{skill_id_b}"` (verbatim) — not the name
- `dominant = null` only if the two are truly equivalent
Be conservative — only return `duplicate` if a careful reader would conclude that keeping both is redundant.
"""
def compose_prompt(pair_id: str, a: dict, b: dict) -> str:
return JUDGEMENT_PROMPT_TEMPLATE.format(
pair_id=pair_id,
skill_id_a=a["skill_id"],
skill_id_b=b["skill_id"],
name_a=a["name"],
name_b=b["name"],
primary_path_a=a["primary_path"],
primary_path_b=b["primary_path"],
content_a=a["skill_md_content"][:8000], # cap at 8KB per skill
content_b=b["skill_md_content"][:8000],
)
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(description="Prepare duplicate-detection prompts (one file per pair).")
p.add_argument("--discovery", required=True, help="Path to discovery.json")
p.add_argument("--prompts-dir", default=None,
help="Directory to write per-pair prompt files (default: <dirname(discovery)>/duplicates-prompts/)")
p.add_argument("--max-pairs", type=int, default=10,
help="Maximum candidate pairs to send to LLM judges (default: 10). "
"Prep ranks by similarity score and keeps the top N.")
p.add_argument("--allow-cross-repo", action="store_true", default=False)
return p.parse_args()
def main() -> int:
args = parse_args()
discovery_path = Path(args.discovery).resolve()
if not discovery_path.exists():
print(f"ERROR: discovery file not found: {discovery_path}", file=sys.stderr)
return 2
prompts_dir = Path(args.prompts_dir) if args.prompts_dir else discovery_path.parent / "duplicates-prompts"
discovery = json.loads(discovery_path.read_text())
validate_against_schema(
discovery,
SCHEMA_DIR / "discovery.schema.json",
role="input (discovery.json)",
source="prepare_duplicates.py",
)
repos = {r["repo_id"]: r for r in discovery.get("metadata", {}).get("repos", [])}
# Read the canonical flat discovery.skills[] (schema 1.1).
skills: list[dict] = []
for s in discovery.get("skills", []):
owning = s.get("owning_package") or {}
skills.append({
"skill_id": s["skill_id"],
"name": s.get("name") or "",
"description": s.get("description") or "",
"repo": s["repo"],
"primary_path": s["primary_path"],
"content": s.get("content") or {},
"content_hash": s.get("content_hash"),
"owning_package": s.get("owning_package"),
"agent_harnesses": s.get("agent_harnesses") or [],
"tile_id": owning.get("name") if owning.get("kind") == "tessl_tile" else None,
"is_declared": bool(s.get("declared_in")),
})
# Compute token sets once per skill
enriched: list[dict] = []
for s in skills:
name = s.get("name") or ""
desc = s.get("description") or ""
body_preview = (s.get("content") or {}).get("body_preview") or ""
nd_tokens = tokenise(name + " " + desc)
body_tokens = tokenise(body_preview[:800])
if is_never_compare_path(s.get("primary_path") or ""):
continue
enriched.append({
"skill": s,
"nd_tokens": nd_tokens,
"body_tokens": body_tokens,
})
# Generate candidate pairs
candidates: list[dict] = []
for a, b in combinations(enriched, 2):
sa = a["skill"]
sb = b["skill"]
if sa["skill_id"] == sb["skill_id"]:
continue
if sa["content_hash"] == sb["content_hash"]:
continue
if not args.allow_cross_repo and sa["repo"] != sb["repo"]:
continue
nd_jac = jaccard(a["nd_tokens"], b["nd_tokens"])
body_jac = jaccard(a["body_tokens"], b["body_tokens"])
same_name = sa["name"] == sb["name"] and sa["name"] != ""
word_overlap = common_significant_word(sa["name"], sb["name"])
is_candidate = nd_jac >= 0.4 or same_name or body_jac >= 0.3 or word_overlap
if not is_candidate:
continue
# Combined score for ranking when capped
score = nd_jac * 0.6 + body_jac * 0.4
if same_name:
score = max(score, 0.95)
# Prefer pairs where both skills are agent-loaded
a_active = bool(sa.get("agent_harnesses"))
b_active = bool(sb.get("agent_harnesses"))
both_active = a_active and b_active
candidates.append({
"skill_a": sa,
"skill_b": sb,
"similarity_score": round(score, 4),
"name_desc_jaccard": round(nd_jac, 4),
"body_jaccard": round(body_jac, 4),
"same_name": same_name,
"both_active": both_active,
})
# Sort: prefer (both_active, score)
candidates.sort(key=lambda c: (-int(c["both_active"]), -c["similarity_score"]))
total_generated = len(candidates)
selected = candidates[: args.max_pairs]
# Compose work entries
work_entries: list[dict] = []
for i, c in enumerate(selected):
sa = c["skill_a"]
sb = c["skill_b"]
repo_a = repos.get(sa["repo"])
repo_b = repos.get(sb["repo"])
repo_path_a = Path(repo_a["path"]) if repo_a else None
repo_path_b = Path(repo_b["path"]) if repo_b else None
content_a = (
load_skill_md_content(repo_path_a, sa["primary_path"]) if repo_path_a else ""
)
content_b = (
load_skill_md_content(repo_path_b, sb["primary_path"]) if repo_path_b else ""
)
pair_id = f"p{i + 1:03d}"
a_summary = {
"skill_id": sa["skill_id"],
"name": sa["name"],
"repo": sa["repo"],
"primary_path": sa["primary_path"],
"agent_harnesses": sa.get("agent_harnesses") or [],
"tile_id": sa.get("tile_id"),
"owning_package": sa.get("owning_package"),
"skill_md_content": content_a,
}
b_summary = {
"skill_id": sb["skill_id"],
"name": sb["name"],
"repo": sb["repo"],
"primary_path": sb["primary_path"],
"agent_harnesses": sb.get("agent_harnesses") or [],
"tile_id": sb.get("tile_id"),
"owning_package": sb.get("owning_package"),
"skill_md_content": content_b,
}
prompt = compose_prompt(pair_id, a_summary, b_summary)
work_entries.append({
"pair_id": pair_id,
"skill_a": a_summary,
"skill_b": b_summary,
"similarity_score": c["similarity_score"],
"judgement_prompt": prompt,
})
# Write per-pair prompts to disk + an index.json manifest
prompts_dir.mkdir(parents=True, exist_ok=True)
for f in prompts_dir.iterdir():
if f.is_file() and (f.suffix in (".txt", ".json")):
f.unlink()
index_entries: list[dict] = []
for i, entry in enumerate(work_entries):
prompt_filename = f"{i:03d}.txt"
prompt_path = prompts_dir / prompt_filename
prompt_path.write_text(entry["judgement_prompt"])
# Index keeps everything except the fully-composed prompt (which is on disk)
index_entries.append({
"idx": i,
"pair_id": entry["pair_id"],
"skill_a": {k: v for k, v in entry["skill_a"].items() if k != "skill_md_content"},
"skill_b": {k: v for k, v in entry["skill_b"].items() if k != "skill_md_content"},
"similarity_score": entry["similarity_score"],
"name_desc_jaccard": selected[i].get("name_desc_jaccard"),
"body_jaccard": selected[i].get("body_jaccard"),
"same_name": selected[i].get("same_name"),
"both_active": selected[i].get("both_active"),
"prompt_filename": prompt_filename,
"prompt_path": str(prompt_path),
"verdict_filename": f"{i:03d}.json",
})
index_doc = {
"schema_version": SCHEMA_VERSION,
"metadata": {
"scan_id": discovery.get("metadata", {}).get("scan_id"),
"scanned_at": datetime.now(timezone.utc).isoformat(),
"tool_version": TOOL_VERSION,
"skill_count": len(skills),
"candidate_pairs_generated": total_generated,
"candidate_pairs_evaluated": len(work_entries),
"max_pairs_cap": args.max_pairs,
"allow_cross_repo": args.allow_cross_repo,
"prompts_dir": str(prompts_dir),
},
"items": index_entries,
}
validate_against_schema(
index_doc,
SCHEMA_DIR / "duplicates-prompts-index.schema.json",
role="output (index.json)",
source="prepare_duplicates.py",
)
(prompts_dir / "index.json").write_text(json.dumps(index_doc, indent=2))
prompt_range = (
f"{prompts_dir}/{{000..{len(work_entries)-1:03d}}}.txt"
if work_entries else f"{prompts_dir}/ (no prompt files; cap selected 0 pairs)"
)
verdict_range = (
f"<output-dir>/duplicates-verdicts/{{000..{len(work_entries)-1:03d}}}.json"
if work_entries else "<output-dir>/duplicates-verdicts/ (no verdict files needed)"
)
print(
f"Duplicate prep complete.\n"
f" Skills considered: {len(skills)}\n"
f" Candidate pairs (raw): {total_generated}\n"
f" Candidate pairs (kept): {len(work_entries)} (cap {args.max_pairs})\n"
f" Cross-repo: {args.allow_cross_repo}\n"
f" Prompts: {prompt_range}\n"
f" Index: {prompts_dir}/index.json\n"
f" Verdicts go to: {verdict_range}",
file=sys.stderr,
)
return 0
if __name__ == "__main__":
sys.exit(main())