Scan a directory or workspace for SKILL.md files across all agents and repos, capture supporting files (references, scripts, linked docs), dedupe vendored copies, enrich each Tessl tile with registry signals, and emit a canonical JSON inventory validated by JSON Schema. Then run four analytical phases in parallel against the inventory — staleness + git provenance (history, broken refs, contributors), quality (Tessl `skill review`), duplicates (similarity + LLM judgement), registry-search (per-standalone-skill registry suggestions, HTTP only) — and render a self-contained interactive HTML report with a top-of-report health overview, top-issues panel, recently-changed list, and per-tessl.json manifests view.
84
90%
Does it follow best practices?
Impact
97%
1.44xAverage score across 2 eval scenarios
Advisory
Suggest reviewing before use
#!/usr/bin/env python3
"""Fetch org-wide skill / MCP / session usage from PostHog.
Issues a fixed set of HogQL queries against the agent-signals event stream
emitted by the Tessl CLI (`cli:agent-signals:skill-activation`,
`cli:agent-signals:mcp-tool-activation`, `cli:agent-signals:session-processed`)
and writes a self-contained `org_usage.json` describing what every Tessl user
is doing across the organisation.
Deliberately uncoupled from `discovery.json` and the rest of the
skill-insights pipeline. No filtering by which skills are installed in any
particular repo. The cross-reference (which skills in our scan are also used
elsewhere, which popular org-wide skills aren't installed here, etc.) happens
in a downstream step that reads this output alongside discovery, staleness,
quality, and duplicates.
Output conforms to references/schemas/org-usage.schema.json.
Usage:
fetch_org_usage.py --output <path> [--windows 7,30,90] [...]
Requires:
- A PostHog personal API key with `query:read` on the project. Read from
$POSTHOG_PERSONAL_API_KEY first, otherwise from --posthog-key-file
(default ~/.tessl/posthog/personal-api-key).
- Stdlib only (`urllib`, `json`). `jsonschema` used for IO validation when
available.
"""
from __future__ import annotations
import argparse
import json
import os
import sys
import time
import urllib.error
import urllib.request
from datetime import datetime, timezone
from pathlib import Path
# Bundled schema validator — same pattern as the other skill-insights phases.
# Tile layout: <tile-root>/skills/<phase>/scripts/<script>.py
# <tile-root>/references/schemas/{_validate.py, *.schema.json}
_SCHEMA_DIR_PATH = Path(__file__).resolve().parent.parent.parent.parent / "references" / "schemas"
if str(_SCHEMA_DIR_PATH) not in sys.path:
sys.path.insert(0, str(_SCHEMA_DIR_PATH))
from _validate import SCHEMA_DIR, validate_against_schema # noqa: E402
TOOL_VERSION = "skill-insights@0.11.0"
SCHEMA_VERSION = "1.4"
DEFAULT_HOST = "https://us.posthog.com"
DEFAULT_PROJECT_ID = 57574
DEFAULT_DASHBOARD_ID = 1358856
DEFAULT_KEY_FILE = "~/.tessl/posthog/personal-api-key"
DEFAULT_WINDOWS = [7, 30, 90]
DEFAULT_TOP_TILES_DETAIL = 50
DEFAULT_TOP_UNTILED = 100
DEFAULT_TOP_LOADED = 5000
# Cap on the per-repo views. Tessl-internal usage today sees ~25-50 distinct
# repos in 30d; bumping it does not change the top-line totals (those are
# always all-repos), it only changes how many rows the renderer can offer
# in chip-style filters and the Repos section.
DEFAULT_TOP_REPOS = 200
DEFAULT_PRIMARY_WINDOW = 30
DEFAULT_FILTER_REPOS = "github.com/tesslio"
DEFAULT_FILTER_EMAIL_DOMAINS = "tessl.io"
SYNC_TIMEOUT_S = 60
SYNC_RETRY_BACKOFF_S = (5, 15)
# ── PostHog client ─────────────────────────────────────────────────────────
class PostHog:
"""Minimal HogQL client.
Submits queries synchronously. PostHog's sync `/query/` endpoint runs the
HogQL query inline and returns the rows in the response. ClickHouse caps
each query at 10s server-side, which our queries are tuned to stay under
(per-skill detail is filtered to the top N tiles, etc).
Async/polling was an option but ran into production overload (`Queries
are a little too busy right now`) because the async work pool queues
project-wide. Sync queries skip that pool entirely.
On transient 504s (rare) we retry a couple of times with backoff before
giving up.
"""
def __init__(self, host: str, project_id: int, api_key: str):
self.host = host.rstrip("/")
self.project_id = project_id
self.api_key = api_key
def hogql(self, query: str) -> list:
body = json.dumps({"query": {"kind": "HogQLQuery", "query": query}}).encode()
last_err: Exception | None = None
for attempt, backoff in enumerate((0,) + SYNC_RETRY_BACKOFF_S):
if backoff:
time.sleep(backoff)
req = urllib.request.Request(
f"{self.host}/api/projects/{self.project_id}/query/",
data=body, method="POST",
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
},
)
try:
with urllib.request.urlopen(req, timeout=SYNC_TIMEOUT_S) as resp:
d = json.loads(resp.read())
except urllib.error.HTTPError as e:
if e.code in (502, 503, 504) and attempt < len(SYNC_RETRY_BACKOFF_S):
last_err = e
continue
self._raise_with_body(e, query)
if d.get("error"):
raise RuntimeError(f"PostHog query failed: {d.get('error')}\n query: {query.strip()[:120]}…")
if d.get("results") is None:
raise RuntimeError(
f"PostHog returned no results field; keys={list(d.keys())}\n"
f" query: {query.strip()[:120]}…",
)
return d["results"]
raise RuntimeError(f"PostHog query failed after retries: {last_err}")
@staticmethod
def _raise_with_body(e: urllib.error.HTTPError, query: str) -> None:
try:
body = json.loads(e.read())
detail = body.get("detail") or body.get("error") or body
except Exception: # noqa: BLE001
detail = f"HTTP {e.code}"
raise RuntimeError(
f"PostHog query failed (HTTP {e.code}): {detail}\n query: {query.strip()[:120]}…",
) from None
def auth_check(self) -> dict:
req = urllib.request.Request(
f"{self.host}/api/projects/{self.project_id}/",
headers={"Authorization": f"Bearer {self.api_key}"},
)
with urllib.request.urlopen(req, timeout=15) as resp:
return json.loads(resp.read())
# ── Helpers ────────────────────────────────────────────────────────────────
def _load_api_key(key_file: Path | None) -> str:
env_key = os.environ.get("POSTHOG_PERSONAL_API_KEY")
if env_key:
return env_key.strip()
if key_file is None:
key_file = Path(DEFAULT_KEY_FILE).expanduser()
if not key_file.exists():
raise SystemExit(
f"ERROR: PostHog key not found.\n"
f" Set $POSTHOG_PERSONAL_API_KEY, or write the key to {key_file}.\n"
f" Get a key at https://us.posthog.com/settings/user-api-keys "
f"with at least 'project:read' and 'query:read' scopes.",
)
try:
return key_file.read_text().strip()
except OSError as e:
raise SystemExit(f"ERROR: failed reading {key_file}: {e}")
# ── Query templates ────────────────────────────────────────────────────────
#
# `toString(properties.X)` and `properties.X` both return bare Python strings
# (or None when the property is missing). HogQL does NOT JSON-encode the
# returned value. Comparisons therefore use the bare form: `properties.x =
# 'value'` and `properties.x IS NULL` / `IS NOT NULL`. `toString()` is
# kept on SELECT only as a defensive cast so `properties.X` of varying
# underlying types renders consistently.
#
# Filtering: events are kept when they match the *repo* filter OR the *email*
# filter. Both filters are optional. `combined_filter` is a SQL fragment like
# "AND ((properties.gitRepo = 'github.com/tesslio'
# OR properties.gitRepo LIKE 'github.com/tesslio/%')
# OR ifNull(toString(person.properties.email), '') LIKE '%@tessl.io')"
# It's pasted into every query that scopes events to the configured filters.
# When neither filter is configured the fragment is the empty string. The
# session-aggregates query uses `sessionGitRepo` for the repo half because
# `cli:agent-signals:session-processed` events carry that property; the
# email half is unchanged because email is on the person record.
def _repo_filter_body(prefixes: list[str], property_name: str = "properties.gitRepo") -> str:
"""The OR-joined body of a repo filter, suitable for use inside countIf().
Returns "" when no prefixes are configured.
"""
if not prefixes:
return ""
clauses: list[str] = []
for p in prefixes:
safe = p.replace("'", "\\'")
clauses.append(f"{property_name} = '{safe}'")
clauses.append(f"{property_name} LIKE '{safe}/%'")
return "(" + " OR ".join(clauses) + ")"
def _email_filter_body(domains: list[str]) -> str:
"""The OR-joined body of an email-domain filter (matched against
`person.properties.email` via the implicit persons join).
`ifNull(... , '')` works around HogQL's `Nullable` strictness on `LIKE`.
Returns "" when no domains are configured.
"""
if not domains:
return ""
clauses: list[str] = []
for d in domains:
safe = d.lstrip("@").replace("'", "\\'")
clauses.append(f"ifNull(toString(person.properties.email), '') LIKE '%@{safe}'")
return "(" + " OR ".join(clauses) + ")"
def _combined_filter_body(
prefixes: list[str],
email_domains: list[str],
repo_property: str = "properties.gitRepo",
) -> str:
"""The (repo OR email) body. Empty if no filter is configured."""
parts = [b for b in (
_repo_filter_body(prefixes, repo_property),
_email_filter_body(email_domains),
) if b]
if not parts:
return ""
return "(" + " OR ".join(parts) + ")"
def _build_combined_filter(
prefixes: list[str],
email_domains: list[str],
repo_property: str = "properties.gitRepo",
) -> str:
body = _combined_filter_body(prefixes, email_domains, repo_property)
return f"AND {body}" if body else ""
def _q_filter_meta(days: int, prefixes: list[str], email_domains: list[str]) -> str:
"""Counts that show how the filter selected vs dropped events.
Always queries the unfiltered slice so consumers can see:
- events_total in the window, ignoring filter
- events_no_gitrepo with `properties.gitRepo IS NULL`
- events_with_gitrepo with non-null gitRepo
- events_matched_filter events that matched (repo OR email)
- events_matched_by_repo events whose gitRepo matched (subset)
- events_matched_by_email events whose person.email matched (subset)
When neither filter is configured every event "matches" — that's the
user-visible semantics: no filter = pull all events.
"""
combined = _combined_filter_body(prefixes, email_domains) or "true"
repo_only = _repo_filter_body(prefixes) or "false"
email_only = _email_filter_body(email_domains) or "false"
return f"""
SELECT count() AS total,
countIf(properties.gitRepo IS NULL) AS no_attribution,
countIf(properties.gitRepo IS NOT NULL) AS with_attribution,
countIf({combined}) AS matched,
countIf({repo_only}) AS matched_by_repo,
countIf({email_only}) AS matched_by_email
FROM events
WHERE event = 'cli:agent-signals:skill-activation'
AND timestamp > now() - INTERVAL {days} DAY
"""
def _q_totals(days: int, where_filter: str) -> str:
return f"""
SELECT count() AS act,
uniq(distinct_id) AS users,
uniq(toString(properties.sessionId)) AS sessions,
countIf(properties.skillTile IS NOT NULL) AS tiled,
countIf(properties.skillTile IS NULL) AS untiled,
countIf(properties.provider = 'claude-code') AS cc_act,
countIf(properties.provider = 'cursor-ide') AS ci_act,
uniqIf(distinct_id, properties.provider = 'claude-code') AS cc_users,
uniqIf(distinct_id, properties.provider = 'cursor-ide') AS ci_users
FROM events
WHERE event = 'cli:agent-signals:skill-activation'
AND timestamp > now() - INTERVAL {days} DAY
{where_filter}
"""
def _q_tile_rollup(days: int, where_filter: str, limit: int = 500) -> str:
return f"""
SELECT toString(properties.skillTile) AS tile,
count() AS act,
uniq(distinct_id) AS users,
uniq(toString(properties.sessionId)) AS sessions
FROM events
WHERE event = 'cli:agent-signals:skill-activation'
AND timestamp > now() - INTERVAL {days} DAY
AND properties.skillTile IS NOT NULL
{where_filter}
GROUP BY tile
ORDER BY act DESC
LIMIT {limit}
"""
def _q_skill_detail(days: int, tiles: list[str], where_filter: str, limit: int = 1000) -> str:
in_clause = ", ".join("'" + t.replace("'", "\\'") + "'" for t in tiles)
if not in_clause:
return ""
return f"""
SELECT toString(properties.skillTile) AS tile,
toString(properties.skillName) AS skill,
count() AS act,
uniq(distinct_id) AS users,
uniq(toString(properties.sessionId)) AS sessions,
min(timestamp) AS first_seen,
max(timestamp) AS last_seen,
countIf(properties.provider = 'claude-code') AS cc,
countIf(properties.provider = 'cursor-ide') AS ci
FROM events
WHERE event = 'cli:agent-signals:skill-activation'
AND timestamp > now() - INTERVAL {days} DAY
AND properties.skillTile IN ({in_clause})
{where_filter}
GROUP BY tile, skill
ORDER BY act DESC
LIMIT {limit}
"""
def _q_untiled(days: int, where_filter: str, limit: int = 200) -> str:
return f"""
SELECT toString(properties.skillName) AS skill,
count() AS act,
uniq(distinct_id) AS users
FROM events
WHERE event = 'cli:agent-signals:skill-activation'
AND timestamp > now() - INTERVAL {days} DAY
AND properties.skillTile IS NULL
AND properties.skillName IS NOT NULL
{where_filter}
GROUP BY skill
ORDER BY act DESC
LIMIT {limit}
"""
def _q_mcp(days: int, where_filter: str) -> str:
return f"""
SELECT toString(properties.tool) AS tool,
count() AS act,
uniq(distinct_id) AS users,
uniq(toString(properties.sessionId)) AS sessions
FROM events
WHERE event = 'cli:agent-signals:mcp-tool-activation'
AND timestamp > now() - INTERVAL {days} DAY
{where_filter}
GROUP BY tool
ORDER BY act DESC
"""
def _q_session_aggregates(days: int, where_filter_session: str) -> str:
# `cli:agent-signals:session-processed` events carry sessionGitRepo (not
# gitRepo). Caller passes a filter built against that property name; the
# email half (when configured) doesn't change.
return f"""
SELECT count() AS sessions,
uniq(distinct_id) AS users,
sum(toFloatOrZero(toString(properties.totalMessages))) AS messages,
sum(toFloatOrZero(toString(properties.totalToolCalls))) AS tool_calls,
sum(toFloatOrZero(toString(properties.totalSkillCalls))) AS skill_calls,
sum(toFloatOrZero(toString(properties.tesslSkillCalls))) AS tessl_skill_calls,
sum(toFloatOrZero(toString(properties.tesslMcpCalls))) AS tessl_mcp_calls,
sum(toFloatOrZero(toString(properties.tesslToolCalls))) AS tessl_cli_calls
FROM events
WHERE event = 'cli:agent-signals:session-processed'
AND timestamp > now() - INTERVAL {days} DAY
{where_filter_session}
"""
def _q_loaded_skills(days: int, where_filter: str, limit: int) -> str:
"""Unroll properties.installedSkills and count distinct users per skill,
scoped to events the configured filter selects."""
return f"""
WITH unrolled AS (
SELECT distinct_id,
arrayJoin(JSONExtractArrayRaw(coalesce(toString(properties.installedSkills), '[]'))) AS s_json
FROM events
WHERE event = 'cli:agent-signals:skill-activation'
AND timestamp > now() - INTERVAL {days} DAY
AND properties.installedSkills IS NOT NULL
{where_filter}
)
SELECT JSONExtractString(s_json, 'tile') AS tile,
JSONExtractString(s_json, 'name') AS name,
JSONExtractString(s_json, 'scope') AS scope,
uniq(distinct_id) AS loaded_users
FROM unrolled
GROUP BY tile, name, scope
ORDER BY loaded_users DESC
LIMIT {limit}
"""
# ── Per-repo query templates ───────────────────────────────────────────────
#
# These mirror the all-repos rollups above but add `properties.gitRepo` to the
# GROUP BY. They power the report's per-repo views and client-side repo
# filtering (so the user can untick repos they don't care about and see
# everything re-aggregate without another PostHog round-trip).
#
# Events with a NULL `gitRepo` are dropped here — they cannot be attributed to
# any repo, so they have nowhere to live in the per-repo views. Their share is
# still visible via `filter.events_per_window.<w>.events_no_gitrepo` and they
# still contribute to the all-repos `totals` and rollups.
def _q_repo_totals(days: int, where_filter: str, limit: int) -> str:
return f"""
SELECT toString(properties.gitRepo) AS repo,
count() AS act,
uniq(distinct_id) AS users,
uniq(toString(properties.sessionId)) AS sessions,
countIf(properties.skillTile IS NOT NULL) AS tiled,
countIf(properties.skillTile IS NULL) AS untiled,
countIf(properties.provider = 'claude-code') AS cc_act,
countIf(properties.provider = 'cursor-ide') AS ci_act,
uniqIf(distinct_id, properties.provider = 'claude-code') AS cc_users,
uniqIf(distinct_id, properties.provider = 'cursor-ide') AS ci_users
FROM events
WHERE event = 'cli:agent-signals:skill-activation'
AND timestamp > now() - INTERVAL {days} DAY
AND properties.gitRepo IS NOT NULL
{where_filter}
GROUP BY repo
ORDER BY act DESC
LIMIT {limit}
"""
def _q_tiles_by_repo(days: int, where_filter: str, repos: list[str], limit: int) -> str:
"""Per-(tile, repo). `repos` filters to the top-N repos kept in the report."""
if not repos:
return ""
in_clause = ", ".join("'" + r.replace("'", "\\'") + "'" for r in repos)
return f"""
SELECT toString(properties.skillTile) AS tile,
toString(properties.gitRepo) AS repo,
count() AS act,
uniq(distinct_id) AS users,
uniq(toString(properties.sessionId)) AS sessions
FROM events
WHERE event = 'cli:agent-signals:skill-activation'
AND timestamp > now() - INTERVAL {days} DAY
AND properties.skillTile IS NOT NULL
AND properties.gitRepo IN ({in_clause})
{where_filter}
GROUP BY tile, repo
ORDER BY act DESC
LIMIT {limit}
"""
def _q_skills_by_repo(
days: int, tiles: list[str], repos: list[str], where_filter: str, limit: int,
) -> str:
if not tiles or not repos:
return ""
tile_in = ", ".join("'" + t.replace("'", "\\'") + "'" for t in tiles)
repo_in = ", ".join("'" + r.replace("'", "\\'") + "'" for r in repos)
return f"""
SELECT toString(properties.skillTile) AS tile,
toString(properties.skillName) AS skill,
toString(properties.gitRepo) AS repo,
count() AS act,
uniq(distinct_id) AS users,
uniq(toString(properties.sessionId)) AS sessions
FROM events
WHERE event = 'cli:agent-signals:skill-activation'
AND timestamp > now() - INTERVAL {days} DAY
AND properties.skillTile IN ({tile_in})
AND properties.gitRepo IN ({repo_in})
{where_filter}
GROUP BY tile, skill, repo
ORDER BY act DESC
LIMIT {limit}
"""
def _q_untiled_by_repo(days: int, repos: list[str], where_filter: str, limit: int) -> str:
if not repos:
return ""
repo_in = ", ".join("'" + r.replace("'", "\\'") + "'" for r in repos)
return f"""
SELECT toString(properties.skillName) AS skill,
toString(properties.gitRepo) AS repo,
count() AS act,
uniq(distinct_id) AS users
FROM events
WHERE event = 'cli:agent-signals:skill-activation'
AND timestamp > now() - INTERVAL {days} DAY
AND properties.skillTile IS NULL
AND properties.skillName IS NOT NULL
AND properties.gitRepo IN ({repo_in})
{where_filter}
GROUP BY skill, repo
ORDER BY act DESC
LIMIT {limit}
"""
def _q_mcp_by_repo(days: int, repos: list[str], where_filter: str, limit: int) -> str:
if not repos:
return ""
repo_in = ", ".join("'" + r.replace("'", "\\'") + "'" for r in repos)
return f"""
SELECT toString(properties.tool) AS tool,
toString(properties.gitRepo) AS repo,
count() AS act,
uniq(distinct_id) AS users,
uniq(toString(properties.sessionId)) AS sessions
FROM events
WHERE event = 'cli:agent-signals:mcp-tool-activation'
AND timestamp > now() - INTERVAL {days} DAY
AND properties.gitRepo IN ({repo_in})
{where_filter}
GROUP BY tool, repo
ORDER BY act DESC
LIMIT {limit}
"""
def _q_session_aggregates_by_repo(
days: int, repos: list[str], where_filter_session: str, limit: int,
) -> str:
if not repos:
return ""
repo_in = ", ".join("'" + r.replace("'", "\\'") + "'" for r in repos)
return f"""
SELECT toString(properties.sessionGitRepo) AS repo,
count() AS sessions,
uniq(distinct_id) AS users,
sum(toFloatOrZero(toString(properties.totalMessages))) AS messages,
sum(toFloatOrZero(toString(properties.totalToolCalls))) AS tool_calls,
sum(toFloatOrZero(toString(properties.totalSkillCalls))) AS skill_calls,
sum(toFloatOrZero(toString(properties.tesslSkillCalls))) AS tessl_skill_calls,
sum(toFloatOrZero(toString(properties.tesslMcpCalls))) AS tessl_mcp_calls,
sum(toFloatOrZero(toString(properties.tesslToolCalls))) AS tessl_cli_calls
FROM events
WHERE event = 'cli:agent-signals:session-processed'
AND timestamp > now() - INTERVAL {days} DAY
AND properties.sessionGitRepo IN ({repo_in})
{where_filter_session}
GROUP BY repo
ORDER BY sessions DESC
LIMIT {limit}
"""
# ── Fetch + assemble ───────────────────────────────────────────────────────
def fetch(
posthog: PostHog,
windows: list[int],
top_tiles_detail: int,
top_untiled: int,
top_loaded: int,
top_repos: int,
primary_window: int,
filter_repos: list[str],
filter_email_domains: list[str],
) -> dict:
if primary_window not in windows:
windows = sorted({primary_window, *windows})
def w_key(d: int) -> str:
return f"{d}d"
where_filter = _build_combined_filter(filter_repos, filter_email_domains)
where_filter_session = _build_combined_filter(
filter_repos, filter_email_domains, "properties.sessionGitRepo",
)
# ── Filter metadata per window ─────────────────────────────────────────
# Always reports the unfiltered slice so consumers can see how much was
# dropped by the filter vs how much had no gitRepo to begin with, and
# what share of matches came from each filter source.
filter_active = bool(filter_repos) or bool(filter_email_domains)
filter_meta: dict[str, dict] = {}
for d in windows:
rows = posthog.hogql(_q_filter_meta(d, filter_repos, filter_email_domains))
if rows and rows[0][0] is not None:
total, no_attr, with_attr, matched, by_repo, by_email = (int(x) for x in rows[0])
else:
total = no_attr = with_attr = matched = by_repo = by_email = 0
filter_meta[w_key(d)] = {
"events_total": total,
"events_no_gitrepo": no_attr,
"events_with_gitrepo": with_attr,
# Semantics: when no filter is configured, every event "matches"
# (the entire window is selected) and zero events are excluded.
# `events_matched_by_repo` and `events_matched_by_email` are
# subsets of `events_matched_filter` and overlap when both
# filters are configured (an event can match both).
"events_matched_filter": matched,
"events_matched_by_repo": by_repo,
"events_matched_by_email": by_email,
"events_excluded_by_filter": (total - matched) if filter_active else 0,
}
# ── Totals per window ──────────────────────────────────────────────────
totals: dict[str, dict] = {}
for d in windows:
rows = posthog.hogql(_q_totals(d, where_filter))
if not rows:
totals[w_key(d)] = {
"activations": 0, "users": 0, "sessions": 0,
"tiled_activations": 0, "untiled_activations": 0,
"providers": {"claude-code": {"activations": 0, "users": 0},
"cursor-ide": {"activations": 0, "users": 0}},
}
continue
act, users, sessions, tiled, untiled, cc, ci, ccu, ciu = rows[0]
totals[w_key(d)] = {
"activations": int(act),
"users": int(users),
"sessions": int(sessions),
"tiled_activations": int(tiled),
"untiled_activations": int(untiled),
"providers": {
"claude-code": {"activations": int(cc), "users": int(ccu)},
"cursor-ide": {"activations": int(ci), "users": int(ciu)},
},
}
# ── Tile rollup per window ─────────────────────────────────────────────
# Query already filters `properties.skillTile IS NOT NULL`; defensive
# check here just in case PostHog returns a stray empty/None tile.
tiles_by_id: dict[str, dict] = {}
for d in windows:
rows = posthog.hogql(_q_tile_rollup(d, where_filter))
for tile, act, users, sessions in rows:
if not tile:
continue
entry = tiles_by_id.setdefault(tile, {"tile": tile, "windows": {}})
entry["windows"][w_key(d)] = {
"activations": int(act),
"users": int(users),
"sessions": int(sessions),
}
# Order tiles by primary-window activations (descending). Tiles with no
# primary-window data go to the end, ranked by their largest-window
# activations as a tiebreaker.
def tile_sort_key(t: dict) -> tuple[int, int, int]:
primary = t["windows"].get(w_key(primary_window)) or {"activations": 0, "users": 0}
# 0 = has primary-window data, 1 = doesn't (sorts last)
bucket = 0 if primary["activations"] > 0 else 1
# Fallback: largest activations seen across any window, used only for
# ordering items that lack primary-window data.
any_max = max(
(w["activations"] for w in t["windows"].values()),
default=0,
)
return (bucket, -primary["activations"], -any_max)
tiles_sorted = sorted(tiles_by_id.values(), key=tile_sort_key)
# ── Per-skill detail for top tiles, all windows ────────────────────────
top_tiles = [t["tile"] for t in tiles_sorted[:top_tiles_detail]]
skills_by_id: dict[tuple[str, str], dict] = {}
for d in windows:
if not top_tiles:
break
query = _q_skill_detail(d, top_tiles, where_filter)
if not query:
continue
rows = posthog.hogql(query)
for tile, skill, act, users, sessions, first_seen, last_seen, cc, ci in rows:
if not tile or not skill:
continue
key = (tile, skill)
entry = skills_by_id.setdefault(key, {
"tile": tile, "name": skill,
"windows": {}, "providers": {}, "first_seen": None, "last_seen": None,
})
entry["windows"][w_key(d)] = {
"activations": int(act),
"users": int(users),
"sessions": int(sessions),
}
# Providers + first/last seen captured against the primary window.
if d == primary_window:
entry["providers"] = {
"claude-code": int(cc),
"cursor-ide": int(ci),
}
entry["first_seen"] = first_seen
entry["last_seen"] = last_seen
def skill_sort_key(s: dict) -> tuple[int, int, int]:
primary = s["windows"].get(w_key(primary_window)) or {"activations": 0, "users": 0}
bucket = 0 if primary["activations"] > 0 else 1
any_max = max(
(w["activations"] for w in s["windows"].values()),
default=0,
)
return (bucket, -primary["activations"], -any_max)
skills_sorted = sorted(skills_by_id.values(), key=skill_sort_key)
# ── Untiled (no skillTile) — third-party / private SKILL.md files ──────
untiled_by_name: dict[str, dict] = {}
for d in windows:
rows = posthog.hogql(_q_untiled(d, where_filter, top_untiled))
for skill, act, users in rows:
if not skill:
continue
entry = untiled_by_name.setdefault(skill, {
"name": skill, "windows": {},
})
entry["windows"][w_key(d)] = {
"activations": int(act),
"users": int(users),
}
untiled_sorted = sorted(
untiled_by_name.values(),
key=lambda s: -(s["windows"].get(w_key(primary_window), {}).get("activations", 0)),
)
# ── MCP tools ──────────────────────────────────────────────────────────
mcp_by_tool: dict[str, dict] = {}
for d in windows:
rows = posthog.hogql(_q_mcp(d, where_filter))
for tool, act, users, sessions in rows:
if not tool:
continue
entry = mcp_by_tool.setdefault(tool, {
"tool": tool, "windows": {},
})
entry["windows"][w_key(d)] = {
"activations": int(act),
"users": int(users),
"sessions": int(sessions),
}
mcp_sorted = sorted(
mcp_by_tool.values(),
key=lambda m: -(m["windows"].get(w_key(primary_window), {}).get("activations", 0)),
)
# ── Session aggregates ─────────────────────────────────────────────────
session_aggregates: dict[str, dict] = {}
for d in windows:
rows = posthog.hogql(_q_session_aggregates(d, where_filter_session))
if not rows or rows[0][0] is None:
session_aggregates[w_key(d)] = {
"sessions": 0, "users": 0,
"messages": 0, "tool_calls": 0, "skill_calls": 0,
"tessl_skill_calls": 0, "tessl_mcp_calls": 0, "tessl_cli_calls": 0,
}
continue
sessions, users, messages, tool_calls, skill_calls, tessl_skill, tessl_mcp, tessl_cli = rows[0]
session_aggregates[w_key(d)] = {
"sessions": int(sessions or 0),
"users": int(users or 0),
"messages": int(messages or 0),
"tool_calls": int(tool_calls or 0),
"skill_calls": int(skill_calls or 0),
"tessl_skill_calls": int(tessl_skill or 0),
"tessl_mcp_calls": int(tessl_mcp or 0),
"tessl_cli_calls": int(tessl_cli or 0),
}
# ── Loaded skills (from installedSkills array on activation events) ────
# Same key shape as `skills` (tile, name) so downstream can join them.
# We separately retain `scope` because the same (tile, name) can appear
# both as a project install and a global install for different users.
loaded_by_id: dict[tuple[str, str, str], dict] = {}
for d in windows:
rows = posthog.hogql(_q_loaded_skills(d, where_filter, top_loaded))
for tile, name, scope, users in rows:
if not name:
continue
key = (tile or "", name, scope or "")
entry = loaded_by_id.setdefault(key, {
"tile": tile or None,
"name": name,
"scope": scope or None,
"windows": {},
})
entry["windows"][w_key(d)] = {"users": int(users)}
loaded_sorted = sorted(
loaded_by_id.values(),
key=lambda s: -(s["windows"].get(w_key(primary_window), {}).get("users", 0)),
)
# ── Per-repo views ─────────────────────────────────────────────────────
# Power the report's Repos section and the client-side repo chip filter.
# Strategy:
# 1. Pull per-(repo) totals across all windows; assemble `repos[]`.
# 2. Pick the top N repos by primary-window activations and use that set
# as the IN-list for every `*_by_repo` follow-up query. This keeps
# long-tail repos (1–2 activations from random external orgs) out of
# the report without dropping their volume from the all-repos
# aggregates above.
# 3. Emit empty arrays / objects when no repos are present. Schema treats
# these fields as required so we never write `null`.
repos_by_id: dict[str, dict] = {}
for d in windows:
rows = posthog.hogql(_q_repo_totals(d, where_filter, top_repos))
for repo, act, users, sessions, tiled, untiled, cc, ci, ccu, ciu in rows:
if not repo:
continue
entry = repos_by_id.setdefault(repo, {"repo": repo, "windows": {}})
entry["windows"][w_key(d)] = {
"activations": int(act),
"users": int(users),
"sessions": int(sessions),
"tiled_activations": int(tiled),
"untiled_activations": int(untiled),
"providers": {
"claude-code": {"activations": int(cc), "users": int(ccu)},
"cursor-ide": {"activations": int(ci), "users": int(ciu)},
},
}
def repo_sort_key(r: dict) -> tuple[int, int, int]:
primary = r["windows"].get(w_key(primary_window)) or {"activations": 0}
bucket = 0 if primary.get("activations", 0) > 0 else 1
any_max = max(
(w["activations"] for w in r["windows"].values()),
default=0,
)
return (bucket, -primary.get("activations", 0), -any_max)
repos_sorted = sorted(repos_by_id.values(), key=repo_sort_key)[:top_repos]
top_repos_list = [r["repo"] for r in repos_sorted]
# ── Per-(tile, repo) ───────────────────────────────────────────────────
tiles_by_repo: dict[tuple[str, str], dict] = {}
for d in windows:
q = _q_tiles_by_repo(d, where_filter, top_repos_list, limit=10000)
if not q:
continue
rows = posthog.hogql(q)
for tile, repo, act, users, sessions in rows:
if not tile or not repo:
continue
key = (tile, repo)
entry = tiles_by_repo.setdefault(key, {
"tile": tile, "repo": repo, "windows": {},
})
entry["windows"][w_key(d)] = {
"activations": int(act),
"users": int(users),
"sessions": int(sessions),
}
# ── Per-(tile, skill, repo) ────────────────────────────────────────────
# Same top-tile gate as `skills[]` so we don't blow up the query budget.
skills_by_repo: dict[tuple[str, str, str], dict] = {}
for d in windows:
q = _q_skills_by_repo(d, top_tiles, top_repos_list, where_filter, limit=10000)
if not q:
continue
rows = posthog.hogql(q)
for tile, skill, repo, act, users, sessions in rows:
if not tile or not skill or not repo:
continue
key = (tile, skill, repo)
entry = skills_by_repo.setdefault(key, {
"tile": tile, "name": skill, "repo": repo, "windows": {},
})
entry["windows"][w_key(d)] = {
"activations": int(act),
"users": int(users),
"sessions": int(sessions),
}
# ── Per-(name, repo) untiled ───────────────────────────────────────────
untiled_by_repo: dict[tuple[str, str], dict] = {}
for d in windows:
q = _q_untiled_by_repo(d, top_repos_list, where_filter, limit=10000)
if not q:
continue
rows = posthog.hogql(q)
for skill, repo, act, users in rows:
if not skill or not repo:
continue
key = (skill, repo)
entry = untiled_by_repo.setdefault(key, {
"name": skill, "repo": repo, "windows": {},
})
entry["windows"][w_key(d)] = {
"activations": int(act),
"users": int(users),
}
# ── Per-(tool, repo) MCP ───────────────────────────────────────────────
mcp_by_repo: dict[tuple[str, str], dict] = {}
for d in windows:
q = _q_mcp_by_repo(d, top_repos_list, where_filter, limit=5000)
if not q:
continue
rows = posthog.hogql(q)
for tool, repo, act, users, sessions in rows:
if not tool or not repo:
continue
key = (tool, repo)
entry = mcp_by_repo.setdefault(key, {
"tool": tool, "repo": repo, "windows": {},
})
entry["windows"][w_key(d)] = {
"activations": int(act),
"users": int(users),
"sessions": int(sessions),
}
# ── Per-(repo) session aggregates per window ───────────────────────────
session_aggregates_by_repo: dict[str, list[dict]] = {}
for d in windows:
q = _q_session_aggregates_by_repo(
d, top_repos_list, where_filter_session, limit=top_repos,
)
per_window: list[dict] = []
if q:
rows = posthog.hogql(q)
for row in rows:
repo, sessions, users, messages, tc, sc, tsc, tmc, tcli = row
if not repo:
continue
per_window.append({
"repo": repo,
"sessions": int(sessions or 0),
"users": int(users or 0),
"messages": int(messages or 0),
"tool_calls": int(tc or 0),
"skill_calls": int(sc or 0),
"tessl_skill_calls": int(tsc or 0),
"tessl_mcp_calls": int(tmc or 0),
"tessl_cli_calls": int(tcli or 0),
})
session_aggregates_by_repo[w_key(d)] = per_window
return {
"schema_version": SCHEMA_VERSION,
"fetched_at": datetime.now(timezone.utc).isoformat(timespec="seconds").replace("+00:00", "Z"),
"source": {
"kind": "posthog",
"host": posthog.host,
"project_id": posthog.project_id,
"dashboard_id_for_reference": DEFAULT_DASHBOARD_ID,
},
"windows": windows,
"primary_window_days": primary_window,
"top_tiles_detail": top_tiles_detail,
"top_untiled": top_untiled,
"top_loaded": top_loaded,
"top_repos": top_repos,
"tool_version": TOOL_VERSION,
"filter": {
"repos": filter_repos,
"email_domains": filter_email_domains,
"match_kind": "prefix-or-email-domain",
"events_per_window": filter_meta,
},
"totals": totals,
"tiles": tiles_sorted,
"skills": skills_sorted,
"untiled_skills": untiled_sorted,
"loaded_skills": loaded_sorted,
"mcp_tools": mcp_sorted,
"session_aggregates": session_aggregates,
"repos": repos_sorted,
"tiles_by_repo": list(tiles_by_repo.values()),
"skills_by_repo": list(skills_by_repo.values()),
"untiled_skills_by_repo": list(untiled_by_repo.values()),
"mcp_tools_by_repo": list(mcp_by_repo.values()),
"session_aggregates_by_repo": session_aggregates_by_repo,
}
# ── CLI ────────────────────────────────────────────────────────────────────
def parse_windows(s: str) -> list[int]:
out = []
for tok in s.split(","):
tok = tok.strip()
if not tok:
continue
try:
n = int(tok)
except ValueError:
raise SystemExit(f"--windows: '{tok}' is not an integer")
if n <= 0:
raise SystemExit(f"--windows: {n} must be > 0")
out.append(n)
if not out:
raise SystemExit("--windows: must contain at least one positive integer")
return sorted(set(out))
def parse_filter_repos(s: str) -> list[str]:
"""Parse the comma-separated list of repo prefixes for the filter.
Empty string (or whitespace-only) means "no filter" — returns an empty list.
"""
if not s or not s.strip():
return []
out: list[str] = []
for tok in s.split(","):
tok = tok.strip().rstrip("/")
if not tok:
continue
out.append(tok)
return out
def parse_filter_email_domains(s: str) -> list[str]:
"""Parse the comma-separated list of email domains.
Strips a leading `@` if present so users can write either `tessl.io` or
`@tessl.io`. Empty string (or whitespace-only) means "no filter" —
returns an empty list.
"""
if not s or not s.strip():
return []
out: list[str] = []
for tok in s.split(","):
tok = tok.strip().lstrip("@")
if not tok:
continue
out.append(tok)
return out
def main() -> int:
p = argparse.ArgumentParser(
description="Fetch org-wide skill / MCP / session usage from PostHog.",
)
p.add_argument("--output", required=True,
help="Path to write org_usage.json")
p.add_argument("--posthog-host", default=DEFAULT_HOST,
help=f"PostHog host (default: {DEFAULT_HOST})")
p.add_argument("--posthog-project", type=int, default=DEFAULT_PROJECT_ID,
help=f"PostHog project ID (default: {DEFAULT_PROJECT_ID})")
p.add_argument("--posthog-key-file", default=None,
help=f"Path to API key file (default: {DEFAULT_KEY_FILE}). "
f"$POSTHOG_PERSONAL_API_KEY takes precedence.")
p.add_argument("--windows", default=",".join(str(d) for d in DEFAULT_WINDOWS),
help=f"Comma-separated day windows (default: {','.join(str(d) for d in DEFAULT_WINDOWS)})")
p.add_argument("--primary-window", type=int, default=DEFAULT_PRIMARY_WINDOW,
help=f"Window used for sort + provider breakdown (default: {DEFAULT_PRIMARY_WINDOW})")
p.add_argument("--top-tiles-detail", type=int, default=DEFAULT_TOP_TILES_DETAIL,
help=f"Per-skill detail fetched only for top N tiles (default: {DEFAULT_TOP_TILES_DETAIL})")
p.add_argument("--top-untiled", type=int, default=DEFAULT_TOP_UNTILED,
help=f"Number of untiled skills to include (default: {DEFAULT_TOP_UNTILED})")
p.add_argument("--top-loaded", type=int, default=DEFAULT_TOP_LOADED,
help=f"Max loaded-skill rows to fetch per window (default: {DEFAULT_TOP_LOADED})")
p.add_argument("--top-repos", type=int, default=DEFAULT_TOP_REPOS,
help=f"Max distinct repos retained in `repos[]` and `*_by_repo[]` per window "
f"(default: {DEFAULT_TOP_REPOS}). Long-tail repos beyond this cap are "
f"dropped from per-repo views; their events still feed the all-repos totals.")
p.add_argument("--filter-repos", default=DEFAULT_FILTER_REPOS,
help=f"Comma-separated repo prefixes to filter to "
f"(default: {DEFAULT_FILTER_REPOS!r}). Each prefix matches "
f"properties.gitRepo as exact-equal OR `prefix/`-prefixed. "
f"Pass an empty string ('') to disable repo filtering.")
p.add_argument("--filter-email-domains", default=DEFAULT_FILTER_EMAIL_DOMAINS,
help=f"Comma-separated email domains for the persons-side filter "
f"(default: {DEFAULT_FILTER_EMAIL_DOMAINS!r}). Matched as "
f"`%@<domain>` against person.properties.email. The `@` is "
f"optional. Empty string disables email filtering. "
f"Repo and email filters combine with OR — an event passes "
f"if either filter matches.")
args = p.parse_args()
windows = parse_windows(args.windows)
if args.primary_window not in windows:
windows = sorted({args.primary_window, *windows})
filter_repos = parse_filter_repos(args.filter_repos)
filter_email_domains = parse_filter_email_domains(args.filter_email_domains)
key_file = Path(args.posthog_key_file).expanduser() if args.posthog_key_file else None
api_key = _load_api_key(key_file)
posthog = PostHog(args.posthog_host, args.posthog_project, api_key)
# Sanity-check auth before doing real work
try:
proj = posthog.auth_check()
except urllib.error.HTTPError as e:
body = ""
try:
body = e.read().decode()[:300]
except Exception: # noqa: BLE001
pass
raise SystemExit(
f"ERROR: PostHog auth failed (HTTP {e.code}). {body}\n"
f" Verify the key has 'project:read' and 'query:read' scopes for project {args.posthog_project}.",
)
print(
f"PostHog auth OK — project {proj.get('id')} ({proj.get('name')!r})",
file=sys.stderr,
)
t0 = time.time()
output = fetch(
posthog,
windows=windows,
top_tiles_detail=args.top_tiles_detail,
top_untiled=args.top_untiled,
top_loaded=args.top_loaded,
top_repos=args.top_repos,
primary_window=args.primary_window,
filter_repos=filter_repos,
filter_email_domains=filter_email_domains,
)
elapsed = time.time() - t0
validate_against_schema(
output,
SCHEMA_DIR / "org-usage.schema.json",
role="output",
source="fetch_org_usage.py",
)
output_path = Path(args.output).expanduser()
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(json.dumps(output, indent=2))
primary = output["totals"].get(f"{args.primary_window}d", {})
fmeta = output["filter"]["events_per_window"].get(f"{args.primary_window}d", {})
filter_parts: list[str] = []
if filter_repos:
filter_parts.append("repos=" + ",".join(filter_repos))
if filter_email_domains:
filter_parts.append("emails=@" + ",@".join(filter_email_domains))
filter_str = " OR ".join(filter_parts) if filter_parts else "(none — pulling all events)"
matched = fmeta.get("events_matched_filter", 0)
by_repo = fmeta.get("events_matched_by_repo", 0)
by_email = fmeta.get("events_matched_by_email", 0)
total = fmeta.get("events_total", 0)
no_attr = fmeta.get("events_no_gitrepo", 0)
excluded_other = fmeta.get("events_excluded_by_filter", 0)
if filter_repos or filter_email_domains:
breakdown = []
if filter_repos: breakdown.append(f"{by_repo} via repo")
if filter_email_domains: breakdown.append(f"{by_email} via email")
slice_str = (
f"{matched} of {total} events selected "
f"({', '.join(breakdown)}; "
f"{excluded_other} excluded; {no_attr} had no gitRepo)"
)
else:
slice_str = f"{total} events (no filter applied; {no_attr} had no gitRepo for reference)"
print(
f"Org usage fetched in {elapsed:.1f}s.\n"
f" Filter: {filter_str}\n"
f" {args.primary_window}d slice: {slice_str}\n"
f" Windows: {windows}\n"
f" Primary {args.primary_window}d: "
f"{primary.get('activations', 0)} activations, "
f"{primary.get('users', 0)} users, "
f"{primary.get('sessions', 0)} sessions\n"
f" Tiles seen: {len(output['tiles'])}\n"
f" Skills seen: {len(output['skills'])} (top {args.top_tiles_detail} tiles, all windows)\n"
f" Untiled: {len(output['untiled_skills'])} (no skillTile attribution)\n"
f" Loaded skills: {len(output['loaded_skills'])} (from installedSkills snapshots)\n"
f" MCP tools: {len(output['mcp_tools'])}\n"
f" Repos seen: {len(output['repos'])} (capped at top {args.top_repos})\n"
f" By-repo rows: {len(output['tiles_by_repo'])} tile×repo, "
f"{len(output['skills_by_repo'])} skill×repo, "
f"{len(output['untiled_skills_by_repo'])} untiled×repo, "
f"{len(output['mcp_tools_by_repo'])} mcp×repo\n"
f" Output: {output_path}",
file=sys.stderr,
)
return 0
if __name__ == "__main__":
sys.exit(main())