Comprehensive Python SDK for the AT Protocol, providing client interfaces, authentication, and real-time streaming for decentralized social networks.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
JSON Web Token parsing, payload extraction, validation, and signature verification for AT Protocol authentication and authorization. These operations handle JWT lifecycle management including creation, validation, and verification of tokens used in AT Protocol sessions.
AT Protocol JWT payloads follow RFC 7519 standards with additional AT Protocol-specific fields.
class JwtPayload:
"""
JWT payload structure based on RFC 7519 with AT Protocol extensions.
Attributes:
iss (Optional[str]): Issuer (DID) - who issued the token
sub (Optional[str]): Subject (DID) - who the token is about
aud (Optional[Union[str, List[str]]]): Audience (DID) - intended recipients
jti (Optional[str]): JWT ID - unique identifier for this token
nbf (Optional[int]): Not Before - earliest time token is valid (Unix timestamp)
exp (Optional[int]): Expiration Time - when token expires (Unix timestamp)
iat (Optional[int]): Issued At - when token was issued (Unix timestamp)
scope (Optional[str]): ATProto-specific scope for permissions
"""
iss: Optional[str]
sub: Optional[str]
aud: Optional[Union[str, List[str]]]
jti: Optional[str]
nbf: Optional[int]
exp: Optional[int]
iat: Optional[int]
scope: Optional[str]
def is_expired(self, current_time: Optional[int] = None) -> bool:
"""
Check if token is expired.
Args:
current_time (int, optional): Current Unix timestamp (default: now)
Returns:
bool: True if token is expired
"""
def is_valid_at_time(self, timestamp: int) -> bool:
"""
Check if token is valid at specific timestamp.
Args:
timestamp (int): Unix timestamp to check
Returns:
bool: True if token is valid at given time
"""
def time_until_expiration(self) -> Optional[int]:
"""
Get seconds until token expires.
Returns:
Optional[int]: Seconds until expiration or None if no expiration
"""
def validate_audience(self, expected_audience: Union[str, List[str]]) -> bool:
"""
Validate token audience against expected values.
Args:
expected_audience (Union[str, List[str]]): Expected audience values
Returns:
bool: True if audience matches expectations
"""Parse JWT tokens into their component parts for inspection and validation.
def parse_jwt(jwt: Union[str, bytes]) -> Tuple[bytes, bytes, Dict[str, Any], bytes]:
"""
Parse JWT into components without verification.
Args:
jwt (Union[str, bytes]): JWT token to parse
Returns:
Tuple[bytes, bytes, Dict[str, Any], bytes]:
(payload_bytes, signing_input, header_dict, signature_bytes)
Raises:
JwtParsingError: If JWT format is invalid
"""
def get_jwt_payload(jwt: Union[str, bytes]) -> JwtPayload:
"""
Extract and validate JWT payload.
Args:
jwt (Union[str, bytes]): JWT token
Returns:
JwtPayload: Decoded and validated payload
Raises:
JwtParsingError: If JWT format is invalid
JwtValidationError: If payload validation fails
"""
def decode_jwt_payload(jwt: Union[str, bytes]) -> JwtPayload:
"""
Decode JWT payload without validation.
Args:
jwt (Union[str, bytes]): JWT token
Returns:
JwtPayload: Decoded payload (may be invalid/expired)
Raises:
JwtParsingError: If JWT format is invalid
"""Usage examples:
from atproto import parse_jwt, get_jwt_payload, decode_jwt_payload, JwtPayload
# Example JWT token (access token from AT Protocol)
jwt_token = "eyJhbGciOiJFUzI1NksiLCJ0eXAiOiJKV1QifQ.eyJpc3MiOiJkaWQ6cGxjOmFsaWNlMTIzIiwic3ViIjoiZGlkOnBsYzphbGljZTEyMyIsImF1ZCI6ImRpZDp3ZWI6YnNreS5zb2NpYWwiLCJpYXQiOjE2ODAwMDAwMDAsImV4cCI6MTY4MDAwMzYwMCwic2NvcGUiOiJjb20uYXRwcm90by5hY2Nlc3MifQ.signature"
# Parse JWT components
payload_bytes, signing_input, header, signature = parse_jwt(jwt_token)
print(f"Header: {header}")
print(f"Payload bytes length: {len(payload_bytes)}")
print(f"Signing input length: {len(signing_input)}")
# Get validated payload
try:
payload = get_jwt_payload(jwt_token)
print(f"Issuer: {payload.iss}")
print(f"Subject: {payload.sub}")
print(f"Audience: {payload.aud}")
print(f"Scope: {payload.scope}")
print(f"Expires at: {payload.exp}")
# Check expiration
if payload.is_expired():
print("⚠️ Token is expired")
else:
print(f"✓ Token valid for {payload.time_until_expiration()} more seconds")
except JwtValidationError as e:
print(f"JWT validation failed: {e}")
# Decode without validation (useful for debugging)
payload_unvalidated = decode_jwt_payload(jwt_token)
print(f"Raw payload (unvalidated): {payload_unvalidated}")Validate JWT payload claims according to AT Protocol requirements.
def validate_jwt_payload(payload: JwtPayload,
expected_audience: Optional[Union[str, List[str]]] = None,
expected_issuer: Optional[str] = None,
current_time: Optional[int] = None) -> bool:
"""
Validate JWT payload claims.
Args:
payload (JwtPayload): Payload to validate
expected_audience (Union[str, List[str]], optional): Expected audience
expected_issuer (str, optional): Expected issuer DID
current_time (int, optional): Current timestamp for time validation
Returns:
bool: True if payload is valid
Raises:
JwtValidationError: If validation fails with details
"""
def validate_access_token_payload(payload: JwtPayload, pds_did: str) -> bool:
"""
Validate access token payload for AT Protocol.
Args:
payload (JwtPayload): Access token payload
pds_did (str): Expected PDS DID as audience
Returns:
bool: True if valid access token
Raises:
JwtValidationError: If access token validation fails
"""
def validate_refresh_token_payload(payload: JwtPayload) -> bool:
"""
Validate refresh token payload for AT Protocol.
Args:
payload (JwtPayload): Refresh token payload
Returns:
bool: True if valid refresh token
Raises:
JwtValidationError: If refresh token validation fails
"""Usage examples:
from atproto import (
validate_jwt_payload, validate_access_token_payload,
validate_refresh_token_payload, get_jwt_payload
)
import time
# Validate general JWT payload
jwt_token = "..." # Your JWT token
payload = get_jwt_payload(jwt_token)
# Basic validation
try:
is_valid = validate_jwt_payload(
payload,
expected_audience="did:web:bsky.social",
expected_issuer="did:plc:alice123",
current_time=int(time.time())
)
print(f"JWT is valid: {is_valid}")
except JwtValidationError as e:
print(f"Validation failed: {e}")
# Validate AT Protocol access token
access_token = "..." # Access token JWT
access_payload = get_jwt_payload(access_token)
try:
is_valid_access = validate_access_token_payload(
access_payload,
pds_did="did:web:alice.pds.example.com"
)
print(f"Access token is valid: {is_valid_access}")
except JwtValidationError as e:
print(f"Access token validation failed: {e}")
# Validate refresh token
refresh_token = "..." # Refresh token JWT
refresh_payload = get_jwt_payload(refresh_token)
try:
is_valid_refresh = validate_refresh_token_payload(refresh_payload)
print(f"Refresh token is valid: {is_valid_refresh}")
except JwtValidationError as e:
print(f"Refresh token validation failed: {e}")Verify JWT signatures using cryptographic keys to ensure token authenticity.
def verify_jwt(jwt: Union[str, bytes], signing_key: str) -> bool:
"""
Verify JWT signature synchronously.
Args:
jwt (Union[str, bytes]): JWT token to verify
signing_key (str): DID key or public key for verification
Returns:
bool: True if signature is valid
Raises:
JwtVerificationError: If signature verification fails
UnsupportedAlgorithmError: If JWT algorithm is not supported
"""
def verify_jwt_async(jwt: Union[str, bytes], signing_key: str) -> bool:
"""
Verify JWT signature asynchronously.
Args:
jwt (Union[str, bytes]): JWT token to verify
signing_key (str): DID key or public key for verification
Returns:
bool: True if signature is valid
Raises:
JwtVerificationError: If signature verification fails
"""Usage examples:
from atproto import verify_jwt, verify_jwt_async, get_jwt_payload
import asyncio
# Synchronous JWT verification
jwt_token = "eyJhbGciOiJFUzI1NksiLCJ0eXAiOiJKV1QifQ..."
signing_key = "did:key:z6MkhaXgBZDvotDkL5257faiztiGiC2QtKLGpbnnEGta2doK"
try:
is_signature_valid = verify_jwt(jwt_token, signing_key)
if is_signature_valid:
print("✓ JWT signature is valid")
# Now safe to trust the payload
payload = get_jwt_payload(jwt_token)
print(f"Verified token from: {payload.iss}")
else:
print("✗ JWT signature is invalid")
except JwtVerificationError as e:
print(f"Signature verification failed: {e}")
except UnsupportedAlgorithmError as e:
print(f"Unsupported algorithm: {e}")
# Asynchronous JWT verification
async def verify_jwt_async_example():
try:
is_valid = await verify_jwt_async(jwt_token, signing_key)
if is_valid:
print("✓ Async JWT signature verification successful")
else:
print("✗ Async JWT signature verification failed")
except Exception as e:
print(f"Async verification error: {e}")
asyncio.run(verify_jwt_async_example())Comprehensive JWT verification combining parsing, validation, and signature verification.
def verify_and_validate_jwt(jwt: Union[str, bytes],
signing_key: str,
expected_audience: Optional[Union[str, List[str]]] = None,
expected_issuer: Optional[str] = None) -> JwtPayload:
"""
Complete JWT verification and validation workflow.
Args:
jwt (Union[str, bytes]): JWT token
signing_key (str): DID key for signature verification
expected_audience (Union[str, List[str]], optional): Expected audience
expected_issuer (str, optional): Expected issuer
Returns:
JwtPayload: Verified and validated payload
Raises:
JwtVerificationError: If signature verification fails
JwtValidationError: If payload validation fails
JwtParsingError: If JWT parsing fails
"""
def verify_at_protocol_session_token(access_token: Union[str, bytes],
user_did: str,
pds_did: str,
signing_key: str) -> JwtPayload:
"""
Verify AT Protocol session access token.
Args:
access_token (Union[str, bytes]): Access token to verify
user_did (str): Expected user DID (subject)
pds_did (str): Expected PDS DID (audience)
signing_key (str): User's signing key for verification
Returns:
JwtPayload: Verified access token payload
Raises:
JwtVerificationError: If token verification fails
"""Usage examples:
from atproto import (
verify_and_validate_jwt, verify_at_protocol_session_token,
JwtVerificationError, JwtValidationError
)
# Complete JWT verification workflow
def authenticate_request(jwt_token, user_signing_key):
"""Example request authentication using JWT."""
try:
# Verify signature and validate claims
payload = verify_and_validate_jwt(
jwt_token,
signing_key=user_signing_key,
expected_audience="did:web:bsky.social",
expected_issuer="did:plc:alice123"
)
print(f"✓ Authenticated user: {payload.sub}")
print(f"✓ Token scope: {payload.scope}")
print(f"✓ Valid until: {payload.exp}")
return {'authenticated': True, 'user_did': payload.sub, 'scope': payload.scope}
except JwtVerificationError:
print("✗ JWT signature verification failed")
return {'authenticated': False, 'error': 'invalid_signature'}
except JwtValidationError as e:
print(f"✗ JWT validation failed: {e}")
return {'authenticated': False, 'error': 'invalid_claims'}
except Exception as e:
print(f"✗ Authentication error: {e}")
return {'authenticated': False, 'error': 'authentication_failed'}
# AT Protocol session verification
def verify_session_token(access_token, user_did, pds_did, signing_key):
"""Verify AT Protocol session access token."""
try:
payload = verify_at_protocol_session_token(
access_token=access_token,
user_did=user_did,
pds_did=pds_did,
signing_key=signing_key
)
print(f"✓ Valid session for user: {payload.sub}")
print(f"✓ Session scope: {payload.scope}")
return payload
except JwtVerificationError as e:
print(f"✗ Session token verification failed: {e}")
raise
# Example usage
jwt_token = "eyJhbGciOiJFUzI1NksiLCJ0eXAiOiJKV1QifQ..."
user_signing_key = "did:key:z6MkhaXgBZDv..."
# Authenticate request
auth_result = authenticate_request(jwt_token, user_signing_key)
if auth_result['authenticated']:
print(f"User {auth_result['user_did']} authenticated with scope {auth_result['scope']}")
# Verify session token
try:
session_payload = verify_session_token(
access_token=jwt_token,
user_did="did:plc:alice123",
pds_did="did:web:alice.pds.example.com",
signing_key=user_signing_key
)
print("Session verification successful")
except Exception as e:
print(f"Session verification failed: {e}")Utility functions for working with JWT tokens in AT Protocol contexts.
def extract_did_from_jwt(jwt: Union[str, bytes]) -> Optional[str]:
"""
Extract DID from JWT issuer claim.
Args:
jwt (Union[str, bytes]): JWT token
Returns:
Optional[str]: Issuer DID or None if not found
"""
def get_jwt_algorithm(jwt: Union[str, bytes]) -> str:
"""
Get algorithm from JWT header.
Args:
jwt (Union[str, bytes]): JWT token
Returns:
str: Algorithm identifier (e.g., 'ES256K', 'EdDSA')
Raises:
JwtParsingError: If header cannot be parsed
"""
def is_access_token(jwt: Union[str, bytes]) -> bool:
"""
Check if JWT is an AT Protocol access token.
Args:
jwt (Union[str, bytes]): JWT token
Returns:
bool: True if appears to be access token
"""
def is_refresh_token(jwt: Union[str, bytes]) -> bool:
"""
Check if JWT is an AT Protocol refresh token.
Args:
jwt (Union[str, bytes]): JWT token
Returns:
bool: True if appears to be refresh token
"""
def get_token_expiration_time(jwt: Union[str, bytes]) -> Optional[int]:
"""
Get token expiration timestamp.
Args:
jwt (Union[str, bytes]): JWT token
Returns:
Optional[int]: Unix timestamp of expiration or None
"""Usage examples:
from atproto import (
extract_did_from_jwt, get_jwt_algorithm, is_access_token,
is_refresh_token, get_token_expiration_time
)
from datetime import datetime
# JWT utility functions
access_token = "eyJhbGciOiJFUzI1NksiLCJ0eXAiOiJKV1QifQ..."
refresh_token = "eyJhbGciOiJFUzI1NksiLCJ0eXAiOiJKV1QifQ..."
# Extract information from tokens
issuer_did = extract_did_from_jwt(access_token)
print(f"Token issued by: {issuer_did}")
algorithm = get_jwt_algorithm(access_token)
print(f"Token algorithm: {algorithm}")
# Check token types
print(f"Is access token: {is_access_token(access_token)}")
print(f"Is refresh token: {is_refresh_token(access_token)}")
# Get expiration time
exp_timestamp = get_token_expiration_time(access_token)
if exp_timestamp:
exp_datetime = datetime.fromtimestamp(exp_timestamp)
print(f"Token expires: {exp_datetime}")class JwtError(Exception):
"""Base exception for JWT operations."""
class JwtParsingError(JwtError):
"""Raised when JWT parsing fails."""
class JwtValidationError(JwtError):
"""Raised when JWT validation fails."""
class JwtVerificationError(JwtError):
"""Raised when JWT signature verification fails."""
class UnsupportedAlgorithmError(JwtError):
"""Raised when JWT uses unsupported algorithm."""
class ExpiredTokenError(JwtValidationError):
"""Raised when JWT is expired."""
class InvalidAudienceError(JwtValidationError):
"""Raised when JWT audience is invalid."""
class InvalidIssuerError(JwtValidationError):
"""Raised when JWT issuer is invalid."""Comprehensive error handling example:
from atproto import (
verify_and_validate_jwt,
JwtParsingError, JwtValidationError, JwtVerificationError,
ExpiredTokenError, InvalidAudienceError, UnsupportedAlgorithmError
)
def robust_jwt_verification(jwt_token, signing_key, expected_audience=None):
"""Robust JWT verification with detailed error handling."""
try:
payload = verify_and_validate_jwt(
jwt_token,
signing_key,
expected_audience=expected_audience
)
return {
'success': True,
'payload': payload,
'user_did': payload.sub,
'expires_at': payload.exp
}
except JwtParsingError as e:
return {
'success': False,
'error': 'malformed_token',
'message': f'Token parsing failed: {e}'
}
except ExpiredTokenError as e:
return {
'success': False,
'error': 'token_expired',
'message': 'Token has expired'
}
except InvalidAudienceError as e:
return {
'success': False,
'error': 'invalid_audience',
'message': f'Token audience mismatch: {e}'
}
except JwtVerificationError as e:
return {
'success': False,
'error': 'signature_invalid',
'message': 'Token signature verification failed'
}
except UnsupportedAlgorithmError as e:
return {
'success': False,
'error': 'unsupported_algorithm',
'message': f'Token uses unsupported algorithm: {e}'
}
except JwtValidationError as e:
return {
'success': False,
'error': 'validation_failed',
'message': f'Token validation failed: {e}'
}
except Exception as e:
return {
'success': False,
'error': 'unknown_error',
'message': f'Unexpected error: {e}'
}
# Usage with error handling
jwt_token = "..."
signing_key = "did:key:..."
result = robust_jwt_verification(jwt_token, signing_key, "did:web:bsky.social")
if result['success']:
print(f"✓ JWT verified for user: {result['user_did']}")
else:
print(f"✗ JWT verification failed: {result['error']} - {result['message']}")Install with Tessl CLI
npx tessl i tessl/pypi-atproto