A comprehensive Python library for implementing OAuth 1.0 and OAuth 2.0 authentication protocols
Token creation, validation, and formatting with support for Bearer tokens, MAC tokens, and JWT tokens. Includes token generators, validators, and utility functions for comprehensive token lifecycle management.
OAuth 2.0 token object providing scope management, token metadata, and validation utilities. Extends Python dictionary with OAuth-specific functionality.
class OAuth2Token(dict[str, str]):
def __init__(
self,
params: dict[str, str],
old_scope: str | list[str] | None = None,
): ...
@property
def scope_changed(self) -> bool:
"""Whether scope changed from original token."""
@property
def old_scope(self) -> str | None:
"""Original token scope as string."""
@property
def old_scopes(self) -> list[str]:
"""Original token scopes as list."""
@property
def scope(self) -> str | None:
"""Current token scope as string."""
@property
def scopes(self) -> list[str]:
"""Current token scopes as list."""
@property
def missing_scopes(self) -> list[str]:
"""Scopes that were removed from original."""
@property
def additional_scopes(self) -> list[str]:
"""Scopes that were added to original."""Bearer token implementation providing token creation, validation, and request processing according to RFC 6750.
class BearerToken:
def __init__(
self,
request_validator=None,
token_generator=None,
expires_in: int | None = None,
refresh_token_generator=None,
):
"""
Bearer token handler.
Parameters:
- request_validator: Request validator instance
- token_generator: Function to generate access tokens
- expires_in: Token expiration time in seconds
- refresh_token_generator: Function to generate refresh tokens
"""
def create_token(
self,
request,
refresh_token: bool = False,
**kwargs,
) -> OAuth2Token:
"""
Create new bearer token.
Parameters:
- request: OAuth request object
- refresh_token: Whether to include refresh token
Returns:
OAuth2Token with access token and optional refresh token
"""
def validate_request(self, request) -> bool:
"""Validate bearer token request."""
def estimate_type(self, request) -> int:
"""Estimate confidence that request uses bearer tokens."""Utility functions for token preparation, extraction, and formatting across different authentication methods.
def prepare_bearer_uri(token: str, uri: str) -> str:
"""Add bearer token to URI query parameters."""
def prepare_bearer_headers(
token: str,
headers: dict[str, str] | None = None,
) -> dict[str, str]:
"""Add bearer token to Authorization header."""
def prepare_bearer_body(token: str, body: str = "") -> str:
"""Add bearer token to request body."""
def get_token_from_header(request) -> str | None:
"""Extract bearer token from Authorization header."""
def prepare_mac_header(
token: str,
uri: str,
key: str | bytes | bytearray,
http_method: str,
nonce: str | None = None,
headers: dict[str, str] | None = None,
body: str | None = None,
ext: str = "",
hash_algorithm: str = "hmac-sha-1",
issue_time: datetime | None = None,
draft: int = 0,
) -> dict[str, str]:
"""Prepare MAC authentication header."""Token generation functions for creating secure access tokens and refresh tokens.
def random_token_generator(request, refresh_token: bool = False) -> str:
"""Generate random token using cryptographically secure methods."""
def signed_token_generator(private_pem: str, **kwargs) -> callable:
"""
Create signed token generator using RSA private key.
Parameters:
- private_pem: RSA private key in PEM format
Returns:
Function that generates signed JWT tokens
"""from oauthlib.oauth2 import BearerToken, RequestValidator
class MyRequestValidator(RequestValidator):
def validate_bearer_token(self, token, scopes, request):
# Validate bearer token from database
token_data = get_token_from_db(token)
if not token_data or token_data.expired:
return False
# Check if token has required scopes
token_scopes = token_data.scopes.split()
return all(scope in token_scopes for scope in scopes)
def save_bearer_token(self, token, request, *args, **kwargs):
# Store bearer token in database
store_token_in_db(token, request.user_id, request.client_id)
# Create bearer token handler
validator = MyRequestValidator()
bearer_token = BearerToken(
request_validator=validator,
expires_in=3600, # 1 hour
)
# Create token for authorized request
token = bearer_token.create_token(request, refresh_token=True)
# Returns: {
# 'access_token': 'abc123...',
# 'token_type': 'Bearer',
# 'expires_in': 3600,
# 'refresh_token': 'def456...',
# 'scope': 'read write'
# }import jwt
from datetime import datetime, timedelta
from oauthlib.common import generate_token
def jwt_token_generator(request):
"""Generate JWT access tokens."""
payload = {
'sub': request.user_id,
'client_id': request.client_id,
'scopes': request.scopes,
'iat': datetime.utcnow(),
'exp': datetime.utcnow() + timedelta(hours=1),
'jti': generate_token(16) # JWT ID
}
return jwt.encode(payload, 'your-secret-key', algorithm='HS256')
# Use custom generator
bearer_token = BearerToken(
request_validator=validator,
token_generator=jwt_token_generator,
refresh_token_generator=lambda req: generate_token(32)
)from oauthlib.oauth2.rfc6749.tokens import (
prepare_bearer_headers,
prepare_bearer_uri,
prepare_bearer_body
)
access_token = 'abc123def456'
# Add token to Authorization header (recommended)
headers = prepare_bearer_headers(access_token)
# {'Authorization': 'Bearer abc123def456'}
# Add token to URI query (less secure)
uri = prepare_bearer_uri(access_token, 'https://api.example.com/data')
# 'https://api.example.com/data?access_token=abc123def456'
# Add token to request body (for POST requests)
body = prepare_bearer_body(access_token, 'data=value')
# 'data=value&access_token=abc123def456'from oauthlib.oauth2.rfc6749.tokens import get_token_from_header
def validate_api_request(request):
"""Validate API request with bearer token."""
# Extract token from request
token = get_token_from_header(request)
if not token:
return False, "Missing access token"
# Validate token
token_data = get_token_from_database(token)
if not token_data:
return False, "Invalid access token"
if token_data.expired:
return False, "Access token expired"
# Check scopes for this endpoint
required_scopes = ['api:read']
token_scopes = token_data.scopes.split()
if not all(scope in token_scopes for scope in required_scopes):
return False, "Insufficient scope"
return True, token_datafrom oauthlib.oauth2.rfc6749.tokens import OAuth2Token
# Create token from server response
response_data = {
'access_token': 'abc123',
'token_type': 'Bearer',
'expires_in': 3600,
'refresh_token': 'def456',
'scope': 'read write admin'
}
token = OAuth2Token(response_data, old_scope='read write')
# Check scope changes
print(token.scope_changed) # True (admin scope added)
print(token.scopes) # ['read', 'write', 'admin']
print(token.old_scopes) # ['read', 'write']
print(token.additional_scopes) # ['admin']
print(token.missing_scopes) # []
# Token acts like a dictionary
print(token['access_token']) # 'abc123'
print(token.get('expires_in')) # 3600from oauthlib.oauth2.rfc6749.tokens import prepare_mac_header
import datetime
# Prepare MAC authentication header
mac_header = prepare_mac_header(
token='h480djs93hd8',
uri='https://api.example.com/data',
key='secret-mac-key',
http_method='GET',
nonce='unique-nonce-123',
hash_algorithm='hmac-sha-256',
issue_time=datetime.datetime.utcnow()
)
print(mac_header)
# {
# 'Authorization': 'MAC id="h480djs93hd8", nonce="unique-nonce-123",
# ts="1234567890", mac="signature..."'
# }def refresh_access_token(refresh_token, client_id, client_secret):
"""Refresh an expired access token."""
from oauthlib.oauth2 import WebApplicationClient
import requests
client = WebApplicationClient(client_id)
# Prepare refresh request
token_url, headers, body = client.prepare_refresh_token_request(
'https://auth.example.com/token',
refresh_token=refresh_token,
scope=['read', 'write'] # Optional: request different scopes
)
# Add client authentication
headers['Authorization'] = f'Basic {base64.b64encode(f"{client_id}:{client_secret}".encode()).decode()}'
# Make request
response = requests.post(token_url, headers=headers, data=body)
if response.status_code == 200:
return client.parse_request_body_response(response.text)
else:
raise Exception(f"Token refresh failed: {response.text}")def revoke_token(token, token_type_hint, client_id, client_secret):
"""Revoke an access or refresh token."""
from oauthlib.oauth2 import WebApplicationClient
import requests
client = WebApplicationClient(client_id)
# Prepare revocation request
revoke_url, headers, body = client.prepare_token_revocation_request(
'https://auth.example.com/revoke',
token=token,
token_type_hint=token_type_hint # 'access_token' or 'refresh_token'
)
# Add client authentication
headers['Authorization'] = f'Basic {base64.b64encode(f"{client_id}:{client_secret}".encode()).decode()}'
# Make request
response = requests.post(revoke_url, headers=headers, data=body)
return response.status_code == 200Install with Tessl CLI
npx tessl i tessl/pypi-oauthlib