CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-atproto

Comprehensive Python SDK for the AT Protocol, providing client interfaces, authentication, and real-time streaming for decentralized social networks.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

jwt-operations.mddocs/

JWT Operations

JSON Web Token parsing, payload extraction, validation, and signature verification for AT Protocol authentication and authorization. These operations handle JWT lifecycle management including creation, validation, and verification of tokens used in AT Protocol sessions.

Capabilities

JWT Payload Structure

AT Protocol JWT payloads follow RFC 7519 standards with additional AT Protocol-specific fields.

class JwtPayload:
    """
    JWT payload structure based on RFC 7519 with AT Protocol extensions.
    
    Attributes:
        iss (Optional[str]): Issuer (DID) - who issued the token
        sub (Optional[str]): Subject (DID) - who the token is about
        aud (Optional[Union[str, List[str]]]): Audience (DID) - intended recipients
        jti (Optional[str]): JWT ID - unique identifier for this token
        nbf (Optional[int]): Not Before - earliest time token is valid (Unix timestamp)
        exp (Optional[int]): Expiration Time - when token expires (Unix timestamp)
        iat (Optional[int]): Issued At - when token was issued (Unix timestamp)
        scope (Optional[str]): ATProto-specific scope for permissions
    """
    iss: Optional[str]
    sub: Optional[str]
    aud: Optional[Union[str, List[str]]]
    jti: Optional[str]
    nbf: Optional[int]
    exp: Optional[int]
    iat: Optional[int]
    scope: Optional[str]
    
    def is_expired(self, current_time: Optional[int] = None) -> bool:
        """
        Check if token is expired.
        
        Args:
            current_time (int, optional): Current Unix timestamp (default: now)
            
        Returns:
            bool: True if token is expired
        """
    
    def is_valid_at_time(self, timestamp: int) -> bool:
        """
        Check if token is valid at specific timestamp.
        
        Args:
            timestamp (int): Unix timestamp to check
            
        Returns:
            bool: True if token is valid at given time
        """
    
    def time_until_expiration(self) -> Optional[int]:
        """
        Get seconds until token expires.
        
        Returns:
            Optional[int]: Seconds until expiration or None if no expiration
        """
    
    def validate_audience(self, expected_audience: Union[str, List[str]]) -> bool:
        """
        Validate token audience against expected values.
        
        Args:
            expected_audience (Union[str, List[str]]): Expected audience values
            
        Returns:
            bool: True if audience matches expectations
        """

JWT Parsing

Parse JWT tokens into their component parts for inspection and validation.

def parse_jwt(jwt: Union[str, bytes]) -> Tuple[bytes, bytes, Dict[str, Any], bytes]:
    """
    Parse JWT into components without verification.
    
    Args:
        jwt (Union[str, bytes]): JWT token to parse
        
    Returns:
        Tuple[bytes, bytes, Dict[str, Any], bytes]: 
            (payload_bytes, signing_input, header_dict, signature_bytes)
            
    Raises:
        JwtParsingError: If JWT format is invalid
    """

def get_jwt_payload(jwt: Union[str, bytes]) -> JwtPayload:
    """
    Extract and validate JWT payload.
    
    Args:
        jwt (Union[str, bytes]): JWT token
        
    Returns:
        JwtPayload: Decoded and validated payload
        
    Raises:
        JwtParsingError: If JWT format is invalid
        JwtValidationError: If payload validation fails
    """

def decode_jwt_payload(jwt: Union[str, bytes]) -> JwtPayload:
    """
    Decode JWT payload without validation.
    
    Args:
        jwt (Union[str, bytes]): JWT token
        
    Returns:
        JwtPayload: Decoded payload (may be invalid/expired)
        
    Raises:
        JwtParsingError: If JWT format is invalid
    """

Usage examples:

from atproto import parse_jwt, get_jwt_payload, decode_jwt_payload, JwtPayload

# Example JWT token (access token from AT Protocol)
jwt_token = "eyJhbGciOiJFUzI1NksiLCJ0eXAiOiJKV1QifQ.eyJpc3MiOiJkaWQ6cGxjOmFsaWNlMTIzIiwic3ViIjoiZGlkOnBsYzphbGljZTEyMyIsImF1ZCI6ImRpZDp3ZWI6YnNreS5zb2NpYWwiLCJpYXQiOjE2ODAwMDAwMDAsImV4cCI6MTY4MDAwMzYwMCwic2NvcGUiOiJjb20uYXRwcm90by5hY2Nlc3MifQ.signature"

# Parse JWT components
payload_bytes, signing_input, header, signature = parse_jwt(jwt_token)
print(f"Header: {header}")
print(f"Payload bytes length: {len(payload_bytes)}")
print(f"Signing input length: {len(signing_input)}")

# Get validated payload
try:
    payload = get_jwt_payload(jwt_token)
    print(f"Issuer: {payload.iss}")
    print(f"Subject: {payload.sub}")
    print(f"Audience: {payload.aud}")
    print(f"Scope: {payload.scope}")
    print(f"Expires at: {payload.exp}")
    
    # Check expiration
    if payload.is_expired():
        print("⚠️  Token is expired")
    else:
        print(f"✓ Token valid for {payload.time_until_expiration()} more seconds")
        
except JwtValidationError as e:
    print(f"JWT validation failed: {e}")

# Decode without validation (useful for debugging)
payload_unvalidated = decode_jwt_payload(jwt_token)
print(f"Raw payload (unvalidated): {payload_unvalidated}")

JWT Validation

Validate JWT payload claims according to AT Protocol requirements.

def validate_jwt_payload(payload: JwtPayload, 
                        expected_audience: Optional[Union[str, List[str]]] = None,
                        expected_issuer: Optional[str] = None,
                        current_time: Optional[int] = None) -> bool:
    """
    Validate JWT payload claims.
    
    Args:
        payload (JwtPayload): Payload to validate
        expected_audience (Union[str, List[str]], optional): Expected audience
        expected_issuer (str, optional): Expected issuer DID
        current_time (int, optional): Current timestamp for time validation
        
    Returns:
        bool: True if payload is valid
        
    Raises:
        JwtValidationError: If validation fails with details
    """

def validate_access_token_payload(payload: JwtPayload, pds_did: str) -> bool:
    """
    Validate access token payload for AT Protocol.
    
    Args:
        payload (JwtPayload): Access token payload
        pds_did (str): Expected PDS DID as audience
        
    Returns:
        bool: True if valid access token
        
    Raises:
        JwtValidationError: If access token validation fails
    """

def validate_refresh_token_payload(payload: JwtPayload) -> bool:
    """
    Validate refresh token payload for AT Protocol.
    
    Args:
        payload (JwtPayload): Refresh token payload
        
    Returns:
        bool: True if valid refresh token
        
    Raises:
        JwtValidationError: If refresh token validation fails
    """

Usage examples:

from atproto import (
    validate_jwt_payload, validate_access_token_payload,
    validate_refresh_token_payload, get_jwt_payload
)
import time

# Validate general JWT payload
jwt_token = "..."  # Your JWT token
payload = get_jwt_payload(jwt_token)

# Basic validation
try:
    is_valid = validate_jwt_payload(
        payload,
        expected_audience="did:web:bsky.social",
        expected_issuer="did:plc:alice123",
        current_time=int(time.time())
    )
    print(f"JWT is valid: {is_valid}")
except JwtValidationError as e:
    print(f"Validation failed: {e}")

# Validate AT Protocol access token
access_token = "..."  # Access token JWT
access_payload = get_jwt_payload(access_token)

try:
    is_valid_access = validate_access_token_payload(
        access_payload, 
        pds_did="did:web:alice.pds.example.com"
    )
    print(f"Access token is valid: {is_valid_access}")
except JwtValidationError as e:
    print(f"Access token validation failed: {e}")

# Validate refresh token
refresh_token = "..."  # Refresh token JWT  
refresh_payload = get_jwt_payload(refresh_token)

try:
    is_valid_refresh = validate_refresh_token_payload(refresh_payload)
    print(f"Refresh token is valid: {is_valid_refresh}")
except JwtValidationError as e:
    print(f"Refresh token validation failed: {e}")

JWT Signature Verification

Verify JWT signatures using cryptographic keys to ensure token authenticity.

def verify_jwt(jwt: Union[str, bytes], signing_key: str) -> bool:
    """
    Verify JWT signature synchronously.
    
    Args:
        jwt (Union[str, bytes]): JWT token to verify
        signing_key (str): DID key or public key for verification
        
    Returns:
        bool: True if signature is valid
        
    Raises:
        JwtVerificationError: If signature verification fails
        UnsupportedAlgorithmError: If JWT algorithm is not supported
    """

def verify_jwt_async(jwt: Union[str, bytes], signing_key: str) -> bool:
    """
    Verify JWT signature asynchronously.
    
    Args:
        jwt (Union[str, bytes]): JWT token to verify
        signing_key (str): DID key or public key for verification
        
    Returns:
        bool: True if signature is valid
        
    Raises:
        JwtVerificationError: If signature verification fails
    """

Usage examples:

from atproto import verify_jwt, verify_jwt_async, get_jwt_payload
import asyncio

# Synchronous JWT verification
jwt_token = "eyJhbGciOiJFUzI1NksiLCJ0eXAiOiJKV1QifQ..."
signing_key = "did:key:z6MkhaXgBZDvotDkL5257faiztiGiC2QtKLGpbnnEGta2doK"

try:
    is_signature_valid = verify_jwt(jwt_token, signing_key)
    if is_signature_valid:
        print("✓ JWT signature is valid")
        
        # Now safe to trust the payload
        payload = get_jwt_payload(jwt_token)
        print(f"Verified token from: {payload.iss}")
    else:
        print("✗ JWT signature is invalid")
        
except JwtVerificationError as e:
    print(f"Signature verification failed: {e}")
except UnsupportedAlgorithmError as e:
    print(f"Unsupported algorithm: {e}")

# Asynchronous JWT verification
async def verify_jwt_async_example():
    try:
        is_valid = await verify_jwt_async(jwt_token, signing_key)
        if is_valid:
            print("✓ Async JWT signature verification successful")
        else:
            print("✗ Async JWT signature verification failed")
    except Exception as e:
        print(f"Async verification error: {e}")

asyncio.run(verify_jwt_async_example())

Complete JWT Verification Workflow

Comprehensive JWT verification combining parsing, validation, and signature verification.

def verify_and_validate_jwt(jwt: Union[str, bytes], 
                           signing_key: str,
                           expected_audience: Optional[Union[str, List[str]]] = None,
                           expected_issuer: Optional[str] = None) -> JwtPayload:
    """
    Complete JWT verification and validation workflow.
    
    Args:
        jwt (Union[str, bytes]): JWT token
        signing_key (str): DID key for signature verification
        expected_audience (Union[str, List[str]], optional): Expected audience
        expected_issuer (str, optional): Expected issuer
        
    Returns:
        JwtPayload: Verified and validated payload
        
    Raises:
        JwtVerificationError: If signature verification fails
        JwtValidationError: If payload validation fails
        JwtParsingError: If JWT parsing fails
    """

def verify_at_protocol_session_token(access_token: Union[str, bytes],
                                   user_did: str,
                                   pds_did: str,
                                   signing_key: str) -> JwtPayload:
    """
    Verify AT Protocol session access token.
    
    Args:
        access_token (Union[str, bytes]): Access token to verify
        user_did (str): Expected user DID (subject)
        pds_did (str): Expected PDS DID (audience)
        signing_key (str): User's signing key for verification
        
    Returns:
        JwtPayload: Verified access token payload
        
    Raises:
        JwtVerificationError: If token verification fails
    """

Usage examples:

from atproto import (
    verify_and_validate_jwt, verify_at_protocol_session_token,
    JwtVerificationError, JwtValidationError
)

# Complete JWT verification workflow
def authenticate_request(jwt_token, user_signing_key):
    """Example request authentication using JWT."""
    try:
        # Verify signature and validate claims
        payload = verify_and_validate_jwt(
            jwt_token,
            signing_key=user_signing_key,
            expected_audience="did:web:bsky.social",
            expected_issuer="did:plc:alice123"
        )
        
        print(f"✓ Authenticated user: {payload.sub}")
        print(f"✓ Token scope: {payload.scope}")
        print(f"✓ Valid until: {payload.exp}")
        
        return {'authenticated': True, 'user_did': payload.sub, 'scope': payload.scope}
        
    except JwtVerificationError:
        print("✗ JWT signature verification failed")
        return {'authenticated': False, 'error': 'invalid_signature'}
        
    except JwtValidationError as e:
        print(f"✗ JWT validation failed: {e}")
        return {'authenticated': False, 'error': 'invalid_claims'}
        
    except Exception as e:
        print(f"✗ Authentication error: {e}")
        return {'authenticated': False, 'error': 'authentication_failed'}
        
# AT Protocol session verification
def verify_session_token(access_token, user_did, pds_did, signing_key):
    """Verify AT Protocol session access token."""
    try:
        payload = verify_at_protocol_session_token(
            access_token=access_token,
            user_did=user_did,
            pds_did=pds_did,
            signing_key=signing_key
        )
        
        print(f"✓ Valid session for user: {payload.sub}")
        print(f"✓ Session scope: {payload.scope}")
        
        return payload
        
    except JwtVerificationError as e:
        print(f"✗ Session token verification failed: {e}")
        raise
        
# Example usage
jwt_token = "eyJhbGciOiJFUzI1NksiLCJ0eXAiOiJKV1QifQ..."
user_signing_key = "did:key:z6MkhaXgBZDv..."

# Authenticate request
auth_result = authenticate_request(jwt_token, user_signing_key)
if auth_result['authenticated']:
    print(f"User {auth_result['user_did']} authenticated with scope {auth_result['scope']}")

# Verify session token
try:
    session_payload = verify_session_token(
        access_token=jwt_token,
        user_did="did:plc:alice123",
        pds_did="did:web:alice.pds.example.com", 
        signing_key=user_signing_key
    )
    print("Session verification successful")
except Exception as e:
    print(f"Session verification failed: {e}")

JWT Utilities

Utility functions for working with JWT tokens in AT Protocol contexts.

def extract_did_from_jwt(jwt: Union[str, bytes]) -> Optional[str]:
    """
    Extract DID from JWT issuer claim.
    
    Args:
        jwt (Union[str, bytes]): JWT token
        
    Returns:
        Optional[str]: Issuer DID or None if not found
    """

def get_jwt_algorithm(jwt: Union[str, bytes]) -> str:
    """
    Get algorithm from JWT header.
    
    Args:
        jwt (Union[str, bytes]): JWT token
        
    Returns:
        str: Algorithm identifier (e.g., 'ES256K', 'EdDSA')
        
    Raises:
        JwtParsingError: If header cannot be parsed
    """

def is_access_token(jwt: Union[str, bytes]) -> bool:
    """
    Check if JWT is an AT Protocol access token.
    
    Args:
        jwt (Union[str, bytes]): JWT token
        
    Returns:
        bool: True if appears to be access token
    """

def is_refresh_token(jwt: Union[str, bytes]) -> bool:
    """
    Check if JWT is an AT Protocol refresh token.
    
    Args:
        jwt (Union[str, bytes]): JWT token
        
    Returns:
        bool: True if appears to be refresh token
    """

def get_token_expiration_time(jwt: Union[str, bytes]) -> Optional[int]:
    """
    Get token expiration timestamp.
    
    Args:
        jwt (Union[str, bytes]): JWT token
        
    Returns:
        Optional[int]: Unix timestamp of expiration or None
    """

Usage examples:

from atproto import (
    extract_did_from_jwt, get_jwt_algorithm, is_access_token,
    is_refresh_token, get_token_expiration_time
)
from datetime import datetime

# JWT utility functions
access_token = "eyJhbGciOiJFUzI1NksiLCJ0eXAiOiJKV1QifQ..."
refresh_token = "eyJhbGciOiJFUzI1NksiLCJ0eXAiOiJKV1QifQ..."

# Extract information from tokens
issuer_did = extract_did_from_jwt(access_token)
print(f"Token issued by: {issuer_did}")

algorithm = get_jwt_algorithm(access_token)
print(f"Token algorithm: {algorithm}")

# Check token types
print(f"Is access token: {is_access_token(access_token)}")
print(f"Is refresh token: {is_refresh_token(access_token)}")

# Get expiration time
exp_timestamp = get_token_expiration_time(access_token)
if exp_timestamp:
    exp_datetime = datetime.fromtimestamp(exp_timestamp)
    print(f"Token expires: {exp_datetime}")

Error Handling

class JwtError(Exception):
    """Base exception for JWT operations."""

class JwtParsingError(JwtError):
    """Raised when JWT parsing fails."""

class JwtValidationError(JwtError):
    """Raised when JWT validation fails."""

class JwtVerificationError(JwtError):
    """Raised when JWT signature verification fails."""

class UnsupportedAlgorithmError(JwtError):
    """Raised when JWT uses unsupported algorithm."""

class ExpiredTokenError(JwtValidationError):
    """Raised when JWT is expired."""

class InvalidAudienceError(JwtValidationError):
    """Raised when JWT audience is invalid."""

class InvalidIssuerError(JwtValidationError):
    """Raised when JWT issuer is invalid."""

Comprehensive error handling example:

from atproto import (
    verify_and_validate_jwt,
    JwtParsingError, JwtValidationError, JwtVerificationError,
    ExpiredTokenError, InvalidAudienceError, UnsupportedAlgorithmError
)

def robust_jwt_verification(jwt_token, signing_key, expected_audience=None):
    """Robust JWT verification with detailed error handling."""
    try:
        payload = verify_and_validate_jwt(
            jwt_token, 
            signing_key, 
            expected_audience=expected_audience
        )
        
        return {
            'success': True,
            'payload': payload,
            'user_did': payload.sub,
            'expires_at': payload.exp
        }
        
    except JwtParsingError as e:
        return {
            'success': False,
            'error': 'malformed_token',
            'message': f'Token parsing failed: {e}'
        }
        
    except ExpiredTokenError as e:
        return {
            'success': False,
            'error': 'token_expired',
            'message': 'Token has expired'
        }
        
    except InvalidAudienceError as e:
        return {
            'success': False,
            'error': 'invalid_audience',
            'message': f'Token audience mismatch: {e}'
        }
        
    except JwtVerificationError as e:
        return {
            'success': False,
            'error': 'signature_invalid',
            'message': 'Token signature verification failed'
        }
        
    except UnsupportedAlgorithmError as e:
        return {
            'success': False,
            'error': 'unsupported_algorithm',
            'message': f'Token uses unsupported algorithm: {e}'
        }
        
    except JwtValidationError as e:
        return {
            'success': False,
            'error': 'validation_failed',
            'message': f'Token validation failed: {e}'
        }
        
    except Exception as e:
        return {
            'success': False,
            'error': 'unknown_error',
            'message': f'Unexpected error: {e}'
        }

# Usage with error handling
jwt_token = "..."
signing_key = "did:key:..."

result = robust_jwt_verification(jwt_token, signing_key, "did:web:bsky.social")

if result['success']:
    print(f"✓ JWT verified for user: {result['user_did']}")
else:
    print(f"✗ JWT verification failed: {result['error']} - {result['message']}")

Install with Tessl CLI

npx tessl i tessl/pypi-atproto

docs

client-operations.md

core-functionality.md

cryptographic-operations.md

identity-resolution.md

index.md

jwt-operations.md

real-time-streaming.md

tile.json