CtrlK
BlogDocsLog inGet started
Tessl Logo

tessleng/skill-insights

Scan a directory or workspace for SKILL.md files across all agents and repos, capture supporting files (references, scripts, linked docs), dedupe vendored copies, enrich each Tessl tile with registry signals, and emit a canonical JSON inventory validated by JSON Schema. Then run four analytical phases in parallel against the inventory — staleness + git provenance (history, broken refs, contributors), quality (Tessl `skill review`), duplicates (similarity + LLM judgement), registry-search (per-standalone-skill registry suggestions, HTTP only) — and render a self-contained interactive HTML report with a top-of-report health overview, top-issues panel, recently-changed list, and per-tessl.json manifests view.

84

1.44x
Quality

90%

Does it follow best practices?

Impact

97%

1.44x

Average score across 2 eval scenarios

SecuritybySnyk

Advisory

Suggest reviewing before use

Overview
Quality
Evals
Security
Files

analyze_staleness.pyskills/analyze-skill-staleness/scripts/

#!/usr/bin/env python3
"""Skill staleness + git provenance analyzer.

Reads a discovery.json produced by discover-skills, runs `git log` once per
skill (parsed for date / SHA / author / subject in a single pass), computes
broken-reference + age signals, and derives per-skill provenance (created_by,
last_modified_by, top contributors, recent commits) from the same stream.

Output conforms to references/schemas/staleness.schema.json.

Fully deterministic — no LLM, no agent judgement. Stdlib + git only.
`jsonschema` is used to validate input/output at the IO boundary when
available; otherwise the script falls back to no validation with a single
stderr warning.

Usage:
    analyze_staleness.py --discovery <path> [--output <path>]
"""
from __future__ import annotations

import argparse
import json
import os
import re
import statistics
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path

# Resolve the bundled schema directory and load the shared validation helper.
# Tile layout: <tile-root>/skills/<phase>/scripts/<script>.py
#              <tile-root>/references/schemas/{_validate.py, *.schema.json}
_SCHEMA_DIR_PATH = Path(__file__).resolve().parent.parent.parent.parent / "references" / "schemas"
if str(_SCHEMA_DIR_PATH) not in sys.path:
    sys.path.insert(0, str(_SCHEMA_DIR_PATH))
from _validate import SCHEMA_DIR, validate_against_schema  # noqa: E402

TOOL_VERSION = "skill-insights@0.10.0"
SCHEMA_VERSION = "1.1"

THRESHOLDS = {
    "fresh_days": 30,
    "warm_days": 90,
    "stale_days": 180,
    "ancient_days": 365,
}

PROVENANCE_TOP_CONTRIBUTORS = 10
PROVENANCE_RECENT_COMMITS = 5


# ── Helpers ────────────────────────────────────────────────────────────────


def run_git(cwd: Path, args: list[str], timeout: int = 10) -> str | None:
    try:
        result = subprocess.run(
            ["git", *args], cwd=str(cwd),
            capture_output=True, text=True, timeout=timeout,
        )
        if result.returncode != 0:
            return None
        return result.stdout.strip() or None
    except Exception:
        return None


def parse_iso(s: str | None) -> datetime | None:
    if not s:
        return None
    try:
        return datetime.fromisoformat(s.replace("Z", "+00:00"))
    except Exception:
        return None


def days_between(a: datetime | None, now: datetime) -> int | None:
    if a is None:
        return None
    return max(0, (now - a).days)


# ── Per-skill computation ──────────────────────────────────────────────────


def _git_signals_for_path(repo_path: Path, rel_path: str) -> dict | None:
    """Try git log for a single path. Returns signals + provenance, or None.

    A single `git log` call gives us every commit touching this path with date,
    author and subject. We derive last/first/count, last_modified_by,
    created_by, top contributors, and the most recent commits from that one
    stream — fewer subprocess calls than the per-signal split would need, and
    no extra cost compared to the previous (last + first + count) split.

    Format `%H\\t%aI\\t%an\\t%ae\\t%s` is tab-separated; the subject (last
    field) may itself contain tabs, so we cap split() at 4 to keep the rest.
    """
    full_path = repo_path / rel_path
    if not full_path.exists() and not full_path.is_symlink():
        return None

    raw = run_git(
        repo_path,
        ["log", "--format=%H%x09%aI%x09%an%x09%ae%x09%s", "--", rel_path],
    )
    if not raw:
        return None

    rows: list[dict] = []
    for line in raw.split("\n"):
        if not line:
            continue
        parts = line.split("\t", 4)
        if len(parts) != 5:
            continue
        sha, date, name, email, subject = parts
        rows.append({
            "sha": sha, "date": date,
            "name": name, "email": email,
            "subject": subject,
        })
    if not rows:
        return None

    last_row = rows[0]
    first_row = rows[-1]

    contrib_counts: dict[tuple[str, str], int] = {}
    for r in rows:
        key = (r["name"], r["email"])
        contrib_counts[key] = contrib_counts.get(key, 0) + 1
    contributors = sorted(
        (
            {"name": name, "email": email, "commits": n}
            for (name, email), n in contrib_counts.items()
        ),
        key=lambda c: (-c["commits"], c["name"].lower()),
    )[:PROVENANCE_TOP_CONTRIBUTORS]

    recent_commits = [
        {
            "sha": r["sha"][:12],
            "date": r["date"],
            "author": r["name"],
            "subject": r["subject"],
        }
        for r in rows[:PROVENANCE_RECENT_COMMITS]
    ]

    return {
        "last_modified": last_row["date"],
        "first_seen": first_row["date"],
        "commit_count": min(len(rows), 100),
        "tracked_path": rel_path,
        "git_provenance": {
            "created_by": {"name": first_row["name"], "email": first_row["email"]},
            "last_modified_by": {"name": last_row["name"], "email": last_row["email"]},
            "contributors": contributors,
            "recent_commits": recent_commits,
        },
    }


# Path priority for git tracking lookups: try the actual tracked source first.
# Vendored copies in .claude/skills/, .agents/skills/, .cursor/skills/ etc. are
# typically gitignored in monorepo-style repos; the real tracked file is under
# tiles/ or .tessl/tiles/ (or wherever the authored source lives).
_TRACKED_PATH_PRIORITY = [
    "tiles/",      # authored source (highest priority — real edit history lives here)
    "tile/",       # singular variant
    ".tessl/tiles/",  # installed materialisation (often gitignored, but check)
    ".claude/skills/",
    ".agents/skills/",
    ".cursor/skills/",
]


def get_git_signals(repo_path: Path, all_paths: list[str], primary_path: str) -> dict:
    """Try git log against each path in priority order. Returns the first that has history.

    Falls back through all_paths because vendored copies (.claude/skills/, .agents/skills/, etc.)
    are usually gitignored — the tracked source is under tiles/ or similar. We try the most
    likely-tracked locations first.
    """
    def rank(p: str) -> tuple[int, str]:
        for i, prefix in enumerate(_TRACKED_PATH_PRIORITY):
            if prefix in "/" + p.replace("\\", "/"):
                return (i, p)
        return (999, p)

    ordered = sorted(all_paths, key=rank)
    if primary_path in ordered:
        # Try primary first if it ranks better than all others; otherwise use the priority order
        if rank(primary_path)[0] <= 2:  # tiles/, tile/, or .tessl/tiles/
            ordered = [primary_path] + [p for p in ordered if p != primary_path]

    for path in ordered:
        result = _git_signals_for_path(repo_path, path)
        if result is not None:
            return result

    return {
        "last_modified": None,
        "first_seen": None,
        "commit_count": 0,
        "tracked_path": None,
        "git_provenance": None,
    }


def extract_broken_refs(skill: dict, discovery_warnings: list[str]) -> list[dict]:
    """Pick out broken-reference warnings that match this skill's paths.

    discovery.warnings entries look like:
       'broken link in <repo_id>/<path>: <target>'
    """
    refs: list[dict] = []
    seen: set[str] = set()
    paths = set(skill["all_paths"])
    repo = skill["repo"]
    for w in discovery_warnings:
        if not w.startswith("broken link in "):
            continue
        # Format: 'broken link in <repo>/<path>: <target>'
        try:
            after = w[len("broken link in "):]
            location, target = after.rsplit(": ", 1)
        except ValueError:
            continue
        # Strip leading repo_id/
        prefix = repo + "/"
        if not location.startswith(prefix):
            continue
        rel_path = location[len(prefix):]
        if rel_path not in paths:
            continue
        target = target.strip()
        # Determine a coarse kind (used for grouping/display only)
        ext = os.path.splitext(target)[1].lower()
        if ext in (".sh", ".py", ".js", ".mjs", ".ts", ".tsx", ".jsx",
                   ".go", ".rs", ".rb", ".java", ".kt", ".swift", ".c",
                   ".cc", ".cpp", ".h", ".hpp", ".cs", ".php", ".pl",
                   ".lua", ".scala", ".groovy") or "dockerfile" in target.lower():
            kind = "code"
        elif ext in (".md", ".mdc", ".txt", ".rst", ".adoc"):
            kind = "doc"
        elif ext in (".json", ".yaml", ".yml", ".toml", ".ini",
                     ".tf", ".tfvars", ".env"):
            kind = "config"
        else:
            kind = "other"
        key = (rel_path, target)
        if key in seen:
            continue
        seen.add(key)
        refs.append({
            "target": target,
            "kind": kind,
            "source": rel_path,
        })
    return refs


def compute_staleness_score(
    days_since_modified: int | None,
    broken_refs: list[dict],
    commit_count: int,
    repo_median_days: int | None,
    tile_update_available: bool = False,
) -> int:
    score = 0

    if days_since_modified is None:
        score += 20  # unknown-age penalty
    else:
        if days_since_modified > 30:
            score += 5
        if days_since_modified > 90:
            score += 15
        if days_since_modified > 180:
            score += 25
        if days_since_modified > 365:
            score += 25

    score += min(len(broken_refs) * 10, 30)

    if repo_median_days is not None and repo_median_days > 180:
        if days_since_modified is not None and days_since_modified > 90:
            score += 5

    if commit_count == 1:
        score += 10  # never updated since first commit

    # An installed tile with an available registry update is implicitly stale
    # versus the registry — regardless of when the local copy was last edited.
    if tile_update_available:
        score += 15

    return min(100, score)


def compute_factors(
    days_since_modified: int | None,
    broken_refs: list[dict],
    commit_count: int,
    repo_median_days: int | None,
    tile_update_available: bool = False,
) -> list[str]:
    factors: list[str] = []
    if days_since_modified is None:
        factors.append("no_git_history")
    else:
        if days_since_modified > 365:
            factors.append("older_than_365_days")
        elif days_since_modified > 180:
            factors.append("older_than_180_days")
        if (
            repo_median_days is not None
            and repo_median_days > 0
            and days_since_modified > 1.5 * repo_median_days
        ):
            factors.append("older_than_repo_median")
    if broken_refs:
        factors.append("broken_references")
    if commit_count == 1:
        factors.append("never_modified")
    if tile_update_available:
        factors.append("registry_update_available")
    return factors


def compute_bucket(score: int, days_since_modified: int | None) -> str:
    if days_since_modified is None:
        return "unknown"
    if score >= 70 or days_since_modified > 365:
        return "ancient"
    if 40 <= score <= 70 or 90 <= days_since_modified <= 365:
        return "stale"
    if days_since_modified < 90 and score < 40:
        if days_since_modified < 30 and score < 20:
            return "fresh"
        return "warm"
    return "stale"


# ── Main ───────────────────────────────────────────────────────────────────


def parse_args() -> argparse.Namespace:
    p = argparse.ArgumentParser(description="Compute per-skill staleness signals from a discovery.json")
    p.add_argument("--discovery", required=True, help="Path to discovery.json")
    p.add_argument("--output", default=None, help="Output staleness.json path")
    return p.parse_args()


def main() -> int:
    args = parse_args()
    discovery_path = Path(args.discovery).resolve()
    if not discovery_path.exists():
        print(f"ERROR: discovery file not found: {discovery_path}", file=sys.stderr)
        return 2

    output_path = Path(args.output) if args.output else discovery_path.parent / "staleness.json"

    discovery = json.loads(discovery_path.read_text())
    validate_against_schema(
        discovery,
        SCHEMA_DIR / "discovery.schema.json",
        role="input (discovery.json)",
        source="analyze_staleness.py",
    )
    repos = {r["repo_id"]: r for r in discovery.get("metadata", {}).get("repos", [])}
    warnings_in: list[str] = list(discovery.get("warnings", []))

    # Discovery 1.2+ adds tiles[] with `outdated` and `registry` enrichment.
    # We use those as additional staleness signals: a tile with an available
    # update is implicitly stale relative to the registry, regardless of git.
    tile_lookup_by_id: dict[str, dict] = {}
    tile_lookup_by_name: dict[tuple[str, str], list[dict]] = {}
    for t in discovery.get("tiles", []):
        tile_lookup_by_id[t["tile_id"]] = t
        tile_lookup_by_name.setdefault((t["repo"], t["name"]), []).append(t)

    # Read the canonical flat discovery.skills[].
    # Project to the minimal shape staleness needs; keep tile/declared metadata
    # for downstream display, plus outdated/last-scored signals when available.
    skills: list[dict] = []
    for s in discovery.get("skills", []):
        owning = s.get("owning_package") or {}
        declared = s.get("declared_in") or []
        tile_name = owning.get("name") if owning.get("kind") == "tessl_tile" else None
        tile_id = s.get("tile_id")
        owning_tile = tile_lookup_by_id.get(tile_id) if tile_id else None
        if owning_tile is None and tile_name:
            candidates = tile_lookup_by_name.get((s["repo"], tile_name), [])
            owning_tile = candidates[0] if len(candidates) == 1 else None
            tile_id = (owning_tile or {}).get("tile_id") or tile_id
        outdated = (owning_tile or {}).get("outdated") or {}
        registry = (owning_tile or {}).get("registry") or {}
        registry_scores = registry.get("scores") or {}
        skills.append({
            "skill_id": s["skill_id"],
            "primary_path": s["primary_path"],
            "all_paths": s.get("all_paths", [s["primary_path"]]),
            "repo": s["repo"],
            "tile_id": tile_id,
            "tile_name": tile_name,
            "tier": s.get("tier", "non_tile"),
            "is_declared": bool(declared),
            "tile_update_available": bool(outdated.get("update_available")),
            "tile_current_version": outdated.get("current"),
            "tile_latest_version": outdated.get("latest"),
            "tile_last_scored_at": registry_scores.get("lastScoredAt"),
        })

    now = datetime.now(timezone.utc)

    # Per-repo median computed in two passes (need all skill ages first)
    raw_per_skill: list[dict] = []
    repo_skill_ages: dict[str, list[int]] = {}

    for s in skills:
        repo_id = s["repo"]
        repo_meta = repos.get(repo_id)
        if not repo_meta:
            warnings_in.append(f"skill '{s['skill_id']}' references unknown repo '{repo_id}'")
            git = {
                "last_modified": None, "first_seen": None,
                "commit_count": 0, "tracked_path": None,
                "git_provenance": None,
            }
        else:
            repo_path = Path(repo_meta["path"])
            git = get_git_signals(repo_path, s["all_paths"], s["primary_path"])

        last_dt = parse_iso(git["last_modified"])
        first_dt = parse_iso(git["first_seen"])
        days_modified = days_between(last_dt, now)
        days_first = days_between(first_dt, now)

        broken = extract_broken_refs(s, warnings_in)

        if days_modified is not None:
            repo_skill_ages.setdefault(repo_id, []).append(days_modified)

        raw_per_skill.append({
            "skill": s,
            "git": git,
            "days_modified": days_modified,
            "days_first": days_first,
            "broken": broken,
        })

    repo_medians = {
        rid: int(statistics.median(ages)) if ages else None
        for rid, ages in repo_skill_ages.items()
    }

    per_skill: list[dict] = []
    for raw in raw_per_skill:
        s = raw["skill"]
        days_modified = raw["days_modified"]
        commit_count = raw["git"]["commit_count"]
        repo_median = repo_medians.get(s["repo"])

        update_available = bool(s.get("tile_update_available"))
        score = compute_staleness_score(
            days_modified, raw["broken"], commit_count, repo_median, update_available,
        )
        factors = compute_factors(
            days_modified, raw["broken"], commit_count, repo_median, update_available,
        )
        bucket = compute_bucket(score, days_modified)

        per_skill.append({
            "skill_id": s["skill_id"],
            "primary_path": s["primary_path"],
            "tracked_path": raw["git"].get("tracked_path"),
            "repo": s["repo"],
            "tile_id": s.get("tile_id"),
            "tile_name": s.get("tile_name"),
            "tier": s.get("tier"),
            "is_declared": s.get("is_declared", False),
            "tile_update_available": update_available,
            "tile_current_version": s.get("tile_current_version"),
            "tile_latest_version": s.get("tile_latest_version"),
            "tile_last_scored_at": s.get("tile_last_scored_at"),
            "last_modified": raw["git"]["last_modified"],
            "days_since_modified": days_modified,
            "first_seen": raw["git"]["first_seen"],
            "days_since_first_seen": raw["days_first"],
            "commit_count": commit_count,
            "git_provenance": raw["git"].get("git_provenance"),
            "broken_references": raw["broken"],
            "staleness_score": score,
            "staleness_bucket": bucket,
            "factors": factors,
        })

    # Estate summary
    all_ages = [p["days_since_modified"] for p in per_skill if p["days_since_modified"] is not None]
    median_days = int(statistics.median(all_ages)) if all_ages else None
    bucket_counts = {"fresh": 0, "warm": 0, "stale": 0, "ancient": 0, "unknown": 0}
    for p in per_skill:
        bucket_counts[p["staleness_bucket"]] += 1
    skills_with_broken = sum(1 for p in per_skill if p["broken_references"])
    skills_unknown = bucket_counts["unknown"]

    top_offenders = sorted(per_skill, key=lambda p: -p["staleness_score"])[:5]
    top_offender_summary = []
    for p in top_offenders:
        if p["staleness_score"] == 0:
            continue
        reason_bits = []
        if p["days_since_modified"] is not None and p["days_since_modified"] > 180:
            reason_bits.append(f"{p['days_since_modified']}d since modified")
        if p["broken_references"]:
            reason_bits.append(f"{len(p['broken_references'])} broken refs")
        if "no_git_history" in p["factors"]:
            reason_bits.append("no git history")
        top_offender_summary.append({
            "skill_id": p["skill_id"],
            "staleness_score": p["staleness_score"],
            "reason": "; ".join(reason_bits) or "high overall score",
        })

    output = {
        "schema_version": SCHEMA_VERSION,
        "metadata": {
            "scan_id": discovery.get("metadata", {}).get("scan_id"),
            "scanned_at": now.isoformat(),
            "tool_version": TOOL_VERSION,
            "skill_count": len(per_skill),
            "thresholds": THRESHOLDS,
        },
        "per_skill": per_skill,
        "estate_summary": {
            "median_days_since_modified": median_days,
            "skills_with_broken_refs": skills_with_broken,
            "skills_with_unknown_age": skills_unknown,
            "buckets": bucket_counts,
            "top_offenders": top_offender_summary,
        },
        "warnings": [],
    }

    validate_against_schema(
        output,
        SCHEMA_DIR / "staleness.schema.json",
        role="output",
        source="analyze_staleness.py",
    )

    output_path.parent.mkdir(parents=True, exist_ok=True)
    output_path.write_text(json.dumps(output, indent=2))

    top_str = ""
    if top_offender_summary:
        top_str = f"  Top offender:  {top_offender_summary[0]['skill_id']} (score {top_offender_summary[0]['staleness_score']})\n"

    print(
        f"Staleness analysis complete.\n"
        f"  Skills:        {len(per_skill)}\n"
        f"  Median age:    {median_days if median_days is not None else 'unknown'} days\n"
        f"  Broken refs:   {skills_with_broken} skills affected\n"
        f"  Buckets:       fresh={bucket_counts['fresh']}, warm={bucket_counts['warm']}, "
        f"stale={bucket_counts['stale']}, ancient={bucket_counts['ancient']}, unknown={bucket_counts['unknown']}\n"
        f"{top_str}"
        f"  Output:        {output_path}",
        file=sys.stderr,
    )
    return 0


if __name__ == "__main__":
    sys.exit(main())

skills

analyze-skill-staleness

README.md

tile.json