A comprehensive Python library for implementing OAuth 1.0 and OAuth 2.0 authentication protocols
Complete server-side OAuth 2.0 implementation providing authorization servers, resource servers, and token endpoints. Supports all standard grant types, token introspection, revocation, and comprehensive request validation.
OAuth 2.0 authorization endpoint handling authorization requests and user consent. Manages the authorization code flow by presenting authorization pages to users and issuing authorization codes.
class AuthorizationEndpoint:
def __init__(self, default_response_type: str, default_token_type: str, response_types: dict[str, callable]): ...
def create_authorization_response(
self,
uri: str,
http_method: str = "GET",
body: str | None = None,
headers: dict[str, str] | None = None,
scopes: list[str] | None = None,
credentials: dict[str, str] | None = None,
) -> tuple[dict[str, str], str, int]:
"""
Create authorization response.
Parameters:
- uri: Authorization request URI
- http_method: HTTP method
- body: Request body
- headers: Request headers
- scopes: Requested scopes
- credentials: User credentials and consent info
Returns:
Tuple of (headers, body, status_code) for authorization response
"""
def validate_authorization_request(
self,
uri: str,
http_method: str = "GET",
body: str | None = None,
headers: dict[str, str] | None = None,
) -> None:
"""Validate authorization request parameters."""OAuth 2.0 token endpoint handling token requests for all grant types. Issues access tokens, refresh tokens, and manages token exchange operations.
class TokenEndpoint:
def __init__(
self,
default_grant_type=None,
default_token_type=None,
token_generator=None,
refresh_token_generator=None,
expires_in=None,
): ...
def create_token_response(
self,
uri: str,
http_method: str = "POST",
body: str | None = None,
headers: dict[str, str] | None = None,
credentials: dict[str, str] | None = None,
grant_type_handler=None,
**kwargs,
) -> tuple[dict[str, str], str, int]:
"""
Create token response for any grant type.
Parameters:
- uri: Token request URI
- http_method: HTTP method
- body: Request body
- headers: Request headers
- credentials: Client credentials
- grant_type_handler: Custom grant type handler
Returns:
Tuple of (headers, body, status_code) for token response
"""
def validate_token_request(self, request) -> None:
"""Validate token request parameters."""OAuth 2.0 resource endpoint for validating access tokens when accessing protected resources. Verifies token validity, scope, and expiration.
class ResourceEndpoint:
def __init__(self, default_token=None, token_generator=None, expires_in=None): ...
def validate_protected_resource_request(
self,
uri: str,
http_method: str = "GET",
body: str | None = None,
headers: dict[str, str] | None = None,
scopes: list[str] | None = None,
) -> tuple[bool, dict]:
"""
Validate protected resource request.
Parameters:
- uri: Resource request URI
- http_method: HTTP method
- body: Request body
- headers: Request headers
- scopes: Required scopes
Returns:
Tuple of (valid, request) where valid indicates if token is valid
"""OAuth 2.0 token revocation endpoint (RFC 7009) for invalidating access and refresh tokens. Allows clients to revoke tokens when they're no longer needed.
class RevocationEndpoint:
def __init__(self, request_validator, supported_token_types=None, enable_jsonp=False): ...
def create_revocation_response(
self,
uri: str,
http_method: str = "POST",
body: str | None = None,
headers: dict[str, str] | None = None,
) -> tuple[dict[str, str], str, int]:
"""
Create token revocation response.
Parameters:
- uri: Revocation request URI
- http_method: HTTP method
- body: Request body containing token to revoke
- headers: Request headers
Returns:
Tuple of (headers, body, status_code) for revocation response
"""
def validate_revocation_request(self, request) -> None:
"""Validate token revocation request."""OAuth 2.0 token introspection endpoint (RFC 7662) for checking token status and metadata. Allows resource servers to query token information.
class IntrospectEndpoint:
def __init__(self, request_validator, supported_token_types=None): ...
def create_introspect_response(
self,
uri: str,
http_method: str = "POST",
body: str | None = None,
headers: dict[str, str] | None = None,
) -> tuple[dict[str, str], str, int]:
"""
Create token introspection response.
Parameters:
- uri: Introspection request URI
- http_method: HTTP method
- body: Request body containing token to introspect
- headers: Request headers
Returns:
Tuple of (headers, body, status_code) with token metadata
"""
def validate_introspect_request(self, request) -> None:
"""Validate token introspection request."""OAuth 2.0 authorization server metadata endpoint (RFC 8414) for publishing server capabilities and configuration. Enables automatic client configuration.
class MetadataEndpoint:
def __init__(self, endpoints, claims=None, **kwargs): ...
def create_metadata_response(
self,
uri: str,
http_method: str = "GET",
body: str | None = None,
headers: dict[str, str] | None = None,
) -> tuple[dict[str, str], str, int]:
"""
Create authorization server metadata response.
Returns:
Tuple of (headers, body, status_code) with server metadata JSON
"""Complete OAuth 2.0 server implementations combining multiple endpoints for common deployment scenarios.
class WebApplicationServer:
def __init__(
self,
request_validator,
token_expires_in=None,
token_generator=None,
refresh_token_generator=None,
**kwargs,
):
"""
Complete OAuth 2.0 server for web applications.
Combines AuthorizationEndpoint, TokenEndpoint, and ResourceEndpoint
to support the authorization code grant flow.
"""
# Inherits methods from AuthorizationEndpoint, TokenEndpoint, ResourceEndPoint
def create_authorization_response(self, uri, http_method="GET", body=None, headers=None, scopes=None, credentials=None): ...
def create_token_response(self, uri, http_method="POST", body=None, headers=None, credentials=None, **kwargs): ...
def validate_protected_resource_request(self, uri, http_method="GET", body=None, headers=None, scopes=None): ...class MobileApplicationServer:
def __init__(
self,
request_validator,
token_expires_in=None,
token_generator=None,
**kwargs,
):
"""
OAuth 2.0 server for mobile applications.
Supports the implicit grant flow for public clients
that cannot securely store credentials.
"""class LegacyApplicationServer:
def __init__(
self,
request_validator,
token_expires_in=None,
token_generator=None,
refresh_token_generator=None,
**kwargs,
):
"""
OAuth 2.0 server for legacy applications.
Supports the password credentials grant flow for
trusted first-party applications.
"""class BackendApplicationServer:
def __init__(
self,
request_validator,
token_expires_in=None,
token_generator=None,
**kwargs,
):
"""
OAuth 2.0 server for backend applications.
Supports the client credentials grant flow for
machine-to-machine authentication.
"""class Server:
def __init__(
self,
request_validator,
token_expires_in=None,
token_generator=None,
refresh_token_generator=None,
**kwargs,
):
"""
Base OAuth 2.0 server supporting all grant types.
Provides a flexible foundation for custom server implementations
with support for authorization code, implicit, password credentials,
and client credentials grants.
"""from oauthlib.oauth2 import WebApplicationServer, RequestValidator
from oauthlib.common import generate_token
class MyRequestValidator(RequestValidator):
def validate_client_id(self, client_id, request, *args, **kwargs):
# Check if client_id exists in your database
return client_id in ['your-client-id', 'another-client']
def authenticate_client(self, request, *args, **kwargs):
# Authenticate client credentials
client_id = getattr(request, 'client_id', None)
client_secret = getattr(request, 'client_secret', None)
return verify_client_credentials(client_id, client_secret)
def validate_redirect_uri(self, client_id, redirect_uri, request, *args, **kwargs):
# Validate redirect URI is registered for this client
return redirect_uri in get_registered_redirect_uris(client_id)
def get_default_scopes(self, client_id, request, *args, **kwargs):
# Return default scopes for client
return ['read']
def validate_scopes(self, client_id, scopes, client, request, *args, **kwargs):
# Validate requested scopes
allowed_scopes = get_client_scopes(client_id)
return all(scope in allowed_scopes for scope in scopes)
def save_authorization_code(self, client_id, code, request, *args, **kwargs):
# Store authorization code in database
store_authorization_code(client_id, code, request)
def validate_code(self, client_id, code, client, request, *args, **kwargs):
# Validate authorization code
return is_valid_authorization_code(client_id, code)
def confirm_redirect_uri(self, client_id, code, redirect_uri, client, request, *args, **kwargs):
# Confirm redirect URI matches original request
return get_code_redirect_uri(code) == redirect_uri
def save_token(self, token, request, *args, **kwargs):
# Store access token in database
store_access_token(token, request)
def validate_bearer_token(self, token, scopes, request):
# Validate bearer token and scopes
return is_valid_bearer_token(token, scopes)
def get_default_redirect_uri(self, client_id, request, *args, **kwargs):
# Return default redirect URI for client
return get_client_default_redirect_uri(client_id)
# Create server
validator = MyRequestValidator()
server = WebApplicationServer(validator)
# Handle authorization request
def handle_authorization(request):
try:
# Extract request details
uri = request.url
http_method = request.method
body = request.body
headers = dict(request.headers)
# Check if user is authenticated and consented
if not user_authenticated(request):
return redirect_to_login()
if not user_consented(request):
return show_consent_form()
# Create authorization response
headers, body, status = server.create_authorization_response(
uri, http_method, body, headers,
scopes=request.args.get('scope', '').split(),
credentials={'user_id': get_user_id(request)}
)
return Response(body, status=status, headers=headers)
except OAuth2Error as e:
return Response(e.json, status=e.status_code,
headers={'Content-Type': 'application/json'})
# Handle token request
def handle_token(request):
try:
uri = request.url
http_method = request.method
body = request.body
headers = dict(request.headers)
headers, body, status = server.create_token_response(
uri, http_method, body, headers
)
return Response(body, status=status, headers=headers)
except OAuth2Error as e:
return Response(e.json, status=e.status_code,
headers={'Content-Type': 'application/json'})from oauthlib.oauth2 import ResourceEndpoint
from functools import wraps
# Create resource endpoint
resource_endpoint = ResourceEndpoint()
def require_oauth(*required_scopes):
"""Decorator to protect API endpoints with OAuth."""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
# Extract request details
uri = request.url
http_method = request.method
body = request.body
headers = dict(request.headers)
try:
# Validate token and scopes
valid, oauth_request = resource_endpoint.validate_protected_resource_request(
uri, http_method, body, headers, required_scopes
)
if not valid:
return Response('Unauthorized', status=401)
# Add OAuth info to request context
request.oauth = oauth_request
return f(*args, **kwargs)
except OAuth2Error as e:
return Response(e.json, status=e.status_code,
headers={'Content-Type': 'application/json'})
return decorated_function
return decorator
# Usage in API endpoints
@app.route('/api/user/profile')
@require_oauth('profile:read')
def get_user_profile():
user_id = request.oauth.user_id
return jsonify(get_user_data(user_id))
@app.route('/api/user/settings', methods=['POST'])
@require_oauth('profile:write')
def update_user_settings():
user_id = request.oauth.user_id
update_user_data(user_id, request.json)
return jsonify({'status': 'updated'})from oauthlib.oauth2 import IntrospectEndpoint
# Create introspection endpoint
introspect_endpoint = IntrospectEndpoint(validator)
def handle_introspect(request):
"""Handle token introspection requests from resource servers."""
try:
uri = request.url
http_method = request.method
body = request.body
headers = dict(request.headers)
headers, body, status = introspect_endpoint.create_introspect_response(
uri, http_method, body, headers
)
return Response(body, status=status, headers=headers)
except OAuth2Error as e:
return Response(e.json, status=e.status_code,
headers={'Content-Type': 'application/json'})from oauthlib.oauth2 import MetadataEndpoint
# Create metadata endpoint
metadata = MetadataEndpoint([
('authorization_endpoint', 'https://auth.example.com/authorize'),
('token_endpoint', 'https://auth.example.com/token'),
('revocation_endpoint', 'https://auth.example.com/revoke'),
('introspection_endpoint', 'https://auth.example.com/introspect'),
], claims=[
('issuer', 'https://auth.example.com'),
('response_types_supported', ['code', 'token']),
('grant_types_supported', ['authorization_code', 'implicit', 'client_credentials']),
('token_endpoint_auth_methods_supported', ['client_secret_basic', 'client_secret_post']),
('scopes_supported', ['read', 'write', 'admin']),
])
def handle_metadata(request):
"""Serve authorization server metadata."""
headers, body, status = metadata.create_metadata_response(
request.url, request.method
)
return Response(body, status=status, headers=headers)# Configure token expiration times
server = WebApplicationServer(
validator,
token_expires_in=3600, # Access tokens expire in 1 hour
)
# Or use a function for dynamic expiration
def token_expires_in(request):
if 'long_lived' in request.scopes:
return 86400 # 24 hours for long-lived tokens
return 3600 # 1 hour for regular tokens
server = WebApplicationServer(validator, token_expires_in=token_expires_in)from oauthlib.common import generate_token
import jwt
def custom_token_generator(request):
"""Generate JWT access tokens."""
payload = {
'user_id': request.user_id,
'client_id': request.client_id,
'scopes': request.scopes,
'exp': datetime.utcnow() + timedelta(hours=1)
}
return jwt.encode(payload, 'secret-key', algorithm='HS256')
server = WebApplicationServer(
validator,
token_generator=custom_token_generator,
refresh_token_generator=generate_token
)Install with Tessl CLI
npx tessl i tessl/pypi-oauthlib