CtrlK
BlogDocsLog inGet started
Tessl Logo

tessleng/skill-insights

Scan a directory or workspace for SKILL.md files across all agents and repos, capture supporting files (references, scripts, linked docs), dedupe vendored copies, enrich each Tessl tile with registry signals, and emit a canonical JSON inventory validated by JSON Schema. Then run four analytical phases in parallel against the inventory — staleness + git provenance (history, broken refs, contributors), quality (Tessl `skill review`), duplicates (similarity + LLM judgement), registry-search (per-standalone-skill registry suggestions, HTTP only) — and render a self-contained interactive HTML report with a top-of-report health overview, top-issues panel, recently-changed list, and per-tessl.json manifests view.

84

1.44x
Quality

90%

Does it follow best practices?

Impact

97%

1.44x

Average score across 2 eval scenarios

SecuritybySnyk

Advisory

Suggest reviewing before use

Overview
Quality
Evals
Security
Files

fetch_org_usage.pyskills/posthog-skill-query/scripts/

#!/usr/bin/env python3
"""Fetch org-wide skill / MCP / session usage from PostHog.

Issues a fixed set of HogQL queries against the agent-signals event stream
emitted by the Tessl CLI (`cli:agent-signals:skill-activation`,
`cli:agent-signals:mcp-tool-activation`, `cli:agent-signals:session-processed`)
and writes a self-contained `org_usage.json` describing what every Tessl user
is doing across the organisation.

Deliberately uncoupled from `discovery.json` and the rest of the
skill-insights pipeline. No filtering by which skills are installed in any
particular repo. The cross-reference (which skills in our scan are also used
elsewhere, which popular org-wide skills aren't installed here, etc.) happens
in a downstream step that reads this output alongside discovery, staleness,
quality, and duplicates.

Output conforms to references/schemas/org-usage.schema.json.

Usage:
    fetch_org_usage.py --output <path> [--windows 7,30,90] [...]

Requires:
- A PostHog personal API key with `query:read` on the project. Read from
  $POSTHOG_PERSONAL_API_KEY first, otherwise from --posthog-key-file
  (default ~/.tessl/posthog/personal-api-key).
- Stdlib only (`urllib`, `json`). `jsonschema` used for IO validation when
  available.
"""
from __future__ import annotations

import argparse
import json
import os
import sys
import time
import urllib.error
import urllib.request
from datetime import datetime, timezone
from pathlib import Path

# Bundled schema validator — same pattern as the other skill-insights phases.
# Tile layout: <tile-root>/skills/<phase>/scripts/<script>.py
#              <tile-root>/references/schemas/{_validate.py, *.schema.json}
_SCHEMA_DIR_PATH = Path(__file__).resolve().parent.parent.parent.parent / "references" / "schemas"
if str(_SCHEMA_DIR_PATH) not in sys.path:
    sys.path.insert(0, str(_SCHEMA_DIR_PATH))
from _validate import SCHEMA_DIR, validate_against_schema  # noqa: E402

TOOL_VERSION = "skill-insights@0.11.0"
SCHEMA_VERSION = "1.4"

DEFAULT_HOST = "https://us.posthog.com"
DEFAULT_PROJECT_ID = 57574
DEFAULT_DASHBOARD_ID = 1358856
DEFAULT_KEY_FILE = "~/.tessl/posthog/personal-api-key"
DEFAULT_WINDOWS = [7, 30, 90]
DEFAULT_TOP_TILES_DETAIL = 50
DEFAULT_TOP_UNTILED = 100
DEFAULT_TOP_LOADED = 5000
# Cap on the per-repo views. Tessl-internal usage today sees ~25-50 distinct
# repos in 30d; bumping it does not change the top-line totals (those are
# always all-repos), it only changes how many rows the renderer can offer
# in chip-style filters and the Repos section.
DEFAULT_TOP_REPOS = 200
DEFAULT_PRIMARY_WINDOW = 30
DEFAULT_FILTER_REPOS = "github.com/tesslio"
DEFAULT_FILTER_EMAIL_DOMAINS = "tessl.io"

SYNC_TIMEOUT_S = 60
SYNC_RETRY_BACKOFF_S = (5, 15)


# ── PostHog client ─────────────────────────────────────────────────────────


class PostHog:
    """Minimal HogQL client.

    Submits queries synchronously. PostHog's sync `/query/` endpoint runs the
    HogQL query inline and returns the rows in the response. ClickHouse caps
    each query at 10s server-side, which our queries are tuned to stay under
    (per-skill detail is filtered to the top N tiles, etc).

    Async/polling was an option but ran into production overload (`Queries
    are a little too busy right now`) because the async work pool queues
    project-wide. Sync queries skip that pool entirely.

    On transient 504s (rare) we retry a couple of times with backoff before
    giving up.
    """

    def __init__(self, host: str, project_id: int, api_key: str):
        self.host = host.rstrip("/")
        self.project_id = project_id
        self.api_key = api_key

    def hogql(self, query: str) -> list:
        body = json.dumps({"query": {"kind": "HogQLQuery", "query": query}}).encode()
        last_err: Exception | None = None
        for attempt, backoff in enumerate((0,) + SYNC_RETRY_BACKOFF_S):
            if backoff:
                time.sleep(backoff)
            req = urllib.request.Request(
                f"{self.host}/api/projects/{self.project_id}/query/",
                data=body, method="POST",
                headers={
                    "Authorization": f"Bearer {self.api_key}",
                    "Content-Type": "application/json",
                },
            )
            try:
                with urllib.request.urlopen(req, timeout=SYNC_TIMEOUT_S) as resp:
                    d = json.loads(resp.read())
            except urllib.error.HTTPError as e:
                if e.code in (502, 503, 504) and attempt < len(SYNC_RETRY_BACKOFF_S):
                    last_err = e
                    continue
                self._raise_with_body(e, query)
            if d.get("error"):
                raise RuntimeError(f"PostHog query failed: {d.get('error')}\n  query: {query.strip()[:120]}…")
            if d.get("results") is None:
                raise RuntimeError(
                    f"PostHog returned no results field; keys={list(d.keys())}\n"
                    f"  query: {query.strip()[:120]}…",
                )
            return d["results"]
        raise RuntimeError(f"PostHog query failed after retries: {last_err}")

    @staticmethod
    def _raise_with_body(e: urllib.error.HTTPError, query: str) -> None:
        try:
            body = json.loads(e.read())
            detail = body.get("detail") or body.get("error") or body
        except Exception:  # noqa: BLE001
            detail = f"HTTP {e.code}"
        raise RuntimeError(
            f"PostHog query failed (HTTP {e.code}): {detail}\n  query: {query.strip()[:120]}…",
        ) from None

    def auth_check(self) -> dict:
        req = urllib.request.Request(
            f"{self.host}/api/projects/{self.project_id}/",
            headers={"Authorization": f"Bearer {self.api_key}"},
        )
        with urllib.request.urlopen(req, timeout=15) as resp:
            return json.loads(resp.read())


# ── Helpers ────────────────────────────────────────────────────────────────


def _load_api_key(key_file: Path | None) -> str:
    env_key = os.environ.get("POSTHOG_PERSONAL_API_KEY")
    if env_key:
        return env_key.strip()
    if key_file is None:
        key_file = Path(DEFAULT_KEY_FILE).expanduser()
    if not key_file.exists():
        raise SystemExit(
            f"ERROR: PostHog key not found.\n"
            f"  Set $POSTHOG_PERSONAL_API_KEY, or write the key to {key_file}.\n"
            f"  Get a key at https://us.posthog.com/settings/user-api-keys "
            f"with at least 'project:read' and 'query:read' scopes.",
        )
    try:
        return key_file.read_text().strip()
    except OSError as e:
        raise SystemExit(f"ERROR: failed reading {key_file}: {e}")


# ── Query templates ────────────────────────────────────────────────────────
#
# `toString(properties.X)` and `properties.X` both return bare Python strings
# (or None when the property is missing). HogQL does NOT JSON-encode the
# returned value. Comparisons therefore use the bare form: `properties.x =
# 'value'` and `properties.x IS NULL` / `IS NOT NULL`. `toString()` is
# kept on SELECT only as a defensive cast so `properties.X` of varying
# underlying types renders consistently.
#
# Filtering: events are kept when they match the *repo* filter OR the *email*
# filter. Both filters are optional. `combined_filter` is a SQL fragment like
#   "AND ((properties.gitRepo = 'github.com/tesslio'
#          OR properties.gitRepo LIKE 'github.com/tesslio/%')
#         OR ifNull(toString(person.properties.email), '') LIKE '%@tessl.io')"
# It's pasted into every query that scopes events to the configured filters.
# When neither filter is configured the fragment is the empty string. The
# session-aggregates query uses `sessionGitRepo` for the repo half because
# `cli:agent-signals:session-processed` events carry that property; the
# email half is unchanged because email is on the person record.


def _repo_filter_body(prefixes: list[str], property_name: str = "properties.gitRepo") -> str:
    """The OR-joined body of a repo filter, suitable for use inside countIf().

    Returns "" when no prefixes are configured.
    """
    if not prefixes:
        return ""
    clauses: list[str] = []
    for p in prefixes:
        safe = p.replace("'", "\\'")
        clauses.append(f"{property_name} = '{safe}'")
        clauses.append(f"{property_name} LIKE '{safe}/%'")
    return "(" + " OR ".join(clauses) + ")"


def _email_filter_body(domains: list[str]) -> str:
    """The OR-joined body of an email-domain filter (matched against
    `person.properties.email` via the implicit persons join).

    `ifNull(... , '')` works around HogQL's `Nullable` strictness on `LIKE`.
    Returns "" when no domains are configured.
    """
    if not domains:
        return ""
    clauses: list[str] = []
    for d in domains:
        safe = d.lstrip("@").replace("'", "\\'")
        clauses.append(f"ifNull(toString(person.properties.email), '') LIKE '%@{safe}'")
    return "(" + " OR ".join(clauses) + ")"


def _combined_filter_body(
    prefixes: list[str],
    email_domains: list[str],
    repo_property: str = "properties.gitRepo",
) -> str:
    """The (repo OR email) body. Empty if no filter is configured."""
    parts = [b for b in (
        _repo_filter_body(prefixes, repo_property),
        _email_filter_body(email_domains),
    ) if b]
    if not parts:
        return ""
    return "(" + " OR ".join(parts) + ")"


def _build_combined_filter(
    prefixes: list[str],
    email_domains: list[str],
    repo_property: str = "properties.gitRepo",
) -> str:
    body = _combined_filter_body(prefixes, email_domains, repo_property)
    return f"AND {body}" if body else ""


def _q_filter_meta(days: int, prefixes: list[str], email_domains: list[str]) -> str:
    """Counts that show how the filter selected vs dropped events.

    Always queries the unfiltered slice so consumers can see:
    - events_total                 in the window, ignoring filter
    - events_no_gitrepo            with `properties.gitRepo IS NULL`
    - events_with_gitrepo          with non-null gitRepo
    - events_matched_filter        events that matched (repo OR email)
    - events_matched_by_repo       events whose gitRepo matched (subset)
    - events_matched_by_email      events whose person.email matched (subset)

    When neither filter is configured every event "matches" — that's the
    user-visible semantics: no filter = pull all events.
    """
    combined = _combined_filter_body(prefixes, email_domains) or "true"
    repo_only = _repo_filter_body(prefixes) or "false"
    email_only = _email_filter_body(email_domains) or "false"
    return f"""
    SELECT count() AS total,
           countIf(properties.gitRepo IS NULL)     AS no_attribution,
           countIf(properties.gitRepo IS NOT NULL) AS with_attribution,
           countIf({combined})                     AS matched,
           countIf({repo_only})                    AS matched_by_repo,
           countIf({email_only})                   AS matched_by_email
    FROM events
    WHERE event = 'cli:agent-signals:skill-activation'
      AND timestamp > now() - INTERVAL {days} DAY
    """


def _q_totals(days: int, where_filter: str) -> str:
    return f"""
    SELECT count() AS act,
           uniq(distinct_id) AS users,
           uniq(toString(properties.sessionId)) AS sessions,
           countIf(properties.skillTile IS NOT NULL) AS tiled,
           countIf(properties.skillTile IS NULL) AS untiled,
           countIf(properties.provider = 'claude-code') AS cc_act,
           countIf(properties.provider = 'cursor-ide') AS ci_act,
           uniqIf(distinct_id, properties.provider = 'claude-code') AS cc_users,
           uniqIf(distinct_id, properties.provider = 'cursor-ide') AS ci_users
    FROM events
    WHERE event = 'cli:agent-signals:skill-activation'
      AND timestamp > now() - INTERVAL {days} DAY
      {where_filter}
    """


def _q_tile_rollup(days: int, where_filter: str, limit: int = 500) -> str:
    return f"""
    SELECT toString(properties.skillTile) AS tile,
           count() AS act,
           uniq(distinct_id) AS users,
           uniq(toString(properties.sessionId)) AS sessions
    FROM events
    WHERE event = 'cli:agent-signals:skill-activation'
      AND timestamp > now() - INTERVAL {days} DAY
      AND properties.skillTile IS NOT NULL
      {where_filter}
    GROUP BY tile
    ORDER BY act DESC
    LIMIT {limit}
    """


def _q_skill_detail(days: int, tiles: list[str], where_filter: str, limit: int = 1000) -> str:
    in_clause = ", ".join("'" + t.replace("'", "\\'") + "'" for t in tiles)
    if not in_clause:
        return ""
    return f"""
    SELECT toString(properties.skillTile) AS tile,
           toString(properties.skillName) AS skill,
           count() AS act,
           uniq(distinct_id) AS users,
           uniq(toString(properties.sessionId)) AS sessions,
           min(timestamp) AS first_seen,
           max(timestamp) AS last_seen,
           countIf(properties.provider = 'claude-code') AS cc,
           countIf(properties.provider = 'cursor-ide') AS ci
    FROM events
    WHERE event = 'cli:agent-signals:skill-activation'
      AND timestamp > now() - INTERVAL {days} DAY
      AND properties.skillTile IN ({in_clause})
      {where_filter}
    GROUP BY tile, skill
    ORDER BY act DESC
    LIMIT {limit}
    """


def _q_untiled(days: int, where_filter: str, limit: int = 200) -> str:
    return f"""
    SELECT toString(properties.skillName) AS skill,
           count() AS act,
           uniq(distinct_id) AS users
    FROM events
    WHERE event = 'cli:agent-signals:skill-activation'
      AND timestamp > now() - INTERVAL {days} DAY
      AND properties.skillTile IS NULL
      AND properties.skillName IS NOT NULL
      {where_filter}
    GROUP BY skill
    ORDER BY act DESC
    LIMIT {limit}
    """


def _q_mcp(days: int, where_filter: str) -> str:
    return f"""
    SELECT toString(properties.tool) AS tool,
           count() AS act,
           uniq(distinct_id) AS users,
           uniq(toString(properties.sessionId)) AS sessions
    FROM events
    WHERE event = 'cli:agent-signals:mcp-tool-activation'
      AND timestamp > now() - INTERVAL {days} DAY
      {where_filter}
    GROUP BY tool
    ORDER BY act DESC
    """


def _q_session_aggregates(days: int, where_filter_session: str) -> str:
    # `cli:agent-signals:session-processed` events carry sessionGitRepo (not
    # gitRepo). Caller passes a filter built against that property name; the
    # email half (when configured) doesn't change.
    return f"""
    SELECT count() AS sessions,
           uniq(distinct_id) AS users,
           sum(toFloatOrZero(toString(properties.totalMessages))) AS messages,
           sum(toFloatOrZero(toString(properties.totalToolCalls))) AS tool_calls,
           sum(toFloatOrZero(toString(properties.totalSkillCalls))) AS skill_calls,
           sum(toFloatOrZero(toString(properties.tesslSkillCalls))) AS tessl_skill_calls,
           sum(toFloatOrZero(toString(properties.tesslMcpCalls))) AS tessl_mcp_calls,
           sum(toFloatOrZero(toString(properties.tesslToolCalls))) AS tessl_cli_calls
    FROM events
    WHERE event = 'cli:agent-signals:session-processed'
      AND timestamp > now() - INTERVAL {days} DAY
      {where_filter_session}
    """


def _q_loaded_skills(days: int, where_filter: str, limit: int) -> str:
    """Unroll properties.installedSkills and count distinct users per skill,
    scoped to events the configured filter selects."""
    return f"""
    WITH unrolled AS (
      SELECT distinct_id,
             arrayJoin(JSONExtractArrayRaw(coalesce(toString(properties.installedSkills), '[]'))) AS s_json
      FROM events
      WHERE event = 'cli:agent-signals:skill-activation'
        AND timestamp > now() - INTERVAL {days} DAY
        AND properties.installedSkills IS NOT NULL
        {where_filter}
    )
    SELECT JSONExtractString(s_json, 'tile') AS tile,
           JSONExtractString(s_json, 'name') AS name,
           JSONExtractString(s_json, 'scope') AS scope,
           uniq(distinct_id) AS loaded_users
    FROM unrolled
    GROUP BY tile, name, scope
    ORDER BY loaded_users DESC
    LIMIT {limit}
    """


# ── Per-repo query templates ───────────────────────────────────────────────
#
# These mirror the all-repos rollups above but add `properties.gitRepo` to the
# GROUP BY. They power the report's per-repo views and client-side repo
# filtering (so the user can untick repos they don't care about and see
# everything re-aggregate without another PostHog round-trip).
#
# Events with a NULL `gitRepo` are dropped here — they cannot be attributed to
# any repo, so they have nowhere to live in the per-repo views. Their share is
# still visible via `filter.events_per_window.<w>.events_no_gitrepo` and they
# still contribute to the all-repos `totals` and rollups.


def _q_repo_totals(days: int, where_filter: str, limit: int) -> str:
    return f"""
    SELECT toString(properties.gitRepo) AS repo,
           count() AS act,
           uniq(distinct_id) AS users,
           uniq(toString(properties.sessionId)) AS sessions,
           countIf(properties.skillTile IS NOT NULL) AS tiled,
           countIf(properties.skillTile IS NULL) AS untiled,
           countIf(properties.provider = 'claude-code') AS cc_act,
           countIf(properties.provider = 'cursor-ide') AS ci_act,
           uniqIf(distinct_id, properties.provider = 'claude-code') AS cc_users,
           uniqIf(distinct_id, properties.provider = 'cursor-ide') AS ci_users
    FROM events
    WHERE event = 'cli:agent-signals:skill-activation'
      AND timestamp > now() - INTERVAL {days} DAY
      AND properties.gitRepo IS NOT NULL
      {where_filter}
    GROUP BY repo
    ORDER BY act DESC
    LIMIT {limit}
    """


def _q_tiles_by_repo(days: int, where_filter: str, repos: list[str], limit: int) -> str:
    """Per-(tile, repo). `repos` filters to the top-N repos kept in the report."""
    if not repos:
        return ""
    in_clause = ", ".join("'" + r.replace("'", "\\'") + "'" for r in repos)
    return f"""
    SELECT toString(properties.skillTile) AS tile,
           toString(properties.gitRepo) AS repo,
           count() AS act,
           uniq(distinct_id) AS users,
           uniq(toString(properties.sessionId)) AS sessions
    FROM events
    WHERE event = 'cli:agent-signals:skill-activation'
      AND timestamp > now() - INTERVAL {days} DAY
      AND properties.skillTile IS NOT NULL
      AND properties.gitRepo IN ({in_clause})
      {where_filter}
    GROUP BY tile, repo
    ORDER BY act DESC
    LIMIT {limit}
    """


def _q_skills_by_repo(
    days: int, tiles: list[str], repos: list[str], where_filter: str, limit: int,
) -> str:
    if not tiles or not repos:
        return ""
    tile_in = ", ".join("'" + t.replace("'", "\\'") + "'" for t in tiles)
    repo_in = ", ".join("'" + r.replace("'", "\\'") + "'" for r in repos)
    return f"""
    SELECT toString(properties.skillTile) AS tile,
           toString(properties.skillName) AS skill,
           toString(properties.gitRepo) AS repo,
           count() AS act,
           uniq(distinct_id) AS users,
           uniq(toString(properties.sessionId)) AS sessions
    FROM events
    WHERE event = 'cli:agent-signals:skill-activation'
      AND timestamp > now() - INTERVAL {days} DAY
      AND properties.skillTile IN ({tile_in})
      AND properties.gitRepo IN ({repo_in})
      {where_filter}
    GROUP BY tile, skill, repo
    ORDER BY act DESC
    LIMIT {limit}
    """


def _q_untiled_by_repo(days: int, repos: list[str], where_filter: str, limit: int) -> str:
    if not repos:
        return ""
    repo_in = ", ".join("'" + r.replace("'", "\\'") + "'" for r in repos)
    return f"""
    SELECT toString(properties.skillName) AS skill,
           toString(properties.gitRepo) AS repo,
           count() AS act,
           uniq(distinct_id) AS users
    FROM events
    WHERE event = 'cli:agent-signals:skill-activation'
      AND timestamp > now() - INTERVAL {days} DAY
      AND properties.skillTile IS NULL
      AND properties.skillName IS NOT NULL
      AND properties.gitRepo IN ({repo_in})
      {where_filter}
    GROUP BY skill, repo
    ORDER BY act DESC
    LIMIT {limit}
    """


def _q_mcp_by_repo(days: int, repos: list[str], where_filter: str, limit: int) -> str:
    if not repos:
        return ""
    repo_in = ", ".join("'" + r.replace("'", "\\'") + "'" for r in repos)
    return f"""
    SELECT toString(properties.tool) AS tool,
           toString(properties.gitRepo) AS repo,
           count() AS act,
           uniq(distinct_id) AS users,
           uniq(toString(properties.sessionId)) AS sessions
    FROM events
    WHERE event = 'cli:agent-signals:mcp-tool-activation'
      AND timestamp > now() - INTERVAL {days} DAY
      AND properties.gitRepo IN ({repo_in})
      {where_filter}
    GROUP BY tool, repo
    ORDER BY act DESC
    LIMIT {limit}
    """


def _q_session_aggregates_by_repo(
    days: int, repos: list[str], where_filter_session: str, limit: int,
) -> str:
    if not repos:
        return ""
    repo_in = ", ".join("'" + r.replace("'", "\\'") + "'" for r in repos)
    return f"""
    SELECT toString(properties.sessionGitRepo) AS repo,
           count() AS sessions,
           uniq(distinct_id) AS users,
           sum(toFloatOrZero(toString(properties.totalMessages))) AS messages,
           sum(toFloatOrZero(toString(properties.totalToolCalls))) AS tool_calls,
           sum(toFloatOrZero(toString(properties.totalSkillCalls))) AS skill_calls,
           sum(toFloatOrZero(toString(properties.tesslSkillCalls))) AS tessl_skill_calls,
           sum(toFloatOrZero(toString(properties.tesslMcpCalls))) AS tessl_mcp_calls,
           sum(toFloatOrZero(toString(properties.tesslToolCalls))) AS tessl_cli_calls
    FROM events
    WHERE event = 'cli:agent-signals:session-processed'
      AND timestamp > now() - INTERVAL {days} DAY
      AND properties.sessionGitRepo IN ({repo_in})
      {where_filter_session}
    GROUP BY repo
    ORDER BY sessions DESC
    LIMIT {limit}
    """


# ── Fetch + assemble ───────────────────────────────────────────────────────


def fetch(
    posthog: PostHog,
    windows: list[int],
    top_tiles_detail: int,
    top_untiled: int,
    top_loaded: int,
    top_repos: int,
    primary_window: int,
    filter_repos: list[str],
    filter_email_domains: list[str],
) -> dict:
    if primary_window not in windows:
        windows = sorted({primary_window, *windows})

    def w_key(d: int) -> str:
        return f"{d}d"

    where_filter = _build_combined_filter(filter_repos, filter_email_domains)
    where_filter_session = _build_combined_filter(
        filter_repos, filter_email_domains, "properties.sessionGitRepo",
    )

    # ── Filter metadata per window ─────────────────────────────────────────
    # Always reports the unfiltered slice so consumers can see how much was
    # dropped by the filter vs how much had no gitRepo to begin with, and
    # what share of matches came from each filter source.
    filter_active = bool(filter_repos) or bool(filter_email_domains)
    filter_meta: dict[str, dict] = {}
    for d in windows:
        rows = posthog.hogql(_q_filter_meta(d, filter_repos, filter_email_domains))
        if rows and rows[0][0] is not None:
            total, no_attr, with_attr, matched, by_repo, by_email = (int(x) for x in rows[0])
        else:
            total = no_attr = with_attr = matched = by_repo = by_email = 0
        filter_meta[w_key(d)] = {
            "events_total": total,
            "events_no_gitrepo": no_attr,
            "events_with_gitrepo": with_attr,
            # Semantics: when no filter is configured, every event "matches"
            # (the entire window is selected) and zero events are excluded.
            # `events_matched_by_repo` and `events_matched_by_email` are
            # subsets of `events_matched_filter` and overlap when both
            # filters are configured (an event can match both).
            "events_matched_filter": matched,
            "events_matched_by_repo": by_repo,
            "events_matched_by_email": by_email,
            "events_excluded_by_filter": (total - matched) if filter_active else 0,
        }

    # ── Totals per window ──────────────────────────────────────────────────
    totals: dict[str, dict] = {}
    for d in windows:
        rows = posthog.hogql(_q_totals(d, where_filter))
        if not rows:
            totals[w_key(d)] = {
                "activations": 0, "users": 0, "sessions": 0,
                "tiled_activations": 0, "untiled_activations": 0,
                "providers": {"claude-code": {"activations": 0, "users": 0},
                              "cursor-ide": {"activations": 0, "users": 0}},
            }
            continue
        act, users, sessions, tiled, untiled, cc, ci, ccu, ciu = rows[0]
        totals[w_key(d)] = {
            "activations": int(act),
            "users": int(users),
            "sessions": int(sessions),
            "tiled_activations": int(tiled),
            "untiled_activations": int(untiled),
            "providers": {
                "claude-code": {"activations": int(cc), "users": int(ccu)},
                "cursor-ide": {"activations": int(ci), "users": int(ciu)},
            },
        }

    # ── Tile rollup per window ─────────────────────────────────────────────
    # Query already filters `properties.skillTile IS NOT NULL`; defensive
    # check here just in case PostHog returns a stray empty/None tile.
    tiles_by_id: dict[str, dict] = {}
    for d in windows:
        rows = posthog.hogql(_q_tile_rollup(d, where_filter))
        for tile, act, users, sessions in rows:
            if not tile:
                continue
            entry = tiles_by_id.setdefault(tile, {"tile": tile, "windows": {}})
            entry["windows"][w_key(d)] = {
                "activations": int(act),
                "users": int(users),
                "sessions": int(sessions),
            }

    # Order tiles by primary-window activations (descending). Tiles with no
    # primary-window data go to the end, ranked by their largest-window
    # activations as a tiebreaker.
    def tile_sort_key(t: dict) -> tuple[int, int, int]:
        primary = t["windows"].get(w_key(primary_window)) or {"activations": 0, "users": 0}
        # 0 = has primary-window data, 1 = doesn't (sorts last)
        bucket = 0 if primary["activations"] > 0 else 1
        # Fallback: largest activations seen across any window, used only for
        # ordering items that lack primary-window data.
        any_max = max(
            (w["activations"] for w in t["windows"].values()),
            default=0,
        )
        return (bucket, -primary["activations"], -any_max)

    tiles_sorted = sorted(tiles_by_id.values(), key=tile_sort_key)

    # ── Per-skill detail for top tiles, all windows ────────────────────────
    top_tiles = [t["tile"] for t in tiles_sorted[:top_tiles_detail]]
    skills_by_id: dict[tuple[str, str], dict] = {}
    for d in windows:
        if not top_tiles:
            break
        query = _q_skill_detail(d, top_tiles, where_filter)
        if not query:
            continue
        rows = posthog.hogql(query)
        for tile, skill, act, users, sessions, first_seen, last_seen, cc, ci in rows:
            if not tile or not skill:
                continue
            key = (tile, skill)
            entry = skills_by_id.setdefault(key, {
                "tile": tile, "name": skill,
                "windows": {}, "providers": {}, "first_seen": None, "last_seen": None,
            })
            entry["windows"][w_key(d)] = {
                "activations": int(act),
                "users": int(users),
                "sessions": int(sessions),
            }
            # Providers + first/last seen captured against the primary window.
            if d == primary_window:
                entry["providers"] = {
                    "claude-code": int(cc),
                    "cursor-ide": int(ci),
                }
                entry["first_seen"] = first_seen
                entry["last_seen"] = last_seen

    def skill_sort_key(s: dict) -> tuple[int, int, int]:
        primary = s["windows"].get(w_key(primary_window)) or {"activations": 0, "users": 0}
        bucket = 0 if primary["activations"] > 0 else 1
        any_max = max(
            (w["activations"] for w in s["windows"].values()),
            default=0,
        )
        return (bucket, -primary["activations"], -any_max)

    skills_sorted = sorted(skills_by_id.values(), key=skill_sort_key)

    # ── Untiled (no skillTile) — third-party / private SKILL.md files ──────
    untiled_by_name: dict[str, dict] = {}
    for d in windows:
        rows = posthog.hogql(_q_untiled(d, where_filter, top_untiled))
        for skill, act, users in rows:
            if not skill:
                continue
            entry = untiled_by_name.setdefault(skill, {
                "name": skill, "windows": {},
            })
            entry["windows"][w_key(d)] = {
                "activations": int(act),
                "users": int(users),
            }

    untiled_sorted = sorted(
        untiled_by_name.values(),
        key=lambda s: -(s["windows"].get(w_key(primary_window), {}).get("activations", 0)),
    )

    # ── MCP tools ──────────────────────────────────────────────────────────
    mcp_by_tool: dict[str, dict] = {}
    for d in windows:
        rows = posthog.hogql(_q_mcp(d, where_filter))
        for tool, act, users, sessions in rows:
            if not tool:
                continue
            entry = mcp_by_tool.setdefault(tool, {
                "tool": tool, "windows": {},
            })
            entry["windows"][w_key(d)] = {
                "activations": int(act),
                "users": int(users),
                "sessions": int(sessions),
            }
    mcp_sorted = sorted(
        mcp_by_tool.values(),
        key=lambda m: -(m["windows"].get(w_key(primary_window), {}).get("activations", 0)),
    )

    # ── Session aggregates ─────────────────────────────────────────────────
    session_aggregates: dict[str, dict] = {}
    for d in windows:
        rows = posthog.hogql(_q_session_aggregates(d, where_filter_session))
        if not rows or rows[0][0] is None:
            session_aggregates[w_key(d)] = {
                "sessions": 0, "users": 0,
                "messages": 0, "tool_calls": 0, "skill_calls": 0,
                "tessl_skill_calls": 0, "tessl_mcp_calls": 0, "tessl_cli_calls": 0,
            }
            continue
        sessions, users, messages, tool_calls, skill_calls, tessl_skill, tessl_mcp, tessl_cli = rows[0]
        session_aggregates[w_key(d)] = {
            "sessions": int(sessions or 0),
            "users": int(users or 0),
            "messages": int(messages or 0),
            "tool_calls": int(tool_calls or 0),
            "skill_calls": int(skill_calls or 0),
            "tessl_skill_calls": int(tessl_skill or 0),
            "tessl_mcp_calls": int(tessl_mcp or 0),
            "tessl_cli_calls": int(tessl_cli or 0),
        }

    # ── Loaded skills (from installedSkills array on activation events) ────
    # Same key shape as `skills` (tile, name) so downstream can join them.
    # We separately retain `scope` because the same (tile, name) can appear
    # both as a project install and a global install for different users.
    loaded_by_id: dict[tuple[str, str, str], dict] = {}
    for d in windows:
        rows = posthog.hogql(_q_loaded_skills(d, where_filter, top_loaded))
        for tile, name, scope, users in rows:
            if not name:
                continue
            key = (tile or "", name, scope or "")
            entry = loaded_by_id.setdefault(key, {
                "tile": tile or None,
                "name": name,
                "scope": scope or None,
                "windows": {},
            })
            entry["windows"][w_key(d)] = {"users": int(users)}

    loaded_sorted = sorted(
        loaded_by_id.values(),
        key=lambda s: -(s["windows"].get(w_key(primary_window), {}).get("users", 0)),
    )

    # ── Per-repo views ─────────────────────────────────────────────────────
    # Power the report's Repos section and the client-side repo chip filter.
    # Strategy:
    # 1. Pull per-(repo) totals across all windows; assemble `repos[]`.
    # 2. Pick the top N repos by primary-window activations and use that set
    #    as the IN-list for every `*_by_repo` follow-up query. This keeps
    #    long-tail repos (1–2 activations from random external orgs) out of
    #    the report without dropping their volume from the all-repos
    #    aggregates above.
    # 3. Emit empty arrays / objects when no repos are present. Schema treats
    #    these fields as required so we never write `null`.
    repos_by_id: dict[str, dict] = {}
    for d in windows:
        rows = posthog.hogql(_q_repo_totals(d, where_filter, top_repos))
        for repo, act, users, sessions, tiled, untiled, cc, ci, ccu, ciu in rows:
            if not repo:
                continue
            entry = repos_by_id.setdefault(repo, {"repo": repo, "windows": {}})
            entry["windows"][w_key(d)] = {
                "activations": int(act),
                "users": int(users),
                "sessions": int(sessions),
                "tiled_activations": int(tiled),
                "untiled_activations": int(untiled),
                "providers": {
                    "claude-code": {"activations": int(cc), "users": int(ccu)},
                    "cursor-ide": {"activations": int(ci), "users": int(ciu)},
                },
            }

    def repo_sort_key(r: dict) -> tuple[int, int, int]:
        primary = r["windows"].get(w_key(primary_window)) or {"activations": 0}
        bucket = 0 if primary.get("activations", 0) > 0 else 1
        any_max = max(
            (w["activations"] for w in r["windows"].values()),
            default=0,
        )
        return (bucket, -primary.get("activations", 0), -any_max)

    repos_sorted = sorted(repos_by_id.values(), key=repo_sort_key)[:top_repos]
    top_repos_list = [r["repo"] for r in repos_sorted]

    # ── Per-(tile, repo) ───────────────────────────────────────────────────
    tiles_by_repo: dict[tuple[str, str], dict] = {}
    for d in windows:
        q = _q_tiles_by_repo(d, where_filter, top_repos_list, limit=10000)
        if not q:
            continue
        rows = posthog.hogql(q)
        for tile, repo, act, users, sessions in rows:
            if not tile or not repo:
                continue
            key = (tile, repo)
            entry = tiles_by_repo.setdefault(key, {
                "tile": tile, "repo": repo, "windows": {},
            })
            entry["windows"][w_key(d)] = {
                "activations": int(act),
                "users": int(users),
                "sessions": int(sessions),
            }

    # ── Per-(tile, skill, repo) ────────────────────────────────────────────
    # Same top-tile gate as `skills[]` so we don't blow up the query budget.
    skills_by_repo: dict[tuple[str, str, str], dict] = {}
    for d in windows:
        q = _q_skills_by_repo(d, top_tiles, top_repos_list, where_filter, limit=10000)
        if not q:
            continue
        rows = posthog.hogql(q)
        for tile, skill, repo, act, users, sessions in rows:
            if not tile or not skill or not repo:
                continue
            key = (tile, skill, repo)
            entry = skills_by_repo.setdefault(key, {
                "tile": tile, "name": skill, "repo": repo, "windows": {},
            })
            entry["windows"][w_key(d)] = {
                "activations": int(act),
                "users": int(users),
                "sessions": int(sessions),
            }

    # ── Per-(name, repo) untiled ───────────────────────────────────────────
    untiled_by_repo: dict[tuple[str, str], dict] = {}
    for d in windows:
        q = _q_untiled_by_repo(d, top_repos_list, where_filter, limit=10000)
        if not q:
            continue
        rows = posthog.hogql(q)
        for skill, repo, act, users in rows:
            if not skill or not repo:
                continue
            key = (skill, repo)
            entry = untiled_by_repo.setdefault(key, {
                "name": skill, "repo": repo, "windows": {},
            })
            entry["windows"][w_key(d)] = {
                "activations": int(act),
                "users": int(users),
            }

    # ── Per-(tool, repo) MCP ───────────────────────────────────────────────
    mcp_by_repo: dict[tuple[str, str], dict] = {}
    for d in windows:
        q = _q_mcp_by_repo(d, top_repos_list, where_filter, limit=5000)
        if not q:
            continue
        rows = posthog.hogql(q)
        for tool, repo, act, users, sessions in rows:
            if not tool or not repo:
                continue
            key = (tool, repo)
            entry = mcp_by_repo.setdefault(key, {
                "tool": tool, "repo": repo, "windows": {},
            })
            entry["windows"][w_key(d)] = {
                "activations": int(act),
                "users": int(users),
                "sessions": int(sessions),
            }

    # ── Per-(repo) session aggregates per window ───────────────────────────
    session_aggregates_by_repo: dict[str, list[dict]] = {}
    for d in windows:
        q = _q_session_aggregates_by_repo(
            d, top_repos_list, where_filter_session, limit=top_repos,
        )
        per_window: list[dict] = []
        if q:
            rows = posthog.hogql(q)
            for row in rows:
                repo, sessions, users, messages, tc, sc, tsc, tmc, tcli = row
                if not repo:
                    continue
                per_window.append({
                    "repo": repo,
                    "sessions": int(sessions or 0),
                    "users": int(users or 0),
                    "messages": int(messages or 0),
                    "tool_calls": int(tc or 0),
                    "skill_calls": int(sc or 0),
                    "tessl_skill_calls": int(tsc or 0),
                    "tessl_mcp_calls": int(tmc or 0),
                    "tessl_cli_calls": int(tcli or 0),
                })
        session_aggregates_by_repo[w_key(d)] = per_window

    return {
        "schema_version": SCHEMA_VERSION,
        "fetched_at": datetime.now(timezone.utc).isoformat(timespec="seconds").replace("+00:00", "Z"),
        "source": {
            "kind": "posthog",
            "host": posthog.host,
            "project_id": posthog.project_id,
            "dashboard_id_for_reference": DEFAULT_DASHBOARD_ID,
        },
        "windows": windows,
        "primary_window_days": primary_window,
        "top_tiles_detail": top_tiles_detail,
        "top_untiled": top_untiled,
        "top_loaded": top_loaded,
        "top_repos": top_repos,
        "tool_version": TOOL_VERSION,
        "filter": {
            "repos": filter_repos,
            "email_domains": filter_email_domains,
            "match_kind": "prefix-or-email-domain",
            "events_per_window": filter_meta,
        },
        "totals": totals,
        "tiles": tiles_sorted,
        "skills": skills_sorted,
        "untiled_skills": untiled_sorted,
        "loaded_skills": loaded_sorted,
        "mcp_tools": mcp_sorted,
        "session_aggregates": session_aggregates,
        "repos": repos_sorted,
        "tiles_by_repo": list(tiles_by_repo.values()),
        "skills_by_repo": list(skills_by_repo.values()),
        "untiled_skills_by_repo": list(untiled_by_repo.values()),
        "mcp_tools_by_repo": list(mcp_by_repo.values()),
        "session_aggregates_by_repo": session_aggregates_by_repo,
    }


# ── CLI ────────────────────────────────────────────────────────────────────


def parse_windows(s: str) -> list[int]:
    out = []
    for tok in s.split(","):
        tok = tok.strip()
        if not tok:
            continue
        try:
            n = int(tok)
        except ValueError:
            raise SystemExit(f"--windows: '{tok}' is not an integer")
        if n <= 0:
            raise SystemExit(f"--windows: {n} must be > 0")
        out.append(n)
    if not out:
        raise SystemExit("--windows: must contain at least one positive integer")
    return sorted(set(out))


def parse_filter_repos(s: str) -> list[str]:
    """Parse the comma-separated list of repo prefixes for the filter.

    Empty string (or whitespace-only) means "no filter" — returns an empty list.
    """
    if not s or not s.strip():
        return []
    out: list[str] = []
    for tok in s.split(","):
        tok = tok.strip().rstrip("/")
        if not tok:
            continue
        out.append(tok)
    return out


def parse_filter_email_domains(s: str) -> list[str]:
    """Parse the comma-separated list of email domains.

    Strips a leading `@` if present so users can write either `tessl.io` or
    `@tessl.io`. Empty string (or whitespace-only) means "no filter" —
    returns an empty list.
    """
    if not s or not s.strip():
        return []
    out: list[str] = []
    for tok in s.split(","):
        tok = tok.strip().lstrip("@")
        if not tok:
            continue
        out.append(tok)
    return out


def main() -> int:
    p = argparse.ArgumentParser(
        description="Fetch org-wide skill / MCP / session usage from PostHog.",
    )
    p.add_argument("--output", required=True,
                   help="Path to write org_usage.json")
    p.add_argument("--posthog-host", default=DEFAULT_HOST,
                   help=f"PostHog host (default: {DEFAULT_HOST})")
    p.add_argument("--posthog-project", type=int, default=DEFAULT_PROJECT_ID,
                   help=f"PostHog project ID (default: {DEFAULT_PROJECT_ID})")
    p.add_argument("--posthog-key-file", default=None,
                   help=f"Path to API key file (default: {DEFAULT_KEY_FILE}). "
                        f"$POSTHOG_PERSONAL_API_KEY takes precedence.")
    p.add_argument("--windows", default=",".join(str(d) for d in DEFAULT_WINDOWS),
                   help=f"Comma-separated day windows (default: {','.join(str(d) for d in DEFAULT_WINDOWS)})")
    p.add_argument("--primary-window", type=int, default=DEFAULT_PRIMARY_WINDOW,
                   help=f"Window used for sort + provider breakdown (default: {DEFAULT_PRIMARY_WINDOW})")
    p.add_argument("--top-tiles-detail", type=int, default=DEFAULT_TOP_TILES_DETAIL,
                   help=f"Per-skill detail fetched only for top N tiles (default: {DEFAULT_TOP_TILES_DETAIL})")
    p.add_argument("--top-untiled", type=int, default=DEFAULT_TOP_UNTILED,
                   help=f"Number of untiled skills to include (default: {DEFAULT_TOP_UNTILED})")
    p.add_argument("--top-loaded", type=int, default=DEFAULT_TOP_LOADED,
                   help=f"Max loaded-skill rows to fetch per window (default: {DEFAULT_TOP_LOADED})")
    p.add_argument("--top-repos", type=int, default=DEFAULT_TOP_REPOS,
                   help=f"Max distinct repos retained in `repos[]` and `*_by_repo[]` per window "
                        f"(default: {DEFAULT_TOP_REPOS}). Long-tail repos beyond this cap are "
                        f"dropped from per-repo views; their events still feed the all-repos totals.")
    p.add_argument("--filter-repos", default=DEFAULT_FILTER_REPOS,
                   help=f"Comma-separated repo prefixes to filter to "
                        f"(default: {DEFAULT_FILTER_REPOS!r}). Each prefix matches "
                        f"properties.gitRepo as exact-equal OR `prefix/`-prefixed. "
                        f"Pass an empty string ('') to disable repo filtering.")
    p.add_argument("--filter-email-domains", default=DEFAULT_FILTER_EMAIL_DOMAINS,
                   help=f"Comma-separated email domains for the persons-side filter "
                        f"(default: {DEFAULT_FILTER_EMAIL_DOMAINS!r}). Matched as "
                        f"`%@<domain>` against person.properties.email. The `@` is "
                        f"optional. Empty string disables email filtering. "
                        f"Repo and email filters combine with OR — an event passes "
                        f"if either filter matches.")
    args = p.parse_args()

    windows = parse_windows(args.windows)
    if args.primary_window not in windows:
        windows = sorted({args.primary_window, *windows})
    filter_repos = parse_filter_repos(args.filter_repos)
    filter_email_domains = parse_filter_email_domains(args.filter_email_domains)

    key_file = Path(args.posthog_key_file).expanduser() if args.posthog_key_file else None
    api_key = _load_api_key(key_file)

    posthog = PostHog(args.posthog_host, args.posthog_project, api_key)

    # Sanity-check auth before doing real work
    try:
        proj = posthog.auth_check()
    except urllib.error.HTTPError as e:
        body = ""
        try:
            body = e.read().decode()[:300]
        except Exception:  # noqa: BLE001
            pass
        raise SystemExit(
            f"ERROR: PostHog auth failed (HTTP {e.code}). {body}\n"
            f"  Verify the key has 'project:read' and 'query:read' scopes for project {args.posthog_project}.",
        )
    print(
        f"PostHog auth OK — project {proj.get('id')} ({proj.get('name')!r})",
        file=sys.stderr,
    )

    t0 = time.time()
    output = fetch(
        posthog,
        windows=windows,
        top_tiles_detail=args.top_tiles_detail,
        top_untiled=args.top_untiled,
        top_loaded=args.top_loaded,
        top_repos=args.top_repos,
        primary_window=args.primary_window,
        filter_repos=filter_repos,
        filter_email_domains=filter_email_domains,
    )
    elapsed = time.time() - t0

    validate_against_schema(
        output,
        SCHEMA_DIR / "org-usage.schema.json",
        role="output",
        source="fetch_org_usage.py",
    )

    output_path = Path(args.output).expanduser()
    output_path.parent.mkdir(parents=True, exist_ok=True)
    output_path.write_text(json.dumps(output, indent=2))

    primary = output["totals"].get(f"{args.primary_window}d", {})
    fmeta = output["filter"]["events_per_window"].get(f"{args.primary_window}d", {})
    filter_parts: list[str] = []
    if filter_repos:
        filter_parts.append("repos=" + ",".join(filter_repos))
    if filter_email_domains:
        filter_parts.append("emails=@" + ",@".join(filter_email_domains))
    filter_str = " OR ".join(filter_parts) if filter_parts else "(none — pulling all events)"

    matched = fmeta.get("events_matched_filter", 0)
    by_repo = fmeta.get("events_matched_by_repo", 0)
    by_email = fmeta.get("events_matched_by_email", 0)
    total = fmeta.get("events_total", 0)
    no_attr = fmeta.get("events_no_gitrepo", 0)
    excluded_other = fmeta.get("events_excluded_by_filter", 0)
    if filter_repos or filter_email_domains:
        breakdown = []
        if filter_repos: breakdown.append(f"{by_repo} via repo")
        if filter_email_domains: breakdown.append(f"{by_email} via email")
        slice_str = (
            f"{matched} of {total} events selected "
            f"({', '.join(breakdown)}; "
            f"{excluded_other} excluded; {no_attr} had no gitRepo)"
        )
    else:
        slice_str = f"{total} events (no filter applied; {no_attr} had no gitRepo for reference)"
    print(
        f"Org usage fetched in {elapsed:.1f}s.\n"
        f"  Filter:        {filter_str}\n"
        f"  {args.primary_window}d slice:    {slice_str}\n"
        f"  Windows:       {windows}\n"
        f"  Primary {args.primary_window}d: "
        f"{primary.get('activations', 0)} activations, "
        f"{primary.get('users', 0)} users, "
        f"{primary.get('sessions', 0)} sessions\n"
        f"  Tiles seen:    {len(output['tiles'])}\n"
        f"  Skills seen:   {len(output['skills'])} (top {args.top_tiles_detail} tiles, all windows)\n"
        f"  Untiled:       {len(output['untiled_skills'])} (no skillTile attribution)\n"
        f"  Loaded skills: {len(output['loaded_skills'])} (from installedSkills snapshots)\n"
        f"  MCP tools:     {len(output['mcp_tools'])}\n"
        f"  Repos seen:    {len(output['repos'])} (capped at top {args.top_repos})\n"
        f"  By-repo rows:  {len(output['tiles_by_repo'])} tile×repo, "
        f"{len(output['skills_by_repo'])} skill×repo, "
        f"{len(output['untiled_skills_by_repo'])} untiled×repo, "
        f"{len(output['mcp_tools_by_repo'])} mcp×repo\n"
        f"  Output:        {output_path}",
        file=sys.stderr,
    )

    return 0


if __name__ == "__main__":
    sys.exit(main())

skills

README.md

tile.json