The ultimate Python library in building OAuth and OpenID Connect servers and clients.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Complete OAuth 1.0 implementation following RFC 5849 with support for all signature methods (HMAC-SHA1, RSA-SHA1, PLAINTEXT) and signature types (header, query, body). Provides both client and server implementations with comprehensive support for temporary credentials and token credentials.
High-level OAuth 1.0 client implementation for performing the complete authorization flow with automatic signature generation and request signing.
class OAuth1Client:
"""OAuth 1.0 client implementation."""
def __init__(self, client_key: str, client_secret: str = None, token: str = None, token_secret: str = None, redirect_uri: str = None, rsa_key=None, verifier: str = None, signature_method: str = 'HMAC-SHA1', signature_type: str = 'AUTH_HEADER', force_include_body: bool = False, **kwargs) -> None:
"""
Initialize OAuth 1.0 client.
Args:
client_key: Client identifier
client_secret: Client secret (not needed for RSA-SHA1)
token: OAuth token
token_secret: OAuth token secret
redirect_uri: Callback URI for authorization
rsa_key: RSA private key for RSA-SHA1 signature method
verifier: OAuth verifier from authorization callback
signature_method: Signature method (HMAC-SHA1, RSA-SHA1, PLAINTEXT)
signature_type: Where to include signature (AUTH_HEADER, QUERY, BODY)
force_include_body: Whether to always include body in signature
"""
def fetch_request_token(self, uri: str, realm: str = None) -> dict:
"""
Fetch temporary credentials (request token).
Args:
uri: Request token endpoint URI
realm: Optional authorization realm
Returns:
Dictionary containing oauth_token and oauth_token_secret
"""
def create_authorization_url(self, uri: str, **request_kwargs) -> str:
"""
Create authorization URL for user to visit.
Args:
uri: Authorization endpoint URI
**request_kwargs: Additional parameters for authorization
Returns:
Authorization URL with embedded token
"""
def fetch_access_token(self, uri: str, verifier: str = None) -> dict:
"""
Exchange authorization verifier for access token.
Args:
uri: Access token endpoint URI
verifier: OAuth verifier from authorization callback
Returns:
Dictionary containing oauth_token and oauth_token_secret
"""
def get_request_token_authorization_url(self, url: str, **request_kwargs) -> str:
"""Get authorization URL (alias for create_authorization_url)."""
def parse_authorization_response(self, url: str) -> dict:
"""
Parse authorization callback response.
Args:
url: Callback URL with parameters
Returns:
Dictionary with parsed parameters
"""Server-side components for implementing OAuth 1.0 authorization and resource servers.
class AuthorizationServer:
"""OAuth 1.0 authorization server."""
def __init__(self, query_client: callable, query_token: callable, save_request_token: callable, save_verifier: callable, save_access_token: callable) -> None:
"""
Initialize authorization server.
Args:
query_client: Function to query client by client_key
query_token: Function to query token by token value
save_request_token: Function to save temporary credentials
save_verifier: Function to save authorization verifier
save_access_token: Function to save access token
"""
def create_request_token_response(self, uri: str, http_method: str = 'POST', body: str = None, headers: dict = None) -> tuple:
"""
Create response for request token endpoint.
Args:
uri: Request URI
http_method: HTTP method
body: Request body
headers: Request headers
Returns:
Tuple of (headers, body, status_code)
"""
def create_authorization_response(self, uri: str, http_method: str = 'GET', body: str = None, headers: dict = None, credentials: dict = None) -> tuple:
"""
Create response for authorization endpoint.
Args:
uri: Request URI
http_method: HTTP method
body: Request body
headers: Request headers
credentials: User credentials dictionary
Returns:
Tuple of (headers, body, status_code)
"""
def create_access_token_response(self, uri: str, http_method: str = 'POST', body: str = None, headers: dict = None, credentials: dict = None) -> tuple:
"""
Create response for access token endpoint.
Args:
uri: Request URI
http_method: HTTP method
body: Request body
headers: Request headers
credentials: Optional user credentials
Returns:
Tuple of (headers, body, status_code)
"""
def validate_request_token_request(self, request: OAuth1Request) -> None:
"""Validate request token request."""
def validate_authorization_request(self, request: OAuth1Request) -> None:
"""Validate authorization request."""
def validate_access_token_request(self, request: OAuth1Request) -> None:
"""Validate access token request."""
class ResourceProtector:
"""OAuth 1.0 resource server protection."""
def __init__(self, query_client: callable, query_token: callable) -> None:
"""
Initialize resource protector.
Args:
query_client: Function to query client by client_key
query_token: Function to query token by token value
"""
def validate_request(self, uri: str, http_method: str, body: str = None, headers: dict = None) -> OAuth1Request:
"""
Validate OAuth 1.0 signed request.
Args:
uri: Request URI
http_method: HTTP method
body: Request body
headers: Request headers
Returns:
Validated OAuth1Request object
"""OAuth 1.0 request representation with signature validation capabilities.
class OAuth1Request:
"""OAuth 1.0 request wrapper."""
def __init__(self, uri: str, http_method: str = 'GET', body: str = None, headers: dict = None) -> None:
"""
Initialize OAuth 1.0 request.
Args:
uri: Request URI
http_method: HTTP method
body: Request body
headers: Request headers
"""
@property
def client_key(self) -> str:
"""Get client key from request."""
@property
def signature(self) -> str:
"""Get signature from request."""
@property
def signature_method(self) -> str:
"""Get signature method from request."""
@property
def token(self) -> str:
"""Get token from request."""
def validate_timestamp_and_nonce(self, timestamp_lifetime: int = 600) -> None:
"""
Validate timestamp and nonce.
Args:
timestamp_lifetime: Maximum age of timestamp in seconds
"""
def validate_signature(self, client_secret: str, token_secret: str = '', rsa_key=None) -> None:
"""
Validate request signature.
Args:
client_secret: Client secret
token_secret: Token secret
rsa_key: RSA public key for RSA-SHA1 verification
"""OAuth 1.0 client authentication for use with HTTP libraries.
class ClientAuth:
"""OAuth 1.0 client authentication."""
def __init__(self, client_key: str, client_secret: str = None, token: str = None, token_secret: str = None, signature_method: str = 'HMAC-SHA1', signature_type: str = 'AUTH_HEADER', rsa_key=None, verifier: str = None, **kwargs) -> None:
"""
Initialize client authentication.
Args:
client_key: Client identifier
client_secret: Client secret
token: OAuth token
token_secret: OAuth token secret
signature_method: Signature method
signature_type: Signature type
rsa_key: RSA private key
verifier: OAuth verifier
"""
def __call__(self, request) -> object:
"""
Apply OAuth 1.0 signature to request.
Args:
request: HTTP request object
Returns:
Modified request with OAuth signature
"""Mixins for database models to store OAuth 1.0 credentials.
class ClientMixin:
"""Mixin for OAuth 1.0 client model."""
client_key: str # Client identifier
client_secret: str # Client secret (may be None for RSA-SHA1)
default_redirect_uri: str # Default callback URI
default_realms: list # Default authorization realms
def get_default_redirect_uri(self) -> str:
"""Get default redirect URI for this client."""
def get_default_realms(self) -> list:
"""Get default realms for this client."""
def validate_redirect_uri(self, redirect_uri: str) -> bool:
"""Validate if redirect URI is allowed for this client."""
class TemporaryCredentialMixin:
"""Mixin for temporary credential (request token) model."""
client_key: str # Client identifier
oauth_token: str # Temporary token
oauth_token_secret: str # Temporary token secret
oauth_callback: str # Callback URI
oauth_verifier: str # Authorization verifier
def get_redirect_uri(self) -> str:
"""Get callback URI for this temporary credential."""
def get_oauth_verifier(self) -> str:
"""Get OAuth verifier for this temporary credential."""
class TokenCredentialMixin:
"""Mixin for token credential (access token) model."""
client_key: str # Client identifier
oauth_token: str # Access token
oauth_token_secret: str # Access token secret
user_id: str # User identifier
def get_user_id(self) -> str:
"""Get user ID associated with this token."""
class TemporaryCredential:
"""Temporary credential representation."""
def __init__(self, client_key: str, oauth_token: str, oauth_token_secret: str, oauth_callback: str = None) -> None:
"""
Initialize temporary credential.
Args:
client_key: Client identifier
oauth_token: Temporary token
oauth_token_secret: Temporary token secret
oauth_callback: Callback URI
"""OAuth 1.0 signature methods and types.
# Signature Methods
SIGNATURE_HMAC_SHA1: str = 'HMAC-SHA1'
SIGNATURE_RSA_SHA1: str = 'RSA-SHA1'
SIGNATURE_PLAINTEXT: str = 'PLAINTEXT'
# Signature Types
SIGNATURE_TYPE_HEADER: str = 'AUTH_HEADER'
SIGNATURE_TYPE_QUERY: str = 'QUERY'
SIGNATURE_TYPE_BODY: str = 'BODY'from authlib.oauth1 import OAuth1Client
# Step 1: Initialize client
client = OAuth1Client(
client_key='your-client-key',
client_secret='your-client-secret',
signature_method='HMAC-SHA1'
)
# Step 2: Get request token
request_token = client.fetch_request_token('https://provider.com/oauth/request_token')
print(f"Request token: {request_token['oauth_token']}")
# Step 3: Get authorization URL
client.token = request_token['oauth_token']
client.token_secret = request_token['oauth_token_secret']
auth_url = client.create_authorization_url('https://provider.com/oauth/authorize')
print(f"Visit: {auth_url}")
# Step 4: After user authorization, exchange for access token
client.verifier = 'verifier-from-callback'
access_token = client.fetch_access_token('https://provider.com/oauth/access_token')
print(f"Access token: {access_token['oauth_token']}")
# Step 5: Make authenticated requests
client.token = access_token['oauth_token']
client.token_secret = access_token['oauth_token_secret']
# Use client with requests library or make manual signed requestsfrom authlib.oauth1 import AuthorizationServer, ResourceProtector
# Define callback functions
def query_client(client_key):
# Return client object or None
return get_client_by_key(client_key)
def query_token(token):
# Return token object or None
return get_token_by_value(token)
def save_request_token(token, request):
# Save temporary credential
store_request_token(token)
def save_verifier(token, verifier, request):
# Save authorization verifier
update_token_verifier(token, verifier)
def save_access_token(token, request):
# Save access token
store_access_token(token)
# Create authorization server
authorization_server = AuthorizationServer(
query_client=query_client,
query_token=query_token,
save_request_token=save_request_token,
save_verifier=save_verifier,
save_access_token=save_access_token
)
# Handle request token endpoint
headers, body, status = authorization_server.create_request_token_response(
uri='/oauth/request_token',
http_method='POST',
headers=request.headers
)
# Handle authorization endpoint
headers, body, status = authorization_server.create_authorization_response(
uri='/oauth/authorize',
http_method='GET',
credentials={'user_id': current_user.id}
)
# Handle access token endpoint
headers, body, status = authorization_server.create_access_token_response(
uri='/oauth/access_token',
http_method='POST',
headers=request.headers
)from authlib.oauth1 import ResourceProtector
# Create resource protector
resource_protector = ResourceProtector(
query_client=query_client,
query_token=query_token
)
# Protect API endpoints
def protected_resource():
try:
# Validate OAuth 1.0 request
oauth_request = resource_protector.validate_request(
uri=request.url,
http_method=request.method,
body=request.get_data(),
headers=request.headers
)
# Access token is valid, process request
token = oauth_request.token
return f"Hello, token: {token}"
except OAuth1Error as error:
return {'error': error.error}, 401Install with Tessl CLI
npx tessl i tessl/pypi-authlib