The ultimate Python library in building OAuth and OpenID Connect servers and clients.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Comprehensive OAuth 2.0 implementation following RFC 6749 and related RFCs. Supports all standard grant types: authorization code, implicit, resource owner password credentials, client credentials, and refresh token. Includes advanced features like PKCE (RFC 7636), device flow (RFC 8628), and bearer tokens (RFC 6750).
High-level OAuth 2.0 client implementation for performing authorization flows with automatic token management.
class OAuth2Client:
"""OAuth 2.0 client implementation."""
def __init__(self, client_id: str, client_secret: str = None, token_endpoint_auth_method: str = 'client_secret_basic', revocation_endpoint_auth_method: str = None, scope: str = None, redirect_uri: str = None, token: dict = None, token_placement: str = 'header', update_token: callable = None, **kwargs) -> None:
"""
Initialize OAuth 2.0 client.
Args:
client_id: Client identifier
client_secret: Client secret
token_endpoint_auth_method: Token endpoint authentication method
revocation_endpoint_auth_method: Revocation endpoint authentication method
scope: Default scope
redirect_uri: Default redirect URI
token: Access token dictionary
token_placement: Where to place token (header, body, uri)
update_token: Callback for token updates
"""
def create_authorization_url(self, authorization_endpoint: str, state: str = None, code_challenge: str = None, code_challenge_method: str = None, **kwargs) -> tuple:
"""
Create authorization URL for authorization code flow.
Args:
authorization_endpoint: Authorization server's authorization endpoint
state: CSRF protection state parameter
code_challenge: PKCE code challenge
code_challenge_method: PKCE code challenge method
**kwargs: Additional authorization parameters
Returns:
Tuple of (authorization_url, state)
"""
def fetch_token(self, token_endpoint: str, code: str = None, authorization_response: str = None, body: str = '', auth: tuple = None, username: str = None, password: str = None, **kwargs) -> dict:
"""
Fetch access token from authorization server.
Args:
token_endpoint: Token endpoint URL
code: Authorization code
authorization_response: Full authorization response URL
body: Additional request body
auth: HTTP basic auth tuple
username: Username for password grant
password: Password for password grant
**kwargs: Additional token parameters
Returns:
Token dictionary with access_token, token_type, etc.
"""
def refresh_token(self, token_endpoint: str, refresh_token: str = None, body: str = '', auth: tuple = None, **kwargs) -> dict:
"""
Refresh access token using refresh token.
Args:
token_endpoint: Token endpoint URL
refresh_token: Refresh token (uses stored token if None)
body: Additional request body
auth: HTTP basic auth tuple
**kwargs: Additional parameters
Returns:
New token dictionary
"""
def revoke_token(self, revocation_endpoint: str, token: str = None, token_type_hint: str = None, body: str = '', auth: tuple = None, **kwargs) -> dict:
"""
Revoke access or refresh token.
Args:
revocation_endpoint: Revocation endpoint URL
token: Token to revoke (uses stored token if None)
token_type_hint: Hint about token type
body: Additional request body
auth: HTTP basic auth tuple
**kwargs: Additional parameters
Returns:
Response dictionary
"""
def introspect_token(self, introspection_endpoint: str, token: str = None, token_type_hint: str = None, **kwargs) -> dict:
"""
Introspect token at authorization server.
Args:
introspection_endpoint: Introspection endpoint URL
token: Token to introspect
token_type_hint: Hint about token type
**kwargs: Additional parameters
Returns:
Introspection response dictionary
"""
def register_client(self, registration_endpoint: str, **kwargs) -> dict:
"""
Register client dynamically.
Args:
registration_endpoint: Registration endpoint URL
**kwargs: Client metadata
Returns:
Registration response with client credentials
"""
@property
def access_token(self) -> str:
"""Get current access token."""
@property
def token_type(self) -> str:
"""Get current token type."""Server-side components for implementing OAuth 2.0 authorization servers.
class AuthorizationServer:
"""OAuth 2.0 authorization server."""
def __init__(self, query_client: callable, save_token: callable = None) -> None:
"""
Initialize authorization server.
Args:
query_client: Function to query client by client_id
save_token: Function to save issued tokens
"""
def register_grant(self, grant_cls: type, extensions: list = None) -> None:
"""
Register a grant type.
Args:
grant_cls: Grant class to register
extensions: List of grant extensions
"""
def register_endpoint(self, endpoint: object) -> None:
"""
Register an endpoint.
Args:
endpoint: Endpoint instance
"""
def validate_consent_request(self, request: OAuth2Request, end_user: object = None) -> None:
"""
Validate consent request at authorization endpoint.
Args:
request: OAuth2Request object
end_user: End user object
"""
def create_authorization_response(self, request: OAuth2Request, grant_user: callable) -> tuple:
"""
Create authorization response.
Args:
request: OAuth2Request object
grant_user: Function to grant authorization to user
Returns:
Tuple of (status_code, body, headers)
"""
def create_token_response(self, request: OAuth2Request) -> tuple:
"""
Create token response.
Args:
request: OAuth2Request object
Returns:
Tuple of (status_code, body, headers)
"""
def create_revocation_response(self, request: OAuth2Request) -> tuple:
"""
Create revocation response.
Args:
request: OAuth2Request object
Returns:
Tuple of (status_code, body, headers)
"""
class ResourceProtector:
"""OAuth 2.0 resource server protection."""
def __init__(self) -> None:
"""Initialize resource protector."""
def register_token_validator(self, validator: 'TokenValidator') -> None:
"""
Register token validator.
Args:
validator: Token validator instance
"""
def validate_request(self, scopes: list, request: OAuth2Request) -> 'OAuth2Token':
"""
Validate OAuth 2.0 request.
Args:
scopes: Required scopes
request: OAuth2Request object
Returns:
OAuth2Token object if valid
"""
def acquire_token(self, scopes: list = None, request: OAuth2Request = None) -> 'OAuth2Token':
"""
Acquire token from request.
Args:
scopes: Required scopes
request: OAuth2Request object
Returns:
OAuth2Token object
"""Core objects for representing OAuth 2.0 requests and tokens.
class OAuth2Request:
"""OAuth 2.0 request wrapper."""
def __init__(self, method: str, uri: str, body: str = None, headers: dict = None) -> None:
"""
Initialize OAuth 2.0 request.
Args:
method: HTTP method
uri: Request URI
body: Request body
headers: Request headers
"""
@property
def client_id(self) -> str:
"""Get client ID from request."""
@property
def client_secret(self) -> str:
"""Get client secret from request."""
@property
def redirect_uri(self) -> str:
"""Get redirect URI from request."""
@property
def scope(self) -> str:
"""Get scope from request."""
@property
def state(self) -> str:
"""Get state from request."""
@property
def response_type(self) -> str:
"""Get response type from request."""
@property
def grant_type(self) -> str:
"""Get grant type from request."""
class OAuth2Token:
"""OAuth 2.0 token representation."""
def __init__(self, params: dict = None) -> None:
"""
Initialize OAuth 2.0 token.
Args:
params: Token parameters dictionary
"""
@property
def access_token(self) -> str:
"""Get access token."""
@property
def token_type(self) -> str:
"""Get token type."""
@property
def refresh_token(self) -> str:
"""Get refresh token."""
@property
def expires_in(self) -> int:
"""Get token expiration time."""
@property
def scope(self) -> str:
"""Get token scope."""
def is_expired(self) -> bool:
"""Check if token is expired."""
def get_expires_at(self) -> int:
"""Get expiration timestamp."""Implementation of standard OAuth 2.0 grant types.
class BaseGrant:
"""Base class for OAuth 2.0 grants."""
def __init__(self, request: OAuth2Request, server: AuthorizationServer) -> None:
"""
Initialize grant.
Args:
request: OAuth2Request object
server: AuthorizationServer instance
"""
def validate_request(self) -> None:
"""Validate the grant request."""
def create_authorization_response(self, redirect_uri: str, grant_user: callable) -> dict:
"""Create authorization response."""
def create_token_response(self) -> dict:
"""Create token response."""
class AuthorizationCodeGrant(BaseGrant):
"""Authorization code grant implementation."""
GRANT_TYPE: str = 'authorization_code'
RESPONSE_TYPES: list = ['code']
def create_authorization_code(self, client: object, grant_user: callable, request: OAuth2Request) -> str:
"""
Create authorization code.
Args:
client: Client object
grant_user: Grant user function
request: OAuth2Request object
Returns:
Authorization code string
"""
def parse_authorization_code(self, code: str, client: object) -> object:
"""
Parse and validate authorization code.
Args:
code: Authorization code
client: Client object
Returns:
Authorization code object
"""
def delete_authorization_code(self, authorization_code: object) -> None:
"""
Delete authorization code after use.
Args:
authorization_code: Authorization code object
"""
def authenticate_user(self, authorization_code: object) -> object:
"""
Authenticate user from authorization code.
Args:
authorization_code: Authorization code object
Returns:
User object
"""
class ImplicitGrant(BaseGrant):
"""Implicit grant implementation."""
GRANT_TYPE: str = None
RESPONSE_TYPES: list = ['token']
class ResourceOwnerPasswordCredentialsGrant(BaseGrant):
"""Resource owner password credentials grant implementation."""
GRANT_TYPE: str = 'password'
def authenticate_user(self, username: str, password: str, client: object, request: OAuth2Request) -> object:
"""
Authenticate user with username and password.
Args:
username: Username
password: Password
client: Client object
request: OAuth2Request object
Returns:
User object if authenticated
"""
class ClientCredentialsGrant(BaseGrant):
"""Client credentials grant implementation."""
GRANT_TYPE: str = 'client_credentials'
class RefreshTokenGrant(BaseGrant):
"""Refresh token grant implementation."""
GRANT_TYPE: str = 'refresh_token'
def authenticate_refresh_token(self, refresh_token: str, client: object, request: OAuth2Request) -> object:
"""
Authenticate refresh token.
Args:
refresh_token: Refresh token string
client: Client object
request: OAuth2Request object
Returns:
Token object if valid
"""
def authenticate_user(self, credential: object) -> object:
"""
Authenticate user from refresh token credential.
Args:
credential: Token credential object
Returns:
User object
"""
def revoke_old_credential(self, credential: object) -> None:
"""
Revoke old refresh token.
Args:
credential: Token credential to revoke
"""RFC 6750 Bearer Token implementation.
class BearerTokenGenerator:
"""Generate bearer tokens."""
def __init__(self, access_token_generator: callable = None, refresh_token_generator: callable = None, expires_generator: callable = None) -> None:
"""
Initialize bearer token generator.
Args:
access_token_generator: Function to generate access tokens
refresh_token_generator: Function to generate refresh tokens
expires_generator: Function to generate expiration times
"""
def generate(self, client: object, grant_type: str, user: object = None, scope: str = None, expires_in: int = None, include_refresh_token: bool = True) -> dict:
"""
Generate bearer token.
Args:
client: Client object
grant_type: Grant type used
user: User object
scope: Token scope
expires_in: Token lifetime in seconds
include_refresh_token: Whether to include refresh token
Returns:
Token dictionary
"""
class BearerTokenValidator:
"""Validate bearer tokens."""
def authenticate_token(self, token_string: str) -> object:
"""
Authenticate bearer token.
Args:
token_string: Bearer token string
Returns:
Token object if valid
"""
def request_invalid(self, request: OAuth2Request) -> bool:
"""
Check if request is invalid.
Args:
request: OAuth2Request object
Returns:
True if request is invalid
"""
def token_revoked(self, token: object) -> bool:
"""
Check if token is revoked.
Args:
token: Token object
Returns:
True if token is revoked
"""
def add_bearer_token(uri: str, http_method: str, body: str, headers: dict, token: dict) -> tuple:
"""
Add bearer token to request.
Args:
uri: Request URI
http_method: HTTP method
body: Request body
headers: Request headers
token: Token dictionary
Returns:
Tuple of (uri, headers, body)
"""RFC 7636 Proof Key for Code Exchange implementation.
class CodeChallenge:
"""PKCE code challenge implementation."""
def __init__(self, code_verifier: str, code_challenge_method: str = 'S256') -> None:
"""
Initialize code challenge.
Args:
code_verifier: Code verifier string
code_challenge_method: Challenge method (plain or S256)
"""
def get_code_challenge(self) -> str:
"""Get code challenge string."""
def get_code_challenge_method(self) -> str:
"""Get code challenge method."""
def verify_code_verifier(self, code_verifier: str) -> bool:
"""
Verify code verifier against challenge.
Args:
code_verifier: Code verifier to verify
Returns:
True if verifier is valid
"""
def create_s256_code_challenge(code_verifier: str) -> str:
"""
Create S256 code challenge from verifier.
Args:
code_verifier: Code verifier string
Returns:
S256 code challenge string
"""RFC 8628 Device Authorization Grant implementation.
class DeviceAuthorizationEndpoint:
"""Device authorization endpoint."""
def __init__(self, server: AuthorizationServer) -> None:
"""
Initialize device authorization endpoint.
Args:
server: AuthorizationServer instance
"""
def create_endpoint_response(self, request: OAuth2Request) -> tuple:
"""
Create device authorization response.
Args:
request: OAuth2Request object
Returns:
Tuple of (status_code, body, headers)
"""
class DeviceCodeGrant(BaseGrant):
"""Device code grant implementation."""
GRANT_TYPE: str = 'urn:ietf:params:oauth:grant-type:device_code'
def query_device_credential(self, device_code: str, client: object) -> object:
"""
Query device credential by device code.
Args:
device_code: Device code
client: Client object
Returns:
Device credential object
"""
def should_slow_down(self, credential: object, now: int) -> bool:
"""
Check if client should slow down polling.
Args:
credential: Device credential object
now: Current timestamp
Returns:
True if client should slow down
"""
# Device flow constants
DEVICE_CODE_GRANT_TYPE: str = 'urn:ietf:params:oauth:grant-type:device_code'
# Device flow errors
class AuthorizationPendingError(OAuth2Error):
"""Authorization is pending user action."""
error: str = 'authorization_pending'
class SlowDownError(OAuth2Error):
"""Client is polling too frequently."""
error: str = 'slow_down'Mixins for database models to store OAuth 2.0 data.
class ClientMixin:
"""Mixin for OAuth 2.0 client model."""
client_id: str # Client identifier
client_secret: str # Client secret (may be None for public clients)
client_id_issued_at: int # Client ID issued timestamp
client_secret_expires_at: int # Client secret expiration timestamp
def get_client_id(self) -> str:
"""Get client ID."""
def get_default_redirect_uri(self) -> str:
"""Get default redirect URI."""
def get_allowed_scope(self, scope: str) -> str:
"""Get allowed scope for client."""
def check_redirect_uri(self, redirect_uri: str) -> bool:
"""Check if redirect URI is allowed."""
def has_client_secret(self) -> bool:
"""Check if client has a secret."""
def check_client_secret(self, client_secret: str) -> bool:
"""Verify client secret."""
def check_token_endpoint_auth_method(self, method: str) -> bool:
"""Check if token endpoint auth method is supported."""
def check_response_type(self, response_type: str) -> bool:
"""Check if response type is supported."""
def check_grant_type(self, grant_type: str) -> bool:
"""Check if grant type is supported."""
class AuthorizationCodeMixin:
"""Mixin for authorization code model."""
code: str # Authorization code
client_id: str # Client identifier
redirect_uri: str # Redirect URI
scope: str # Authorized scope
user_id: str # User identifier
code_challenge: str # PKCE code challenge
code_challenge_method: str # PKCE challenge method
def is_expired(self) -> bool:
"""Check if authorization code is expired."""
def get_redirect_uri(self) -> str:
"""Get redirect URI."""
def get_scope(self) -> str:
"""Get authorized scope."""
def get_user_id(self) -> str:
"""Get user ID."""
def get_code_challenge(self) -> str:
"""Get PKCE code challenge."""
def get_code_challenge_method(self) -> str:
"""Get PKCE challenge method."""
class TokenMixin:
"""Mixin for access token model."""
access_token: str # Access token
client_id: str # Client identifier
token_type: str # Token type (usually 'Bearer')
refresh_token: str # Refresh token
scope: str # Token scope
user_id: str # User identifier
issued_at: int # Token issued timestamp
expires_in: int # Token lifetime in seconds
def get_scope(self) -> str:
"""Get token scope."""
def get_user_id(self) -> str:
"""Get user ID."""
def is_expired(self) -> bool:
"""Check if token is expired."""
def is_revoked(self) -> bool:
"""Check if token is revoked."""Helper functions for scope and parameter handling.
def scope_to_list(scope: str) -> list:
"""
Convert scope string to list.
Args:
scope: Space-separated scope string
Returns:
List of scope values
"""
def list_to_scope(scopes: list) -> str:
"""
Convert scope list to string.
Args:
scopes: List of scope values
Returns:
Space-separated scope string
"""from authlib.oauth2 import OAuth2Client
# Initialize client
client = OAuth2Client(
client_id='your-client-id',
client_secret='your-client-secret',
scope='read write'
)
# Step 1: Generate authorization URL
authorization_url, state = client.create_authorization_url(
'https://provider.com/authorize',
state='random-state-string'
)
print(f"Visit: {authorization_url}")
# Step 2: Exchange code for token
token = client.fetch_token(
'https://provider.com/token',
code='authorization-code-from-callback'
)
print(f"Access token: {token['access_token']}")
# Step 3: Refresh token when needed
if 'refresh_token' in token:
new_token = client.refresh_token(
'https://provider.com/token',
refresh_token=token['refresh_token']
)from authlib.oauth2 import OAuth2Client, create_s256_code_challenge
from authlib.common.security import generate_token
# Generate PKCE parameters
code_verifier = generate_token(128)
code_challenge = create_s256_code_challenge(code_verifier)
# Create authorization URL with PKCE
client = OAuth2Client(client_id='public-client-id')
auth_url, state = client.create_authorization_url(
'https://provider.com/authorize',
code_challenge=code_challenge,
code_challenge_method='S256'
)
# Exchange code with verifier
token = client.fetch_token(
'https://provider.com/token',
code='authorization-code',
code_verifier=code_verifier
)from authlib.oauth2 import AuthorizationServer, ResourceProtector
from authlib.oauth2.rfc6749.grants import AuthorizationCodeGrant
# Define query functions
def query_client(client_id):
return get_client_by_id(client_id)
def save_token(token, request, *args, **kwargs):
store_access_token(token)
# Create authorization server
authorization_server = AuthorizationServer(
query_client=query_client,
save_token=save_token
)
# Register authorization code grant
authorization_server.register_grant(AuthorizationCodeGrant)
# Handle authorization endpoint
@app.route('/authorize', methods=['GET', 'POST'])
def authorize():
request = OAuth2Request(request.method, request.url, request.form, request.headers)
if request.method == 'GET':
# Show authorization form
try:
authorization_server.validate_consent_request(request, current_user)
return render_template('authorize.html')
except OAuth2Error as error:
return {'error': error.error}, 400
# Handle authorization
def grant_user():
return current_user
status_code, body, headers = authorization_server.create_authorization_response(
request, grant_user
)
return Response(body, status=status_code, headers=headers)
# Handle token endpoint
@app.route('/token', methods=['POST'])
def issue_token():
request = OAuth2Request(request.method, request.url, request.form, request.headers)
status_code, body, headers = authorization_server.create_token_response(request)
return Response(body, status=status_code, headers=headers)Install with Tessl CLI
npx tessl i tessl/pypi-authlib