JSON Web Token implementation in Python with support for JWT, JWS, JWK, and JWKS
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
HTTP client for fetching and caching JSON Web Key Sets from remote endpoints. Provides automatic key refresh, flexible caching strategies, and seamless integration with JWT verification workflows for dynamic key management.
Creates an HTTP client configured for fetching JWKS from remote endpoints with customizable caching and network options.
class PyJWKClient:
def __init__(self, uri: str, cache_keys: bool = False,
max_cached_keys: int = 16, cache_jwk_set: bool = True,
lifespan: int = 300, headers: dict = None,
timeout: int = 30, ssl_context = None):
"""
Initialize JWKS client for fetching remote key sets.
Args:
uri (str): JWKS endpoint URL
cache_keys (bool): Enable individual key caching
max_cached_keys (int): Maximum keys to cache (when cache_keys=True)
cache_jwk_set (bool): Enable JWK Set caching
lifespan (int): Cache lifespan in seconds (default 300/5min)
headers (dict): Additional HTTP headers
timeout (int): Request timeout in seconds
ssl_context: SSL context for HTTPS requests
Raises:
PyJWKClientError: Invalid configuration
"""Usage examples:
import jwt
from jwt import PyJWKClient
# Basic JWKS client
jwks_client = PyJWKClient("https://example.com/.well-known/jwks.json")
# With custom caching
jwks_client = PyJWKClient(
"https://example.com/.well-known/jwks.json",
cache_keys=True, # Cache individual signing keys
max_cached_keys=50, # Increase cache size
lifespan=600 # 10 minute cache
)
# With custom headers and timeout
jwks_client = PyJWKClient(
"https://example.com/.well-known/jwks.json",
headers={"User-Agent": "MyApp/1.0"},
timeout=10 # 10 second timeout
)
# Disable JWK Set caching for always-fresh keys
jwks_client = PyJWKClient(
"https://example.com/.well-known/jwks.json",
cache_jwk_set=False
)Methods for fetching and accessing signing keys from remote JWKS endpoints.
class PyJWKClient:
def get_signing_key(self, kid: str) -> PyJWK:
"""
Get signing key by key ID, with automatic refresh on cache miss.
Args:
kid (str): Key ID to retrieve
Returns:
PyJWK: Signing key matching the key ID
Raises:
PyJWKClientError: Key not found or network error
PyJWKClientConnectionError: Connection failed
"""
def get_signing_key_from_jwt(self, token: str) -> PyJWK:
"""
Extract key ID from JWT header and retrieve corresponding key.
Args:
token (str): JWT token containing 'kid' in header
Returns:
PyJWK: Signing key for the token
Raises:
PyJWKClientError: Missing kid or key not found
PyJWKClientConnectionError: Network error
"""
def get_signing_keys(self, refresh: bool = False) -> list:
"""
Get all signing keys from the JWKS endpoint.
Args:
refresh (bool): Force refresh from remote endpoint
Returns:
list: List of PyJWK signing keys (use='sig' or None)
Raises:
PyJWKClientError: No signing keys found
PyJWKClientConnectionError: Network error
"""Usage examples:
import jwt
from jwt import PyJWKClient
from jwt.exceptions import PyJWKClientError
jwks_client = PyJWKClient("https://example.com/.well-known/jwks.json")
# Get key by ID
try:
signing_key = jwks_client.get_signing_key("my-key-id")
print(f"Found key: {signing_key.algorithm_name}")
except PyJWKClientError as e:
print(f"Key retrieval failed: {e}")
# Get key from JWT token
token = "eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiIsImtpZCI6Im15LWtleS1pZCJ9..."
try:
signing_key = jwks_client.get_signing_key_from_jwt(token)
payload = jwt.decode(token, signing_key.key, algorithms=[signing_key.algorithm_name])
print(f"Token verified: {payload}")
except PyJWKClientError as e:
print(f"Token verification failed: {e}")
# Get all signing keys
signing_keys = jwks_client.get_signing_keys()
for key in signing_keys:
print(f"Key {key.key_id}: {key.algorithm_name}")
# Force refresh from remote
fresh_keys = jwks_client.get_signing_keys(refresh=True)Methods for managing and accessing complete JWK Sets from remote endpoints.
class PyJWKClient:
def get_jwk_set(self, refresh: bool = False) -> PyJWKSet:
"""
Get complete JWK Set from endpoint with caching.
Args:
refresh (bool): Force refresh from remote endpoint
Returns:
PyJWKSet: Complete key set
Raises:
PyJWKClientError: Invalid JWKS format
PyJWKClientConnectionError: Network error
"""
def fetch_data(self):
"""
Fetch raw JWKS data from remote endpoint.
Returns:
dict: Raw JWKS data
Raises:
PyJWKClientConnectionError: Network or timeout error
"""
@staticmethod
def match_kid(signing_keys: list, kid: str) -> PyJWK | None:
"""
Find signing key by key ID from a list of keys.
Args:
signing_keys (list): List of PyJWK signing keys
kid (str): Key ID to match
Returns:
PyJWK | None: Matching key or None if not found
"""Usage examples:
import jwt
from jwt import PyJWKClient
jwks_client = PyJWKClient("https://example.com/.well-known/jwks.json")
# Get cached JWK Set
jwk_set = jwks_client.get_jwk_set()
print(f"Loaded {len(jwk_set.keys)} keys")
# Force refresh for latest keys
fresh_jwk_set = jwks_client.get_jwk_set(refresh=True)
# Access individual keys
rsa_key = jwk_set['rsa-key-1']
ec_key = jwk_set['ec-key-1']
# Low-level: fetch raw data
raw_jwks = jwks_client.fetch_data()
print(f"Raw JWKS: {raw_jwks}")Common patterns for integrating JWKS client with JWT verification:
import jwt
from jwt import PyJWKClient
from jwt.exceptions import PyJWKClientError, InvalidTokenError
def verify_jwt_with_jwks(token: str, jwks_url: str) -> dict:
"""
Verify JWT using remote JWKS endpoint.
"""
jwks_client = PyJWKClient(jwks_url)
try:
# Get signing key from token
signing_key = jwks_client.get_signing_key_from_jwt(token)
# Verify and decode token
payload = jwt.decode(
token,
signing_key.key,
algorithms=[signing_key.algorithm_name],
audience="my-app",
issuer="https://my-auth-server.com"
)
return payload
except PyJWKClientError as e:
raise ValueError(f"Key retrieval failed: {e}")
except InvalidTokenError as e:
raise ValueError(f"Token validation failed: {e}")
# Usage
try:
payload = verify_jwt_with_jwks(
token="eyJ...",
jwks_url="https://auth.example.com/.well-known/jwks.json"
)
print(f"User: {payload.get('sub')}")
except ValueError as e:
print(f"Verification failed: {e}")PyJWKClient provides two levels of caching:
Caches the entire JWKS response for the specified lifespan:
# Default: 5-minute JWK Set cache
jwks_client = PyJWKClient(uri, cache_jwk_set=True, lifespan=300)
# Custom cache duration
jwks_client = PyJWKClient(uri, lifespan=1800) # 30 minutes
# Disable JWK Set caching
jwks_client = PyJWKClient(uri, cache_jwk_set=False)Caches individual signing key lookups using LRU cache:
# Enable signing key caching
jwks_client = PyJWKClient(uri, cache_keys=True, max_cached_keys=16)
# Larger key cache
jwks_client = PyJWKClient(uri, cache_keys=True, max_cached_keys=100)
# Both caching levels enabled
jwks_client = PyJWKClient(
uri,
cache_jwk_set=True, # Cache full JWKS response
lifespan=600, # 10 minute JWKS cache
cache_keys=True, # Also cache individual key lookups
max_cached_keys=50 # LRU cache for 50 keys
)JWKS client specific exceptions and error patterns:
import jwt
from jwt.exceptions import (
PyJWKClientError,
PyJWKClientConnectionError,
PyJWKError
)
jwks_client = PyJWKClient("https://example.com/.well-known/jwks.json")
try:
signing_key = jwks_client.get_signing_key("key-id")
except PyJWKClientConnectionError as e:
# Network/timeout errors
print(f"Connection failed: {e}")
# Implement retry logic or fallback
except PyJWKClientError as e:
# Key not found, invalid JWKS format, etc.
print(f"JWKS error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
# Graceful degradation example
def get_key_with_fallback(client, kid, backup_key=None):
try:
return client.get_signing_key(kid)
except (PyJWKClientError, PyJWKClientConnectionError):
if backup_key:
return backup_key
raise ValueError("Primary and backup key retrieval failed")SSL and network customization:
import ssl
from jwt import PyJWKClient
# Custom SSL context
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False # For testing only!
jwks_client = PyJWKClient(
"https://internal-auth.company.com/.well-known/jwks.json",
headers={
"Authorization": "Bearer internal-token",
"User-Agent": "MyApp/2.0"
},
timeout=5,
ssl_context=ssl_context
)
# Corporate proxy or special routing
jwks_client = PyJWKClient(
"https://auth.example.com/.well-known/jwks.json",
headers={"X-Forwarded-For": "internal.company.com"}
)Install with Tessl CLI
npx tessl i tessl/pypi-pyjwt