The ultimate Python library in building OAuth and OpenID Connect servers and clients.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Complete Flask integration providing OAuth client registry and OAuth 2.0 server implementations. Supports automatic token management, session integration, Flask-specific request/response handling, and seamless integration with Flask's application context and blueprints.
Flask-specific OAuth client registry with automatic token management and session integration.
class OAuth:
"""Flask OAuth client registry."""
def __init__(self, app: Flask = None) -> None:
"""
Initialize Flask OAuth registry.
Args:
app: Flask application instance
"""
def init_app(self, app: Flask) -> None:
"""
Initialize OAuth registry with Flask app.
Args:
app: Flask application instance
"""
def register(self, name: str, client_id: str = None, client_secret: str = None, client_kwargs: dict = None, **kwargs) -> 'FlaskOAuth2App':
"""
Register an OAuth provider.
Args:
name: Provider name for reference
client_id: OAuth client ID
client_secret: OAuth client secret
client_kwargs: Additional client configuration
**kwargs: Provider configuration (server_metadata_url, etc.)
Returns:
FlaskOAuth2App instance
"""
def create_client(self, name: str = None) -> 'FlaskOAuth2App':
"""
Create OAuth client by name.
Args:
name: Registered provider name
Returns:
FlaskOAuth2App instance
"""
class FlaskOAuth2App:
"""Flask OAuth 2.0 application integration."""
def __init__(self, name: str, client_id: str, client_secret: str = None, client_kwargs: dict = None, **kwargs) -> None:
"""
Initialize Flask OAuth 2.0 app.
Args:
name: Application name
client_id: OAuth client ID
client_secret: OAuth client secret
client_kwargs: Additional client configuration
**kwargs: Provider configuration
"""
def authorize_redirect(self, redirect_uri: str = None, **kwargs) -> Response:
"""
Create authorization redirect response.
Args:
redirect_uri: Callback URI
**kwargs: Additional authorization parameters
Returns:
Flask redirect response
"""
def authorize_access_token(self, **kwargs) -> dict:
"""
Handle authorization callback and fetch access token.
Args:
**kwargs: Additional token parameters
Returns:
Token dictionary
"""
def parse_id_token(self, token: dict, nonce: str = None, claims_options: dict = None) -> dict:
"""
Parse and validate OpenID Connect ID token.
Args:
token: Token dictionary containing id_token
nonce: Expected nonce value
claims_options: Claims validation options
Returns:
ID token claims dictionary
"""
def get(self, url: str, token: dict = None, **kwargs) -> Response:
"""
Make authenticated GET request.
Args:
url: Request URL
token: Access token dictionary
**kwargs: Additional request parameters
Returns:
HTTP response
"""
def post(self, url: str, token: dict = None, **kwargs) -> Response:
"""
Make authenticated POST request.
Args:
url: Request URL
token: Access token dictionary
**kwargs: Additional request parameters
Returns:
HTTP response
"""
def put(self, url: str, token: dict = None, **kwargs) -> Response:
"""
Make authenticated PUT request.
Args:
url: Request URL
token: Access token dictionary
**kwargs: Additional request parameters
Returns:
HTTP response
"""
def delete(self, url: str, token: dict = None, **kwargs) -> Response:
"""
Make authenticated DELETE request.
Args:
url: Request URL
token: Access token dictionary
**kwargs: Additional request parameters
Returns:
HTTP response
"""
class FlaskOAuth1App:
"""Flask OAuth 1.0 application integration."""
def __init__(self, name: str, client_id: str, client_secret: str = None, client_kwargs: dict = None, **kwargs) -> None:
"""
Initialize Flask OAuth 1.0 app.
Args:
name: Application name
client_id: OAuth client ID
client_secret: OAuth client secret
client_kwargs: Additional client configuration
**kwargs: Provider configuration
"""
def authorize_redirect(self, callback: str = None, **kwargs) -> Response:
"""
Create authorization redirect response.
Args:
callback: Callback URI
**kwargs: Additional authorization parameters
Returns:
Flask redirect response
"""
def authorize_access_token(self) -> dict:
"""
Handle authorization callback and fetch access token.
Returns:
Token dictionary
"""
def get(self, url: str, token: dict = None, **kwargs) -> Response:
"""Make authenticated GET request."""
def post(self, url: str, token: dict = None, **kwargs) -> Response:
"""Make authenticated POST request."""Flask-specific OAuth 2.0 authorization server implementation with Flask integration features.
class AuthorizationServer:
"""Flask OAuth 2.0 authorization server."""
def __init__(self, app: Flask = None, query_client: callable = None, save_token: callable = None) -> None:
"""
Initialize Flask authorization server.
Args:
app: Flask application instance
query_client: Function to query client by client_id
save_token: Function to save issued tokens
"""
def init_app(self, app: Flask, query_client: callable = None, save_token: callable = None) -> None:
"""
Initialize authorization server with Flask app.
Args:
app: Flask application instance
query_client: Function to query client by client_id
save_token: Function to save issued tokens
"""
def register_grant(self, grant_cls: type, extensions: list = None) -> None:
"""
Register a grant type.
Args:
grant_cls: Grant class to register
extensions: List of grant extensions
"""
def register_endpoint(self, endpoint: object) -> None:
"""
Register an endpoint.
Args:
endpoint: Endpoint instance
"""
def create_authorization_response(self, request: Request = None, grant_user: callable = None) -> Response:
"""
Create authorization response.
Args:
request: Flask request object (uses current request if None)
grant_user: Function to grant authorization to user
Returns:
Flask response object
"""
def create_token_response(self, request: Request = None) -> Response:
"""
Create token response.
Args:
request: Flask request object (uses current request if None)
Returns:
Flask response object
"""
def create_revocation_response(self, request: Request = None) -> Response:
"""
Create revocation response.
Args:
request: Flask request object (uses current request if None)
Returns:
Flask response object
"""
def create_introspection_response(self, request: Request = None) -> Response:
"""
Create introspection response.
Args:
request: Flask request object (uses current request if None)
Returns:
Flask response object
"""
class ResourceProtector:
"""Flask OAuth 2.0 resource protector."""
def __init__(self) -> None:
"""Initialize Flask resource protector."""
def register_token_validator(self, validator: 'BearerTokenValidator') -> None:
"""
Register bearer token validator.
Args:
validator: Token validator instance
"""
def __call__(self, scopes: list = None, optional: bool = False) -> callable:
"""
Decorator for protecting Flask routes.
Args:
scopes: Required scopes
optional: Whether protection is optional
Returns:
Route decorator function
"""
def acquire_token(self, scopes: list = None, request: Request = None) -> 'OAuth2Token':
"""
Acquire token from Flask request.
Args:
scopes: Required scopes
request: Flask request object
Returns:
OAuth2Token object
"""
# Current token proxy for accessing token in protected routes
current_token: LocalProxyBase classes for Flask-specific OAuth integrations.
class FlaskIntegration:
"""Flask framework integration base."""
def __init__(self, oauth: OAuth, name: str, fetch_token: callable = None, update_token: callable = None) -> None:
"""
Initialize Flask integration.
Args:
oauth: OAuth registry instance
name: Provider name
fetch_token: Function to fetch stored token
update_token: Function to update stored token
"""
def load_server_metadata(self) -> dict:
"""
Load OAuth server metadata.
Returns:
Server metadata dictionary
"""Flask-specific token management utilities.
def token_update(token: dict, refresh_token: str = None, access_token: str = None) -> None:
"""
Update token in Flask session.
Args:
token: Token dictionary to update
refresh_token: New refresh token
access_token: New access token
"""Flask signals for OAuth events.
# OAuth 2.0 server signals
client_authenticated: NamedSignal # Emitted when client is authenticated
token_authenticated: NamedSignal # Emitted when token is authenticated
token_revoked: NamedSignal # Emitted when token is revokedfrom flask import Flask, session, url_for, redirect, jsonify
from authlib.integrations.flask_client import OAuth
app = Flask(__name__)
app.secret_key = 'your-secret-key'
# Initialize OAuth registry
oauth = OAuth(app)
# Register Google OAuth provider
google = oauth.register(
name='google',
client_id='your-google-client-id',
client_secret='your-google-client-secret',
server_metadata_url='https://accounts.google.com/.well-known/openid_configuration',
client_kwargs={
'scope': 'openid email profile'
}
)
# Register GitHub OAuth provider
github = oauth.register(
name='github',
client_id='your-github-client-id',
client_secret='your-github-client-secret',
access_token_url='https://github.com/login/oauth/access_token',
access_token_params=None,
authorize_url='https://github.com/login/oauth/authorize',
authorize_params=None,
api_base_url='https://api.github.com/',
client_kwargs={'scope': 'user:email'},
)
@app.route('/login/google')
def google_login():
redirect_uri = url_for('google_callback', _external=True)
return google.authorize_redirect(redirect_uri)
@app.route('/callback/google')
def google_callback():
token = google.authorize_access_token()
# Parse ID token for OpenID Connect
user_info = google.parse_id_token(token)
session['user'] = user_info
return redirect(url_for('profile'))
@app.route('/login/github')
def github_login():
redirect_uri = url_for('github_callback', _external=True)
return github.authorize_redirect(redirect_uri)
@app.route('/callback/github')
def github_callback():
token = github.authorize_access_token()
# Get user info from API
response = github.get('user', token=token)
user_info = response.json()
session['user'] = user_info
return redirect(url_for('profile'))
@app.route('/profile')
def profile():
user = session.get('user')
if not user:
return redirect(url_for('login'))
return jsonify(user)from flask import Flask, request, session, render_template
from authlib.integrations.flask_oauth2 import AuthorizationServer, ResourceProtector
from authlib.oauth2.rfc6749.grants import AuthorizationCodeGrant, RefreshTokenGrant
from authlib.oauth2.rfc6749.models import ClientMixin, AuthorizationCodeMixin, TokenMixin
app = Flask(__name__)
app.secret_key = 'your-secret-key'
# Database models
class Client(db.Model, ClientMixin):
id = db.Column(db.Integer, primary_key=True)
client_id = db.Column(db.String(40), unique=True)
client_secret = db.Column(db.String(55))
def get_client_id(self):
return self.client_id
def check_client_secret(self, client_secret):
return self.client_secret == client_secret
class AuthorizationCode(db.Model, AuthorizationCodeMixin):
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, nullable=False)
code = db.Column(db.String(120), unique=True, nullable=False)
client_id = db.Column(db.String(40), nullable=False)
def get_user_id(self):
return self.user_id
class Token(db.Model, TokenMixin):
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, nullable=False)
client_id = db.Column(db.String(40), nullable=False)
access_token = db.Column(db.String(255), unique=True)
refresh_token = db.Column(db.String(255), unique=True)
def get_user_id(self):
return self.user_id
# Query functions
def query_client(client_id):
return Client.query.filter_by(client_id=client_id).first()
def save_token(token, request, *args, **kwargs):
if request.grant_type == 'authorization_code':
code = request.credential
item = Token(
client_id=request.client.client_id,
user_id=code.get_user_id(),
**token
)
else:
item = Token(
client_id=request.client.client_id,
**token
)
db.session.add(item)
db.session.commit()
# Initialize authorization server
authorization = AuthorizationServer(app, query_client=query_client, save_token=save_token)
# Register grants
authorization.register_grant(AuthorizationCodeGrant)
authorization.register_grant(RefreshTokenGrant)
@app.route('/authorize', methods=['GET', 'POST'])
def authorize():
user = get_current_user() # Your user authentication
if request.method == 'GET':
try:
grant = authorization.validate_consent_request(end_user=user)
return render_template('authorize.html', grant=grant, user=user)
except OAuth2Error as error:
return {'error': error.error}, 400
if not user:
return redirect('/login')
if request.form.get('confirm'):
grant_user = user
else:
grant_user = None
return authorization.create_authorization_response(grant_user=grant_user)
@app.route('/token', methods=['POST'])
def issue_token():
return authorization.create_token_response()
@app.route('/revoke', methods=['POST'])
def revoke_token():
return authorization.create_revocation_response()from authlib.integrations.flask_oauth2 import ResourceProtector, current_token
from authlib.oauth2.rfc6750 import BearerTokenValidator
# Token validator
class MyBearerTokenValidator(BearerTokenValidator):
def authenticate_token(self, token_string):
return Token.query.filter_by(access_token=token_string).first()
def request_invalid(self, request):
return False
def token_revoked(self, token):
return token.revoked
# Initialize resource protector
require_oauth = ResourceProtector()
require_oauth.register_token_validator(MyBearerTokenValidator())
@app.route('/api/user')
@require_oauth('profile')
def api_user():
user = get_user_by_id(current_token.user_id)
return jsonify({
'id': user.id,
'username': user.username,
'email': user.email
})
@app.route('/api/posts')
@require_oauth('read')
def api_posts():
posts = Post.query.filter_by(user_id=current_token.user_id).all()
return jsonify([{
'id': post.id,
'title': post.title,
'content': post.content
} for post in posts])
@app.route('/api/posts', methods=['POST'])
@require_oauth('write')
def api_create_post():
data = request.get_json()
post = Post(
user_id=current_token.user_id,
title=data['title'],
content=data['content']
)
db.session.add(post)
db.session.commit()
return jsonify({'id': post.id}), 201
# Optional protection
@app.route('/api/public')
@require_oauth(optional=True)
def api_public():
if current_token:
return jsonify({'message': f'Hello {current_token.user_id}'})
else:
return jsonify({'message': 'Hello anonymous'})from authlib.integrations.flask_oauth2 import client_authenticated, token_authenticated
@client_authenticated.connect
def on_client_authenticated(sender, client=None):
print(f'Client {client.client_id} authenticated')
# Log client authentication, update metrics, etc.
@token_authenticated.connect
def on_token_authenticated(sender, token=None):
print(f'Token for user {token.user_id} authenticated')
# Update user last activity, log API usage, etc.
@token_revoked.connect
def on_token_revoked(sender, token=None):
print(f'Token {token.access_token} revoked')
# Clean up related resources, notify user, etc.from flask import session
def fetch_token(name):
"""Fetch token from Flask session."""
return session.get(f'{name}_token')
def update_token(name, token, refresh_token=None, access_token=None):
"""Update token in Flask session."""
if refresh_token:
token['refresh_token'] = refresh_token
if access_token:
token['access_token'] = access_token
session[f'{name}_token'] = token
# Register provider with token callbacks
oauth.register(
name='provider',
client_id='client-id',
client_secret='client-secret',
fetch_token=lambda: fetch_token('provider'),
update_token=lambda token, **kwargs: update_token('provider', token, **kwargs)
)Install with Tessl CLI
npx tessl i tessl/pypi-authlib