CtrlK
BlogDocsLog inGet started
Tessl Logo

tessleng/skill-insights

Scan a directory or workspace for SKILL.md files across all agents and repos, capture supporting files (references, scripts, linked docs), dedupe vendored copies, enrich each Tessl tile with registry signals, and emit a canonical JSON inventory validated by JSON Schema. Then run four analytical phases in parallel against the inventory — staleness + git provenance (history, broken refs, contributors), quality (Tessl `skill review`), duplicates (similarity + LLM judgement), registry-search (per-standalone-skill registry suggestions, HTTP only) — and render a self-contained interactive HTML report with a top-of-report health overview, top-issues panel, recently-changed list, and per-tessl.json manifests view.

84

1.44x
Quality

90%

Does it follow best practices?

Impact

97%

1.44x

Average score across 2 eval scenarios

SecuritybySnyk

Advisory

Suggest reviewing before use

Overview
Quality
Evals
Security
Files

discover_skills.pyskills/discover-skills/scripts/

#!/usr/bin/env python3
"""Skill discovery — emits discovery.json per the skill-insights schema.

Scans the target directory for SKILL.md files, dedupes vendored copies by
content hash, captures supporting files (references/, scripts/, linked docs,
bundled directories), and writes a canonical JSON inventory.

Output conforms to references/schemas/discovery.schema.json (validated at
the IO boundary when `jsonschema` is installed; falls back to no validation
with a single stderr warning otherwise).

Usage:
    discover_skills.py [--scan-root PATH] [--output PATH] [--scan-id ID]
    discover_skills.py --help

Environment variables (fall-back if flags not passed):
    SCAN_ROOT       Directory to scan                    (default: $(pwd))
    OUTPUT_PATH     Where to write discovery.json        (default: $SCAN_ROOT/.skill-insights/discovery.json)
    SCAN_ID         Opaque ID for this scan              (default: discovery-YYYYMMDD-HHMMSS)

No external dependencies are required. PyYAML is used for frontmatter
parsing when available (regex fallback otherwise); jsonschema is used for
IO contract validation when available (skipped with a warning otherwise).
"""
from __future__ import annotations

import argparse
import hashlib
import json
import os
import re
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path

# Resolve the bundled schema directory and load the shared validation helper.
# Tile layout: <tile-root>/skills/discover-skills/scripts/discover_skills.py
#              <tile-root>/references/schemas/{_validate.py, *.schema.json}
_SCHEMA_DIR_PATH = Path(__file__).resolve().parent.parent.parent.parent / "references" / "schemas"
if str(_SCHEMA_DIR_PATH) not in sys.path:
    sys.path.insert(0, str(_SCHEMA_DIR_PATH))
from _validate import SCHEMA_DIR, validate_against_schema  # noqa: E402

TOOL_VERSION = "skill-insights@0.11.0"
SCHEMA_VERSION = "1.4"

# ── Registry / CLI enrichment configuration ────────────────────────────────
TESSL_API_BASE_URL = os.environ.get("TESSL_API_BASE_URL", "https://api.tessl.io")
TESSL_AUTH_FILE = Path(os.environ.get(
    "TESSL_AUTH_FILE", str(Path.home() / ".tessl" / "api-credentials.json"),
))
ENRICHMENT_HTTP_TIMEOUT_SEC = 10
ENRICHMENT_HTTP_CONCURRENCY = 8
ENRICHMENT_LINT_CONCURRENCY = 4

# ── Scan-time configuration (populated in main) ────────────────────────────
SCAN_ROOT: Path
OUTPUT_PATH: Path
SCAN_ID: str

# ── Exclusion sets ─────────────────────────────────────────────────────────

# Directory basenames to skip entirely during walk. Handles the 90% case;
# EXCLUDE_PATH_PATTERNS handles more nuanced matches.
EXCLUDE_DIR_NAMES = {
    "node_modules", ".git", "dist", "build", "out", ".next", ".vercel",
    ".turbo", "vendor", "target", "coverage", "__pycache__",
    ".tessl",
}

# Directory basename prefixes to skip entirely during walk. Tessl-vendored
# skill copies live in directories like `.claude/skills/tessl__<name>/` —
# they're duplicates of skills sourced elsewhere (typically a tile installed
# under `.tessl/`), so excluding them keeps the inventory focused on
# first-party skills authored in the scanned repo.
EXCLUDE_DIR_PREFIXES = ("tessl__",)

# Patterns applied to the repo-relative path (with leading `/`) to decide
# whether a discovered SKILL.md should be skipped. Deliberately restricted
# to test-fixture locations and other paths that are clearly not part of
# the repo's real skill surface.
EXCLUDE_PATH_PATTERNS = [
    re.compile(r"/tests?/__fixtures__(/|$)"),
    re.compile(r"/tests?/fixtures(/|$)"),
    re.compile(r"/test/__fixtures__(/|$)"),
    re.compile(r"/__fixtures__(/|$)"),
    re.compile(r"/local/repos/"),
    re.compile(r"/evals/[^/]+/resources/"),
]

# Sibling directory names we do NOT include in bundled_directories (because
# they're captured separately or are just noise).
BUNDLED_EXCLUDE = {
    "references", "reference", "scripts", "node_modules", "__pycache__",
    ".git", "dist", "build", "target",
}

# File extensions considered "likely code/doc paths" when classifying link
# targets in SKILL.md bodies.
# Note: we intentionally don't filter file references by extension. A path is
# "real" if it's tracked in git history (currently or previously) — the repo
# itself is the source of truth, not a hardcoded allowlist.

# Hard safety cap on the walk recursion depth. Should never be hit under
# reasonable repo layouts; serves as a backstop if cycle detection fails.
MAX_WALK_DEPTH = 60

# Harness classification priority when picking primary_path. Earlier wins.
PRIMARY_PRIORITY = [
    ".claude/skills/",
    ".agents/skills/",
    ".cursor/skills/",
    ".tessl/tiles/",
    "tiles/",
    "tile/",
]

# ── Shared state (populated during scan) ───────────────────────────────────

warnings_list: list[str] = []
skipped_counts: dict[str, int] = {}


# ── Helpers ────────────────────────────────────────────────────────────────


def run_git(cwd: Path, args: list[str]) -> str | None:
    try:
        result = subprocess.run(
            ["git", *args], cwd=str(cwd),
            capture_output=True, text=True, timeout=10,
        )
        if result.returncode != 0:
            return None
        return result.stdout.strip() or None
    except Exception:
        return None


def is_excluded(repo_rel_path: str) -> str | None:
    """Return the reason string if this path should be skipped, else None."""
    parts = repo_rel_path.split(os.sep)
    for seg in parts:
        if seg in EXCLUDE_DIR_NAMES:
            return seg + "/"
        for prefix in EXCLUDE_DIR_PREFIXES:
            if seg.startswith(prefix):
                return prefix + "*/"
    for pat in EXCLUDE_PATH_PATTERNS:
        if pat.search("/" + repo_rel_path):
            return pat.pattern
    return None


def parse_frontmatter(content: str) -> tuple[dict | None, str | None, str]:
    """Return (parsed_dict_or_None, parse_error_or_None, body_str).

    A `None` raw with `None` error means "no frontmatter present" (not an
    error). A non-`None` error means "frontmatter present but unparseable".
    """
    if not content.startswith("---"):
        return None, None, content.lstrip("\n")
    lines = content.split("\n")
    if lines[0].strip() != "---":
        return None, None, content
    end_idx = None
    for i in range(1, len(lines)):
        if lines[i].strip() == "---":
            end_idx = i
            break
    if end_idx is None:
        return None, None, content
    fm_text = "\n".join(lines[1:end_idx])
    body = "\n".join(lines[end_idx + 1:]).lstrip("\n")

    # Prefer PyYAML when available. The fallback parser is lossy for edge
    # cases (nested structures, anchors) but handles name/description well.
    try:
        import yaml  # type: ignore
        parsed = yaml.safe_load(fm_text)
        if parsed is None:
            return None, None, body
        if not isinstance(parsed, dict):
            return None, f"frontmatter not a dict: {type(parsed).__name__}", body
        return parsed, None, body
    except ImportError:
        return _fallback_parse_frontmatter(fm_text, body)
    except Exception as e:
        return None, f"{type(e).__name__}: {e}", body


def _fallback_parse_frontmatter(fm_text: str, body: str) -> tuple[dict | None, str | None, str]:
    """Minimal YAML subset parser. Handles single-line scalars and block
    scalars (`>`, `>-`, `|`, `|-`) well enough for Tessl skill frontmatter.
    """
    parsed: dict = {}
    cur_key: str | None = None
    cur_mode: str | None = None  # 'folded' | 'literal' | None
    buf: list[str] = []

    def flush() -> None:
        nonlocal cur_key, cur_mode, buf
        if cur_key is None:
            return
        if cur_mode == "folded":
            parsed[cur_key] = re.sub(r"\s+", " ", " ".join(buf)).strip()
        elif cur_mode == "literal":
            parsed[cur_key] = "\n".join(buf).rstrip()
        else:
            parsed[cur_key] = "\n".join(buf).strip() if buf else parsed.get(cur_key, "")
        cur_key = None
        cur_mode = None
        buf = []

    for line in fm_text.split("\n"):
        m = re.match(r"^([a-zA-Z0-9_-]+):\s*(.*)$", line)
        if m:
            flush()
            cur_key = m.group(1)
            val = m.group(2).strip()
            if val in (">", ">-"):
                cur_mode = "folded"
                parsed[cur_key] = ""
            elif val in ("|", "|-"):
                cur_mode = "literal"
                parsed[cur_key] = ""
            elif val == "":
                cur_mode = None
            else:
                parsed[cur_key] = val.strip('"').strip("'")
                cur_key = None
                cur_mode = None
                buf = []
        elif cur_key is not None and (line.startswith((" ", "\t")) or line == ""):
            if cur_mode in ("folded", "literal"):
                buf.append(line.strip())
            else:
                buf.append(line.strip())
    flush()
    return parsed, None, body


def word_count(body: str) -> int:
    return len(re.findall(r"\S+", body))


def compute_body_preview(body: str, limit: int = 400) -> str:
    s = body.strip()
    if len(s) <= limit:
        return s
    cut = s[:limit]
    last_space = cut.rfind(" ")
    if last_space > limit * 0.5:
        cut = cut[:last_space]
    return cut


def file_line_count(p: Path) -> int | None:
    try:
        with p.open("rb") as f:
            data = f.read()
        if b"\x00" in data:
            return None
        if not data:
            return 0
        return data.count(b"\n") + (0 if data.endswith(b"\n") else 1)
    except Exception:
        return None


def find_owning_package(skill_dir: Path, repo_root: Path) -> dict | None:
    """Walk up from skill_dir towards repo_root looking for a package manifest.

    Returns an owning_package dict or None.
    """
    cur = skill_dir
    try:
        if not str(cur).startswith(str(repo_root)):
            return None
    except Exception:
        return None

    while True:
        tile_json = cur / "tile.json"
        if tile_json.exists():
            try:
                data = json.loads(tile_json.read_text())
                return {
                    "kind": "tessl_tile",
                    "name": data.get("name") or tile_json.parent.name,
                    "version": data.get("version"),
                    "manifest_path": str(tile_json.relative_to(repo_root)),
                }
            except Exception as e:
                warnings_list.append(f"unparseable tile.json at {tile_json.relative_to(repo_root)}: {e}")

        for plugin_candidate in (cur / ".claude-plugin" / "plugin.json", cur / "plugin.json"):
            if plugin_candidate.exists():
                try:
                    data = json.loads(plugin_candidate.read_text())
                    return {
                        "kind": "claude_plugin",
                        "name": data.get("name") or plugin_candidate.parent.name,
                        "version": data.get("version"),
                        "manifest_path": str(plugin_candidate.relative_to(repo_root)),
                    }
                except Exception as e:
                    warnings_list.append(
                        f"unparseable plugin.json at {plugin_candidate.relative_to(repo_root)}: {e}"
                    )

        if cur == repo_root:
            return None
        parent = cur.parent
        if parent == cur:
            return None
        cur = parent


# ── Symlink-aware walker ───────────────────────────────────────────────────


def walk_collecting_skills(
    start_dir: Path,
    repo_root: Path,
    collected: list[Path],
    skipped: dict[str, int],
    chain_realpaths: set[str] | None = None,
    depth: int = 0,
) -> None:
    """Recursively walk start_dir, following symlinks, collecting SKILL.md paths.

    Cycle detection uses per-chain realpath tracking (only prevents recursion
    INTO an ancestor, so a vendored symlink pointing to a dir we've visited
    elsewhere in the tree is still followed).

    Does NOT use os.walk(followlinks=True) with a global visited set — that
    would miss vendored symlink paths, which are exactly the paths we want
    to record for Tessl-style installs.
    """
    if depth > MAX_WALK_DEPTH:
        warnings_list.append(f"walk depth cap hit at {start_dir.relative_to(repo_root)}")
        return

    try:
        real = os.path.realpath(start_dir)
    except OSError:
        return
    if chain_realpaths is None:
        chain_realpaths = set()
    if real in chain_realpaths:
        # Ancestor cycle — symlink points to a directory we're currently inside.
        return
    chain_realpaths = chain_realpaths | {real}

    try:
        entries = list(os.scandir(start_dir))
    except (PermissionError, OSError):
        return

    for entry in entries:
        name = entry.name
        full = Path(entry.path)
        try:
            if entry.is_file(follow_symlinks=True):
                if name == "SKILL.md":
                    try:
                        rel = str(full.relative_to(repo_root))
                    except ValueError:
                        continue
                    reason = is_excluded(rel)
                    if reason:
                        skipped[reason] = skipped.get(reason, 0) + 1
                        continue
                    collected.append(full)
                continue
            if entry.is_dir(follow_symlinks=True):
                if name in EXCLUDE_DIR_NAMES:
                    skipped[name + "/"] = skipped.get(name + "/", 0) + 0  # noted; no skipped-file count here
                    continue
                if any(name.startswith(p) for p in EXCLUDE_DIR_PREFIXES):
                    continue
                walk_collecting_skills(
                    full, repo_root, collected, skipped,
                    chain_realpaths=chain_realpaths, depth=depth + 1,
                )
        except OSError:
            # broken symlink or permission denied — ignore
            continue


def walk_dir_files(d: Path) -> list[Path]:
    """Enumerate files under `d` (recursive). Does NOT follow symlinks into
    parent chains; only used for reading references/, scripts/, etc. where
    cycles are not expected.
    """
    out: list[Path] = []
    if not d.exists() or not d.is_dir():
        return out
    for root, dirs, files in os.walk(d, followlinks=False):
        dirs[:] = [dd for dd in dirs if dd not in {".git", "node_modules", "__pycache__"}]
        for f in files:
            out.append(Path(root) / f)
    return out


# ── Link extraction ────────────────────────────────────────────────────────


MD_LINK_RE = re.compile(r"\[([^\]]*)\]\(([^)]+)\)")
INLINE_CODE_RE = re.compile(r"`([^`\n]+)`")


def looks_like_path_candidate(target: str) -> bool:
    """Cheap pre-filter: is this *plausibly* a path-shaped string?

    Used to exclude obvious non-paths (URLs, code symbols, placeholders) before
    the more expensive git-index lookup. We don't filter on file extension —
    git history is the source of truth.
    """
    if not target:
        return False
    if target.startswith(("http://", "https://", "mailto:", "tel:", "ftp:", "file://", "//", "#")):
        return False
    if "<" in target or ">" in target:
        return False
    # Must contain a slash (single-segment things are almost always code symbols
    # like `getAuth()`, never repo paths) AND must not contain whitespace.
    if "/" not in target or any(c.isspace() for c in target):
        return False
    return True


def extract_body_links(body: str) -> list[tuple[str, str]]:
    """Yield (kind, target) tuples for path-like references in the SKILL.md body.

    Catches three patterns, all outside fenced/indented code blocks:
      - Markdown links:  [text](path/to/file)
      - @imports:        @path/to/file  (line-leading)
      - Inline backticks: `path/to/file`  (the most common in-prose form)

    Targets are emitted unfiltered by extension — the caller validates each
    against git history and existence so the repo itself decides what's real.
    """
    results: list[tuple[str, str]] = []
    in_fence = False
    for line in body.split("\n"):
        stripped = line.strip()
        if stripped.startswith("```") or stripped.startswith("~~~"):
            in_fence = not in_fence
            continue
        if in_fence:
            continue
        if line.startswith("    ") or line.startswith("\t"):
            continue

        # Markdown links: [text](target)
        for m in MD_LINK_RE.finditer(line):
            target = m.group(2).strip().split(" ")[0].strip()
            if looks_like_path_candidate(target):
                results.append(("markdown_link", target))

        # Inline backtick paths — strip line-number suffixes like `path:42`
        for m in INLINE_CODE_RE.finditer(line):
            text = m.group(1).strip()
            # Strip trailing punctuation glued to the closing backtick
            text = text.rstrip(",.;:)]}\"'")
            # Strip line-number / column suffixes (`foo.ts:12`, `foo.ts:12:5`)
            text = re.sub(r":\d+(:\d+)?$", "", text)
            if looks_like_path_candidate(text):
                results.append(("inline_code", text))

        # @imports — only at line start
        lstripped = line.lstrip()
        if lstripped.startswith("@") and len(lstripped) > 1:
            tok = lstripped[1:].split()[0].rstrip(",.;:)]}")
            # Skip npm-style scoped packages like @scope/package (1 slash, no dot)
            if tok and "/" in tok and not (tok.count("/") == 1 and "." not in tok):
                if looks_like_path_candidate(tok):
                    results.append(("at_import", tok))
    return results


# ── Per-repo git-history index for reference validation ────────────────────


def build_repo_path_indices(repo_root: Path) -> tuple[set[str], set[str]]:
    """Return (currently_tracked, ever_tracked) sets of repo-relative paths.

    `currently_tracked` — files in HEAD's git tree (what's in the repo now).
    `ever_tracked`      — files that have ever appeared in any commit on any
                          ref. Used to detect references to files that were
                          deleted or renamed since the skill was authored.

    Falls back to a filesystem walk if git isn't available — in that case
    `ever_tracked` collapses to `currently_tracked` (no broken-link signal).
    """
    if not (repo_root / ".git").exists():
        # Non-git repo: filesystem-walk fallback
        currently: set[str] = set()
        for root, _dirs, files in os.walk(repo_root, followlinks=False):
            for f in files:
                p = Path(root) / f
                try:
                    currently.add(str(p.relative_to(repo_root)))
                except ValueError:
                    pass
        return currently, currently

    currently_tracked: set[str] = set()
    try:
        result = subprocess.run(
            ["git", "ls-tree", "-r", "--name-only", "HEAD"],
            cwd=str(repo_root), capture_output=True, text=True, timeout=30,
        )
        if result.returncode == 0:
            currently_tracked = {p for p in result.stdout.split("\n") if p}
    except Exception as e:
        warnings_list.append(f"git ls-tree failed for {repo_root.name}: {e}")

    ever_tracked: set[str] = set(currently_tracked)
    try:
        # All paths that ever appeared in any commit on any ref — captures
        # files that were deleted or renamed.
        result = subprocess.run(
            ["git", "log", "--all", "--pretty=format:", "--name-only", "--no-renames"],
            cwd=str(repo_root), capture_output=True, text=True, timeout=120,
        )
        if result.returncode == 0:
            ever_tracked.update(p for p in result.stdout.split("\n") if p)
    except Exception as e:
        warnings_list.append(f"git log --name-only failed for {repo_root.name}: {e}")

    return currently_tracked, ever_tracked


def normalise_link_target(target: str, kind: str, skill_dir: Path, repo_root: Path) -> str | None:
    """Return a repo-root-relative path string for a candidate link target.

    Resolution semantics differ by kind:
      - markdown_link / at_import: relative to skill_dir (standard markdown
        link semantics) when not absolute, falling back to repo_root if that
        doesn't make sense.
      - inline_code: usually a fully-qualified repo path, so resolve from
        repo_root first; fall back to skill_dir if that misses.
    """
    if not target or target.startswith("/"):
        return None
    target = target.lstrip("./")
    if not target:
        return None

    candidates: list[Path] = []
    if kind == "inline_code":
        candidates.append(repo_root / target)
        candidates.append(skill_dir / target)
    else:
        candidates.append(skill_dir / target)
        candidates.append(repo_root / target)

    for cand in candidates:
        try:
            rel = cand.resolve().relative_to(repo_root)
        except (ValueError, OSError):
            continue
        return str(rel)
    return None


# ── Classification ─────────────────────────────────────────────────────────


def classify_harness(rel_path: str) -> str:
    lp = "/" + rel_path.lower().replace("\\", "/")
    if "/.claude/skills/" in lp:
        return "claude"
    if "/.agents/skills/" in lp:
        return "agents"
    if "/.cursor/skills/" in lp:
        return "cursor"
    if "/.github/skills/" in lp or "/.vscode/skills/" in lp:
        return "agents"
    if "/.tessl/tiles/" in lp:
        return "tessl_tile"
    if lp.startswith("/tiles/") or lp.startswith("/tile/") or "/tiles/" in lp or "/tile/" in lp:
        return "tessl_tile"
    return "standalone"


def primary_rank(rel_path: str) -> tuple[int, str]:
    lp = "/" + rel_path.lower().replace("\\", "/")
    for i, tag in enumerate(PRIMARY_PRIORITY):
        if f"/{tag}" in lp:
            return (i, rel_path)
    return (999, rel_path)


def source_type_for(skill: dict) -> str:
    """Per the discovery schema:
      1. owning_package.kind == tessl_tile OR any path is in .tessl/tiles/ OR authored tile layout → tessl_tile_skill
      2. owning_package.kind == claude_plugin → claude_plugin_skill
      3-5. based on primary_path's harness
      6. standalone
    """
    op = skill.get("owning_package") or {}
    paths_lower = ["/" + p.lower() for p in skill["all_paths"]]

    if op.get("kind") == "tessl_tile":
        return "tessl_tile_skill"
    if any("/.tessl/tiles/" in p for p in paths_lower):
        return "tessl_tile_skill"
    for p in paths_lower:
        # authored-tile layout: /tiles/<anything>/SKILL.md or /tile/<anything>/SKILL.md
        if re.search(r"(^|/)(tile|tiles)/.*/skill\.md$", p):
            return "tessl_tile_skill"

    if op.get("kind") == "claude_plugin":
        return "claude_plugin_skill"

    pp_lower = "/" + skill["primary_path"].lower()
    if "/.claude/skills/" in pp_lower:
        return "claude_skill"
    if "/.agents/skills/" in pp_lower or "/.github/skills/" in pp_lower or "/.vscode/skills/" in pp_lower:
        return "agents_skill"
    if "/.cursor/skills/" in pp_lower:
        return "cursor_skill"
    return "standalone"


def make_tile_instance_id(
    repo: str,
    tile_name: str,
    manifest_path: str | None,
    version: str | None,
) -> str:
    """Stable ID for one tile materialisation inside one repo.

    A repo can contain the same tile name more than once: for example a local
    authored source under `tile/` and an installed copy under `.tessl/tiles/`.
    Downstream phases need to keep those instances separate.
    """
    instance_source = manifest_path or f"version:{version or 'unknown'}"
    suffix = re.sub(r"[^A-Za-z0-9._-]+", "__", instance_source).strip("_")
    return f"{repo}::{tile_name}::{suffix or 'unknown'}"


# ── tessl.json manifest resolution ─────────────────────────────────────────

# Dependency keys with these prefixes are package-equivalents (npm / pypi /
# similar) — not skill-bearing. Skip when resolving manifest deps to skills.
NON_SKILL_DEP_PREFIXES = ("tessl/npm-", "tessl/pypi-")


def find_tessl_manifests(repo_root: Path) -> list[tuple[Path, dict]]:
    """Walk repo for tessl.json files. Respects EXCLUDE_DIR_NAMES / prefixes."""
    manifests: list[tuple[Path, dict]] = []
    for root, dirs, files in os.walk(repo_root, followlinks=False):
        dirs[:] = [
            d for d in dirs
            if d not in EXCLUDE_DIR_NAMES
            and not any(d.startswith(p) for p in EXCLUDE_DIR_PREFIXES)
        ]
        if "tessl.json" not in files:
            continue
        path = Path(root) / "tessl.json"
        try:
            parsed = json.loads(path.read_text())
        except Exception as e:
            warnings_list.append(
                f"unparseable tessl.json at {path.relative_to(repo_root)}: {e}"
            )
            continue
        if isinstance(parsed, dict):
            manifests.append((path, parsed))
    return manifests


def resolve_tile_dir(
    manifest_path: Path, dep_key: str, dep_info: dict, repo_root: Path,
) -> Path | None:
    """Resolve a tessl.json dependency to its tile directory.

    `source: "file:..."` → relative to the manifest's parent dir.
    Otherwise (version-pinned) → `<repo_root>/.tessl/tiles/<dep_key>/`.
    """
    src = dep_info.get("source")
    if isinstance(src, str) and src.startswith("file:"):
        rel = src[len("file:"):]
        return (manifest_path.parent / rel).resolve()
    return (repo_root / ".tessl" / "tiles" / dep_key).resolve()


def collect_manifest_skill_paths(
    manifests: list[tuple[Path, dict]], repo_root: Path, repo_id: str,
) -> tuple[dict[Path, list[dict]], list[dict]]:
    """For each manifest, resolve every skill-bearing dep to its SKILL.md paths.

    Returns:
        - declared_by: { absolute SKILL.md path → [{manifest_path, dep_key, version}, ...] }
        - manifest_summaries: per-manifest stats for metadata.
    """
    declared_by: dict[Path, list[dict]] = {}
    summaries: list[dict] = []

    for manifest_path, parsed in manifests:
        manifest_rel = str(manifest_path.relative_to(repo_root))
        deps = parsed.get("dependencies") or {}
        if not isinstance(deps, dict):
            deps = {}

        resolved = unresolved = skipped = 0

        for dep_key, dep_info in deps.items():
            if not isinstance(dep_info, dict):
                continue
            if any(dep_key.startswith(p) for p in NON_SKILL_DEP_PREFIXES):
                skipped += 1
                continue

            tile_dir = resolve_tile_dir(manifest_path, dep_key, dep_info, repo_root)
            tile_json_path = tile_dir / "tile.json" if tile_dir else None

            if not tile_json_path or not tile_json_path.exists():
                warnings_list.append(
                    f"tile '{dep_key}' declared in {manifest_rel} but no tile.json found"
                )
                unresolved += 1
                continue

            try:
                tile_data = json.loads(tile_json_path.read_text())
            except Exception as e:
                warnings_list.append(
                    f"unparseable tile.json for '{dep_key}' (declared in {manifest_rel}): {e}"
                )
                unresolved += 1
                continue

            tile_skills = tile_data.get("skills") or {}
            if not isinstance(tile_skills, dict):
                tile_skills = {}

            version = dep_info.get("version") or tile_data.get("version")
            for skill_key, skill_info in tile_skills.items():
                if not isinstance(skill_info, dict):
                    continue
                skill_path_rel = skill_info.get("path")
                if not skill_path_rel:
                    continue
                skill_md = (tile_dir / skill_path_rel).resolve()
                if not skill_md.exists():
                    warnings_list.append(
                        f"skill '{skill_key}' of tile '{dep_key}' (declared in {manifest_rel}) "
                        f"not found at expected path"
                    )
                    continue
                # Only count skills inside the repo — out-of-repo declarations
                # exist but can't be inventoried by this scan.
                try:
                    skill_md.relative_to(repo_root)
                except ValueError:
                    warnings_list.append(
                        f"skill '{skill_key}' of tile '{dep_key}' resolves outside repo {repo_id} — skipped"
                    )
                    continue
                declared_by.setdefault(skill_md, []).append({
                    "manifest_path": manifest_rel,
                    "dep_key": dep_key,
                    "version": version,
                })
            resolved += 1

        summaries.append({
            "path": manifest_rel,
            "name": parsed.get("name"),
            "dependencies_total": len(deps),
            "dependencies_resolved": resolved,
            "dependencies_unresolved": unresolved,
            "dependencies_skipped_non_skill": skipped,
        })

    return declared_by, summaries


# ── Repo-level scan ────────────────────────────────────────────────────────


def scan_repo(
    repo_path: Path, repo_id: str, name: str, is_git: bool,
    head_sha: str | None, head_branch: str | None, remote_url: str | None,
) -> tuple[dict, list[dict], list[tuple[Path, dict]]]:
    """Return (repo_metadata, [skills], parsed_manifests). Dedupes vendored copies by content hash.

    parsed_manifests is the raw `find_tessl_manifests` output for this repo —
    the tile-enrichment pass uses it to look up `dependencies.<key>.source`
    when classifying tiles into published / github / authored tiers.
    """
    repo_root = repo_path
    repo_meta = {
        "repo_id": repo_id,
        "name": name,
        "path": str(repo_root),
        "is_git_repo": is_git,
        "head_sha": head_sha,
        "head_branch": head_branch,
        "remote_url": remote_url,
    }

    found: list[Path] = []
    repo_skipped: dict[str, int] = {}
    walk_collecting_skills(repo_root, repo_root, found, repo_skipped)

    for reason, count in repo_skipped.items():
        if count > 0:
            skipped_counts[reason] = skipped_counts.get(reason, 0) + count

    # Build path indices once per repo. Used downstream to validate every
    # path-like reference in skill bodies against git history.
    currently_tracked, ever_tracked = build_repo_path_indices(repo_root)

    # Resolve tessl.json manifests → declared SKILL.md paths. We use this to
    # populate per-skill `declared_in` and to power tile aggregation. We do NOT
    # re-add SKILL.md files that resolve inside `.tessl/` to the inventory —
    # that directory is the Tessl CLI's installed-tile cache, surfaced via
    # `tiles[]` (with `source: "tessl_json"`) rather than `skills[]`.
    manifests = find_tessl_manifests(repo_root)
    declared_by, manifest_summaries = collect_manifest_skill_paths(
        manifests, repo_root, repo_id,
    )
    repo_meta["tessl_manifests"] = manifest_summaries

    found_set = {p.resolve() for p in found}
    for skill_md in declared_by:
        try:
            rel = skill_md.relative_to(repo_root).as_posix()
        except ValueError:
            continue
        if rel.startswith(".tessl/"):
            continue
        if skill_md not in found_set and skill_md.exists():
            found.append(skill_md)
            found_set.add(skill_md)

    per_file_records: list[dict] = []
    for skill_md in found:
        rel_path = str(skill_md.relative_to(repo_root))
        try:
            raw_bytes = skill_md.read_bytes()
        except Exception as e:
            warnings_list.append(f"cannot read {repo_id}/{rel_path}: {e}")
            continue
        content_hash = "sha256:" + hashlib.sha256(raw_bytes).hexdigest()

        try:
            content_str = raw_bytes.decode("utf-8")
        except UnicodeDecodeError:
            content_str = raw_bytes.decode("utf-8", errors="replace")

        fm_raw, fm_err, body = parse_frontmatter(content_str)
        if fm_err:
            warnings_list.append(f"unparseable frontmatter in {repo_id}/{rel_path}: {fm_err}")

        skill_dir = skill_md.parent
        name_val = ""
        desc_val = ""
        if fm_raw and isinstance(fm_raw, dict):
            name_val = str(fm_raw.get("name") or "").strip()
            d = fm_raw.get("description")
            if d is not None:
                desc_val = re.sub(r"\s+", " ", str(d)).strip()
        if not name_val:
            name_val = skill_dir.name

        line_count = content_str.count("\n") + (0 if content_str.endswith("\n") or not content_str else 1)
        body_wc = word_count(body)
        body_preview = compute_body_preview(body)

        has_refs = (skill_dir / "references").is_dir() or (skill_dir / "reference").is_dir()
        has_scripts = (skill_dir / "scripts").is_dir()

        owning_package = find_owning_package(skill_dir, repo_root)

        # ── Supporting files ──
        supporting: list[dict] = []
        seen_paths: set[str] = set()

        def add_supporting(path_obj: Path, kind: str, discovered_via: str) -> None:
            try:
                rp = str(path_obj.relative_to(repo_root))
            except ValueError:
                return
            if rp in seen_paths:
                return
            seen_paths.add(rp)
            try:
                sz = path_obj.stat().st_size
            except OSError:
                sz = 0
            supporting.append({
                "path": rp,
                "kind": kind,
                "discovered_via": discovered_via,
                "line_count": file_line_count(path_obj),
                "size_bytes": sz,
            })

        for refname in ("references", "reference"):
            for f in walk_dir_files(skill_dir / refname):
                add_supporting(f, "reference", "references_dir")

        for f in walk_dir_files(skill_dir / "scripts"):
            add_supporting(f, "script", "scripts_dir")

        # bundled_directories — sibling dirs not captured above
        bundled: list[dict] = []
        try:
            for child in sorted(skill_dir.iterdir()):
                if not child.is_dir():
                    continue
                if child.name.startswith(".") or child.name in BUNDLED_EXCLUDE:
                    continue
                files_in_dir = walk_dir_files(child)
                bundled.append({
                    "path": str(child.relative_to(repo_root)),
                    "file_count": len(files_in_dir),
                })
        except Exception:
            pass

        # Path-like references in body (markdown links, @imports, inline
        # backticks). Validation is git-history backed: a candidate counts as
        # a real reference iff it's currently tracked OR was ever tracked.
        # Anything else (code symbols, package names, external paths, etc.)
        # is silently ignored — no extension allowlist, no false positives.
        seen_targets: set[str] = set()
        for kind, target in extract_body_links(body):
            if "#" in target:
                target = target.split("#", 1)[0]
            if "?" in target:
                target = target.split("?", 1)[0]
            rel = normalise_link_target(target, kind, skill_dir, repo_root)
            if not rel or rel in seen_targets:
                continue
            seen_targets.add(rel)

            in_current = rel in currently_tracked
            in_history = rel in ever_tracked

            if in_current:
                # Valid reference — add to supporting files.
                resolved = repo_root / rel
                try:
                    if resolved.is_file():
                        sf_kind = "nested_skill" if resolved.name == "SKILL.md" else "linked_doc"
                        sf_via = {
                            "markdown_link": "markdown_link",
                            "at_import": "at_import",
                            "inline_code": "inline_code",
                        }.get(kind, "markdown_link")
                        add_supporting(resolved, sf_kind, sf_via)
                except Exception:
                    pass
            elif in_history:
                # Was tracked, no longer present → stale reference.
                warnings_list.append(f"broken link in {repo_id}/{rel_path}: {rel}")
            # else: not a repo file, ignore.

        per_file_records.append({
            "_rel_path": rel_path,
            "_content_hash": content_hash,
            "_harness": classify_harness(rel_path),
            "_name": name_val,
            "_description": desc_val,
            "_line_count": line_count,
            "_word_count": body_wc,
            "_body_preview": body_preview,
            "_has_refs": has_refs,
            "_has_scripts": has_scripts,
            "_owning_package": owning_package,
            "_supporting_files": supporting,
            "_bundled_directories": bundled,
            "_frontmatter_raw": fm_raw,
            "_frontmatter_err": fm_err,
            "_declared_in": declared_by.get(skill_md.resolve(), []),
        })

    # ── Dedup by content_hash within this repo ──
    by_hash: dict[str, list[dict]] = {}
    for r in per_file_records:
        by_hash.setdefault(r["_content_hash"], []).append(r)

    dedup_skills: list[dict] = []
    for ch, group in by_hash.items():
        sorted_group = sorted(group, key=lambda x: primary_rank(x["_rel_path"]))
        primary = sorted_group[0]
        all_paths = sorted([g["_rel_path"] for g in group], key=primary_rank)
        harnesses = sorted({g["_harness"] for g in group})

        pp_slug = primary["_rel_path"].replace("/", "__")
        if pp_slug.endswith("__SKILL.md"):
            pp_slug = pp_slug[:-len("__SKILL.md")]
        elif pp_slug.endswith("SKILL.md"):
            pp_slug = pp_slug[:-len("SKILL.md")].rstrip("_")

        # Merge declared_in across all paths in the dedup group, deduping by
        # (manifest_path, dep_key) so the same declaration isn't listed twice.
        merged_declared: list[dict] = []
        seen_decls: set[tuple[str, str]] = set()
        for g in sorted_group:
            for d in g["_declared_in"]:
                key = (d["manifest_path"], d["dep_key"])
                if key in seen_decls:
                    continue
                seen_decls.add(key)
                merged_declared.append(d)

        skill_obj = {
            "skill_id": f"{repo_id}::{pp_slug}",
            "name": primary["_name"],
            "description": primary["_description"],
            "repo": repo_id,
            "primary_path": primary["_rel_path"],
            "all_paths": all_paths,
            "agent_harnesses": harnesses,
            "source_type": "",  # set below
            "content_hash": ch,
            "owning_package": primary["_owning_package"],
            "declared_in": merged_declared,
            "supporting_files": primary["_supporting_files"],
            "bundled_directories": primary["_bundled_directories"],
            "frontmatter": {
                "raw": primary["_frontmatter_raw"],
                "parse_error": primary["_frontmatter_err"],
            },
            "content": {
                "line_count": primary["_line_count"],
                "word_count": primary["_word_count"],
                "body_preview": primary["_body_preview"],
                "has_references_dir": primary["_has_refs"],
                "has_scripts_dir": primary["_has_scripts"],
            },
        }
        skill_obj["source_type"] = source_type_for(skill_obj)
        dedup_skills.append(skill_obj)

    return repo_meta, dedup_skills, manifests


# ── Tile aggregation + tier classification ────────────────────────────────


def aggregate_tiles(
    skills: list[dict],
    repo_metas: list[dict],
    manifests_by_repo: dict[str, list[tuple[Path, dict]]],
) -> list[dict]:
    """Build the tiles[] inventory.

    Two sources contribute, with `tessl_json` superseding `filesystem` on
    collision:

      - **filesystem**: skills with `owning_package.kind == "tessl_tile"`
        contribute a tile entry derived from their owning `tile.json`. This is
        how authored tiles under `tiles/<name>/` get surfaced.
      - **tessl_json**: every dependency in a `tessl.json` manifest that
        resolves to a real `tile.json` contributes a tile entry. This is the
        only source for tiles installed under `.tessl/tiles/...` since those
        SKILL.md files are intentionally excluded from `skills[]`.

    Loose skills (T4 / non_tile) don't appear here — they stay in `skills[]`
    with `tier = "non_tile"`.

    Tier rules (see `classify_tile_tier`):
      - `authored_tile`  — declared in tessl.json with `source: "file:..."`,
                            OR the owning tile.json lives directly under
                            `<repo>/tiles/...` / `<repo>/tile/` (authored
                            layout) and not under `.tessl/`.
      - `github_tile`    — declared with `source: "https://github.com/..."`,
                            `source: "github:..."`, or any non-`file:` /
                            non-registry source.
      - `published_tile` — installed from the registry (no `source` field, or
                            source is the registry URL). Fallback for tiles
                            present in `.tessl/tiles/` without an explicit
                            file/github source.
    """
    repo_lookup = {r["repo_id"]: r for r in repo_metas}
    repo_path_by_id = {r["repo_id"]: Path(r["path"]) for r in repo_metas}
    tiles: dict[tuple[str, str, str | None], dict] = {}

    # ── Step A: tiles from authored skills (source: filesystem) ──
    for s in skills:
        pkg = s.get("owning_package") or {}
        if pkg.get("kind") != "tessl_tile":
            continue
        tile_name = pkg.get("name")
        if not tile_name:
            continue
        repo = s["repo"]
        manifest_path = pkg.get("manifest_path")
        version = pkg.get("version")
        key = (repo, tile_name, manifest_path)
        if key not in tiles:
            tiles[key] = {
                "tile_id": make_tile_instance_id(repo, tile_name, manifest_path, version),
                "name": tile_name,
                "repo": repo,
                "tier": None,  # filled below
                "source": "filesystem",
                "version_installed": version,
                "manifest_path": manifest_path,
                "skill_ids": [],
            }
        tiles[key]["skill_ids"].append(s["skill_id"])

    # ── Step B: tiles from tessl.json declarations (source: tessl_json) ──
    # Every declared dep that resolves to a real tile.json gets a tile entry.
    # If the same tile already exists from Step A, this overrides its source
    # to tessl_json (per "tessl.json supersedes filesystem").
    for repo, manifests in manifests_by_repo.items():
        repo_path = repo_path_by_id.get(repo)
        if repo_path is None:
            continue
        for manifest_path_p, parsed in manifests:
            deps = parsed.get("dependencies") or {}
            if not isinstance(deps, dict):
                continue
            for dep_key, dep_info in deps.items():
                if not isinstance(dep_info, dict):
                    continue
                if any(dep_key.startswith(p) for p in NON_SKILL_DEP_PREFIXES):
                    continue
                tile_dir = resolve_tile_dir(manifest_path_p, dep_key, dep_info, repo_path)
                if tile_dir is None:
                    continue
                tile_json_path = tile_dir / "tile.json"
                if not tile_json_path.exists():
                    continue
                try:
                    tile_data = json.loads(tile_json_path.read_text())
                except Exception:
                    continue
                tile_name = tile_data.get("name") or dep_key
                version = dep_info.get("version") or tile_data.get("version")
                try:
                    tile_manifest_rel = str(tile_json_path.relative_to(repo_path))
                except ValueError:
                    continue
                key = (repo, tile_name, tile_manifest_rel)
                if key in tiles:
                    tiles[key]["source"] = "tessl_json"
                    if tiles[key].get("version_installed") is None:
                        tiles[key]["version_installed"] = version
                else:
                    tiles[key] = {
                        "tile_id": make_tile_instance_id(repo, tile_name, tile_manifest_rel, version),
                        "name": tile_name,
                        "repo": repo,
                        "tier": None,
                        "source": "tessl_json",
                        "version_installed": version,
                        "manifest_path": tile_manifest_rel,
                        "skill_ids": [],
                    }

    # Classify tier per tile by inspecting tessl.json `source` fields and
    # the owning tile.json's location.
    for (repo, _tile_name, _manifest_path), tile in tiles.items():
        tile["tier"] = classify_tile_tier(tile, manifests_by_repo.get(repo, []), repo_lookup.get(repo))

    return sorted(tiles.values(), key=lambda t: (t["repo"], t["name"], t["tile_id"]))


def classify_tile_tier(
    tile: dict,
    manifests: list[tuple[Path, dict]],
    repo_meta: dict | None,
) -> str:
    """Decide tier for one tile based on tessl.json source + manifest_path."""
    repo_root = Path(repo_meta["path"]) if repo_meta and repo_meta.get("path") else None

    # Look up source field across all manifests in this repo
    source_str: str | None = None
    for _, parsed in manifests:
        deps = parsed.get("dependencies") or {}
        if not isinstance(deps, dict):
            continue
        dep = deps.get(tile["name"])
        if isinstance(dep, dict):
            src = dep.get("source")
            if isinstance(src, str):
                source_str = src
                break

    if source_str:
        if source_str.startswith("file:"):
            return "authored_tile"
        if source_str.startswith("https://github.com/") or source_str.startswith("github:"):
            return "github_tile"
        # Any other source string (e.g. registry URL) — treat as published
        return "published_tile"

    # No source declared in any tessl.json — either an orphan install, an
    # authored-only tile, or a tile.json sitting in some research/ corner.
    # `.tessl/tiles/...` paths are produced by `tessl install` from a
    # registry source, so we treat them as published. Everything else is
    # authored — registry-published tiles always have a tessl.json
    # declaration *somewhere*, so absence implies the tile is local-only.
    manifest_path = tile.get("manifest_path") or ""
    if manifest_path.startswith(".tessl/"):
        return "published_tile"
    return "authored_tile"


# ── Registry enrichment ────────────────────────────────────────────────────


def read_auth_token() -> str | None:
    """Read access token from ~/.tessl/api-credentials.json. Returns None if unavailable."""
    if not TESSL_AUTH_FILE.exists():
        return None
    try:
        data = json.loads(TESSL_AUTH_FILE.read_text())
        token = data.get("accessToken")
        return token if isinstance(token, str) and token else None
    except Exception:
        return None


def fetch_tile_version_from_registry(
    workspace: str, name: str, version: str, token: str,
) -> dict | None:
    """Fetch tile version data from /v1/tiles/{ws}/{name}/versions/{ver}.

    Returns the parsed `attributes` block (with scores), or a sentinel
    {"_error": "...", "_status": <int>} on failure.
    """
    import urllib.request
    import urllib.error
    url = f"{TESSL_API_BASE_URL}/v1/tiles/{workspace}/{name}/versions/{version}"
    req = urllib.request.Request(
        url, headers={"Authorization": f"Bearer {token}", "Accept": "application/json"},
    )
    try:
        with urllib.request.urlopen(req, timeout=ENRICHMENT_HTTP_TIMEOUT_SEC) as resp:
            body = json.loads(resp.read().decode("utf-8"))
    except urllib.error.HTTPError as e:
        return {"_error": f"{e.code} {e.reason}", "_status": e.code}
    except Exception as e:
        return {"_error": str(e), "_status": None}

    attrs = (body.get("data") or {}).get("attributes") or {}
    # Project just the fields we want into a clean shape
    scores = attrs.get("scores") or {}
    return {
        "version": attrs.get("version"),
        "fingerprint": attrs.get("fingerprint"),
        "summary": attrs.get("summary"),
        "moderationPassed": attrs.get("moderationPassed"),
        "moderationError": attrs.get("moderationError"),
        "archived": attrs.get("archived"),
        "archivedReason": attrs.get("archivedReason"),
        "archivedAt": attrs.get("archivedAt"),
        "hasDocs": attrs.get("hasDocs"),
        "hasSkills": attrs.get("hasSkills"),
        "hasSteering": attrs.get("hasSteering"),
        "evalScore": attrs.get("evalScore"),
        "evalBaselineScore": attrs.get("evalBaselineScore"),
        "evalImprovement": attrs.get("evalImprovement"),
        "evalImprovementMultiplier": attrs.get("evalImprovementMultiplier"),
        "scores": {
            "aggregate": scores.get("aggregate"),
            "quality": scores.get("quality"),
            "impact": scores.get("impact"),
            "security": scores.get("security"),
            "evalAvg": scores.get("evalAvg"),
            "evalCount": scores.get("evalCount"),
            "lastScoredAt": scores.get("lastScoredAt"),
            "validationErrors": scores.get("validationErrors") or [],
        },
    }


def enrich_tiles_from_registry(
    tiles: list[dict], token: str,
) -> None:
    """Attach a `registry` block to every tile whose name resolves on the registry.

    We try every tile regardless of tier — locally-authored tiles (`file:`
    source) are often *also* published, and a github-source tile may have a
    registry shadow too. The API returns 404 for tiles that aren't published;
    that's reflected as `registry: { _error: "404 ...", _status: 404 }`.
    Each tile then gains an explicit `published_to_registry: bool`.
    """
    from concurrent.futures import ThreadPoolExecutor

    targets = [t for t in tiles if t.get("version_installed") and "/" in t["name"]]

    def _do(t: dict) -> tuple[dict, dict | None]:
        ws, nm = t["name"].split("/", 1)
        return t, fetch_tile_version_from_registry(ws, nm, t["version_installed"], token)

    with ThreadPoolExecutor(max_workers=ENRICHMENT_HTTP_CONCURRENCY) as pool:
        for t, result in pool.map(_do, targets):
            if result is None:
                t["published_to_registry"] = None
                continue
            if result.get("_error"):
                t["registry"] = result
                t["published_to_registry"] = False
            else:
                t["registry"] = result
                t["published_to_registry"] = True


# ── Outdated check ─────────────────────────────────────────────────────────


def run_tessl_outdated(repo_path: Path, repo_id: str) -> tuple[dict[str, dict], bool]:
    """Run `tessl outdated --json` for one repo.

    Returns (map from "<ws>/<name>" → outdated record, command_succeeded).
    The success flag matters because an empty outdated list means "all current"
    only when the command itself succeeded.
    """
    try:
        result = subprocess.run(
            ["tessl", "outdated", "--json"],
            cwd=str(repo_path),
            capture_output=True, text=True, timeout=30,
        )
    except Exception as e:
        warnings_list.append(f"`tessl outdated` failed in {repo_id}: {e}")
        return {}, False

    if result.returncode != 0:
        warnings_list.append(
            f"`tessl outdated` exit {result.returncode} in {repo_id}: "
            f"{result.stderr.strip()[:200]}"
        )
        return {}, False

    try:
        data = json.loads(result.stdout)
    except Exception as e:
        warnings_list.append(f"`tessl outdated` returned non-JSON in {repo_id}: {e}")
        return {}, False

    out: dict[str, dict] = {}
    for entry in data.get("outdated") or []:
        cur = (entry.get("current") or {}).get("tile") or {}
        ws = cur.get("workspaceName")
        nm = cur.get("tileName")
        if not (ws and nm):
            continue
        full = f"{ws}/{nm}"
        out[full] = {
            "current": cur.get("version"),
            "latest": (entry.get("latest") or {}).get("version"),
            "update": (entry.get("update") or {}).get("version"),
            "isLatestPinned": entry.get("isLatestPinned"),
        }
    return out, True


def attach_outdated_to_tiles(
    tiles: list[dict],
    outdated_by_repo: dict[str, dict[str, dict]],
    successful_repos: set[str],
) -> None:
    """Attach `outdated` block to each tile that the registry knows about."""
    for t in tiles:
        # Only meaningful for tiles that resolve on the registry
        if not t.get("published_to_registry"):
            continue
        repo = t.get("repo")
        if repo not in successful_repos:
            continue
        info = outdated_by_repo.get(repo, {}).get(t["name"])
        if info:
            t["outdated"] = {**info, "update_available": True}
        else:
            # Tile is registered and we got a successful outdated check
            # without it appearing → installed version is current
            t["outdated"] = {
                "current": t.get("version_installed"),
                "latest": t.get("version_installed"),
                "update": None,
                "update_available": False,
            }


# ── Context-cost via `tessl tile lint` ─────────────────────────────────────


# `tessl tile lint` text output samples:
#   Skills
#     - my-skill: 113 front-loaded, 2k-35.1k on-demand tokens   (multi-skill: range)
#     - my-skill: 93 front-loaded, 477 on-demand tokens          (single-skill: scalar)
#   Total: 568 front-loaded, 6k-53.1k on-demand tokens
#   Total: 393 front-loaded, 477 on-demand, 109 content tokens   (with docs)

_TOK = r"[\d.]+[kKmM]?"  # token count: '113', '2k', '35.1k', '1.2m'
_LINT_SKILL_RE = re.compile(
    rf"^\s*-\s+(?P<name>[^:]+?):\s+(?P<fl>{_TOK})\s+front-loaded,\s+"
    rf"(?P<od_min>{_TOK})(?:-(?P<od_max>{_TOK}))?\s+on-demand"
)
_LINT_TOTAL_RE = re.compile(
    rf"^\s*Total:\s+(?P<fl>{_TOK})\s+front-loaded,\s+"
    rf"(?P<od_min>{_TOK})(?:-(?P<od_max>{_TOK}))?\s+on-demand"
    rf"(?:,\s+(?P<content>{_TOK})\s+content)?"
)


def _parse_token_count(s: str) -> int:
    """Parse '113' / '2k' / '35.1k' / '1.2m' into an integer token count."""
    s = s.strip().lower()
    mult = 1
    if s.endswith("k"):
        mult = 1_000
        s = s[:-1]
    elif s.endswith("m"):
        mult = 1_000_000
        s = s[:-1]
    try:
        return int(float(s) * mult)
    except ValueError:
        return 0


def parse_tile_lint_output(text: str) -> dict:
    """Parse `tessl tile lint` text output into a context-cost dict.

    Handles both single-value ('477 on-demand') and range
    ('6k-53.1k on-demand') forms, plus the optional 'content tokens' suffix
    when the tile bundles docs.
    """
    per_skill: dict[str, dict] = {}
    totals: dict | None = None
    valid = "is valid" in text
    for line in text.splitlines():
        m = _LINT_SKILL_RE.match(line)
        if m:
            od_min = _parse_token_count(m.group("od_min"))
            od_max = _parse_token_count(m.group("od_max")) if m.group("od_max") else od_min
            per_skill[m.group("name").strip()] = {
                "front_loaded": _parse_token_count(m.group("fl")),
                "on_demand_min": od_min,
                "on_demand_max": od_max,
            }
            continue
        m = _LINT_TOTAL_RE.match(line)
        if m:
            od_min = _parse_token_count(m.group("od_min"))
            od_max = _parse_token_count(m.group("od_max")) if m.group("od_max") else od_min
            totals = {
                "front_loaded": _parse_token_count(m.group("fl")),
                "on_demand_min": od_min,
                "on_demand_max": od_max,
                "content_tokens": _parse_token_count(m.group("content")) if m.group("content") else 0,
            }
    return {
        "per_skill": per_skill,
        "front_loaded_total": (totals or {}).get("front_loaded", 0),
        "on_demand_min_total": (totals or {}).get("on_demand_min", 0),
        "on_demand_max_total": (totals or {}).get("on_demand_max", 0),
        "content_tokens_total": (totals or {}).get("content_tokens", 0),
        "lint_valid": valid,
    }


def run_tile_lint(tile_path: Path) -> dict | None:
    """Invoke `tessl tile lint <path>`, parse output. Returns None on failure."""
    try:
        result = subprocess.run(
            ["tessl", "tile", "lint", str(tile_path)],
            capture_output=True, text=True, timeout=30,
        )
    except Exception:
        return None
    # Lint may exit non-zero on warnings; we still try to parse output.
    text = (result.stdout or "") + "\n" + (result.stderr or "")
    parsed = parse_tile_lint_output(text)
    if not parsed["per_skill"] and not parsed["front_loaded_total"]:
        return None
    parsed["exit_code"] = result.returncode
    return parsed


def enrich_tiles_context_cost(tiles: list[dict]) -> None:
    """For every tile with a resolvable manifest path, run `tessl tile lint`."""
    from concurrent.futures import ThreadPoolExecutor

    targets: list[tuple[dict, Path]] = []
    for t in tiles:
        manifest_path = t.get("manifest_path")
        if not manifest_path:
            continue
        repo_path = t.get("_repo_path")
        if not repo_path:
            continue
        tile_dir = (Path(repo_path) / manifest_path).parent
        if not tile_dir.exists():
            continue
        targets.append((t, tile_dir))

    def _do(item):
        t, path = item
        return t, run_tile_lint(path)

    with ThreadPoolExecutor(max_workers=ENRICHMENT_LINT_CONCURRENCY) as pool:
        for t, result in pool.map(_do, targets):
            if result is not None:
                t["context_cost"] = result
            t.pop("_repo_path", None)


# ── Skill-level tier stamping ──────────────────────────────────────────────


def stamp_skill_tiers(skills: list[dict], tiles: list[dict]) -> None:
    """Stamp `tier` on each skill, derived from its owning tile (if any)."""
    tile_by_instance = {
        (t["repo"], t["name"], t.get("manifest_path")): t
        for t in tiles
    }
    tiles_by_name: dict[tuple[str, str], list[dict]] = {}
    for t in tiles:
        tiles_by_name.setdefault((t["repo"], t["name"]), []).append(t)

    for s in skills:
        pkg = s.get("owning_package") or {}
        if pkg.get("kind") == "tessl_tile" and pkg.get("name"):
            tile = tile_by_instance.get((s["repo"], pkg["name"], pkg.get("manifest_path")))
            if tile is None:
                candidates = tiles_by_name.get((s["repo"], pkg["name"]), [])
                tile = candidates[0] if len(candidates) == 1 else None
            s["tile_name"] = pkg["name"]
            if tile:
                s["tile_id"] = tile["tile_id"]
                s["tier"] = tile["tier"]
            else:
                s["tile_id"] = None
                s["tier"] = "published_tile"
        elif pkg.get("kind") == "claude_plugin":
            s["tier"] = "claude_plugin"
        else:
            s["tier"] = "non_tile"


# ── Main ───────────────────────────────────────────────────────────────────


def identify_repos(scan_root: Path) -> list[Path]:
    """Return the list of repo roots to scan.

    - If scan_root is itself a git repo → [scan_root]
    - Else: each immediate child that contains .git/ → those children
    - Else (no git children): [scan_root] treated as a single non-git repo
    """
    if (scan_root / ".git").exists():
        return [scan_root]
    children = []
    try:
        for child in sorted(scan_root.iterdir()):
            if not child.is_dir() or child.name.startswith("."):
                continue
            if (child / ".git").exists():
                children.append(child)
    except Exception:
        pass
    if children:
        return children
    return [scan_root]


def derive_repo_name(repo_path: Path, remote_url: str | None) -> str:
    if remote_url:
        m = re.search(r"[:/]([^/]+?/[^/]+?)(?:\.git)?/?$", remote_url)
        if m:
            return m.group(1)
    return repo_path.name


def parse_args() -> argparse.Namespace:
    p = argparse.ArgumentParser(
        description="Skill discovery — inventories SKILL.md files and supporting content.",
    )
    p.add_argument("--scan-root", dest="scan_root", type=str, default=None,
                   help="Directory to scan (default: $SCAN_ROOT or $(pwd))")
    p.add_argument("--repo", dest="repos", type=str, action="append", default=None,
                   help=(
                       "Repo path to include (repeatable). When set, replaces auto-"
                       "discovery — only the specified repos are scanned. Each can be "
                       "absolute or relative to --scan-root."
                   ))
    p.add_argument("--output", dest="output", type=str, default=None,
                   help="Where to write discovery.json (default: $OUTPUT_PATH or <scan-root>/.skill-insights/discovery.json)")
    p.add_argument("--scan-id", dest="scan_id", type=str, default=None,
                   help="Scan ID (default: $SCAN_ID or discovery-YYYYMMDD-HHMMSS)")
    return p.parse_args()


def main() -> int:
    global SCAN_ROOT, OUTPUT_PATH, SCAN_ID
    args = parse_args()

    scan_root_str = args.scan_root or os.environ.get("SCAN_ROOT") or os.getcwd()
    SCAN_ROOT = Path(scan_root_str).resolve()

    default_output = SCAN_ROOT / ".skill-insights" / "discovery.json"
    output_str = args.output or os.environ.get("OUTPUT_PATH") or str(default_output)
    OUTPUT_PATH = Path(output_str)

    SCAN_ID = args.scan_id or os.environ.get("SCAN_ID") or \
        f"discovery-{datetime.now().strftime('%Y%m%d-%H%M%S')}"

    if not SCAN_ROOT.exists():
        print(f"ERROR: scan root does not exist: {SCAN_ROOT}", file=sys.stderr)
        return 2
    if not SCAN_ROOT.is_dir():
        print(f"ERROR: scan root is not a directory: {SCAN_ROOT}", file=sys.stderr)
        return 2

    print(f"Scan root: {SCAN_ROOT}", file=sys.stderr)
    print(f"Output:    {OUTPUT_PATH}", file=sys.stderr)
    print(f"Scan ID:   {SCAN_ID}", file=sys.stderr)

    # If --repo is provided, use the explicit list (resolves relative paths
    # against scan-root); otherwise fall back to auto-discovery from scan-root.
    if args.repos:
        repos = []
        for r in args.repos:
            p_abs = Path(r)
            if not p_abs.is_absolute():
                p_abs = SCAN_ROOT / r
            p_abs = p_abs.resolve()
            if not p_abs.exists():
                print(f"ERROR: --repo path does not exist: {p_abs}", file=sys.stderr)
                return 2
            if not p_abs.is_dir():
                print(f"ERROR: --repo path is not a directory: {p_abs}", file=sys.stderr)
                return 2
            repos.append(p_abs)
        print(f"Mode:      explicit repos ({len(repos)})", file=sys.stderr)
    else:
        repos = identify_repos(SCAN_ROOT)
        print(
            f"Mode:      auto-discovery ({'single repo' if len(repos) == 1 and repos[0] == SCAN_ROOT else f'{len(repos)} workspace child repos'})",
            file=sys.stderr,
        )
    repo_metas: list[dict] = []
    all_skills: list[dict] = []
    manifests_by_repo: dict[str, list[tuple[Path, dict]]] = {}
    repo_path_by_id: dict[str, Path] = {}
    used_ids: dict[str, int] = {}

    for repo_path in repos:
        basename = repo_path.name
        # Disambiguate duplicate basenames: first dupe gets __1, next __2, etc.
        if basename in used_ids:
            used_ids[basename] += 1
            repo_id = f"{basename}__{used_ids[basename]}"
            warnings_list.append(f"duplicate repo basename '{basename}' disambiguated as '{repo_id}'")
        else:
            used_ids[basename] = 0
            repo_id = basename

        is_git = (repo_path / ".git").exists()
        head_sha = head_branch = remote_url = None
        if is_git:
            head_sha = run_git(repo_path, ["rev-parse", "HEAD"])
            head_branch = run_git(repo_path, ["rev-parse", "--abbrev-ref", "HEAD"])
            remote_url = run_git(repo_path, ["remote", "get-url", "origin"])

        name = derive_repo_name(repo_path, remote_url)
        print(f"  scanning {repo_id} @ {repo_path}", file=sys.stderr)
        meta, skills, manifests = scan_repo(
            repo_path, repo_id, name, is_git,
            head_sha, head_branch, remote_url,
        )
        repo_metas.append(meta)
        all_skills.extend(skills)
        manifests_by_repo[repo_id] = manifests
        repo_path_by_id[repo_id] = repo_path

    # ── Tile aggregation + enrichment ──
    print("  aggregating tiles + classifying tiers...", file=sys.stderr)
    tiles = aggregate_tiles(all_skills, repo_metas, manifests_by_repo)
    # Stamp absolute repo paths on tiles for the lint pass (popped before output)
    for t in tiles:
        t["_repo_path"] = str(repo_path_by_id.get(t["repo"], ""))

    auth_token = read_auth_token()
    if auth_token:
        candidates = sum(1 for t in tiles if t.get("version_installed") and "/" in t["name"])
        if candidates:
            print(f"  querying registry for {candidates} tiles (some may 404)...", file=sys.stderr)
            enrich_tiles_from_registry(tiles, auth_token)
        print("  checking for outdated tiles...", file=sys.stderr)
        outdated_by_repo: dict[str, dict[str, dict]] = {}
        successful_outdated_repos: set[str] = set()
        for repo_meta in repo_metas:
            repo_id = repo_meta["repo_id"]
            repo_path = repo_path_by_id.get(repo_id)
            if repo_path is None:
                continue
            outdated_map, ok = run_tessl_outdated(repo_path, repo_id)
            outdated_by_repo[repo_id] = outdated_map
            if ok:
                successful_outdated_repos.add(repo_id)
        attach_outdated_to_tiles(tiles, outdated_by_repo, successful_outdated_repos)
    else:
        warnings_list.append("no Tessl auth token found — registry/outdated enrichment skipped")

    # Context cost via `tessl tile lint`. Works for any tile (T1/T2/T3).
    print(f"  running `tessl tile lint` on {len(tiles)} tiles for context cost...", file=sys.stderr)
    enrich_tiles_context_cost(tiles)

    # Stamp tier onto each skill (derived from owning tile)
    stamp_skill_tiers(all_skills, tiles)

    for reason, count in skipped_counts.items():
        if count:
            warnings_list.append(f"skipped {count} SKILL.md files inside {reason}")

    by_source = {
        "claude_skill": 0, "cursor_skill": 0, "agents_skill": 0,
        "tessl_tile_skill": 0, "standalone": 0, "claude_plugin_skill": 0,
    }
    by_repo: dict[str, int] = {}
    by_tier: dict[str, int] = {}
    for s in all_skills:
        by_source[s["source_type"]] = by_source.get(s["source_type"], 0) + 1
        by_repo[s["repo"]] = by_repo.get(s["repo"], 0) + 1
        by_tier[s.get("tier", "non_tile")] = by_tier.get(s.get("tier", "non_tile"), 0) + 1

    by_tier_tiles: dict[str, int] = {}
    tiles_by_source = {"tessl_json": 0, "filesystem": 0}
    tiles_published_to_registry = 0
    tiles_authored_only = 0
    tiles_with_security_concern = 0
    tiles_with_update_available = 0
    for t in tiles:
        by_tier_tiles[t["tier"]] = by_tier_tiles.get(t["tier"], 0) + 1
        src = t.get("source")
        if src in tiles_by_source:
            tiles_by_source[src] += 1
        if t.get("published_to_registry") is True:
            tiles_published_to_registry += 1
            sec = (t.get("registry", {}).get("scores") or {}).get("security")
            if sec in ("MEDIUM", "HIGH", "CRITICAL"):
                tiles_with_security_concern += 1
        elif t.get("published_to_registry") is False:
            tiles_authored_only += 1
        if t.get("outdated", {}).get("update_available"):
            tiles_with_update_available += 1

    stats = {
        "total_skills": len(all_skills),
        "total_skill_files": sum(len(s["all_paths"]) for s in all_skills),
        "total_repos": len(repo_metas),
        "total_tiles": len(tiles),
        "by_source_type": by_source,
        "by_repo": by_repo,
        "by_tier": by_tier,
        "by_tile_tier": by_tier_tiles,
        "tiles_by_source": tiles_by_source,
        "tiles_published_to_registry": tiles_published_to_registry,
        "tiles_authored_only": tiles_authored_only,
        "tiles_with_security_concern": tiles_with_security_concern,
        "tiles_with_update_available": tiles_with_update_available,
        "skills_with_supporting_files": sum(1 for s in all_skills if s["supporting_files"]),
        "total_supporting_files": sum(len(s["supporting_files"]) for s in all_skills),
    }

    output = {
        "schema_version": SCHEMA_VERSION,
        "metadata": {
            "scan_id": SCAN_ID,
            "scan_root": str(SCAN_ROOT),
            "scanned_at": datetime.now(timezone.utc).isoformat(),
            "tool_version": TOOL_VERSION,
            "repos": repo_metas,
        },
        "tiles": tiles,
        "skills": all_skills,
        "stats": stats,
        "warnings": warnings_list,
    }

    validate_against_schema(
        output,
        SCHEMA_DIR / "discovery.schema.json",
        role="output",
        source="discover_skills.py",
    )

    OUTPUT_PATH.parent.mkdir(parents=True, exist_ok=True)
    OUTPUT_PATH.write_text(json.dumps(output, indent=2))

    print(
        f"wrote {OUTPUT_PATH}  (skills={len(all_skills)}, "
        f"paths={stats['total_skill_files']}, repos={len(repo_metas)}, "
        f"warnings={len(warnings_list)})",
        file=sys.stderr,
    )
    return 0


if __name__ == "__main__":
    sys.exit(main())

skills

discover-skills

README.md

tile.json