Scan a directory or workspace for SKILL.md files across all agents and repos, capture supporting files (references, scripts, linked docs), dedupe vendored copies, enrich each Tessl tile with registry signals, and emit a canonical JSON inventory validated by JSON Schema. Then run four analytical phases in parallel against the inventory — staleness + git provenance (history, broken refs, contributors), quality (Tessl `skill review`), duplicates (similarity + LLM judgement), registry-search (per-standalone-skill registry suggestions, HTTP only) — and render a self-contained interactive HTML report with a top-of-report health overview, top-issues panel, recently-changed list, and per-tessl.json manifests view.
84
90%
Does it follow best practices?
Impact
97%
1.44xAverage score across 2 eval scenarios
Advisory
Suggest reviewing before use
#!/usr/bin/env python3
"""Skill quality analysis — Tessl-CLI driven.
Reads discovery.json, then for each skill invokes `tessl skill review --json`
in parallel batches to get the canonical Tessl quality assessment (validation
checks, description judge, content judge, review score).
Tile-level quality is pulled directly from `discovery.tiles[].registry.scores`
when available — no extra work needed for tiles that are already scored on
the registry.
Output conforms to references/schemas/quality.schema.json (v2.0). No
subagents. No custom rubric. `jsonschema` is used to validate input/output
at the IO boundary when available; otherwise the script falls back to no
validation with a single stderr warning.
Usage:
analyze_quality.py --discovery <path> [--output <path>]
[--max-skills N] [--concurrency N]
[--skip-published-skills]
"""
from __future__ import annotations
import argparse
import asyncio
import json
import statistics
import sys
from datetime import datetime, timezone
from pathlib import Path
# Resolve the bundled schema directory and load the shared validation helper.
# Tile layout: <tile-root>/skills/<phase>/scripts/<script>.py
# <tile-root>/references/schemas/{_validate.py, *.schema.json}
_SCHEMA_DIR_PATH = Path(__file__).resolve().parent.parent.parent.parent / "references" / "schemas"
if str(_SCHEMA_DIR_PATH) not in sys.path:
sys.path.insert(0, str(_SCHEMA_DIR_PATH))
from _validate import SCHEMA_DIR, validate_against_schema # noqa: E402
TOOL_VERSION = "skill-insights@0.10.0"
SCHEMA_VERSION = "2.0"
DEFAULT_CONCURRENCY = 8
SKILL_REVIEW_TIMEOUT_SEC = 60
def verdict_for_score(score: int | None) -> str:
"""Map a 0-100 review score to a coarse verdict band."""
if score is None:
return "unknown"
if score >= 85:
return "good"
if score >= 70:
return "acceptable"
if score >= 50:
return "needs_work"
return "poor"
def make_unscored_record(skill_meta: dict, status: str, error: str | None = None) -> dict:
"""Project a skill into the quality output when no review score exists."""
record = {
"skill_id": skill_meta["skill_id"],
"name": skill_meta["name"],
"repo": skill_meta["repo"],
"primary_path": skill_meta["primary_path"],
"tile_id": skill_meta.get("tile_id"),
"tile_name": skill_meta.get("tile_name"),
"tier": skill_meta.get("tier"),
"is_declared": skill_meta.get("is_declared", False),
"review_score": None,
"verdict": "unknown",
"validation": None,
"description_judge": None,
"content_judge": None,
"_status": status,
}
if error:
record["_error"] = error
return record
def make_passthrough_record(skill_meta: dict, score_pct: int) -> dict:
record = make_unscored_record(skill_meta, "passthrough")
record["review_score"] = score_pct
record["verdict"] = verdict_for_score(score_pct)
record["_passthrough"] = "registry_tile_score"
return record
# ── tessl skill review invocation ──────────────────────────────────────────
async def review_one_skill(
sem: asyncio.Semaphore, skill_meta: dict,
) -> dict:
"""Invoke `tessl skill review --json <abs_path>`. Returns a normalized record."""
abs_path = skill_meta["abs_path"]
async with sem:
try:
proc = await asyncio.create_subprocess_exec(
"tessl", "skill", "review", "--json", abs_path,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
stdout, stderr = await asyncio.wait_for(
proc.communicate(), timeout=SKILL_REVIEW_TIMEOUT_SEC,
)
except asyncio.TimeoutError:
proc.kill()
record = make_unscored_record(
skill_meta,
"failed",
f"timed out after {SKILL_REVIEW_TIMEOUT_SEC}s",
)
record["_failed"] = True
return record
except FileNotFoundError:
record = make_unscored_record(skill_meta, "failed", "`tessl` CLI not found in PATH")
record["_failed"] = True
return record
except Exception as e:
record = make_unscored_record(skill_meta, "failed", str(e))
record["_failed"] = True
return record
if proc.returncode not in (0, 1):
# 0 = pass, 1 = validation failed but JSON still emitted; anything else is an error
record = make_unscored_record(
skill_meta,
"failed",
f"exit {proc.returncode}: {stderr.decode()[:300]}",
)
record["_failed"] = True
return record
try:
data = json.loads(stdout.decode())
except Exception as e:
record = make_unscored_record(skill_meta, "failed", f"non-JSON output: {e}")
record["_failed"] = True
return record
return _normalize_review_response(skill_meta, data)
def _normalize_review_response(skill_meta: dict, data: dict) -> dict:
"""Project a raw skill-review response into our quality-schema shape."""
review = data.get("review") or {}
validation = data.get("validation") or {}
desc = data.get("descriptionJudge") or {}
cont = data.get("contentJudge") or {}
failed_checks = [
{"name": c["name"], "status": c["status"], "message": c.get("message", "")}
for c in validation.get("checks", [])
if c.get("status") in ("error", "warning")
]
return {
"skill_id": skill_meta["skill_id"],
"name": skill_meta["name"],
"repo": skill_meta["repo"],
"primary_path": skill_meta["primary_path"],
"tile_id": skill_meta.get("tile_id"),
"tile_name": skill_meta.get("tile_name"),
"tier": skill_meta.get("tier"),
"is_declared": skill_meta.get("is_declared", False),
"review_score": review.get("reviewScore"),
"verdict": verdict_for_score(review.get("reviewScore")),
"validation": {
"passed": validation.get("overallPassed"),
"error_count": validation.get("errorCount", 0),
"warning_count": validation.get("warningCount", 0),
"failed_checks": failed_checks,
},
"description_judge": {
"model": (desc.get("judgeConfig") or {}).get("model"),
"success": desc.get("success"),
"normalized_score": desc.get("normalizedScore"),
"scores": (desc.get("evaluation") or {}).get("scores") or {},
"overall_assessment": (desc.get("evaluation") or {}).get("overall_assessment"),
"suggestions": (desc.get("evaluation") or {}).get("suggestions") or [],
},
"content_judge": {
"model": (cont.get("judgeConfig") or {}).get("model"),
"success": cont.get("success"),
"normalized_score": cont.get("normalizedScore"),
"scores": (cont.get("evaluation") or {}).get("scores") or {},
"overall_assessment": (cont.get("evaluation") or {}).get("overall_assessment"),
"suggestions": (cont.get("evaluation") or {}).get("suggestions") or [],
},
"_status": "reviewed",
}
# ── Skill collection from discovery ────────────────────────────────────────
def collect_skill_targets(
discovery: dict, skip_published_skills: bool, max_skills: int | None,
) -> tuple[list[dict], list[dict], list[dict]]:
"""Return (targets, passthrough, skipped_due_to_cap).
targets — skills that need a `tessl skill review` invocation
passthrough — skills we're skipping (because their tile already has a
registry quality score we'll attach instead). Each carries
enough info to be projected into per_skill output without
a review call.
`skip_published_skills` (default False) controls whether skills whose
owning tile is published-to-registry get a passthrough — saving N LLM
calls at the cost of per-skill detail in those tiles.
"""
repos = {r["repo_id"]: r for r in discovery.get("metadata", {}).get("repos", [])}
tile_lookup_by_id = {t["tile_id"]: t for t in discovery.get("tiles", [])}
tile_lookup_by_name: dict[tuple[str, str], list[dict]] = {}
for t in discovery.get("tiles", []):
tile_lookup_by_name.setdefault((t["repo"], t["name"]), []).append(t)
targets: list[dict] = []
passthrough: list[dict] = []
review_candidates: list[dict] = []
for s in discovery.get("skills", []):
owning = s.get("owning_package") or {}
tile_name = owning.get("name") if owning.get("kind") == "tessl_tile" else None
tile_id = s.get("tile_id")
repo = repos.get(s["repo"])
owning_tile = tile_lookup_by_id.get(tile_id) if tile_id else None
if owning_tile is None and tile_name:
candidates = tile_lookup_by_name.get((s["repo"], tile_name), [])
owning_tile = candidates[0] if len(candidates) == 1 else None
tile_id = (owning_tile or {}).get("tile_id") or tile_id
registry_quality = None
if owning_tile:
registry_quality = ((owning_tile.get("registry") or {}).get("scores") or {}).get("quality")
meta = {
"skill_id": s["skill_id"],
"name": s.get("name") or "",
"repo": s["repo"],
"primary_path": s["primary_path"],
"tile_id": tile_id,
"tile_name": tile_name,
"tier": s.get("tier", "non_tile"),
"is_declared": bool(s.get("declared_in")),
"registry_quality": registry_quality,
}
if repo:
meta["abs_path"] = str(Path(repo["path"]) / s["primary_path"])
else:
meta["abs_path"] = s["primary_path"]
if skip_published_skills and registry_quality is not None:
# Treat the tile-level score as the per-skill score; no review run.
score_pct = round(registry_quality * 100)
passthrough.append(make_passthrough_record(meta, score_pct))
continue
review_candidates.append(meta)
if max_skills is not None:
targets = review_candidates[:max_skills]
skipped = [
make_unscored_record(meta, "skipped_max_skills")
for meta in review_candidates[max_skills:]
]
else:
targets = review_candidates
skipped = []
return targets, passthrough, skipped
# ── Per-tile rollup ────────────────────────────────────────────────────────
def build_per_tile(
discovery: dict, per_skill: list[dict],
) -> list[dict]:
"""One row per tile. Pulls registry.scores.quality where available; else
falls back to mean of the tile's per-skill review scores."""
tile_to_skills: dict[str, list[dict]] = {}
for ps in per_skill:
if ps.get("tile_id"):
tile_to_skills.setdefault(ps["tile_id"], []).append(ps)
rows: list[dict] = []
for t in discovery.get("tiles", []):
skill_records = tile_to_skills.get(t["tile_id"]) or []
review_scores = [
ps["review_score"] for ps in skill_records
if isinstance(ps.get("review_score"), int)
]
registry_q = ((t.get("registry") or {}).get("scores") or {}).get("quality")
if registry_q is not None:
tile_score = round(registry_q * 100)
score_source = "registry"
elif review_scores:
tile_score = round(statistics.mean(review_scores))
score_source = "computed_avg"
else:
tile_score = None
score_source = None
weakest = min(skill_records, key=lambda x: x.get("review_score") or 999) if skill_records else None
strongest = max(skill_records, key=lambda x: x.get("review_score") or -1) if skill_records else None
rows.append({
"tile_id": t["tile_id"],
"name": t["name"],
"repo": t["repo"],
"tier": t.get("tier"),
"published_to_registry": t.get("published_to_registry"),
"score": tile_score,
"score_source": score_source,
"verdict": verdict_for_score(tile_score),
"skill_count": len(skill_records),
"weakest_skill_id": (weakest or {}).get("skill_id") if weakest and weakest.get("review_score") is not None else None,
"strongest_skill_id": (strongest or {}).get("skill_id") if strongest and strongest.get("review_score") is not None else None,
})
return rows
# ── Main ───────────────────────────────────────────────────────────────────
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(description="Skill quality analysis driven by `tessl skill review`")
p.add_argument("--discovery", required=True, help="Path to discovery.json")
p.add_argument("--output", default=None, help="Output quality.json path")
p.add_argument("--max-skills", type=int, default=None,
help="Cap number of skills reviewed (for fast iteration)")
p.add_argument("--concurrency", type=int, default=DEFAULT_CONCURRENCY,
help=f"Parallel `tessl skill review` calls (default: {DEFAULT_CONCURRENCY})")
p.add_argument("--skip-published-skills", action="store_true",
help="Skip per-skill review for skills whose owning tile already has "
"a registry quality score; use the tile score as a passthrough.")
return p.parse_args()
async def _run(args) -> int:
discovery_path = Path(args.discovery).resolve()
if not discovery_path.exists():
print(f"ERROR: discovery file not found: {discovery_path}", file=sys.stderr)
return 2
output_path = Path(args.output) if args.output else discovery_path.parent / "quality.json"
discovery = json.loads(discovery_path.read_text())
validate_against_schema(
discovery,
SCHEMA_DIR / "discovery.schema.json",
role="input (discovery.json)",
source="analyze_quality.py",
)
targets, passthrough, skipped = collect_skill_targets(
discovery, args.skip_published_skills, args.max_skills,
)
print(
f"Quality analysis: {len(targets)} skills to review via tessl, "
f"{len(passthrough)} passthrough from registry, "
f"{len(skipped)} skipped by cap",
file=sys.stderr,
)
sem = asyncio.Semaphore(max(1, args.concurrency))
started_at = datetime.now(timezone.utc)
results = await asyncio.gather(*[review_one_skill(sem, m) for m in targets])
finished_at = datetime.now(timezone.utc)
succeeded = [r for r in results if not r.get("_failed")]
failed = [r for r in results if r.get("_failed")]
failed_output = [
{k: v for k, v in r.items() if k != "_failed"}
for r in failed
]
per_skill = succeeded + passthrough + skipped + failed_output
# Estate summary
review_scores = [r["review_score"] for r in per_skill if isinstance(r.get("review_score"), int)]
avg_score = round(statistics.mean(review_scores), 1) if review_scores else None
by_verdict = {"good": 0, "acceptable": 0, "needs_work": 0, "poor": 0, "unknown": 0}
for r in per_skill:
by_verdict[r.get("verdict", "unknown")] = by_verdict.get(r.get("verdict", "unknown"), 0) + 1
per_tile = build_per_tile(discovery, per_skill)
output = {
"schema_version": SCHEMA_VERSION,
"metadata": {
"scan_id": (discovery.get("metadata") or {}).get("scan_id"),
"scanned_at": finished_at.isoformat(),
"tool_version": TOOL_VERSION,
"skill_count_reviewed": len(succeeded),
"skill_count_passthrough": len(passthrough),
"skill_count_skipped": len(skipped),
"skill_count_failed": len(failed),
"skill_count_total": len(per_skill),
"duration_sec": round((finished_at - started_at).total_seconds(), 1),
"failed_skills": [
{"skill_id": f["skill_id"], "error": f.get("_error", "")}
for f in failed
],
},
"per_skill": per_skill,
"per_tile": per_tile,
"estate_summary": {
"avg_review_score": avg_score,
"by_verdict": by_verdict,
"skills_with_validation_failures": sum(
1 for r in succeeded if (r.get("validation") or {}).get("error_count", 0) > 0
),
"tiles_with_registry_score": sum(1 for t in per_tile if t.get("score_source") == "registry"),
"tiles_with_computed_avg": sum(1 for t in per_tile if t.get("score_source") == "computed_avg"),
},
"warnings": [],
}
validate_against_schema(
output,
SCHEMA_DIR / "quality.schema.json",
role="output",
source="analyze_quality.py",
)
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(json.dumps(output, indent=2))
print(
f"Quality analysis complete.\n"
f" Reviewed: {len(succeeded)} (failed: {len(failed)})\n"
f" Passthrough: {len(passthrough)}\n"
f" Skipped: {len(skipped)}\n"
f" Avg score: {avg_score}\n"
f" By verdict: good={by_verdict['good']}, accept={by_verdict['acceptable']}, "
f"needs_work={by_verdict['needs_work']}, poor={by_verdict['poor']}, unknown={by_verdict['unknown']}\n"
f" Duration: {output['metadata']['duration_sec']}s\n"
f" Output: {output_path}",
file=sys.stderr,
)
return 0
def main() -> int:
args = parse_args()
return asyncio.run(_run(args))
if __name__ == "__main__":
sys.exit(main())