Run PromQL queries, inspect alert state, and troubleshoot OAuth2 or OIDC client-credentials access to Prometheus-compatible APIs.
95
Quality
95%
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
#!/usr/bin/env python3
"""Prometheus query helper with OAuth2 client-credentials auth."""
from __future__ import annotations
import argparse
import json
import os
import ssl
import sys
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from urllib import error, parse, request
CACHE_DIRECTORY = "prometheus-oidc-query"
CACHE_FILENAME = "token-cache.json"
DEFAULT_TIMEOUT_SECONDS = 30.0
REFRESH_SKEW_SECONDS = 60
class ConfigError(Exception):
"""Raised when configuration is incomplete or invalid."""
class HttpRequestError(Exception):
"""Raised when an HTTP request fails."""
@dataclass
class Settings:
prometheus_url: str | None
token_url: str | None
client_id: str | None
client_secret: str | None
scope: str | None
ca_bundle: str | None
timeout: float
cache_path: Path
def cache_path() -> Path:
base = Path(os.environ.get("XDG_CACHE_HOME") or (Path.home() / ".cache"))
return base / CACHE_DIRECTORY / CACHE_FILENAME
def load_settings() -> Settings:
timeout_text = os.environ.get("PROM_QUERY_TIMEOUT", str(DEFAULT_TIMEOUT_SECONDS))
try:
timeout = float(timeout_text)
except ValueError as exc:
raise ConfigError("PROM_QUERY_TIMEOUT must be a number") from exc
if timeout <= 0:
raise ConfigError("PROM_QUERY_TIMEOUT must be greater than zero")
return Settings(
prometheus_url=os.environ.get("PROM_QUERY_PROMETHEUS_URL"),
token_url=os.environ.get("PROM_QUERY_TOKEN_URL"),
client_id=os.environ.get("PROM_QUERY_CLIENT_ID"),
client_secret=os.environ.get("PROM_QUERY_CLIENT_SECRET"),
scope=os.environ.get("PROM_QUERY_SCOPE"),
ca_bundle=os.environ.get("PROM_QUERY_CA_BUNDLE"),
timeout=timeout,
cache_path=cache_path(),
)
def validate_url(value: str | None, field_name: str) -> str | None:
if not value:
return f"{field_name} is required"
parsed = parse.urlparse(value)
if parsed.scheme not in {"http", "https"} or not parsed.netloc:
return f"{field_name} must be an absolute http or https URL"
return None
def build_validation_report(settings: Settings) -> dict[str, Any]:
errors: list[str] = []
for field_name, value in (
("PROM_QUERY_PROMETHEUS_URL", settings.prometheus_url),
("PROM_QUERY_TOKEN_URL", settings.token_url),
):
problem = validate_url(value, field_name)
if problem:
errors.append(problem)
if not settings.client_id:
errors.append("PROM_QUERY_CLIENT_ID is required")
if not settings.client_secret:
errors.append("PROM_QUERY_CLIENT_SECRET is required")
if settings.ca_bundle and not Path(settings.ca_bundle).is_file():
errors.append("PROM_QUERY_CA_BUNDLE must point to an existing file")
return {
"valid": not errors,
"errors": errors,
"resolved_config": {
"prometheus_url": settings.prometheus_url,
"token_url": settings.token_url,
"client_id": settings.client_id,
"client_secret_set": bool(settings.client_secret),
"scope": settings.scope or None,
"ca_bundle": settings.ca_bundle or None,
"timeout": settings.timeout,
},
"required_env": {
"PROM_QUERY_PROMETHEUS_URL": bool(settings.prometheus_url),
"PROM_QUERY_TOKEN_URL": bool(settings.token_url),
"PROM_QUERY_CLIENT_ID": bool(settings.client_id),
"PROM_QUERY_CLIENT_SECRET": bool(settings.client_secret),
},
"optional_env": {
"PROM_QUERY_SCOPE": settings.scope or None,
"PROM_QUERY_CA_BUNDLE": settings.ca_bundle or None,
"PROM_QUERY_TIMEOUT": settings.timeout,
},
"cache": {
"path": str(settings.cache_path),
"parent_exists": settings.cache_path.parent.exists(),
"exists": settings.cache_path.exists(),
},
}
def ensure_valid_settings(settings: Settings) -> None:
report = build_validation_report(settings)
if report["valid"]:
return
raise ConfigError("; ".join(report["errors"]))
def build_ssl_context(settings: Settings) -> ssl.SSLContext:
if settings.ca_bundle:
return ssl.create_default_context(cafile=settings.ca_bundle)
return ssl.create_default_context()
def print_json(payload: dict[str, Any]) -> None:
json.dump(payload, sys.stdout, indent=2, sort_keys=True)
sys.stdout.write("\n")
def read_json_response(response: Any) -> dict[str, Any]:
raw = response.read().decode("utf-8")
try:
return json.loads(raw)
except json.JSONDecodeError as exc:
raise HttpRequestError("received a non-JSON response") from exc
def request_json(
url: str,
*,
method: str,
timeout: float,
context: ssl.SSLContext,
headers: dict[str, str] | None = None,
form: dict[str, str] | None = None,
) -> dict[str, Any]:
encoded_form = None
request_headers = dict(headers or {})
if form is not None:
encoded_form = parse.urlencode(form).encode("utf-8")
request_headers.setdefault("Content-Type", "application/x-www-form-urlencoded")
req = request.Request(url, data=encoded_form, headers=request_headers, method=method)
try:
with request.urlopen(req, timeout=timeout, context=context) as response:
return read_json_response(response)
except error.HTTPError as exc:
body = exc.read().decode("utf-8", errors="replace").strip()
message = f"{method} {url} failed with HTTP {exc.code}"
if body:
message = f"{message}: {body}"
raise HttpRequestError(message) from exc
except error.URLError as exc:
raise HttpRequestError(f"{method} {url} failed: {exc.reason}") from exc
def read_cached_token(path: Path) -> dict[str, Any] | None:
if not path.exists():
return None
try:
payload = json.loads(path.read_text(encoding="utf-8"))
except (json.JSONDecodeError, OSError):
return None
expires_at = payload.get("expires_at")
access_token = payload.get("access_token")
if not isinstance(expires_at, (int, float)) or not isinstance(access_token, str):
return None
if expires_at - REFRESH_SKEW_SECONDS <= time.time():
return None
return payload
def write_cached_token(path: Path, payload: dict[str, Any]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
temp_path = path.with_suffix(".tmp")
temp_path.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8")
os.chmod(temp_path, 0o600)
temp_path.replace(path)
def fetch_access_token(settings: Settings, context: ssl.SSLContext) -> dict[str, Any]:
form = {
"grant_type": "client_credentials",
"client_id": settings.client_id or "",
"client_secret": settings.client_secret or "",
}
if settings.scope:
form["scope"] = settings.scope
token_response = request_json(
settings.token_url or "",
method="POST",
timeout=settings.timeout,
context=context,
form=form,
)
access_token = token_response.get("access_token")
token_type = token_response.get("token_type", "Bearer")
expires_in = token_response.get("expires_in", 3600)
scope = token_response.get("scope", settings.scope)
if not isinstance(access_token, str) or not access_token:
raise HttpRequestError("token endpoint response did not include access_token")
try:
expires_in_seconds = int(expires_in)
except (TypeError, ValueError) as exc:
raise HttpRequestError("token endpoint response included an invalid expires_in") from exc
payload = {
"access_token": access_token,
"token_type": token_type,
"scope": scope,
"expires_at": int(time.time()) + expires_in_seconds,
}
write_cached_token(settings.cache_path, payload)
return payload
def get_access_token(
settings: Settings,
context: ssl.SSLContext,
*,
force_refresh: bool = False,
) -> tuple[str, dict[str, Any]]:
if not force_refresh:
cached = read_cached_token(settings.cache_path)
if cached:
return "cache", cached
return "token_endpoint", fetch_access_token(settings, context)
def token_metadata(source: str, token_payload: dict[str, Any], settings: Settings) -> dict[str, Any]:
expires_at = int(token_payload["expires_at"])
return {
"cache_path": str(settings.cache_path),
"expires_at": expires_at,
"expires_in_seconds": max(0, expires_at - int(time.time())),
"scope": token_payload.get("scope"),
"source": source,
"token_type": token_payload.get("token_type", "Bearer"),
}
def perform_query(settings: Settings, context: ssl.SSLContext, expression: str) -> dict[str, Any]:
source, token_payload = get_access_token(settings, context)
endpoint = f"{settings.prometheus_url.rstrip('/')}/api/v1/query"
query_string = parse.urlencode({"query": expression})
response = request_json(
f"{endpoint}?{query_string}",
method="GET",
timeout=settings.timeout,
context=context,
headers={
"Accept": "application/json",
"Authorization": f"Bearer {token_payload['access_token']}",
},
)
return {
"auth_source": source,
"query": expression,
"response": response,
}
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description=__doc__)
subparsers = parser.add_subparsers(dest="command", required=True)
query_parser = subparsers.add_parser("query", help="Run an instant PromQL query")
query_parser.add_argument("--expr", required=True, help="PromQL expression")
alerts_parser = subparsers.add_parser("alerts", help="Query ALERTS by state")
alerts_parser.add_argument(
"--state",
default="firing",
choices=["firing", "pending", "inactive"],
help="Alert state to query",
)
subparsers.add_parser("config", help="Print redacted configuration")
token_parser = subparsers.add_parser("token", help="Inspect token cache metadata")
token_parser.add_argument(
"--refresh",
action="store_true",
help="Ignore any cached token and fetch a fresh one",
)
return parser
def run_command(args: argparse.Namespace, settings: Settings) -> dict[str, Any]:
report = build_validation_report(settings)
if args.command == "config":
return report
ensure_valid_settings(settings)
context = build_ssl_context(settings)
if args.command == "query":
return perform_query(settings, context, args.expr)
if args.command == "alerts":
expression = f'ALERTS{{alertstate="{args.state}"}}'
payload = perform_query(settings, context, expression)
payload["state"] = args.state
return payload
if args.command == "token":
source, token_payload = get_access_token(settings, context, force_refresh=args.refresh)
return token_metadata(source, token_payload, settings)
raise ConfigError(f"unknown command: {args.command}")
def main(argv: list[str] | None = None) -> int:
parser = build_parser()
args = parser.parse_args(argv)
try:
settings = load_settings()
payload = run_command(args, settings)
except (ConfigError, HttpRequestError) as exc:
print(str(exc), file=sys.stderr)
return 1
print_json(payload)
if args.command == "config" and not payload["valid"]:
return 1
return 0
if __name__ == "__main__":
sys.exit(main())Install with Tessl CLI
npx tessl i jobe-skills/prometheus-oidc-query