CtrlK
BlogDocsLog inGet started
Tessl Logo

tessleng/skill-insights

Scan a directory or workspace for SKILL.md files across all agents and repos, capture supporting files (references, scripts, linked docs), dedupe vendored copies, enrich each Tessl tile with registry signals, and emit a canonical JSON inventory validated by JSON Schema. Then run four analytical phases in parallel against the inventory — staleness + git provenance (history, broken refs, contributors), quality (Tessl `skill review`), duplicates (similarity + LLM judgement), registry-search (per-standalone-skill registry suggestions, HTTP only) — and render a self-contained interactive HTML report with a top-of-report health overview, top-issues panel, recently-changed list, and per-tessl.json manifests view.

84

1.44x
Quality

90%

Does it follow best practices?

Impact

97%

1.44x

Average score across 2 eval scenarios

SecuritybySnyk

Advisory

Suggest reviewing before use

Overview
Quality
Evals
Security
Files

registry_search.pyskills/registry-search/scripts/

#!/usr/bin/env python3
"""Skill registry-search analyzer.

Reads a discovery.json produced by discover-skills, filters down to standalone
skills, queries the Tessl registry's hybrid search endpoint twice per skill
(once filtered to skills, once filtered to tiles), and writes the top match
for each into registry-search.json.

Output conforms to references/schemas/registry-search.schema.json.

No LLM, no agent judgement. Stdlib + HTTP only. The `/experimental/search`
endpoint is called anonymously — public results are sufficient for surfacing
registry suggestions, and skipping auth keeps the script free of credential
state and stale-token failure modes. `jsonschema` is a soft dep used for IO
contract validation.

Usage:
    registry_search.py --discovery <path> [--output <path>]
                       [--registry-base-url URL] [--concurrency N]
"""
from __future__ import annotations

import argparse
import json
import os
import sys
import urllib.error
import urllib.parse
import urllib.request
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timezone
from pathlib import Path

# Resolve the bundled schema directory and load the shared validation helper.
# Tile layout: <tile-root>/skills/<phase>/scripts/<script>.py
#              <tile-root>/references/schemas/{_validate.py, *.schema.json}
_SCHEMA_DIR_PATH = Path(__file__).resolve().parent.parent.parent.parent / "references" / "schemas"
if str(_SCHEMA_DIR_PATH) not in sys.path:
    sys.path.insert(0, str(_SCHEMA_DIR_PATH))
from _validate import SCHEMA_DIR, validate_against_schema  # noqa: E402

TOOL_VERSION = "skill-insights@0.11.0"
SCHEMA_VERSION = "1.2"
SEARCH_MODE = "hybrid"

DEFAULT_REGISTRY_BASE_URL = os.environ.get("TESSL_API_BASE_URL", "https://api.tessl.io")
HTTP_TIMEOUT_SEC = 10
DEFAULT_CONCURRENCY = 8


# ── Search ─────────────────────────────────────────────────────────────────


def _build_search_url(base_url: str, query: str, type_filter: str) -> str:
    params = [
        ("q", query),
        ("searchMode", SEARCH_MODE),
        ("filter[type][eq]", type_filter),
        ("page[size]", "1"),
        ("page[number]", "1"),
        ("includePrivate", "true"),
    ]
    return f"{base_url.rstrip('/')}/experimental/search?{urllib.parse.urlencode(params)}"


def _do_search(base_url: str, query: str, type_filter: str) -> tuple[dict | None, dict | None]:
    """Hit /experimental/search anonymously once. Returns (top_result, error_dict)."""
    url = _build_search_url(base_url, query, type_filter)
    req = urllib.request.Request(url, headers={"Accept": "application/json"})
    try:
        with urllib.request.urlopen(req, timeout=HTTP_TIMEOUT_SEC) as resp:
            body = json.loads(resp.read().decode("utf-8"))
    except urllib.error.HTTPError as e:
        return None, {"target": type_filter, "status": e.code, "message": f"{e.code} {e.reason}"}
    except Exception as e:
        return None, {"target": type_filter, "status": None, "message": str(e)}

    data = body.get("data") if isinstance(body, dict) else None
    if not isinstance(data, list) or not data:
        return None, None
    return data[0], None


# ── Result projection ──────────────────────────────────────────────────────


def _project_scores(scores: dict | None) -> dict:
    s = scores or {}
    eval_count = s.get("evalCount")
    return {
        "aggregate": s.get("aggregate"),
        "quality": s.get("quality"),
        "impact": s.get("impact"),
        "security": s.get("security"),
        "evalAvg": s.get("evalAvg"),
        "evalBaseline": s.get("evalBaseline"),
        "evalImprovement": s.get("evalImprovement"),
        "evalImprovementMultiplier": s.get("evalImprovementMultiplier"),
        "evalCount": eval_count if isinstance(eval_count, int) else None,
        "lastScoredAt": s.get("lastScoredAt"),
    }


def _project_skill_match(raw: dict) -> dict | None:
    attrs = raw.get("attributes") or {}
    name = attrs.get("name")
    if not isinstance(name, str):
        return None
    description = attrs.get("description") or ""
    path = attrs.get("path") or ""
    source_url = attrs.get("sourceUrl") or None
    if isinstance(source_url, str) and source_url == "":
        source_url = None
    return {
        "kind": "skill",
        "id": str(raw.get("id") or ""),
        "name": name,
        "description": description,
        "path": path,
        "source_url": source_url,
        "tile_workspace": None,
        "tile_name": None,
        "scores": _project_scores(attrs.get("scores")),
    }


def _project_tile_match(raw: dict) -> dict | None:
    attrs = raw.get("attributes") or {}
    name = attrs.get("name")
    full_name = attrs.get("fullName")
    if not isinstance(name, str) or not isinstance(full_name, str):
        return None
    workspace_name = None
    rel = raw.get("relationships") or {}
    workspace = (rel.get("workspace") or {}).get("data") or {}
    ws_attrs = workspace.get("attributes") or {}
    ws_name = ws_attrs.get("name")
    if isinstance(ws_name, str):
        workspace_name = ws_name

    versions = attrs.get("versions") or []
    latest = None
    if isinstance(versions, list) and versions:
        v = versions[0]
        if isinstance(v, dict) and isinstance(v.get("version"), str):
            latest = {
                "version": v["version"],
                "summary": v.get("summary") or "",
                "has_skills": v.get("hasSkills"),
                "has_docs": v.get("hasDocs"),
                "has_steering": v.get("hasSteering"),
            }

    return {
        "kind": "tile",
        "id": str(raw.get("id") or ""),
        "name": name,
        "full_name": full_name,
        "describes": attrs.get("describes"),
        "featured": attrs.get("featured"),
        "is_private": attrs.get("isPrivate"),
        "workspace_name": workspace_name,
        "latest_version": latest,
        "scores": _project_scores(attrs.get("scores")),
    }


def _aggregate(match: dict | None) -> float:
    """Numeric aggregate score for ranking; missing → -1 so any real score wins."""
    if not match:
        return -1.0
    score = (match.get("scores") or {}).get("aggregate")
    return float(score) if isinstance(score, (int, float)) else -1.0


def _pick_best(skill_match: dict | None, tile_match: dict | None) -> dict | None:
    if skill_match is None and tile_match is None:
        return None
    if _aggregate(tile_match) > _aggregate(skill_match):
        return tile_match
    return skill_match


# ── Per-skill candidate selection ──────────────────────────────────────────


def _build_query(name: str, description: str | None) -> str:
    body = (description or "").strip()
    name = (name or "").strip()
    if name and body:
        combined = f"{name} {body}"
    else:
        combined = name or body
    # Endpoint accepts up to 500 chars; trim conservatively to leave headroom.
    return combined[:480].strip()


def _select_candidates(discovery: dict) -> tuple[list[dict], int]:
    """Pick the skills to query the registry for.

    Only `source_type: "standalone"` skills are searched — anything inside a
    tile or an agent-harness directory has already been chosen by the user
    (either authored as a tile or installed via tessl.json), so suggesting a
    registry alternative would just be noise. Standalone skills are the ones
    that don't yet belong to anything, which is exactly when a registry-side
    suggestion is useful.
    """
    candidates: list[dict] = []
    skipped_non_standalone = 0

    for s in discovery.get("skills", []):
        if s.get("source_type") != "standalone":
            skipped_non_standalone += 1
            continue

        owning = s.get("owning_package") or {}
        tile_name = owning.get("name") if owning.get("kind") == "tessl_tile" else None

        candidates.append({
            "skill_id": s["skill_id"],
            "name": s.get("name") or "",
            "description": s.get("description") or "",
            "tile_name": tile_name,
        })

    return candidates, skipped_non_standalone


# ── Per-skill orchestration ────────────────────────────────────────────────


def _search_one(candidate: dict, base_url: str) -> dict:
    query = _build_query(candidate["name"], candidate["description"])
    if not query:
        return {
            "source_skill_id": candidate["skill_id"],
            "source_skill_name": candidate["name"],
            "source_tile_name": candidate["tile_name"],
            "source_query": "",
            "best_match": None,
            "search_errors": [{
                "target": "skill",
                "status": None,
                "message": "skipped: empty query (no name and no description)",
            }],
        }

    search_errors: list[dict] = []

    skill_raw, skill_err = _do_search(base_url, query, "skill")
    if skill_err:
        search_errors.append(skill_err)
    skill_match = _project_skill_match(skill_raw) if skill_raw else None

    tile_raw, tile_err = _do_search(base_url, query, "tile")
    if tile_err:
        search_errors.append(tile_err)
    tile_match = _project_tile_match(tile_raw) if tile_raw else None

    return {
        "source_skill_id": candidate["skill_id"],
        "source_skill_name": candidate["name"],
        "source_tile_name": candidate["tile_name"],
        "source_query": query,
        "best_match": _pick_best(skill_match, tile_match),
        "search_errors": search_errors,
    }


# ── Main ───────────────────────────────────────────────────────────────────


def parse_args() -> argparse.Namespace:
    p = argparse.ArgumentParser(description="Search the Tessl registry for matches against locally-authored skills.")
    p.add_argument("--discovery", required=True, help="Path to discovery.json")
    p.add_argument("--output", default=None, help="Output registry-search.json path")
    p.add_argument("--registry-base-url", default=DEFAULT_REGISTRY_BASE_URL, help="Tessl registry API base URL")
    p.add_argument("--concurrency", type=int, default=DEFAULT_CONCURRENCY, help="Parallel HTTP workers")
    return p.parse_args()


def main() -> int:
    args = parse_args()
    discovery_path = Path(args.discovery).resolve()
    if not discovery_path.exists():
        print(f"ERROR: discovery file not found: {discovery_path}", file=sys.stderr)
        return 2

    output_path = Path(args.output) if args.output else discovery_path.parent / "registry-search.json"

    discovery = json.loads(discovery_path.read_text())
    validate_against_schema(
        discovery,
        SCHEMA_DIR / "discovery.schema.json",
        role="input (discovery.json)",
        source="registry_search.py",
    )

    candidates, skipped_non_standalone = _select_candidates(discovery)
    now = datetime.now(timezone.utc)
    warnings: list[str] = []

    with ThreadPoolExecutor(max_workers=max(1, args.concurrency)) as pool:
        matches = list(pool.map(
            lambda c: _search_one(c, args.registry_base_url),
            candidates,
        ))

    skills_with_match = sum(1 for m in matches if m["best_match"] is not None)
    skills_with_no_match = len(matches) - skills_with_match
    match_kinds = {"skill": 0, "tile": 0}
    for m in matches:
        bm = m.get("best_match")
        if bm and bm.get("kind") in match_kinds:
            match_kinds[bm["kind"]] += 1
    search_error_count = sum(len(m["search_errors"]) for m in matches)

    output = {
        "schema_version": SCHEMA_VERSION,
        "metadata": {
            "scan_id": discovery.get("metadata", {}).get("scan_id"),
            "scanned_at": now.isoformat(),
            "tool_version": TOOL_VERSION,
            "registry_base_url": args.registry_base_url,
            "search_mode": SEARCH_MODE,
            "skills_searched": len(candidates),
            "skills_skipped_non_standalone": skipped_non_standalone,
        },
        "matches": matches,
        "stats": {
            "total_skills_searched": len(candidates),
            "skills_with_match": skills_with_match,
            "skills_with_no_match": skills_with_no_match,
            "match_kinds": match_kinds,
            "search_errors": search_error_count,
        },
        "warnings": warnings,
    }

    validate_against_schema(
        output,
        SCHEMA_DIR / "registry-search.schema.json",
        role="output",
        source="registry_search.py",
    )

    output_path.parent.mkdir(parents=True, exist_ok=True)
    output_path.write_text(json.dumps(output, indent=2))

    print(
        f"Registry search complete.\n"
        f"  Skills searched:        {len(candidates)} (source_type=standalone)\n"
        f"  Skipped (non-standalone): {skipped_non_standalone}\n"
        f"  With match:             {skills_with_match} (skill={match_kinds['skill']}, tile={match_kinds['tile']})\n"
        f"  No match:               {skills_with_no_match}\n"
        f"  Search errors:          {search_error_count}\n"
        f"  Output:                 {output_path}",
        file=sys.stderr,
    )
    return 0


if __name__ == "__main__":
    sys.exit(main())

skills

README.md

tile.json