Scan a directory or workspace for SKILL.md files across all agents and repos, capture supporting files (references, scripts, linked docs), dedupe vendored copies, enrich each Tessl tile with registry signals, and emit a canonical JSON inventory validated by JSON Schema. Then run four analytical phases in parallel against the inventory — staleness + git provenance (history, broken refs, contributors), quality (Tessl `skill review`), duplicates (similarity + LLM judgement), registry-search (per-standalone-skill registry suggestions, HTTP only) — and render a self-contained interactive HTML report with a top-of-report health overview, top-issues panel, recently-changed list, and per-tessl.json manifests view.
84
90%
Does it follow best practices?
Impact
97%
1.44xAverage score across 2 eval scenarios
Advisory
Suggest reviewing before use
#!/usr/bin/env python3
"""Skill staleness + git provenance analyzer.
Reads a discovery.json produced by discover-skills, runs `git log` once per
skill (parsed for date / SHA / author / subject in a single pass), computes
broken-reference + age signals, and derives per-skill provenance (created_by,
last_modified_by, top contributors, recent commits) from the same stream.
Output conforms to references/schemas/staleness.schema.json.
Fully deterministic — no LLM, no agent judgement. Stdlib + git only.
`jsonschema` is used to validate input/output at the IO boundary when
available; otherwise the script falls back to no validation with a single
stderr warning.
Usage:
analyze_staleness.py --discovery <path> [--output <path>]
"""
from __future__ import annotations
import argparse
import json
import os
import re
import statistics
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path
# Resolve the bundled schema directory and load the shared validation helper.
# Tile layout: <tile-root>/skills/<phase>/scripts/<script>.py
# <tile-root>/references/schemas/{_validate.py, *.schema.json}
_SCHEMA_DIR_PATH = Path(__file__).resolve().parent.parent.parent.parent / "references" / "schemas"
if str(_SCHEMA_DIR_PATH) not in sys.path:
sys.path.insert(0, str(_SCHEMA_DIR_PATH))
from _validate import SCHEMA_DIR, validate_against_schema # noqa: E402
TOOL_VERSION = "skill-insights@0.10.0"
SCHEMA_VERSION = "1.1"
THRESHOLDS = {
"fresh_days": 30,
"warm_days": 90,
"stale_days": 180,
"ancient_days": 365,
}
PROVENANCE_TOP_CONTRIBUTORS = 10
PROVENANCE_RECENT_COMMITS = 5
# ── Helpers ────────────────────────────────────────────────────────────────
def run_git(cwd: Path, args: list[str], timeout: int = 10) -> str | None:
try:
result = subprocess.run(
["git", *args], cwd=str(cwd),
capture_output=True, text=True, timeout=timeout,
)
if result.returncode != 0:
return None
return result.stdout.strip() or None
except Exception:
return None
def parse_iso(s: str | None) -> datetime | None:
if not s:
return None
try:
return datetime.fromisoformat(s.replace("Z", "+00:00"))
except Exception:
return None
def days_between(a: datetime | None, now: datetime) -> int | None:
if a is None:
return None
return max(0, (now - a).days)
# ── Per-skill computation ──────────────────────────────────────────────────
def _git_signals_for_path(repo_path: Path, rel_path: str) -> dict | None:
"""Try git log for a single path. Returns signals + provenance, or None.
A single `git log` call gives us every commit touching this path with date,
author and subject. We derive last/first/count, last_modified_by,
created_by, top contributors, and the most recent commits from that one
stream — fewer subprocess calls than the per-signal split would need, and
no extra cost compared to the previous (last + first + count) split.
Format `%H\\t%aI\\t%an\\t%ae\\t%s` is tab-separated; the subject (last
field) may itself contain tabs, so we cap split() at 4 to keep the rest.
"""
full_path = repo_path / rel_path
if not full_path.exists() and not full_path.is_symlink():
return None
raw = run_git(
repo_path,
["log", "--format=%H%x09%aI%x09%an%x09%ae%x09%s", "--", rel_path],
)
if not raw:
return None
rows: list[dict] = []
for line in raw.split("\n"):
if not line:
continue
parts = line.split("\t", 4)
if len(parts) != 5:
continue
sha, date, name, email, subject = parts
rows.append({
"sha": sha, "date": date,
"name": name, "email": email,
"subject": subject,
})
if not rows:
return None
last_row = rows[0]
first_row = rows[-1]
contrib_counts: dict[tuple[str, str], int] = {}
for r in rows:
key = (r["name"], r["email"])
contrib_counts[key] = contrib_counts.get(key, 0) + 1
contributors = sorted(
(
{"name": name, "email": email, "commits": n}
for (name, email), n in contrib_counts.items()
),
key=lambda c: (-c["commits"], c["name"].lower()),
)[:PROVENANCE_TOP_CONTRIBUTORS]
recent_commits = [
{
"sha": r["sha"][:12],
"date": r["date"],
"author": r["name"],
"subject": r["subject"],
}
for r in rows[:PROVENANCE_RECENT_COMMITS]
]
return {
"last_modified": last_row["date"],
"first_seen": first_row["date"],
"commit_count": min(len(rows), 100),
"tracked_path": rel_path,
"git_provenance": {
"created_by": {"name": first_row["name"], "email": first_row["email"]},
"last_modified_by": {"name": last_row["name"], "email": last_row["email"]},
"contributors": contributors,
"recent_commits": recent_commits,
},
}
# Path priority for git tracking lookups: try the actual tracked source first.
# Vendored copies in .claude/skills/, .agents/skills/, .cursor/skills/ etc. are
# typically gitignored in monorepo-style repos; the real tracked file is under
# tiles/ or .tessl/tiles/ (or wherever the authored source lives).
_TRACKED_PATH_PRIORITY = [
"tiles/", # authored source (highest priority — real edit history lives here)
"tile/", # singular variant
".tessl/tiles/", # installed materialisation (often gitignored, but check)
".claude/skills/",
".agents/skills/",
".cursor/skills/",
]
def get_git_signals(repo_path: Path, all_paths: list[str], primary_path: str) -> dict:
"""Try git log against each path in priority order. Returns the first that has history.
Falls back through all_paths because vendored copies (.claude/skills/, .agents/skills/, etc.)
are usually gitignored — the tracked source is under tiles/ or similar. We try the most
likely-tracked locations first.
"""
def rank(p: str) -> tuple[int, str]:
for i, prefix in enumerate(_TRACKED_PATH_PRIORITY):
if prefix in "/" + p.replace("\\", "/"):
return (i, p)
return (999, p)
ordered = sorted(all_paths, key=rank)
if primary_path in ordered:
# Try primary first if it ranks better than all others; otherwise use the priority order
if rank(primary_path)[0] <= 2: # tiles/, tile/, or .tessl/tiles/
ordered = [primary_path] + [p for p in ordered if p != primary_path]
for path in ordered:
result = _git_signals_for_path(repo_path, path)
if result is not None:
return result
return {
"last_modified": None,
"first_seen": None,
"commit_count": 0,
"tracked_path": None,
"git_provenance": None,
}
def extract_broken_refs(skill: dict, discovery_warnings: list[str]) -> list[dict]:
"""Pick out broken-reference warnings that match this skill's paths.
discovery.warnings entries look like:
'broken link in <repo_id>/<path>: <target>'
"""
refs: list[dict] = []
seen: set[str] = set()
paths = set(skill["all_paths"])
repo = skill["repo"]
for w in discovery_warnings:
if not w.startswith("broken link in "):
continue
# Format: 'broken link in <repo>/<path>: <target>'
try:
after = w[len("broken link in "):]
location, target = after.rsplit(": ", 1)
except ValueError:
continue
# Strip leading repo_id/
prefix = repo + "/"
if not location.startswith(prefix):
continue
rel_path = location[len(prefix):]
if rel_path not in paths:
continue
target = target.strip()
# Determine a coarse kind (used for grouping/display only)
ext = os.path.splitext(target)[1].lower()
if ext in (".sh", ".py", ".js", ".mjs", ".ts", ".tsx", ".jsx",
".go", ".rs", ".rb", ".java", ".kt", ".swift", ".c",
".cc", ".cpp", ".h", ".hpp", ".cs", ".php", ".pl",
".lua", ".scala", ".groovy") or "dockerfile" in target.lower():
kind = "code"
elif ext in (".md", ".mdc", ".txt", ".rst", ".adoc"):
kind = "doc"
elif ext in (".json", ".yaml", ".yml", ".toml", ".ini",
".tf", ".tfvars", ".env"):
kind = "config"
else:
kind = "other"
key = (rel_path, target)
if key in seen:
continue
seen.add(key)
refs.append({
"target": target,
"kind": kind,
"source": rel_path,
})
return refs
def compute_staleness_score(
days_since_modified: int | None,
broken_refs: list[dict],
commit_count: int,
repo_median_days: int | None,
tile_update_available: bool = False,
) -> int:
score = 0
if days_since_modified is None:
score += 20 # unknown-age penalty
else:
if days_since_modified > 30:
score += 5
if days_since_modified > 90:
score += 15
if days_since_modified > 180:
score += 25
if days_since_modified > 365:
score += 25
score += min(len(broken_refs) * 10, 30)
if repo_median_days is not None and repo_median_days > 180:
if days_since_modified is not None and days_since_modified > 90:
score += 5
if commit_count == 1:
score += 10 # never updated since first commit
# An installed tile with an available registry update is implicitly stale
# versus the registry — regardless of when the local copy was last edited.
if tile_update_available:
score += 15
return min(100, score)
def compute_factors(
days_since_modified: int | None,
broken_refs: list[dict],
commit_count: int,
repo_median_days: int | None,
tile_update_available: bool = False,
) -> list[str]:
factors: list[str] = []
if days_since_modified is None:
factors.append("no_git_history")
else:
if days_since_modified > 365:
factors.append("older_than_365_days")
elif days_since_modified > 180:
factors.append("older_than_180_days")
if (
repo_median_days is not None
and repo_median_days > 0
and days_since_modified > 1.5 * repo_median_days
):
factors.append("older_than_repo_median")
if broken_refs:
factors.append("broken_references")
if commit_count == 1:
factors.append("never_modified")
if tile_update_available:
factors.append("registry_update_available")
return factors
def compute_bucket(score: int, days_since_modified: int | None) -> str:
if days_since_modified is None:
return "unknown"
if score >= 70 or days_since_modified > 365:
return "ancient"
if 40 <= score <= 70 or 90 <= days_since_modified <= 365:
return "stale"
if days_since_modified < 90 and score < 40:
if days_since_modified < 30 and score < 20:
return "fresh"
return "warm"
return "stale"
# ── Main ───────────────────────────────────────────────────────────────────
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(description="Compute per-skill staleness signals from a discovery.json")
p.add_argument("--discovery", required=True, help="Path to discovery.json")
p.add_argument("--output", default=None, help="Output staleness.json path")
return p.parse_args()
def main() -> int:
args = parse_args()
discovery_path = Path(args.discovery).resolve()
if not discovery_path.exists():
print(f"ERROR: discovery file not found: {discovery_path}", file=sys.stderr)
return 2
output_path = Path(args.output) if args.output else discovery_path.parent / "staleness.json"
discovery = json.loads(discovery_path.read_text())
validate_against_schema(
discovery,
SCHEMA_DIR / "discovery.schema.json",
role="input (discovery.json)",
source="analyze_staleness.py",
)
repos = {r["repo_id"]: r for r in discovery.get("metadata", {}).get("repos", [])}
warnings_in: list[str] = list(discovery.get("warnings", []))
# Discovery 1.2+ adds tiles[] with `outdated` and `registry` enrichment.
# We use those as additional staleness signals: a tile with an available
# update is implicitly stale relative to the registry, regardless of git.
tile_lookup_by_id: dict[str, dict] = {}
tile_lookup_by_name: dict[tuple[str, str], list[dict]] = {}
for t in discovery.get("tiles", []):
tile_lookup_by_id[t["tile_id"]] = t
tile_lookup_by_name.setdefault((t["repo"], t["name"]), []).append(t)
# Read the canonical flat discovery.skills[].
# Project to the minimal shape staleness needs; keep tile/declared metadata
# for downstream display, plus outdated/last-scored signals when available.
skills: list[dict] = []
for s in discovery.get("skills", []):
owning = s.get("owning_package") or {}
declared = s.get("declared_in") or []
tile_name = owning.get("name") if owning.get("kind") == "tessl_tile" else None
tile_id = s.get("tile_id")
owning_tile = tile_lookup_by_id.get(tile_id) if tile_id else None
if owning_tile is None and tile_name:
candidates = tile_lookup_by_name.get((s["repo"], tile_name), [])
owning_tile = candidates[0] if len(candidates) == 1 else None
tile_id = (owning_tile or {}).get("tile_id") or tile_id
outdated = (owning_tile or {}).get("outdated") or {}
registry = (owning_tile or {}).get("registry") or {}
registry_scores = registry.get("scores") or {}
skills.append({
"skill_id": s["skill_id"],
"primary_path": s["primary_path"],
"all_paths": s.get("all_paths", [s["primary_path"]]),
"repo": s["repo"],
"tile_id": tile_id,
"tile_name": tile_name,
"tier": s.get("tier", "non_tile"),
"is_declared": bool(declared),
"tile_update_available": bool(outdated.get("update_available")),
"tile_current_version": outdated.get("current"),
"tile_latest_version": outdated.get("latest"),
"tile_last_scored_at": registry_scores.get("lastScoredAt"),
})
now = datetime.now(timezone.utc)
# Per-repo median computed in two passes (need all skill ages first)
raw_per_skill: list[dict] = []
repo_skill_ages: dict[str, list[int]] = {}
for s in skills:
repo_id = s["repo"]
repo_meta = repos.get(repo_id)
if not repo_meta:
warnings_in.append(f"skill '{s['skill_id']}' references unknown repo '{repo_id}'")
git = {
"last_modified": None, "first_seen": None,
"commit_count": 0, "tracked_path": None,
"git_provenance": None,
}
else:
repo_path = Path(repo_meta["path"])
git = get_git_signals(repo_path, s["all_paths"], s["primary_path"])
last_dt = parse_iso(git["last_modified"])
first_dt = parse_iso(git["first_seen"])
days_modified = days_between(last_dt, now)
days_first = days_between(first_dt, now)
broken = extract_broken_refs(s, warnings_in)
if days_modified is not None:
repo_skill_ages.setdefault(repo_id, []).append(days_modified)
raw_per_skill.append({
"skill": s,
"git": git,
"days_modified": days_modified,
"days_first": days_first,
"broken": broken,
})
repo_medians = {
rid: int(statistics.median(ages)) if ages else None
for rid, ages in repo_skill_ages.items()
}
per_skill: list[dict] = []
for raw in raw_per_skill:
s = raw["skill"]
days_modified = raw["days_modified"]
commit_count = raw["git"]["commit_count"]
repo_median = repo_medians.get(s["repo"])
update_available = bool(s.get("tile_update_available"))
score = compute_staleness_score(
days_modified, raw["broken"], commit_count, repo_median, update_available,
)
factors = compute_factors(
days_modified, raw["broken"], commit_count, repo_median, update_available,
)
bucket = compute_bucket(score, days_modified)
per_skill.append({
"skill_id": s["skill_id"],
"primary_path": s["primary_path"],
"tracked_path": raw["git"].get("tracked_path"),
"repo": s["repo"],
"tile_id": s.get("tile_id"),
"tile_name": s.get("tile_name"),
"tier": s.get("tier"),
"is_declared": s.get("is_declared", False),
"tile_update_available": update_available,
"tile_current_version": s.get("tile_current_version"),
"tile_latest_version": s.get("tile_latest_version"),
"tile_last_scored_at": s.get("tile_last_scored_at"),
"last_modified": raw["git"]["last_modified"],
"days_since_modified": days_modified,
"first_seen": raw["git"]["first_seen"],
"days_since_first_seen": raw["days_first"],
"commit_count": commit_count,
"git_provenance": raw["git"].get("git_provenance"),
"broken_references": raw["broken"],
"staleness_score": score,
"staleness_bucket": bucket,
"factors": factors,
})
# Estate summary
all_ages = [p["days_since_modified"] for p in per_skill if p["days_since_modified"] is not None]
median_days = int(statistics.median(all_ages)) if all_ages else None
bucket_counts = {"fresh": 0, "warm": 0, "stale": 0, "ancient": 0, "unknown": 0}
for p in per_skill:
bucket_counts[p["staleness_bucket"]] += 1
skills_with_broken = sum(1 for p in per_skill if p["broken_references"])
skills_unknown = bucket_counts["unknown"]
top_offenders = sorted(per_skill, key=lambda p: -p["staleness_score"])[:5]
top_offender_summary = []
for p in top_offenders:
if p["staleness_score"] == 0:
continue
reason_bits = []
if p["days_since_modified"] is not None and p["days_since_modified"] > 180:
reason_bits.append(f"{p['days_since_modified']}d since modified")
if p["broken_references"]:
reason_bits.append(f"{len(p['broken_references'])} broken refs")
if "no_git_history" in p["factors"]:
reason_bits.append("no git history")
top_offender_summary.append({
"skill_id": p["skill_id"],
"staleness_score": p["staleness_score"],
"reason": "; ".join(reason_bits) or "high overall score",
})
output = {
"schema_version": SCHEMA_VERSION,
"metadata": {
"scan_id": discovery.get("metadata", {}).get("scan_id"),
"scanned_at": now.isoformat(),
"tool_version": TOOL_VERSION,
"skill_count": len(per_skill),
"thresholds": THRESHOLDS,
},
"per_skill": per_skill,
"estate_summary": {
"median_days_since_modified": median_days,
"skills_with_broken_refs": skills_with_broken,
"skills_with_unknown_age": skills_unknown,
"buckets": bucket_counts,
"top_offenders": top_offender_summary,
},
"warnings": [],
}
validate_against_schema(
output,
SCHEMA_DIR / "staleness.schema.json",
role="output",
source="analyze_staleness.py",
)
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(json.dumps(output, indent=2))
top_str = ""
if top_offender_summary:
top_str = f" Top offender: {top_offender_summary[0]['skill_id']} (score {top_offender_summary[0]['staleness_score']})\n"
print(
f"Staleness analysis complete.\n"
f" Skills: {len(per_skill)}\n"
f" Median age: {median_days if median_days is not None else 'unknown'} days\n"
f" Broken refs: {skills_with_broken} skills affected\n"
f" Buckets: fresh={bucket_counts['fresh']}, warm={bucket_counts['warm']}, "
f"stale={bucket_counts['stale']}, ancient={bucket_counts['ancient']}, unknown={bucket_counts['unknown']}\n"
f"{top_str}"
f" Output: {output_path}",
file=sys.stderr,
)
return 0
if __name__ == "__main__":
sys.exit(main())