A comprehensive Python library for implementing OAuth 1.0 and OAuth 2.0 authentication protocols
Comprehensive request validation interfaces for OAuth 1.0 and OAuth 2.0 flows. Provides extensible validation framework for implementing custom authentication logic, security policies, and token management.
Base request validation interface that must be implemented to provide custom authentication and authorization logic for OAuth 2.0 servers.
class RequestValidator:
def __init__(self): ...
# Client Authentication
def client_authentication_required(
self,
request,
*args,
**kwargs,
) -> bool:
"""
Determine if client authentication is required.
Returns:
True if client must authenticate, False otherwise
"""
def authenticate_client(self, request, *args, **kwargs) -> bool:
"""
Authenticate client credentials.
Parameters:
- request: OAuth request object with client credentials
Returns:
True if client is authenticated, False otherwise
"""
def authenticate_client_id(
self,
client_id: str,
request,
*args,
**kwargs,
) -> bool:
"""
Authenticate client by ID only (for public clients).
Parameters:
- client_id: Client identifier
- request: OAuth request object
Returns:
True if client ID is valid, False otherwise
"""
# Client Validation
def validate_client_id(
self,
client_id: str,
request,
*args,
**kwargs,
) -> bool:
"""
Validate client identifier.
Parameters:
- client_id: Client identifier to validate
- request: OAuth request object
Returns:
True if client ID is valid, False otherwise
"""
def validate_redirect_uri(
self,
client_id: str,
redirect_uri: str,
request,
*args,
**kwargs,
) -> bool:
"""
Validate redirect URI for client.
Parameters:
- client_id: Client identifier
- redirect_uri: Redirect URI to validate
- request: OAuth request object
Returns:
True if redirect URI is valid for client, False otherwise
"""
def get_default_redirect_uri(
self,
client_id: str,
request,
*args,
**kwargs,
) -> str:
"""
Get default redirect URI for client.
Parameters:
- client_id: Client identifier
- request: OAuth request object
Returns:
Default redirect URI for client
"""
# Scope Validation
def validate_scopes(
self,
client_id: str,
scopes: list[str],
client,
request,
*args,
**kwargs,
) -> bool:
"""
Validate requested scopes for client.
Parameters:
- client_id: Client identifier
- scopes: List of requested scopes
- client: Client object
- request: OAuth request object
Returns:
True if scopes are valid for client, False otherwise
"""
def get_default_scopes(
self,
client_id: str,
request,
*args,
**kwargs,
) -> list[str]:
"""
Get default scopes for client.
Parameters:
- client_id: Client identifier
- request: OAuth request object
Returns:
List of default scopes for client
"""
# Response Type Validation
def validate_response_type(
self,
client_id: str,
response_type: str,
client,
request,
*args,
**kwargs,
) -> bool:
"""
Validate response type for client.
Parameters:
- client_id: Client identifier
- response_type: Response type (code, token)
- client: Client object
- request: OAuth request object
Returns:
True if response type is valid for client, False otherwise
"""
# Authorization Code Validation
def validate_code(
self,
client_id: str,
code: str,
client,
request,
*args,
**kwargs,
) -> bool:
"""
Validate authorization code.
Parameters:
- client_id: Client identifier
- code: Authorization code to validate
- client: Client object
- request: OAuth request object
Returns:
True if code is valid, False otherwise
"""
def confirm_redirect_uri(
self,
client_id: str,
code: str,
redirect_uri: str,
client,
request,
*args,
**kwargs,
) -> bool:
"""
Confirm redirect URI matches original authorization request.
Parameters:
- client_id: Client identifier
- code: Authorization code
- redirect_uri: Redirect URI to confirm
- client: Client object
- request: OAuth request object
Returns:
True if redirect URI matches, False otherwise
"""
def save_authorization_code(
self,
client_id: str,
code: dict,
request,
*args,
**kwargs,
) -> None:
"""
Store authorization code for later validation.
Parameters:
- client_id: Client identifier
- code: Authorization code data
- request: OAuth request object
"""
def invalidate_authorization_code(
self,
client_id: str,
code: str,
request,
*args,
**kwargs,
) -> None:
"""
Invalidate used authorization code.
Parameters:
- client_id: Client identifier
- code: Authorization code to invalidate
- request: OAuth request object
"""
# Grant Type Validation
def validate_grant_type(
self,
client_id: str,
grant_type: str,
client,
request,
*args,
**kwargs,
) -> bool:
"""
Validate grant type for client.
Parameters:
- client_id: Client identifier
- grant_type: Grant type to validate
- client: Client object
- request: OAuth request object
Returns:
True if grant type is valid for client, False otherwise
"""
# User Credentials Validation (Password Grant)
def validate_user(
self,
username: str,
password: str,
client,
request,
*args,
**kwargs,
) -> bool:
"""
Validate user credentials for password grant.
Parameters:
- username: User's username
- password: User's password
- client: Client object
- request: OAuth request object
Returns:
True if credentials are valid, False otherwise
"""
# Token Validation
def validate_bearer_token(
self,
token: str,
scopes: list[str],
request,
) -> bool:
"""
Validate bearer token and required scopes.
Parameters:
- token: Bearer token to validate
- scopes: Required scopes for resource access
- request: OAuth request object
Returns:
True if token is valid and has required scopes, False otherwise
"""
def validate_refresh_token(
self,
refresh_token: str,
client,
request,
*args,
**kwargs,
) -> bool:
"""
Validate refresh token.
Parameters:
- refresh_token: Refresh token to validate
- client: Client object
- request: OAuth request object
Returns:
True if refresh token is valid, False otherwise
"""
def get_original_scopes(
self,
refresh_token: str,
request,
*args,
**kwargs,
) -> list[str]:
"""
Get original scopes associated with refresh token.
Parameters:
- refresh_token: Refresh token
- request: OAuth request object
Returns:
List of original scopes
"""
def is_within_original_scope(
self,
request_scopes: list[str],
refresh_token: str,
request,
*args,
**kwargs,
) -> bool:
"""
Check if requested scopes are within original scope.
Parameters:
- request_scopes: Requested scopes
- refresh_token: Refresh token
- request: OAuth request object
Returns:
True if requested scopes are subset of original, False otherwise
"""
# Token Storage
def save_token(
self,
token: dict,
request,
*args,
**kwargs,
) -> None:
"""
Store access token.
Parameters:
- token: Token data to store
- request: OAuth request object
"""
def save_bearer_token(
self,
token: dict,
request,
*args,
**kwargs,
) -> None:
"""
Store bearer token.
Parameters:
- token: Bearer token data to store
- request: OAuth request object
"""
# Token Management
def revoke_token(
self,
token: str,
token_type_hint: str,
request,
*args,
**kwargs,
) -> None:
"""
Revoke access or refresh token.
Parameters:
- token: Token to revoke
- token_type_hint: Type of token (access_token, refresh_token)
- request: OAuth request object
"""
def rotate_refresh_token(self, request) -> bool:
"""
Determine if refresh token should be rotated.
Parameters:
- request: OAuth request object
Returns:
True if refresh token should be rotated, False otherwise
"""
def introspect_token(
self,
token: str,
token_type_hint: str,
request,
*args,
**kwargs,
) -> dict | None:
"""
Get token metadata for introspection.
Parameters:
- token: Token to introspect
- token_type_hint: Type of token
- request: OAuth request object
Returns:
Dictionary with token metadata or None if invalid
"""
# PKCE Support
def is_pkce_required(self, client_id: str, request) -> bool:
"""
Determine if PKCE is required for client.
Parameters:
- client_id: Client identifier
- request: OAuth request object
Returns:
True if PKCE is required, False otherwise
"""
def get_code_challenge(self, code: str, request) -> str:
"""
Get PKCE code challenge for authorization code.
Parameters:
- code: Authorization code
- request: OAuth request object
Returns:
PKCE code challenge
"""
def get_code_challenge_method(self, code: str, request) -> str:
"""
Get PKCE code challenge method for authorization code.
Parameters:
- code: Authorization code
- request: OAuth request object
Returns:
PKCE code challenge method (plain, S256)
"""
# CORS Support
def is_origin_allowed(
self,
client_id: str,
origin: str,
request,
*args,
**kwargs,
) -> bool:
"""
Check if origin is allowed for CORS requests.
Parameters:
- client_id: Client identifier
- origin: Request origin
- request: OAuth request object
Returns:
True if origin is allowed, False otherwise
"""from oauthlib.oauth2 import RequestValidator
import hashlib
import secrets
from datetime import datetime, timedelta
class DatabaseRequestValidator(RequestValidator):
"""Complete request validator implementation using database storage."""
def __init__(self, db_session):
self.db = db_session
# Client Management
def validate_client_id(self, client_id, request, *args, **kwargs):
client = self.db.query(Client).filter_by(client_id=client_id).first()
return client is not None and client.is_active
def authenticate_client(self, request, *args, **kwargs):
client_id = getattr(request, 'client_id', None)
client_secret = getattr(request, 'client_secret', None)
if not client_id or not client_secret:
return False
client = self.db.query(Client).filter_by(client_id=client_id).first()
if not client or not client.is_active:
return False
# Use constant-time comparison for security
return secrets.compare_digest(client.client_secret, client_secret)
def client_authentication_required(self, request, *args, **kwargs):
# Require authentication for confidential clients
client = self.db.query(Client).filter_by(
client_id=request.client_id
).first()
return client and client.client_type == 'confidential'
# Redirect URI Validation
def validate_redirect_uri(self, client_id, redirect_uri, request, *args, **kwargs):
client = self.db.query(Client).filter_by(client_id=client_id).first()
if not client:
return False
return redirect_uri in [uri.uri for uri in client.redirect_uris]
def get_default_redirect_uri(self, client_id, request, *args, **kwargs):
client = self.db.query(Client).filter_by(client_id=client_id).first()
if client and client.redirect_uris:
return client.redirect_uris[0].uri
return None
# Scope Management
def validate_scopes(self, client_id, scopes, client, request, *args, **kwargs):
client_obj = self.db.query(Client).filter_by(client_id=client_id).first()
if not client_obj:
return False
allowed_scopes = {scope.name for scope in client_obj.allowed_scopes}
return all(scope in allowed_scopes for scope in scopes)
def get_default_scopes(self, client_id, request, *args, **kwargs):
client = self.db.query(Client).filter_by(client_id=client_id).first()
if client:
return [scope.name for scope in client.default_scopes]
return []
# Authorization Code Management
def save_authorization_code(self, client_id, code, request, *args, **kwargs):
auth_code = AuthorizationCode(
client_id=client_id,
user_id=request.user.id,
code=code['code'],
redirect_uri=request.redirect_uri,
scopes=' '.join(request.scopes),
code_challenge=getattr(request, 'code_challenge', None),
code_challenge_method=getattr(request, 'code_challenge_method', None),
expires_at=datetime.utcnow() + timedelta(minutes=10)
)
self.db.add(auth_code)
self.db.commit()
def validate_code(self, client_id, code, client, request, *args, **kwargs):
auth_code = self.db.query(AuthorizationCode).filter_by(
client_id=client_id,
code=code
).first()
if not auth_code:
return False
if auth_code.expires_at < datetime.utcnow():
return False
# Validate PKCE if present
if auth_code.code_challenge:
code_verifier = getattr(request, 'code_verifier', None)
if not code_verifier:
return False
if auth_code.code_challenge_method == 'S256':
challenge = hashlib.sha256(code_verifier.encode()).hexdigest()
else:
challenge = code_verifier
if not secrets.compare_digest(auth_code.code_challenge, challenge):
return False
return True
def confirm_redirect_uri(self, client_id, code, redirect_uri, client, request, *args, **kwargs):
auth_code = self.db.query(AuthorizationCode).filter_by(
client_id=client_id,
code=code
).first()
return auth_code and auth_code.redirect_uri == redirect_uri
def invalidate_authorization_code(self, client_id, code, request, *args, **kwargs):
auth_code = self.db.query(AuthorizationCode).filter_by(
client_id=client_id,
code=code
).first()
if auth_code:
self.db.delete(auth_code)
self.db.commit()
# Token Management
def save_bearer_token(self, token, request, *args, **kwargs):
access_token = AccessToken(
client_id=request.client_id,
user_id=getattr(request, 'user_id', None),
token=token['access_token'],
scopes=token.get('scope', ''),
expires_at=datetime.utcnow() + timedelta(seconds=token['expires_in'])
)
self.db.add(access_token)
if 'refresh_token' in token:
refresh_token = RefreshToken(
client_id=request.client_id,
user_id=getattr(request, 'user_id', None),
token=token['refresh_token'],
scopes=token.get('scope', ''),
access_token=access_token
)
self.db.add(refresh_token)
self.db.commit()
def validate_bearer_token(self, token, scopes, request):
access_token = self.db.query(AccessToken).filter_by(
token=token
).first()
if not access_token:
return False
if access_token.expires_at < datetime.utcnow():
return False
token_scopes = access_token.scopes.split()
if not all(scope in token_scopes for scope in scopes):
return False
# Add user info to request
request.user_id = access_token.user_id
request.client_id = access_token.client_id
request.scopes = token_scopes
return True
def validate_refresh_token(self, refresh_token, client, request, *args, **kwargs):
token = self.db.query(RefreshToken).filter_by(
token=refresh_token,
client_id=request.client_id
).first()
return token is not None
# User Authentication (Password Grant)
def validate_user(self, username, password, client, request, *args, **kwargs):
user = self.db.query(User).filter_by(username=username).first()
if not user or not user.is_active:
return False
# Use secure password verification
if verify_password(password, user.password_hash):
request.user_id = user.id
return True
return False
# Grant Type Validation
def validate_grant_type(self, client_id, grant_type, client, request, *args, **kwargs):
client_obj = self.db.query(Client).filter_by(client_id=client_id).first()
if not client_obj:
return False
allowed_grants = {grant.grant_type for grant in client_obj.allowed_grants}
return grant_type in allowed_grants
# PKCE Support
def is_pkce_required(self, client_id, request):
client = self.db.query(Client).filter_by(client_id=client_id).first()
return client and client.require_pkce
# Token Introspection
def introspect_token(self, token, token_type_hint, request, *args, **kwargs):
if token_type_hint == 'refresh_token':
token_obj = self.db.query(RefreshToken).filter_by(token=token).first()
else:
token_obj = self.db.query(AccessToken).filter_by(token=token).first()
if not token_obj:
return {'active': False}
if hasattr(token_obj, 'expires_at') and token_obj.expires_at < datetime.utcnow():
return {'active': False}
return {
'active': True,
'client_id': token_obj.client_id,
'username': token_obj.user.username if token_obj.user else None,
'scope': token_obj.scopes,
'exp': int(token_obj.expires_at.timestamp()) if hasattr(token_obj, 'expires_at') else None,
'token_type': 'Bearer' if isinstance(token_obj, AccessToken) else 'refresh_token'
}Install with Tessl CLI
npx tessl i tessl/pypi-oauthlib