Scan a directory or workspace for SKILL.md files across all agents and repos, capture supporting files (references, scripts, linked docs), dedupe vendored copies, enrich each Tessl tile with registry signals, and emit a canonical JSON inventory validated by JSON Schema. Then run four analytical phases in parallel against the inventory — staleness + git provenance (history, broken refs, contributors), quality (Tessl `skill review`), duplicates (similarity + LLM judgement), registry-search (per-standalone-skill registry suggestions, HTTP only) — and render a self-contained interactive HTML report with a top-of-report health overview, top-issues panel, recently-changed list, and per-tessl.json manifests view.
84
90%
Does it follow best practices?
Impact
97%
1.44xAverage score across 2 eval scenarios
Advisory
Suggest reviewing before use
#!/usr/bin/env python3
"""Skill discovery — emits discovery.json per the skill-insights schema.
Scans the target directory for SKILL.md files, dedupes vendored copies by
content hash, captures supporting files (references/, scripts/, linked docs,
bundled directories), and writes a canonical JSON inventory.
Output conforms to references/schemas/discovery.schema.json (validated at
the IO boundary when `jsonschema` is installed; falls back to no validation
with a single stderr warning otherwise).
Usage:
discover_skills.py [--scan-root PATH] [--output PATH] [--scan-id ID]
discover_skills.py --help
Environment variables (fall-back if flags not passed):
SCAN_ROOT Directory to scan (default: $(pwd))
OUTPUT_PATH Where to write discovery.json (default: $SCAN_ROOT/.skill-insights/discovery.json)
SCAN_ID Opaque ID for this scan (default: discovery-YYYYMMDD-HHMMSS)
No external dependencies are required. PyYAML is used for frontmatter
parsing when available (regex fallback otherwise); jsonschema is used for
IO contract validation when available (skipped with a warning otherwise).
"""
from __future__ import annotations
import argparse
import hashlib
import json
import os
import re
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path
# Resolve the bundled schema directory and load the shared validation helper.
# Tile layout: <tile-root>/skills/discover-skills/scripts/discover_skills.py
# <tile-root>/references/schemas/{_validate.py, *.schema.json}
_SCHEMA_DIR_PATH = Path(__file__).resolve().parent.parent.parent.parent / "references" / "schemas"
if str(_SCHEMA_DIR_PATH) not in sys.path:
sys.path.insert(0, str(_SCHEMA_DIR_PATH))
from _validate import SCHEMA_DIR, validate_against_schema # noqa: E402
TOOL_VERSION = "skill-insights@0.11.0"
SCHEMA_VERSION = "1.4"
# ── Registry / CLI enrichment configuration ────────────────────────────────
TESSL_API_BASE_URL = os.environ.get("TESSL_API_BASE_URL", "https://api.tessl.io")
TESSL_AUTH_FILE = Path(os.environ.get(
"TESSL_AUTH_FILE", str(Path.home() / ".tessl" / "api-credentials.json"),
))
ENRICHMENT_HTTP_TIMEOUT_SEC = 10
ENRICHMENT_HTTP_CONCURRENCY = 8
ENRICHMENT_LINT_CONCURRENCY = 4
# ── Scan-time configuration (populated in main) ────────────────────────────
SCAN_ROOT: Path
OUTPUT_PATH: Path
SCAN_ID: str
# ── Exclusion sets ─────────────────────────────────────────────────────────
# Directory basenames to skip entirely during walk. Handles the 90% case;
# EXCLUDE_PATH_PATTERNS handles more nuanced matches.
EXCLUDE_DIR_NAMES = {
"node_modules", ".git", "dist", "build", "out", ".next", ".vercel",
".turbo", "vendor", "target", "coverage", "__pycache__",
".tessl",
}
# Directory basename prefixes to skip entirely during walk. Tessl-vendored
# skill copies live in directories like `.claude/skills/tessl__<name>/` —
# they're duplicates of skills sourced elsewhere (typically a tile installed
# under `.tessl/`), so excluding them keeps the inventory focused on
# first-party skills authored in the scanned repo.
EXCLUDE_DIR_PREFIXES = ("tessl__",)
# Patterns applied to the repo-relative path (with leading `/`) to decide
# whether a discovered SKILL.md should be skipped. Deliberately restricted
# to test-fixture locations and other paths that are clearly not part of
# the repo's real skill surface.
EXCLUDE_PATH_PATTERNS = [
re.compile(r"/tests?/__fixtures__(/|$)"),
re.compile(r"/tests?/fixtures(/|$)"),
re.compile(r"/test/__fixtures__(/|$)"),
re.compile(r"/__fixtures__(/|$)"),
re.compile(r"/local/repos/"),
re.compile(r"/evals/[^/]+/resources/"),
]
# Sibling directory names we do NOT include in bundled_directories (because
# they're captured separately or are just noise).
BUNDLED_EXCLUDE = {
"references", "reference", "scripts", "node_modules", "__pycache__",
".git", "dist", "build", "target",
}
# File extensions considered "likely code/doc paths" when classifying link
# targets in SKILL.md bodies.
# Note: we intentionally don't filter file references by extension. A path is
# "real" if it's tracked in git history (currently or previously) — the repo
# itself is the source of truth, not a hardcoded allowlist.
# Hard safety cap on the walk recursion depth. Should never be hit under
# reasonable repo layouts; serves as a backstop if cycle detection fails.
MAX_WALK_DEPTH = 60
# Harness classification priority when picking primary_path. Earlier wins.
PRIMARY_PRIORITY = [
".claude/skills/",
".agents/skills/",
".cursor/skills/",
".tessl/tiles/",
"tiles/",
"tile/",
]
# ── Shared state (populated during scan) ───────────────────────────────────
warnings_list: list[str] = []
skipped_counts: dict[str, int] = {}
# ── Helpers ────────────────────────────────────────────────────────────────
def run_git(cwd: Path, args: list[str]) -> str | None:
try:
result = subprocess.run(
["git", *args], cwd=str(cwd),
capture_output=True, text=True, timeout=10,
)
if result.returncode != 0:
return None
return result.stdout.strip() or None
except Exception:
return None
def is_excluded(repo_rel_path: str) -> str | None:
"""Return the reason string if this path should be skipped, else None."""
parts = repo_rel_path.split(os.sep)
for seg in parts:
if seg in EXCLUDE_DIR_NAMES:
return seg + "/"
for prefix in EXCLUDE_DIR_PREFIXES:
if seg.startswith(prefix):
return prefix + "*/"
for pat in EXCLUDE_PATH_PATTERNS:
if pat.search("/" + repo_rel_path):
return pat.pattern
return None
def parse_frontmatter(content: str) -> tuple[dict | None, str | None, str]:
"""Return (parsed_dict_or_None, parse_error_or_None, body_str).
A `None` raw with `None` error means "no frontmatter present" (not an
error). A non-`None` error means "frontmatter present but unparseable".
"""
if not content.startswith("---"):
return None, None, content.lstrip("\n")
lines = content.split("\n")
if lines[0].strip() != "---":
return None, None, content
end_idx = None
for i in range(1, len(lines)):
if lines[i].strip() == "---":
end_idx = i
break
if end_idx is None:
return None, None, content
fm_text = "\n".join(lines[1:end_idx])
body = "\n".join(lines[end_idx + 1:]).lstrip("\n")
# Prefer PyYAML when available. The fallback parser is lossy for edge
# cases (nested structures, anchors) but handles name/description well.
try:
import yaml # type: ignore
parsed = yaml.safe_load(fm_text)
if parsed is None:
return None, None, body
if not isinstance(parsed, dict):
return None, f"frontmatter not a dict: {type(parsed).__name__}", body
return parsed, None, body
except ImportError:
return _fallback_parse_frontmatter(fm_text, body)
except Exception as e:
return None, f"{type(e).__name__}: {e}", body
def _fallback_parse_frontmatter(fm_text: str, body: str) -> tuple[dict | None, str | None, str]:
"""Minimal YAML subset parser. Handles single-line scalars and block
scalars (`>`, `>-`, `|`, `|-`) well enough for Tessl skill frontmatter.
"""
parsed: dict = {}
cur_key: str | None = None
cur_mode: str | None = None # 'folded' | 'literal' | None
buf: list[str] = []
def flush() -> None:
nonlocal cur_key, cur_mode, buf
if cur_key is None:
return
if cur_mode == "folded":
parsed[cur_key] = re.sub(r"\s+", " ", " ".join(buf)).strip()
elif cur_mode == "literal":
parsed[cur_key] = "\n".join(buf).rstrip()
else:
parsed[cur_key] = "\n".join(buf).strip() if buf else parsed.get(cur_key, "")
cur_key = None
cur_mode = None
buf = []
for line in fm_text.split("\n"):
m = re.match(r"^([a-zA-Z0-9_-]+):\s*(.*)$", line)
if m:
flush()
cur_key = m.group(1)
val = m.group(2).strip()
if val in (">", ">-"):
cur_mode = "folded"
parsed[cur_key] = ""
elif val in ("|", "|-"):
cur_mode = "literal"
parsed[cur_key] = ""
elif val == "":
cur_mode = None
else:
parsed[cur_key] = val.strip('"').strip("'")
cur_key = None
cur_mode = None
buf = []
elif cur_key is not None and (line.startswith((" ", "\t")) or line == ""):
if cur_mode in ("folded", "literal"):
buf.append(line.strip())
else:
buf.append(line.strip())
flush()
return parsed, None, body
def word_count(body: str) -> int:
return len(re.findall(r"\S+", body))
def compute_body_preview(body: str, limit: int = 400) -> str:
s = body.strip()
if len(s) <= limit:
return s
cut = s[:limit]
last_space = cut.rfind(" ")
if last_space > limit * 0.5:
cut = cut[:last_space]
return cut
def file_line_count(p: Path) -> int | None:
try:
with p.open("rb") as f:
data = f.read()
if b"\x00" in data:
return None
if not data:
return 0
return data.count(b"\n") + (0 if data.endswith(b"\n") else 1)
except Exception:
return None
def find_owning_package(skill_dir: Path, repo_root: Path) -> dict | None:
"""Walk up from skill_dir towards repo_root looking for a package manifest.
Returns an owning_package dict or None.
"""
cur = skill_dir
try:
if not str(cur).startswith(str(repo_root)):
return None
except Exception:
return None
while True:
tile_json = cur / "tile.json"
if tile_json.exists():
try:
data = json.loads(tile_json.read_text())
return {
"kind": "tessl_tile",
"name": data.get("name") or tile_json.parent.name,
"version": data.get("version"),
"manifest_path": str(tile_json.relative_to(repo_root)),
}
except Exception as e:
warnings_list.append(f"unparseable tile.json at {tile_json.relative_to(repo_root)}: {e}")
for plugin_candidate in (cur / ".claude-plugin" / "plugin.json", cur / "plugin.json"):
if plugin_candidate.exists():
try:
data = json.loads(plugin_candidate.read_text())
return {
"kind": "claude_plugin",
"name": data.get("name") or plugin_candidate.parent.name,
"version": data.get("version"),
"manifest_path": str(plugin_candidate.relative_to(repo_root)),
}
except Exception as e:
warnings_list.append(
f"unparseable plugin.json at {plugin_candidate.relative_to(repo_root)}: {e}"
)
if cur == repo_root:
return None
parent = cur.parent
if parent == cur:
return None
cur = parent
# ── Symlink-aware walker ───────────────────────────────────────────────────
def walk_collecting_skills(
start_dir: Path,
repo_root: Path,
collected: list[Path],
skipped: dict[str, int],
chain_realpaths: set[str] | None = None,
depth: int = 0,
) -> None:
"""Recursively walk start_dir, following symlinks, collecting SKILL.md paths.
Cycle detection uses per-chain realpath tracking (only prevents recursion
INTO an ancestor, so a vendored symlink pointing to a dir we've visited
elsewhere in the tree is still followed).
Does NOT use os.walk(followlinks=True) with a global visited set — that
would miss vendored symlink paths, which are exactly the paths we want
to record for Tessl-style installs.
"""
if depth > MAX_WALK_DEPTH:
warnings_list.append(f"walk depth cap hit at {start_dir.relative_to(repo_root)}")
return
try:
real = os.path.realpath(start_dir)
except OSError:
return
if chain_realpaths is None:
chain_realpaths = set()
if real in chain_realpaths:
# Ancestor cycle — symlink points to a directory we're currently inside.
return
chain_realpaths = chain_realpaths | {real}
try:
entries = list(os.scandir(start_dir))
except (PermissionError, OSError):
return
for entry in entries:
name = entry.name
full = Path(entry.path)
try:
if entry.is_file(follow_symlinks=True):
if name == "SKILL.md":
try:
rel = str(full.relative_to(repo_root))
except ValueError:
continue
reason = is_excluded(rel)
if reason:
skipped[reason] = skipped.get(reason, 0) + 1
continue
collected.append(full)
continue
if entry.is_dir(follow_symlinks=True):
if name in EXCLUDE_DIR_NAMES:
skipped[name + "/"] = skipped.get(name + "/", 0) + 0 # noted; no skipped-file count here
continue
if any(name.startswith(p) for p in EXCLUDE_DIR_PREFIXES):
continue
walk_collecting_skills(
full, repo_root, collected, skipped,
chain_realpaths=chain_realpaths, depth=depth + 1,
)
except OSError:
# broken symlink or permission denied — ignore
continue
def walk_dir_files(d: Path) -> list[Path]:
"""Enumerate files under `d` (recursive). Does NOT follow symlinks into
parent chains; only used for reading references/, scripts/, etc. where
cycles are not expected.
"""
out: list[Path] = []
if not d.exists() or not d.is_dir():
return out
for root, dirs, files in os.walk(d, followlinks=False):
dirs[:] = [dd for dd in dirs if dd not in {".git", "node_modules", "__pycache__"}]
for f in files:
out.append(Path(root) / f)
return out
# ── Link extraction ────────────────────────────────────────────────────────
MD_LINK_RE = re.compile(r"\[([^\]]*)\]\(([^)]+)\)")
INLINE_CODE_RE = re.compile(r"`([^`\n]+)`")
def looks_like_path_candidate(target: str) -> bool:
"""Cheap pre-filter: is this *plausibly* a path-shaped string?
Used to exclude obvious non-paths (URLs, code symbols, placeholders) before
the more expensive git-index lookup. We don't filter on file extension —
git history is the source of truth.
"""
if not target:
return False
if target.startswith(("http://", "https://", "mailto:", "tel:", "ftp:", "file://", "//", "#")):
return False
if "<" in target or ">" in target:
return False
# Must contain a slash (single-segment things are almost always code symbols
# like `getAuth()`, never repo paths) AND must not contain whitespace.
if "/" not in target or any(c.isspace() for c in target):
return False
return True
def extract_body_links(body: str) -> list[tuple[str, str]]:
"""Yield (kind, target) tuples for path-like references in the SKILL.md body.
Catches three patterns, all outside fenced/indented code blocks:
- Markdown links: [text](path/to/file)
- @imports: @path/to/file (line-leading)
- Inline backticks: `path/to/file` (the most common in-prose form)
Targets are emitted unfiltered by extension — the caller validates each
against git history and existence so the repo itself decides what's real.
"""
results: list[tuple[str, str]] = []
in_fence = False
for line in body.split("\n"):
stripped = line.strip()
if stripped.startswith("```") or stripped.startswith("~~~"):
in_fence = not in_fence
continue
if in_fence:
continue
if line.startswith(" ") or line.startswith("\t"):
continue
# Markdown links: [text](target)
for m in MD_LINK_RE.finditer(line):
target = m.group(2).strip().split(" ")[0].strip()
if looks_like_path_candidate(target):
results.append(("markdown_link", target))
# Inline backtick paths — strip line-number suffixes like `path:42`
for m in INLINE_CODE_RE.finditer(line):
text = m.group(1).strip()
# Strip trailing punctuation glued to the closing backtick
text = text.rstrip(",.;:)]}\"'")
# Strip line-number / column suffixes (`foo.ts:12`, `foo.ts:12:5`)
text = re.sub(r":\d+(:\d+)?$", "", text)
if looks_like_path_candidate(text):
results.append(("inline_code", text))
# @imports — only at line start
lstripped = line.lstrip()
if lstripped.startswith("@") and len(lstripped) > 1:
tok = lstripped[1:].split()[0].rstrip(",.;:)]}")
# Skip npm-style scoped packages like @scope/package (1 slash, no dot)
if tok and "/" in tok and not (tok.count("/") == 1 and "." not in tok):
if looks_like_path_candidate(tok):
results.append(("at_import", tok))
return results
# ── Per-repo git-history index for reference validation ────────────────────
def build_repo_path_indices(repo_root: Path) -> tuple[set[str], set[str]]:
"""Return (currently_tracked, ever_tracked) sets of repo-relative paths.
`currently_tracked` — files in HEAD's git tree (what's in the repo now).
`ever_tracked` — files that have ever appeared in any commit on any
ref. Used to detect references to files that were
deleted or renamed since the skill was authored.
Falls back to a filesystem walk if git isn't available — in that case
`ever_tracked` collapses to `currently_tracked` (no broken-link signal).
"""
if not (repo_root / ".git").exists():
# Non-git repo: filesystem-walk fallback
currently: set[str] = set()
for root, _dirs, files in os.walk(repo_root, followlinks=False):
for f in files:
p = Path(root) / f
try:
currently.add(str(p.relative_to(repo_root)))
except ValueError:
pass
return currently, currently
currently_tracked: set[str] = set()
try:
result = subprocess.run(
["git", "ls-tree", "-r", "--name-only", "HEAD"],
cwd=str(repo_root), capture_output=True, text=True, timeout=30,
)
if result.returncode == 0:
currently_tracked = {p for p in result.stdout.split("\n") if p}
except Exception as e:
warnings_list.append(f"git ls-tree failed for {repo_root.name}: {e}")
ever_tracked: set[str] = set(currently_tracked)
try:
# All paths that ever appeared in any commit on any ref — captures
# files that were deleted or renamed.
result = subprocess.run(
["git", "log", "--all", "--pretty=format:", "--name-only", "--no-renames"],
cwd=str(repo_root), capture_output=True, text=True, timeout=120,
)
if result.returncode == 0:
ever_tracked.update(p for p in result.stdout.split("\n") if p)
except Exception as e:
warnings_list.append(f"git log --name-only failed for {repo_root.name}: {e}")
return currently_tracked, ever_tracked
def normalise_link_target(target: str, kind: str, skill_dir: Path, repo_root: Path) -> str | None:
"""Return a repo-root-relative path string for a candidate link target.
Resolution semantics differ by kind:
- markdown_link / at_import: relative to skill_dir (standard markdown
link semantics) when not absolute, falling back to repo_root if that
doesn't make sense.
- inline_code: usually a fully-qualified repo path, so resolve from
repo_root first; fall back to skill_dir if that misses.
"""
if not target or target.startswith("/"):
return None
target = target.lstrip("./")
if not target:
return None
candidates: list[Path] = []
if kind == "inline_code":
candidates.append(repo_root / target)
candidates.append(skill_dir / target)
else:
candidates.append(skill_dir / target)
candidates.append(repo_root / target)
for cand in candidates:
try:
rel = cand.resolve().relative_to(repo_root)
except (ValueError, OSError):
continue
return str(rel)
return None
# ── Classification ─────────────────────────────────────────────────────────
def classify_harness(rel_path: str) -> str:
lp = "/" + rel_path.lower().replace("\\", "/")
if "/.claude/skills/" in lp:
return "claude"
if "/.agents/skills/" in lp:
return "agents"
if "/.cursor/skills/" in lp:
return "cursor"
if "/.github/skills/" in lp or "/.vscode/skills/" in lp:
return "agents"
if "/.tessl/tiles/" in lp:
return "tessl_tile"
if lp.startswith("/tiles/") or lp.startswith("/tile/") or "/tiles/" in lp or "/tile/" in lp:
return "tessl_tile"
return "standalone"
def primary_rank(rel_path: str) -> tuple[int, str]:
lp = "/" + rel_path.lower().replace("\\", "/")
for i, tag in enumerate(PRIMARY_PRIORITY):
if f"/{tag}" in lp:
return (i, rel_path)
return (999, rel_path)
def source_type_for(skill: dict) -> str:
"""Per the discovery schema:
1. owning_package.kind == tessl_tile OR any path is in .tessl/tiles/ OR authored tile layout → tessl_tile_skill
2. owning_package.kind == claude_plugin → claude_plugin_skill
3-5. based on primary_path's harness
6. standalone
"""
op = skill.get("owning_package") or {}
paths_lower = ["/" + p.lower() for p in skill["all_paths"]]
if op.get("kind") == "tessl_tile":
return "tessl_tile_skill"
if any("/.tessl/tiles/" in p for p in paths_lower):
return "tessl_tile_skill"
for p in paths_lower:
# authored-tile layout: /tiles/<anything>/SKILL.md or /tile/<anything>/SKILL.md
if re.search(r"(^|/)(tile|tiles)/.*/skill\.md$", p):
return "tessl_tile_skill"
if op.get("kind") == "claude_plugin":
return "claude_plugin_skill"
pp_lower = "/" + skill["primary_path"].lower()
if "/.claude/skills/" in pp_lower:
return "claude_skill"
if "/.agents/skills/" in pp_lower or "/.github/skills/" in pp_lower or "/.vscode/skills/" in pp_lower:
return "agents_skill"
if "/.cursor/skills/" in pp_lower:
return "cursor_skill"
return "standalone"
def make_tile_instance_id(
repo: str,
tile_name: str,
manifest_path: str | None,
version: str | None,
) -> str:
"""Stable ID for one tile materialisation inside one repo.
A repo can contain the same tile name more than once: for example a local
authored source under `tile/` and an installed copy under `.tessl/tiles/`.
Downstream phases need to keep those instances separate.
"""
instance_source = manifest_path or f"version:{version or 'unknown'}"
suffix = re.sub(r"[^A-Za-z0-9._-]+", "__", instance_source).strip("_")
return f"{repo}::{tile_name}::{suffix or 'unknown'}"
# ── tessl.json manifest resolution ─────────────────────────────────────────
# Dependency keys with these prefixes are package-equivalents (npm / pypi /
# similar) — not skill-bearing. Skip when resolving manifest deps to skills.
NON_SKILL_DEP_PREFIXES = ("tessl/npm-", "tessl/pypi-")
def find_tessl_manifests(repo_root: Path) -> list[tuple[Path, dict]]:
"""Walk repo for tessl.json files. Respects EXCLUDE_DIR_NAMES / prefixes."""
manifests: list[tuple[Path, dict]] = []
for root, dirs, files in os.walk(repo_root, followlinks=False):
dirs[:] = [
d for d in dirs
if d not in EXCLUDE_DIR_NAMES
and not any(d.startswith(p) for p in EXCLUDE_DIR_PREFIXES)
]
if "tessl.json" not in files:
continue
path = Path(root) / "tessl.json"
try:
parsed = json.loads(path.read_text())
except Exception as e:
warnings_list.append(
f"unparseable tessl.json at {path.relative_to(repo_root)}: {e}"
)
continue
if isinstance(parsed, dict):
manifests.append((path, parsed))
return manifests
def resolve_tile_dir(
manifest_path: Path, dep_key: str, dep_info: dict, repo_root: Path,
) -> Path | None:
"""Resolve a tessl.json dependency to its tile directory.
`source: "file:..."` → relative to the manifest's parent dir.
Otherwise (version-pinned) → `<repo_root>/.tessl/tiles/<dep_key>/`.
"""
src = dep_info.get("source")
if isinstance(src, str) and src.startswith("file:"):
rel = src[len("file:"):]
return (manifest_path.parent / rel).resolve()
return (repo_root / ".tessl" / "tiles" / dep_key).resolve()
def collect_manifest_skill_paths(
manifests: list[tuple[Path, dict]], repo_root: Path, repo_id: str,
) -> tuple[dict[Path, list[dict]], list[dict]]:
"""For each manifest, resolve every skill-bearing dep to its SKILL.md paths.
Returns:
- declared_by: { absolute SKILL.md path → [{manifest_path, dep_key, version}, ...] }
- manifest_summaries: per-manifest stats for metadata.
"""
declared_by: dict[Path, list[dict]] = {}
summaries: list[dict] = []
for manifest_path, parsed in manifests:
manifest_rel = str(manifest_path.relative_to(repo_root))
deps = parsed.get("dependencies") or {}
if not isinstance(deps, dict):
deps = {}
resolved = unresolved = skipped = 0
for dep_key, dep_info in deps.items():
if not isinstance(dep_info, dict):
continue
if any(dep_key.startswith(p) for p in NON_SKILL_DEP_PREFIXES):
skipped += 1
continue
tile_dir = resolve_tile_dir(manifest_path, dep_key, dep_info, repo_root)
tile_json_path = tile_dir / "tile.json" if tile_dir else None
if not tile_json_path or not tile_json_path.exists():
warnings_list.append(
f"tile '{dep_key}' declared in {manifest_rel} but no tile.json found"
)
unresolved += 1
continue
try:
tile_data = json.loads(tile_json_path.read_text())
except Exception as e:
warnings_list.append(
f"unparseable tile.json for '{dep_key}' (declared in {manifest_rel}): {e}"
)
unresolved += 1
continue
tile_skills = tile_data.get("skills") or {}
if not isinstance(tile_skills, dict):
tile_skills = {}
version = dep_info.get("version") or tile_data.get("version")
for skill_key, skill_info in tile_skills.items():
if not isinstance(skill_info, dict):
continue
skill_path_rel = skill_info.get("path")
if not skill_path_rel:
continue
skill_md = (tile_dir / skill_path_rel).resolve()
if not skill_md.exists():
warnings_list.append(
f"skill '{skill_key}' of tile '{dep_key}' (declared in {manifest_rel}) "
f"not found at expected path"
)
continue
# Only count skills inside the repo — out-of-repo declarations
# exist but can't be inventoried by this scan.
try:
skill_md.relative_to(repo_root)
except ValueError:
warnings_list.append(
f"skill '{skill_key}' of tile '{dep_key}' resolves outside repo {repo_id} — skipped"
)
continue
declared_by.setdefault(skill_md, []).append({
"manifest_path": manifest_rel,
"dep_key": dep_key,
"version": version,
})
resolved += 1
summaries.append({
"path": manifest_rel,
"name": parsed.get("name"),
"dependencies_total": len(deps),
"dependencies_resolved": resolved,
"dependencies_unresolved": unresolved,
"dependencies_skipped_non_skill": skipped,
})
return declared_by, summaries
# ── Repo-level scan ────────────────────────────────────────────────────────
def scan_repo(
repo_path: Path, repo_id: str, name: str, is_git: bool,
head_sha: str | None, head_branch: str | None, remote_url: str | None,
) -> tuple[dict, list[dict], list[tuple[Path, dict]]]:
"""Return (repo_metadata, [skills], parsed_manifests). Dedupes vendored copies by content hash.
parsed_manifests is the raw `find_tessl_manifests` output for this repo —
the tile-enrichment pass uses it to look up `dependencies.<key>.source`
when classifying tiles into published / github / authored tiers.
"""
repo_root = repo_path
repo_meta = {
"repo_id": repo_id,
"name": name,
"path": str(repo_root),
"is_git_repo": is_git,
"head_sha": head_sha,
"head_branch": head_branch,
"remote_url": remote_url,
}
found: list[Path] = []
repo_skipped: dict[str, int] = {}
walk_collecting_skills(repo_root, repo_root, found, repo_skipped)
for reason, count in repo_skipped.items():
if count > 0:
skipped_counts[reason] = skipped_counts.get(reason, 0) + count
# Build path indices once per repo. Used downstream to validate every
# path-like reference in skill bodies against git history.
currently_tracked, ever_tracked = build_repo_path_indices(repo_root)
# Resolve tessl.json manifests → declared SKILL.md paths. We use this to
# populate per-skill `declared_in` and to power tile aggregation. We do NOT
# re-add SKILL.md files that resolve inside `.tessl/` to the inventory —
# that directory is the Tessl CLI's installed-tile cache, surfaced via
# `tiles[]` (with `source: "tessl_json"`) rather than `skills[]`.
manifests = find_tessl_manifests(repo_root)
declared_by, manifest_summaries = collect_manifest_skill_paths(
manifests, repo_root, repo_id,
)
repo_meta["tessl_manifests"] = manifest_summaries
found_set = {p.resolve() for p in found}
for skill_md in declared_by:
try:
rel = skill_md.relative_to(repo_root).as_posix()
except ValueError:
continue
if rel.startswith(".tessl/"):
continue
if skill_md not in found_set and skill_md.exists():
found.append(skill_md)
found_set.add(skill_md)
per_file_records: list[dict] = []
for skill_md in found:
rel_path = str(skill_md.relative_to(repo_root))
try:
raw_bytes = skill_md.read_bytes()
except Exception as e:
warnings_list.append(f"cannot read {repo_id}/{rel_path}: {e}")
continue
content_hash = "sha256:" + hashlib.sha256(raw_bytes).hexdigest()
try:
content_str = raw_bytes.decode("utf-8")
except UnicodeDecodeError:
content_str = raw_bytes.decode("utf-8", errors="replace")
fm_raw, fm_err, body = parse_frontmatter(content_str)
if fm_err:
warnings_list.append(f"unparseable frontmatter in {repo_id}/{rel_path}: {fm_err}")
skill_dir = skill_md.parent
name_val = ""
desc_val = ""
if fm_raw and isinstance(fm_raw, dict):
name_val = str(fm_raw.get("name") or "").strip()
d = fm_raw.get("description")
if d is not None:
desc_val = re.sub(r"\s+", " ", str(d)).strip()
if not name_val:
name_val = skill_dir.name
line_count = content_str.count("\n") + (0 if content_str.endswith("\n") or not content_str else 1)
body_wc = word_count(body)
body_preview = compute_body_preview(body)
has_refs = (skill_dir / "references").is_dir() or (skill_dir / "reference").is_dir()
has_scripts = (skill_dir / "scripts").is_dir()
owning_package = find_owning_package(skill_dir, repo_root)
# ── Supporting files ──
supporting: list[dict] = []
seen_paths: set[str] = set()
def add_supporting(path_obj: Path, kind: str, discovered_via: str) -> None:
try:
rp = str(path_obj.relative_to(repo_root))
except ValueError:
return
if rp in seen_paths:
return
seen_paths.add(rp)
try:
sz = path_obj.stat().st_size
except OSError:
sz = 0
supporting.append({
"path": rp,
"kind": kind,
"discovered_via": discovered_via,
"line_count": file_line_count(path_obj),
"size_bytes": sz,
})
for refname in ("references", "reference"):
for f in walk_dir_files(skill_dir / refname):
add_supporting(f, "reference", "references_dir")
for f in walk_dir_files(skill_dir / "scripts"):
add_supporting(f, "script", "scripts_dir")
# bundled_directories — sibling dirs not captured above
bundled: list[dict] = []
try:
for child in sorted(skill_dir.iterdir()):
if not child.is_dir():
continue
if child.name.startswith(".") or child.name in BUNDLED_EXCLUDE:
continue
files_in_dir = walk_dir_files(child)
bundled.append({
"path": str(child.relative_to(repo_root)),
"file_count": len(files_in_dir),
})
except Exception:
pass
# Path-like references in body (markdown links, @imports, inline
# backticks). Validation is git-history backed: a candidate counts as
# a real reference iff it's currently tracked OR was ever tracked.
# Anything else (code symbols, package names, external paths, etc.)
# is silently ignored — no extension allowlist, no false positives.
seen_targets: set[str] = set()
for kind, target in extract_body_links(body):
if "#" in target:
target = target.split("#", 1)[0]
if "?" in target:
target = target.split("?", 1)[0]
rel = normalise_link_target(target, kind, skill_dir, repo_root)
if not rel or rel in seen_targets:
continue
seen_targets.add(rel)
in_current = rel in currently_tracked
in_history = rel in ever_tracked
if in_current:
# Valid reference — add to supporting files.
resolved = repo_root / rel
try:
if resolved.is_file():
sf_kind = "nested_skill" if resolved.name == "SKILL.md" else "linked_doc"
sf_via = {
"markdown_link": "markdown_link",
"at_import": "at_import",
"inline_code": "inline_code",
}.get(kind, "markdown_link")
add_supporting(resolved, sf_kind, sf_via)
except Exception:
pass
elif in_history:
# Was tracked, no longer present → stale reference.
warnings_list.append(f"broken link in {repo_id}/{rel_path}: {rel}")
# else: not a repo file, ignore.
per_file_records.append({
"_rel_path": rel_path,
"_content_hash": content_hash,
"_harness": classify_harness(rel_path),
"_name": name_val,
"_description": desc_val,
"_line_count": line_count,
"_word_count": body_wc,
"_body_preview": body_preview,
"_has_refs": has_refs,
"_has_scripts": has_scripts,
"_owning_package": owning_package,
"_supporting_files": supporting,
"_bundled_directories": bundled,
"_frontmatter_raw": fm_raw,
"_frontmatter_err": fm_err,
"_declared_in": declared_by.get(skill_md.resolve(), []),
})
# ── Dedup by content_hash within this repo ──
by_hash: dict[str, list[dict]] = {}
for r in per_file_records:
by_hash.setdefault(r["_content_hash"], []).append(r)
dedup_skills: list[dict] = []
for ch, group in by_hash.items():
sorted_group = sorted(group, key=lambda x: primary_rank(x["_rel_path"]))
primary = sorted_group[0]
all_paths = sorted([g["_rel_path"] for g in group], key=primary_rank)
harnesses = sorted({g["_harness"] for g in group})
pp_slug = primary["_rel_path"].replace("/", "__")
if pp_slug.endswith("__SKILL.md"):
pp_slug = pp_slug[:-len("__SKILL.md")]
elif pp_slug.endswith("SKILL.md"):
pp_slug = pp_slug[:-len("SKILL.md")].rstrip("_")
# Merge declared_in across all paths in the dedup group, deduping by
# (manifest_path, dep_key) so the same declaration isn't listed twice.
merged_declared: list[dict] = []
seen_decls: set[tuple[str, str]] = set()
for g in sorted_group:
for d in g["_declared_in"]:
key = (d["manifest_path"], d["dep_key"])
if key in seen_decls:
continue
seen_decls.add(key)
merged_declared.append(d)
skill_obj = {
"skill_id": f"{repo_id}::{pp_slug}",
"name": primary["_name"],
"description": primary["_description"],
"repo": repo_id,
"primary_path": primary["_rel_path"],
"all_paths": all_paths,
"agent_harnesses": harnesses,
"source_type": "", # set below
"content_hash": ch,
"owning_package": primary["_owning_package"],
"declared_in": merged_declared,
"supporting_files": primary["_supporting_files"],
"bundled_directories": primary["_bundled_directories"],
"frontmatter": {
"raw": primary["_frontmatter_raw"],
"parse_error": primary["_frontmatter_err"],
},
"content": {
"line_count": primary["_line_count"],
"word_count": primary["_word_count"],
"body_preview": primary["_body_preview"],
"has_references_dir": primary["_has_refs"],
"has_scripts_dir": primary["_has_scripts"],
},
}
skill_obj["source_type"] = source_type_for(skill_obj)
dedup_skills.append(skill_obj)
return repo_meta, dedup_skills, manifests
# ── Tile aggregation + tier classification ────────────────────────────────
def aggregate_tiles(
skills: list[dict],
repo_metas: list[dict],
manifests_by_repo: dict[str, list[tuple[Path, dict]]],
) -> list[dict]:
"""Build the tiles[] inventory.
Two sources contribute, with `tessl_json` superseding `filesystem` on
collision:
- **filesystem**: skills with `owning_package.kind == "tessl_tile"`
contribute a tile entry derived from their owning `tile.json`. This is
how authored tiles under `tiles/<name>/` get surfaced.
- **tessl_json**: every dependency in a `tessl.json` manifest that
resolves to a real `tile.json` contributes a tile entry. This is the
only source for tiles installed under `.tessl/tiles/...` since those
SKILL.md files are intentionally excluded from `skills[]`.
Loose skills (T4 / non_tile) don't appear here — they stay in `skills[]`
with `tier = "non_tile"`.
Tier rules (see `classify_tile_tier`):
- `authored_tile` — declared in tessl.json with `source: "file:..."`,
OR the owning tile.json lives directly under
`<repo>/tiles/...` / `<repo>/tile/` (authored
layout) and not under `.tessl/`.
- `github_tile` — declared with `source: "https://github.com/..."`,
`source: "github:..."`, or any non-`file:` /
non-registry source.
- `published_tile` — installed from the registry (no `source` field, or
source is the registry URL). Fallback for tiles
present in `.tessl/tiles/` without an explicit
file/github source.
"""
repo_lookup = {r["repo_id"]: r for r in repo_metas}
repo_path_by_id = {r["repo_id"]: Path(r["path"]) for r in repo_metas}
tiles: dict[tuple[str, str, str | None], dict] = {}
# ── Step A: tiles from authored skills (source: filesystem) ──
for s in skills:
pkg = s.get("owning_package") or {}
if pkg.get("kind") != "tessl_tile":
continue
tile_name = pkg.get("name")
if not tile_name:
continue
repo = s["repo"]
manifest_path = pkg.get("manifest_path")
version = pkg.get("version")
key = (repo, tile_name, manifest_path)
if key not in tiles:
tiles[key] = {
"tile_id": make_tile_instance_id(repo, tile_name, manifest_path, version),
"name": tile_name,
"repo": repo,
"tier": None, # filled below
"source": "filesystem",
"version_installed": version,
"manifest_path": manifest_path,
"skill_ids": [],
}
tiles[key]["skill_ids"].append(s["skill_id"])
# ── Step B: tiles from tessl.json declarations (source: tessl_json) ──
# Every declared dep that resolves to a real tile.json gets a tile entry.
# If the same tile already exists from Step A, this overrides its source
# to tessl_json (per "tessl.json supersedes filesystem").
for repo, manifests in manifests_by_repo.items():
repo_path = repo_path_by_id.get(repo)
if repo_path is None:
continue
for manifest_path_p, parsed in manifests:
deps = parsed.get("dependencies") or {}
if not isinstance(deps, dict):
continue
for dep_key, dep_info in deps.items():
if not isinstance(dep_info, dict):
continue
if any(dep_key.startswith(p) for p in NON_SKILL_DEP_PREFIXES):
continue
tile_dir = resolve_tile_dir(manifest_path_p, dep_key, dep_info, repo_path)
if tile_dir is None:
continue
tile_json_path = tile_dir / "tile.json"
if not tile_json_path.exists():
continue
try:
tile_data = json.loads(tile_json_path.read_text())
except Exception:
continue
tile_name = tile_data.get("name") or dep_key
version = dep_info.get("version") or tile_data.get("version")
try:
tile_manifest_rel = str(tile_json_path.relative_to(repo_path))
except ValueError:
continue
key = (repo, tile_name, tile_manifest_rel)
if key in tiles:
tiles[key]["source"] = "tessl_json"
if tiles[key].get("version_installed") is None:
tiles[key]["version_installed"] = version
else:
tiles[key] = {
"tile_id": make_tile_instance_id(repo, tile_name, tile_manifest_rel, version),
"name": tile_name,
"repo": repo,
"tier": None,
"source": "tessl_json",
"version_installed": version,
"manifest_path": tile_manifest_rel,
"skill_ids": [],
}
# Classify tier per tile by inspecting tessl.json `source` fields and
# the owning tile.json's location.
for (repo, _tile_name, _manifest_path), tile in tiles.items():
tile["tier"] = classify_tile_tier(tile, manifests_by_repo.get(repo, []), repo_lookup.get(repo))
return sorted(tiles.values(), key=lambda t: (t["repo"], t["name"], t["tile_id"]))
def classify_tile_tier(
tile: dict,
manifests: list[tuple[Path, dict]],
repo_meta: dict | None,
) -> str:
"""Decide tier for one tile based on tessl.json source + manifest_path."""
repo_root = Path(repo_meta["path"]) if repo_meta and repo_meta.get("path") else None
# Look up source field across all manifests in this repo
source_str: str | None = None
for _, parsed in manifests:
deps = parsed.get("dependencies") or {}
if not isinstance(deps, dict):
continue
dep = deps.get(tile["name"])
if isinstance(dep, dict):
src = dep.get("source")
if isinstance(src, str):
source_str = src
break
if source_str:
if source_str.startswith("file:"):
return "authored_tile"
if source_str.startswith("https://github.com/") or source_str.startswith("github:"):
return "github_tile"
# Any other source string (e.g. registry URL) — treat as published
return "published_tile"
# No source declared in any tessl.json — either an orphan install, an
# authored-only tile, or a tile.json sitting in some research/ corner.
# `.tessl/tiles/...` paths are produced by `tessl install` from a
# registry source, so we treat them as published. Everything else is
# authored — registry-published tiles always have a tessl.json
# declaration *somewhere*, so absence implies the tile is local-only.
manifest_path = tile.get("manifest_path") or ""
if manifest_path.startswith(".tessl/"):
return "published_tile"
return "authored_tile"
# ── Registry enrichment ────────────────────────────────────────────────────
def read_auth_token() -> str | None:
"""Read access token from ~/.tessl/api-credentials.json. Returns None if unavailable."""
if not TESSL_AUTH_FILE.exists():
return None
try:
data = json.loads(TESSL_AUTH_FILE.read_text())
token = data.get("accessToken")
return token if isinstance(token, str) and token else None
except Exception:
return None
def fetch_tile_version_from_registry(
workspace: str, name: str, version: str, token: str,
) -> dict | None:
"""Fetch tile version data from /v1/tiles/{ws}/{name}/versions/{ver}.
Returns the parsed `attributes` block (with scores), or a sentinel
{"_error": "...", "_status": <int>} on failure.
"""
import urllib.request
import urllib.error
url = f"{TESSL_API_BASE_URL}/v1/tiles/{workspace}/{name}/versions/{version}"
req = urllib.request.Request(
url, headers={"Authorization": f"Bearer {token}", "Accept": "application/json"},
)
try:
with urllib.request.urlopen(req, timeout=ENRICHMENT_HTTP_TIMEOUT_SEC) as resp:
body = json.loads(resp.read().decode("utf-8"))
except urllib.error.HTTPError as e:
return {"_error": f"{e.code} {e.reason}", "_status": e.code}
except Exception as e:
return {"_error": str(e), "_status": None}
attrs = (body.get("data") or {}).get("attributes") or {}
# Project just the fields we want into a clean shape
scores = attrs.get("scores") or {}
return {
"version": attrs.get("version"),
"fingerprint": attrs.get("fingerprint"),
"summary": attrs.get("summary"),
"moderationPassed": attrs.get("moderationPassed"),
"moderationError": attrs.get("moderationError"),
"archived": attrs.get("archived"),
"archivedReason": attrs.get("archivedReason"),
"archivedAt": attrs.get("archivedAt"),
"hasDocs": attrs.get("hasDocs"),
"hasSkills": attrs.get("hasSkills"),
"hasSteering": attrs.get("hasSteering"),
"evalScore": attrs.get("evalScore"),
"evalBaselineScore": attrs.get("evalBaselineScore"),
"evalImprovement": attrs.get("evalImprovement"),
"evalImprovementMultiplier": attrs.get("evalImprovementMultiplier"),
"scores": {
"aggregate": scores.get("aggregate"),
"quality": scores.get("quality"),
"impact": scores.get("impact"),
"security": scores.get("security"),
"evalAvg": scores.get("evalAvg"),
"evalCount": scores.get("evalCount"),
"lastScoredAt": scores.get("lastScoredAt"),
"validationErrors": scores.get("validationErrors") or [],
},
}
def enrich_tiles_from_registry(
tiles: list[dict], token: str,
) -> None:
"""Attach a `registry` block to every tile whose name resolves on the registry.
We try every tile regardless of tier — locally-authored tiles (`file:`
source) are often *also* published, and a github-source tile may have a
registry shadow too. The API returns 404 for tiles that aren't published;
that's reflected as `registry: { _error: "404 ...", _status: 404 }`.
Each tile then gains an explicit `published_to_registry: bool`.
"""
from concurrent.futures import ThreadPoolExecutor
targets = [t for t in tiles if t.get("version_installed") and "/" in t["name"]]
def _do(t: dict) -> tuple[dict, dict | None]:
ws, nm = t["name"].split("/", 1)
return t, fetch_tile_version_from_registry(ws, nm, t["version_installed"], token)
with ThreadPoolExecutor(max_workers=ENRICHMENT_HTTP_CONCURRENCY) as pool:
for t, result in pool.map(_do, targets):
if result is None:
t["published_to_registry"] = None
continue
if result.get("_error"):
t["registry"] = result
t["published_to_registry"] = False
else:
t["registry"] = result
t["published_to_registry"] = True
# ── Outdated check ─────────────────────────────────────────────────────────
def run_tessl_outdated(repo_path: Path, repo_id: str) -> tuple[dict[str, dict], bool]:
"""Run `tessl outdated --json` for one repo.
Returns (map from "<ws>/<name>" → outdated record, command_succeeded).
The success flag matters because an empty outdated list means "all current"
only when the command itself succeeded.
"""
try:
result = subprocess.run(
["tessl", "outdated", "--json"],
cwd=str(repo_path),
capture_output=True, text=True, timeout=30,
)
except Exception as e:
warnings_list.append(f"`tessl outdated` failed in {repo_id}: {e}")
return {}, False
if result.returncode != 0:
warnings_list.append(
f"`tessl outdated` exit {result.returncode} in {repo_id}: "
f"{result.stderr.strip()[:200]}"
)
return {}, False
try:
data = json.loads(result.stdout)
except Exception as e:
warnings_list.append(f"`tessl outdated` returned non-JSON in {repo_id}: {e}")
return {}, False
out: dict[str, dict] = {}
for entry in data.get("outdated") or []:
cur = (entry.get("current") or {}).get("tile") or {}
ws = cur.get("workspaceName")
nm = cur.get("tileName")
if not (ws and nm):
continue
full = f"{ws}/{nm}"
out[full] = {
"current": cur.get("version"),
"latest": (entry.get("latest") or {}).get("version"),
"update": (entry.get("update") or {}).get("version"),
"isLatestPinned": entry.get("isLatestPinned"),
}
return out, True
def attach_outdated_to_tiles(
tiles: list[dict],
outdated_by_repo: dict[str, dict[str, dict]],
successful_repos: set[str],
) -> None:
"""Attach `outdated` block to each tile that the registry knows about."""
for t in tiles:
# Only meaningful for tiles that resolve on the registry
if not t.get("published_to_registry"):
continue
repo = t.get("repo")
if repo not in successful_repos:
continue
info = outdated_by_repo.get(repo, {}).get(t["name"])
if info:
t["outdated"] = {**info, "update_available": True}
else:
# Tile is registered and we got a successful outdated check
# without it appearing → installed version is current
t["outdated"] = {
"current": t.get("version_installed"),
"latest": t.get("version_installed"),
"update": None,
"update_available": False,
}
# ── Context-cost via `tessl tile lint` ─────────────────────────────────────
# `tessl tile lint` text output samples:
# Skills
# - my-skill: 113 front-loaded, 2k-35.1k on-demand tokens (multi-skill: range)
# - my-skill: 93 front-loaded, 477 on-demand tokens (single-skill: scalar)
# Total: 568 front-loaded, 6k-53.1k on-demand tokens
# Total: 393 front-loaded, 477 on-demand, 109 content tokens (with docs)
_TOK = r"[\d.]+[kKmM]?" # token count: '113', '2k', '35.1k', '1.2m'
_LINT_SKILL_RE = re.compile(
rf"^\s*-\s+(?P<name>[^:]+?):\s+(?P<fl>{_TOK})\s+front-loaded,\s+"
rf"(?P<od_min>{_TOK})(?:-(?P<od_max>{_TOK}))?\s+on-demand"
)
_LINT_TOTAL_RE = re.compile(
rf"^\s*Total:\s+(?P<fl>{_TOK})\s+front-loaded,\s+"
rf"(?P<od_min>{_TOK})(?:-(?P<od_max>{_TOK}))?\s+on-demand"
rf"(?:,\s+(?P<content>{_TOK})\s+content)?"
)
def _parse_token_count(s: str) -> int:
"""Parse '113' / '2k' / '35.1k' / '1.2m' into an integer token count."""
s = s.strip().lower()
mult = 1
if s.endswith("k"):
mult = 1_000
s = s[:-1]
elif s.endswith("m"):
mult = 1_000_000
s = s[:-1]
try:
return int(float(s) * mult)
except ValueError:
return 0
def parse_tile_lint_output(text: str) -> dict:
"""Parse `tessl tile lint` text output into a context-cost dict.
Handles both single-value ('477 on-demand') and range
('6k-53.1k on-demand') forms, plus the optional 'content tokens' suffix
when the tile bundles docs.
"""
per_skill: dict[str, dict] = {}
totals: dict | None = None
valid = "is valid" in text
for line in text.splitlines():
m = _LINT_SKILL_RE.match(line)
if m:
od_min = _parse_token_count(m.group("od_min"))
od_max = _parse_token_count(m.group("od_max")) if m.group("od_max") else od_min
per_skill[m.group("name").strip()] = {
"front_loaded": _parse_token_count(m.group("fl")),
"on_demand_min": od_min,
"on_demand_max": od_max,
}
continue
m = _LINT_TOTAL_RE.match(line)
if m:
od_min = _parse_token_count(m.group("od_min"))
od_max = _parse_token_count(m.group("od_max")) if m.group("od_max") else od_min
totals = {
"front_loaded": _parse_token_count(m.group("fl")),
"on_demand_min": od_min,
"on_demand_max": od_max,
"content_tokens": _parse_token_count(m.group("content")) if m.group("content") else 0,
}
return {
"per_skill": per_skill,
"front_loaded_total": (totals or {}).get("front_loaded", 0),
"on_demand_min_total": (totals or {}).get("on_demand_min", 0),
"on_demand_max_total": (totals or {}).get("on_demand_max", 0),
"content_tokens_total": (totals or {}).get("content_tokens", 0),
"lint_valid": valid,
}
def run_tile_lint(tile_path: Path) -> dict | None:
"""Invoke `tessl tile lint <path>`, parse output. Returns None on failure."""
try:
result = subprocess.run(
["tessl", "tile", "lint", str(tile_path)],
capture_output=True, text=True, timeout=30,
)
except Exception:
return None
# Lint may exit non-zero on warnings; we still try to parse output.
text = (result.stdout or "") + "\n" + (result.stderr or "")
parsed = parse_tile_lint_output(text)
if not parsed["per_skill"] and not parsed["front_loaded_total"]:
return None
parsed["exit_code"] = result.returncode
return parsed
def enrich_tiles_context_cost(tiles: list[dict]) -> None:
"""For every tile with a resolvable manifest path, run `tessl tile lint`."""
from concurrent.futures import ThreadPoolExecutor
targets: list[tuple[dict, Path]] = []
for t in tiles:
manifest_path = t.get("manifest_path")
if not manifest_path:
continue
repo_path = t.get("_repo_path")
if not repo_path:
continue
tile_dir = (Path(repo_path) / manifest_path).parent
if not tile_dir.exists():
continue
targets.append((t, tile_dir))
def _do(item):
t, path = item
return t, run_tile_lint(path)
with ThreadPoolExecutor(max_workers=ENRICHMENT_LINT_CONCURRENCY) as pool:
for t, result in pool.map(_do, targets):
if result is not None:
t["context_cost"] = result
t.pop("_repo_path", None)
# ── Skill-level tier stamping ──────────────────────────────────────────────
def stamp_skill_tiers(skills: list[dict], tiles: list[dict]) -> None:
"""Stamp `tier` on each skill, derived from its owning tile (if any)."""
tile_by_instance = {
(t["repo"], t["name"], t.get("manifest_path")): t
for t in tiles
}
tiles_by_name: dict[tuple[str, str], list[dict]] = {}
for t in tiles:
tiles_by_name.setdefault((t["repo"], t["name"]), []).append(t)
for s in skills:
pkg = s.get("owning_package") or {}
if pkg.get("kind") == "tessl_tile" and pkg.get("name"):
tile = tile_by_instance.get((s["repo"], pkg["name"], pkg.get("manifest_path")))
if tile is None:
candidates = tiles_by_name.get((s["repo"], pkg["name"]), [])
tile = candidates[0] if len(candidates) == 1 else None
s["tile_name"] = pkg["name"]
if tile:
s["tile_id"] = tile["tile_id"]
s["tier"] = tile["tier"]
else:
s["tile_id"] = None
s["tier"] = "published_tile"
elif pkg.get("kind") == "claude_plugin":
s["tier"] = "claude_plugin"
else:
s["tier"] = "non_tile"
# ── Main ───────────────────────────────────────────────────────────────────
def identify_repos(scan_root: Path) -> list[Path]:
"""Return the list of repo roots to scan.
- If scan_root is itself a git repo → [scan_root]
- Else: each immediate child that contains .git/ → those children
- Else (no git children): [scan_root] treated as a single non-git repo
"""
if (scan_root / ".git").exists():
return [scan_root]
children = []
try:
for child in sorted(scan_root.iterdir()):
if not child.is_dir() or child.name.startswith("."):
continue
if (child / ".git").exists():
children.append(child)
except Exception:
pass
if children:
return children
return [scan_root]
def derive_repo_name(repo_path: Path, remote_url: str | None) -> str:
if remote_url:
m = re.search(r"[:/]([^/]+?/[^/]+?)(?:\.git)?/?$", remote_url)
if m:
return m.group(1)
return repo_path.name
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(
description="Skill discovery — inventories SKILL.md files and supporting content.",
)
p.add_argument("--scan-root", dest="scan_root", type=str, default=None,
help="Directory to scan (default: $SCAN_ROOT or $(pwd))")
p.add_argument("--repo", dest="repos", type=str, action="append", default=None,
help=(
"Repo path to include (repeatable). When set, replaces auto-"
"discovery — only the specified repos are scanned. Each can be "
"absolute or relative to --scan-root."
))
p.add_argument("--output", dest="output", type=str, default=None,
help="Where to write discovery.json (default: $OUTPUT_PATH or <scan-root>/.skill-insights/discovery.json)")
p.add_argument("--scan-id", dest="scan_id", type=str, default=None,
help="Scan ID (default: $SCAN_ID or discovery-YYYYMMDD-HHMMSS)")
return p.parse_args()
def main() -> int:
global SCAN_ROOT, OUTPUT_PATH, SCAN_ID
args = parse_args()
scan_root_str = args.scan_root or os.environ.get("SCAN_ROOT") or os.getcwd()
SCAN_ROOT = Path(scan_root_str).resolve()
default_output = SCAN_ROOT / ".skill-insights" / "discovery.json"
output_str = args.output or os.environ.get("OUTPUT_PATH") or str(default_output)
OUTPUT_PATH = Path(output_str)
SCAN_ID = args.scan_id or os.environ.get("SCAN_ID") or \
f"discovery-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
if not SCAN_ROOT.exists():
print(f"ERROR: scan root does not exist: {SCAN_ROOT}", file=sys.stderr)
return 2
if not SCAN_ROOT.is_dir():
print(f"ERROR: scan root is not a directory: {SCAN_ROOT}", file=sys.stderr)
return 2
print(f"Scan root: {SCAN_ROOT}", file=sys.stderr)
print(f"Output: {OUTPUT_PATH}", file=sys.stderr)
print(f"Scan ID: {SCAN_ID}", file=sys.stderr)
# If --repo is provided, use the explicit list (resolves relative paths
# against scan-root); otherwise fall back to auto-discovery from scan-root.
if args.repos:
repos = []
for r in args.repos:
p_abs = Path(r)
if not p_abs.is_absolute():
p_abs = SCAN_ROOT / r
p_abs = p_abs.resolve()
if not p_abs.exists():
print(f"ERROR: --repo path does not exist: {p_abs}", file=sys.stderr)
return 2
if not p_abs.is_dir():
print(f"ERROR: --repo path is not a directory: {p_abs}", file=sys.stderr)
return 2
repos.append(p_abs)
print(f"Mode: explicit repos ({len(repos)})", file=sys.stderr)
else:
repos = identify_repos(SCAN_ROOT)
print(
f"Mode: auto-discovery ({'single repo' if len(repos) == 1 and repos[0] == SCAN_ROOT else f'{len(repos)} workspace child repos'})",
file=sys.stderr,
)
repo_metas: list[dict] = []
all_skills: list[dict] = []
manifests_by_repo: dict[str, list[tuple[Path, dict]]] = {}
repo_path_by_id: dict[str, Path] = {}
used_ids: dict[str, int] = {}
for repo_path in repos:
basename = repo_path.name
# Disambiguate duplicate basenames: first dupe gets __1, next __2, etc.
if basename in used_ids:
used_ids[basename] += 1
repo_id = f"{basename}__{used_ids[basename]}"
warnings_list.append(f"duplicate repo basename '{basename}' disambiguated as '{repo_id}'")
else:
used_ids[basename] = 0
repo_id = basename
is_git = (repo_path / ".git").exists()
head_sha = head_branch = remote_url = None
if is_git:
head_sha = run_git(repo_path, ["rev-parse", "HEAD"])
head_branch = run_git(repo_path, ["rev-parse", "--abbrev-ref", "HEAD"])
remote_url = run_git(repo_path, ["remote", "get-url", "origin"])
name = derive_repo_name(repo_path, remote_url)
print(f" scanning {repo_id} @ {repo_path}", file=sys.stderr)
meta, skills, manifests = scan_repo(
repo_path, repo_id, name, is_git,
head_sha, head_branch, remote_url,
)
repo_metas.append(meta)
all_skills.extend(skills)
manifests_by_repo[repo_id] = manifests
repo_path_by_id[repo_id] = repo_path
# ── Tile aggregation + enrichment ──
print(" aggregating tiles + classifying tiers...", file=sys.stderr)
tiles = aggregate_tiles(all_skills, repo_metas, manifests_by_repo)
# Stamp absolute repo paths on tiles for the lint pass (popped before output)
for t in tiles:
t["_repo_path"] = str(repo_path_by_id.get(t["repo"], ""))
auth_token = read_auth_token()
if auth_token:
candidates = sum(1 for t in tiles if t.get("version_installed") and "/" in t["name"])
if candidates:
print(f" querying registry for {candidates} tiles (some may 404)...", file=sys.stderr)
enrich_tiles_from_registry(tiles, auth_token)
print(" checking for outdated tiles...", file=sys.stderr)
outdated_by_repo: dict[str, dict[str, dict]] = {}
successful_outdated_repos: set[str] = set()
for repo_meta in repo_metas:
repo_id = repo_meta["repo_id"]
repo_path = repo_path_by_id.get(repo_id)
if repo_path is None:
continue
outdated_map, ok = run_tessl_outdated(repo_path, repo_id)
outdated_by_repo[repo_id] = outdated_map
if ok:
successful_outdated_repos.add(repo_id)
attach_outdated_to_tiles(tiles, outdated_by_repo, successful_outdated_repos)
else:
warnings_list.append("no Tessl auth token found — registry/outdated enrichment skipped")
# Context cost via `tessl tile lint`. Works for any tile (T1/T2/T3).
print(f" running `tessl tile lint` on {len(tiles)} tiles for context cost...", file=sys.stderr)
enrich_tiles_context_cost(tiles)
# Stamp tier onto each skill (derived from owning tile)
stamp_skill_tiers(all_skills, tiles)
for reason, count in skipped_counts.items():
if count:
warnings_list.append(f"skipped {count} SKILL.md files inside {reason}")
by_source = {
"claude_skill": 0, "cursor_skill": 0, "agents_skill": 0,
"tessl_tile_skill": 0, "standalone": 0, "claude_plugin_skill": 0,
}
by_repo: dict[str, int] = {}
by_tier: dict[str, int] = {}
for s in all_skills:
by_source[s["source_type"]] = by_source.get(s["source_type"], 0) + 1
by_repo[s["repo"]] = by_repo.get(s["repo"], 0) + 1
by_tier[s.get("tier", "non_tile")] = by_tier.get(s.get("tier", "non_tile"), 0) + 1
by_tier_tiles: dict[str, int] = {}
tiles_by_source = {"tessl_json": 0, "filesystem": 0}
tiles_published_to_registry = 0
tiles_authored_only = 0
tiles_with_security_concern = 0
tiles_with_update_available = 0
for t in tiles:
by_tier_tiles[t["tier"]] = by_tier_tiles.get(t["tier"], 0) + 1
src = t.get("source")
if src in tiles_by_source:
tiles_by_source[src] += 1
if t.get("published_to_registry") is True:
tiles_published_to_registry += 1
sec = (t.get("registry", {}).get("scores") or {}).get("security")
if sec in ("MEDIUM", "HIGH", "CRITICAL"):
tiles_with_security_concern += 1
elif t.get("published_to_registry") is False:
tiles_authored_only += 1
if t.get("outdated", {}).get("update_available"):
tiles_with_update_available += 1
stats = {
"total_skills": len(all_skills),
"total_skill_files": sum(len(s["all_paths"]) for s in all_skills),
"total_repos": len(repo_metas),
"total_tiles": len(tiles),
"by_source_type": by_source,
"by_repo": by_repo,
"by_tier": by_tier,
"by_tile_tier": by_tier_tiles,
"tiles_by_source": tiles_by_source,
"tiles_published_to_registry": tiles_published_to_registry,
"tiles_authored_only": tiles_authored_only,
"tiles_with_security_concern": tiles_with_security_concern,
"tiles_with_update_available": tiles_with_update_available,
"skills_with_supporting_files": sum(1 for s in all_skills if s["supporting_files"]),
"total_supporting_files": sum(len(s["supporting_files"]) for s in all_skills),
}
output = {
"schema_version": SCHEMA_VERSION,
"metadata": {
"scan_id": SCAN_ID,
"scan_root": str(SCAN_ROOT),
"scanned_at": datetime.now(timezone.utc).isoformat(),
"tool_version": TOOL_VERSION,
"repos": repo_metas,
},
"tiles": tiles,
"skills": all_skills,
"stats": stats,
"warnings": warnings_list,
}
validate_against_schema(
output,
SCHEMA_DIR / "discovery.schema.json",
role="output",
source="discover_skills.py",
)
OUTPUT_PATH.parent.mkdir(parents=True, exist_ok=True)
OUTPUT_PATH.write_text(json.dumps(output, indent=2))
print(
f"wrote {OUTPUT_PATH} (skills={len(all_skills)}, "
f"paths={stats['total_skill_files']}, repos={len(repo_metas)}, "
f"warnings={len(warnings_list)})",
file=sys.stderr,
)
return 0
if __name__ == "__main__":
sys.exit(main())