The ultimate Python library in building OAuth and OpenID Connect servers and clients.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Seamless integration with popular HTTP clients including Requests and HTTPX for both synchronous and asynchronous OAuth operations. Provides automatic token management, request signing, and support for all OAuth flows with minimal configuration.
Complete OAuth 1.0 and OAuth 2.0 integration with the popular Requests library.
class OAuth1Session:
"""Requests OAuth 1.0 session."""
def __init__(self, client_key: str, client_secret: str = None, resource_owner_key: str = None, resource_owner_secret: str = None, callback_uri: str = None, signature_method: str = 'HMAC-SHA1', signature_type: str = 'AUTH_HEADER', rsa_key=None, verifier: str = None, client_class=None, force_include_body: bool = False, **request_kwargs) -> None:
"""
Initialize OAuth 1.0 session.
Args:
client_key: Client identifier
client_secret: Client secret
resource_owner_key: Resource owner token
resource_owner_secret: Resource owner token secret
callback_uri: Callback URI for authorization
signature_method: Signature method (HMAC-SHA1, RSA-SHA1, PLAINTEXT)
signature_type: Signature type (AUTH_HEADER, QUERY, BODY)
rsa_key: RSA private key for RSA-SHA1
verifier: OAuth verifier
client_class: Custom OAuth1Client class
force_include_body: Force include body in signature
**request_kwargs: Additional requests session arguments
"""
def fetch_request_token(self, request_token_url: str, realm: str = None) -> dict:
"""
Fetch request token from authorization server.
Args:
request_token_url: Request token endpoint URL
realm: Authorization realm
Returns:
Dictionary with oauth_token and oauth_token_secret
"""
def parse_authorization_response_url(self, authorization_response_url: str) -> dict:
"""
Parse authorization callback response.
Args:
authorization_response_url: Full callback URL with parameters
Returns:
Dictionary with parsed parameters
"""
def fetch_access_token(self, access_token_url: str, verifier: str = None) -> dict:
"""
Exchange authorization verifier for access token.
Args:
access_token_url: Access token endpoint URL
verifier: OAuth verifier from callback
Returns:
Dictionary with oauth_token and oauth_token_secret
"""
class OAuth1Auth:
"""Requests OAuth 1.0 authentication handler."""
def __init__(self, client_key: str, client_secret: str = None, resource_owner_key: str = None, resource_owner_secret: str = None, callback_uri: str = None, signature_method: str = 'HMAC-SHA1', signature_type: str = 'AUTH_HEADER', rsa_key=None, verifier: str = None, **kwargs) -> None:
"""
Initialize OAuth 1.0 auth handler.
Args:
client_key: Client identifier
client_secret: Client secret
resource_owner_key: Resource owner token
resource_owner_secret: Resource owner token secret
callback_uri: Callback URI
signature_method: Signature method
signature_type: Signature type
rsa_key: RSA private key
verifier: OAuth verifier
"""
def __call__(self, request: PreparedRequest) -> PreparedRequest:
"""
Apply OAuth 1.0 signature to request.
Args:
request: Prepared request object
Returns:
Request with OAuth signature applied
"""
class OAuth2Session(requests.Session):
"""Requests OAuth 2.0 session."""
def __init__(self, client_id: str = None, client: OAuth2Client = None, auto_refresh_url: str = None, auto_refresh_kwargs: dict = None, token_updater: callable = None, **kwargs) -> None:
"""
Initialize OAuth 2.0 session.
Args:
client_id: Client identifier
client: OAuth2Client instance
auto_refresh_url: URL for automatic token refresh
auto_refresh_kwargs: Arguments for token refresh
token_updater: Callback for token updates
**kwargs: Additional session arguments
"""
def new_state(self) -> str:
"""
Generate new state parameter for CSRF protection.
Returns:
Random state string
"""
def authorization_url(self, authorization_endpoint: str, state: str = None, **kwargs) -> tuple:
"""
Generate authorization URL.
Args:
authorization_endpoint: Authorization server's authorization endpoint
state: CSRF protection state (generated if None)
**kwargs: Additional authorization parameters
Returns:
Tuple of (authorization_url, state)
"""
def fetch_token(self, token_endpoint: str, code: str = None, authorization_response: str = None, body: str = '', auth: tuple = None, username: str = None, password: str = None, method: str = 'POST', timeout: int = None, headers: dict = None, verify: bool = True, proxies: dict = None, include_client_id: bool = None, **kwargs) -> dict:
"""
Fetch access token from authorization server.
Args:
token_endpoint: Token endpoint URL
code: Authorization code
authorization_response: Full authorization response URL
body: Additional request body
auth: HTTP basic auth tuple
username: Username for password grant
password: Password for password grant
method: HTTP method
timeout: Request timeout
headers: HTTP headers
verify: SSL verification
proxies: HTTP proxies
include_client_id: Include client ID in request
**kwargs: Additional token parameters
Returns:
Token dictionary
"""
def token_from_fragment(self, authorization_response_url: str) -> dict:
"""
Extract token from URL fragment (implicit flow).
Args:
authorization_response_url: Authorization response URL with fragment
Returns:
Token dictionary
"""
def refresh_token(self, token_url: str, refresh_token: str = None, body: str = '', auth: tuple = None, timeout: int = None, headers: dict = None, verify: bool = True, **kwargs) -> dict:
"""
Refresh access token.
Args:
token_url: Token endpoint URL
refresh_token: Refresh token (uses stored if None)
body: Additional request body
auth: HTTP basic auth tuple
timeout: Request timeout
headers: HTTP headers
verify: SSL verification
**kwargs: Additional parameters
Returns:
New token dictionary
"""
def revoke_token(self, revocation_endpoint: str, token: str = None, token_type_hint: str = None, body: str = '', auth: tuple = None, timeout: int = None, headers: dict = None, verify: bool = True, **kwargs) -> dict:
"""
Revoke access or refresh token.
Args:
revocation_endpoint: Revocation endpoint URL
token: Token to revoke
token_type_hint: Hint about token type
body: Additional request body
auth: HTTP basic auth tuple
timeout: Request timeout
headers: HTTP headers
verify: SSL verification
**kwargs: Additional parameters
Returns:
Response dictionary
"""
class OAuth2Auth:
"""Requests OAuth 2.0 authentication handler."""
def __init__(self, client_id: str = None, client: OAuth2Client = None, token: dict = None) -> None:
"""
Initialize OAuth 2.0 auth handler.
Args:
client_id: Client identifier
client: OAuth2Client instance
token: Access token dictionary
"""
def __call__(self, request: PreparedRequest) -> PreparedRequest:
"""
Apply OAuth 2.0 token to request.
Args:
request: Prepared request object
Returns:
Request with OAuth token applied
"""
class AssertionSession:
"""JWT assertion session for OAuth 2.0."""
def __init__(self, grant_type: str, assertion: str, issuer: str, audience: str, subject: str = None, **kwargs) -> None:
"""
Initialize JWT assertion session.
Args:
grant_type: Assertion grant type
assertion: JWT assertion string
issuer: Token issuer
audience: Token audience
subject: Token subject
**kwargs: Additional session arguments
"""
def fetch_token(self, token_endpoint: str, **kwargs) -> dict:
"""
Fetch token using JWT assertion.
Args:
token_endpoint: Token endpoint URL
**kwargs: Additional parameters
Returns:
Token dictionary
"""Complete OAuth 1.0 and OAuth 2.0 integration with HTTPX for both sync and async operations.
class OAuth1Client(httpx.Client):
"""HTTPX OAuth 1.0 client."""
def __init__(self, client_key: str, client_secret: str = None, token: str = None, token_secret: str = None, redirect_uri: str = None, rsa_key=None, verifier: str = None, signature_method: str = 'HMAC-SHA1', signature_type: str = 'AUTH_HEADER', force_include_body: bool = False, **kwargs) -> None:
"""
Initialize HTTPX OAuth 1.0 client.
Args:
client_key: Client identifier
client_secret: Client secret
token: OAuth token
token_secret: OAuth token secret
redirect_uri: Redirect URI
rsa_key: RSA private key
verifier: OAuth verifier
signature_method: Signature method
signature_type: Signature type
force_include_body: Force include body in signature
**kwargs: Additional HTTPX client arguments
"""
def fetch_request_token(self, url: str, realm: str = None, **kwargs) -> dict:
"""
Fetch request token.
Args:
url: Request token endpoint URL
realm: Authorization realm
**kwargs: Additional request arguments
Returns:
Dictionary with oauth_token and oauth_token_secret
"""
def create_authorization_url(self, url: str, **kwargs) -> str:
"""
Create authorization URL.
Args:
url: Authorization endpoint URL
**kwargs: Additional authorization parameters
Returns:
Authorization URL
"""
def fetch_access_token(self, url: str, verifier: str = None, **kwargs) -> dict:
"""
Fetch access token.
Args:
url: Access token endpoint URL
verifier: OAuth verifier
**kwargs: Additional request arguments
Returns:
Dictionary with oauth_token and oauth_token_secret
"""
class AsyncOAuth1Client(httpx.AsyncClient):
"""HTTPX async OAuth 1.0 client."""
def __init__(self, client_key: str, client_secret: str = None, token: str = None, token_secret: str = None, redirect_uri: str = None, rsa_key=None, verifier: str = None, signature_method: str = 'HMAC-SHA1', signature_type: str = 'AUTH_HEADER', force_include_body: bool = False, **kwargs) -> None:
"""Initialize async OAuth 1.0 client with same parameters as sync version."""
async def fetch_request_token(self, url: str, realm: str = None, **kwargs) -> dict:
"""Async version of fetch_request_token."""
async def fetch_access_token(self, url: str, verifier: str = None, **kwargs) -> dict:
"""Async version of fetch_access_token."""
class OAuth1Auth:
"""HTTPX OAuth 1.0 authentication handler."""
def __init__(self, client_key: str, client_secret: str = None, token: str = None, token_secret: str = None, signature_method: str = 'HMAC-SHA1', signature_type: str = 'AUTH_HEADER', rsa_key=None, verifier: str = None, **kwargs) -> None:
"""
Initialize OAuth 1.0 auth handler.
Args:
client_key: Client identifier
client_secret: Client secret
token: OAuth token
token_secret: OAuth token secret
signature_method: Signature method
signature_type: Signature type
rsa_key: RSA private key
verifier: OAuth verifier
"""
def auth_flow(self, request: httpx.Request) -> httpx.Request:
"""
Apply OAuth 1.0 signature to HTTPX request.
Args:
request: HTTPX request object
Returns:
Request with OAuth signature applied
"""
class OAuth2Client(httpx.Client):
"""HTTPX OAuth 2.0 client."""
def __init__(self, client_id: str = None, client_secret: str = None, token_endpoint_auth_method: str = 'client_secret_basic', revocation_endpoint_auth_method: str = None, scope: str = None, redirect_uri: str = None, token: dict = None, token_placement: str = 'header', update_token: callable = None, **kwargs) -> None:
"""
Initialize HTTPX OAuth 2.0 client.
Args:
client_id: Client identifier
client_secret: Client secret
token_endpoint_auth_method: Token endpoint auth method
revocation_endpoint_auth_method: Revocation endpoint auth method
scope: Default scope
redirect_uri: Default redirect URI
token: Access token dictionary
token_placement: Token placement (header, body, uri)
update_token: Token update callback
**kwargs: Additional HTTPX client arguments
"""
def create_authorization_url(self, authorization_endpoint: str, state: str = None, code_challenge: str = None, code_challenge_method: str = None, **kwargs) -> tuple:
"""
Create authorization URL.
Args:
authorization_endpoint: Authorization endpoint URL
state: CSRF state parameter
code_challenge: PKCE code challenge
code_challenge_method: PKCE challenge method
**kwargs: Additional authorization parameters
Returns:
Tuple of (authorization_url, state)
"""
def fetch_token(self, token_endpoint: str, code: str = None, authorization_response: str = None, body: str = '', auth: tuple = None, username: str = None, password: str = None, **kwargs) -> dict:
"""
Fetch access token.
Args:
token_endpoint: Token endpoint URL
code: Authorization code
authorization_response: Authorization response URL
body: Additional request body
auth: HTTP basic auth tuple
username: Username for password grant
password: Password for password grant
**kwargs: Additional token parameters
Returns:
Token dictionary
"""
def refresh_token(self, token_endpoint: str, refresh_token: str = None, body: str = '', auth: tuple = None, **kwargs) -> dict:
"""
Refresh access token.
Args:
token_endpoint: Token endpoint URL
refresh_token: Refresh token
body: Additional request body
auth: HTTP basic auth tuple
**kwargs: Additional parameters
Returns:
New token dictionary
"""
def revoke_token(self, revocation_endpoint: str, token: str = None, token_type_hint: str = None, body: str = '', auth: tuple = None, **kwargs) -> dict:
"""
Revoke token.
Args:
revocation_endpoint: Revocation endpoint URL
token: Token to revoke
token_type_hint: Token type hint
body: Additional request body
auth: HTTP basic auth tuple
**kwargs: Additional parameters
Returns:
Response dictionary
"""
class AsyncOAuth2Client(httpx.AsyncClient):
"""HTTPX async OAuth 2.0 client."""
def __init__(self, client_id: str = None, client_secret: str = None, token_endpoint_auth_method: str = 'client_secret_basic', revocation_endpoint_auth_method: str = None, scope: str = None, redirect_uri: str = None, token: dict = None, token_placement: str = 'header', update_token: callable = None, **kwargs) -> None:
"""Initialize async OAuth 2.0 client with same parameters as sync version."""
async def fetch_token(self, token_endpoint: str, code: str = None, authorization_response: str = None, body: str = '', auth: tuple = None, username: str = None, password: str = None, **kwargs) -> dict:
"""Async version of fetch_token."""
async def refresh_token(self, token_endpoint: str, refresh_token: str = None, body: str = '', auth: tuple = None, **kwargs) -> dict:
"""Async version of refresh_token."""
async def revoke_token(self, revocation_endpoint: str, token: str = None, token_type_hint: str = None, body: str = '', auth: tuple = None, **kwargs) -> dict:
"""Async version of revoke_token."""
class OAuth2Auth:
"""HTTPX OAuth 2.0 authentication handler."""
def __init__(self, client_id: str = None, client_secret: str = None, token: dict = None, token_placement: str = 'header', **kwargs) -> None:
"""
Initialize OAuth 2.0 auth handler.
Args:
client_id: Client identifier
client_secret: Client secret
token: Access token dictionary
token_placement: Token placement (header, body, uri)
"""
def auth_flow(self, request: httpx.Request) -> httpx.Request:
"""
Apply OAuth 2.0 token to HTTPX request.
Args:
request: HTTPX request object
Returns:
Request with OAuth token applied
"""
class OAuth2ClientAuth:
"""HTTPX OAuth 2.0 client authentication."""
def __init__(self, client_id: str, client_secret: str = None, method: str = 'client_secret_basic') -> None:
"""
Initialize client authentication.
Args:
client_id: Client identifier
client_secret: Client secret
method: Authentication method
"""
def auth_flow(self, request: httpx.Request) -> httpx.Request:
"""
Apply client authentication to request.
Args:
request: HTTPX request object
Returns:
Request with client authentication applied
"""
class AssertionClient(httpx.Client):
"""HTTPX JWT assertion client."""
def __init__(self, grant_type: str, assertion: str, **kwargs) -> None:
"""
Initialize JWT assertion client.
Args:
grant_type: Assertion grant type
assertion: JWT assertion string
**kwargs: Additional client arguments
"""
def fetch_token(self, token_endpoint: str, **kwargs) -> dict:
"""
Fetch token using JWT assertion.
Args:
token_endpoint: Token endpoint URL
**kwargs: Additional parameters
Returns:
Token dictionary
"""
class AsyncAssertionClient(httpx.AsyncClient):
"""HTTPX async JWT assertion client."""
def __init__(self, grant_type: str, assertion: str, **kwargs) -> None:
"""Initialize async assertion client with same parameters as sync version."""
async def fetch_token(self, token_endpoint: str, **kwargs) -> dict:
"""Async version of fetch_token."""from authlib.integrations.requests_client import OAuth2Session
import requests
# Basic OAuth 2.0 flow
client = OAuth2Session(
client_id='your-client-id',
redirect_uri='https://your-app.com/callback'
)
# Step 1: Get authorization URL
authorization_url, state = client.authorization_url(
'https://provider.com/authorize',
scope='read write'
)
print(f'Please visit: {authorization_url}')
# Step 2: Exchange authorization code for token
authorization_response = input('Paste the full redirect URL here:')
token = client.fetch_token(
'https://provider.com/token',
authorization_response=authorization_response,
client_secret='your-client-secret'
)
# Step 3: Make authenticated requests
response = client.get('https://api.provider.com/user')
user_data = response.json()
# Automatic token refresh
def token_saver(token):
# Save token to database or file
save_token_to_storage(token)
client = OAuth2Session(
client_id='your-client-id',
token=stored_token,
auto_refresh_url='https://provider.com/token',
auto_refresh_kwargs={'client_id': 'your-client-id', 'client_secret': 'your-client-secret'},
token_updater=token_saver
)
# Token will be automatically refreshed if expired
response = client.get('https://api.provider.com/user')from authlib.integrations.requests_client import OAuth1Session
# OAuth 1.0 flow
client = OAuth1Session(
client_key='your-client-key',
client_secret='your-client-secret'
)
# Step 1: Fetch request token
request_token = client.fetch_request_token('https://provider.com/oauth/request_token')
# Step 2: Get authorization URL
authorization_url = f"https://provider.com/oauth/authorize?oauth_token={request_token['oauth_token']}"
print(f'Please visit: {authorization_url}')
# Step 3: Get verifier from user
verifier = input('Please enter the verifier:')
client.verifier = verifier
# Step 4: Fetch access token
access_token = client.fetch_access_token('https://provider.com/oauth/access_token')
# Step 5: Make authenticated requests
response = client.get('https://api.provider.com/user')
user_data = response.json()from authlib.integrations.httpx_client import OAuth2Client
import httpx
# Synchronous OAuth 2.0 client
with OAuth2Client(
client_id='your-client-id',
client_secret='your-client-secret'
) as client:
# Authorization code flow
authorization_url, state = client.create_authorization_url(
'https://provider.com/authorize',
scope='read write'
)
# After user authorization
token = client.fetch_token(
'https://provider.com/token',
code='authorization-code-from-callback'
)
# Make authenticated requests
response = client.get('https://api.provider.com/user')
user_data = response.json()
# Asynchronous OAuth 2.0 client
import asyncio
async def oauth_flow():
async with AsyncOAuth2Client(
client_id='your-client-id',
client_secret='your-client-secret'
) as client:
# Fetch token using client credentials
token = await client.fetch_token(
'https://provider.com/token',
grant_type='client_credentials',
scope='api:read'
)
# Make authenticated requests
response = await client.get('https://api.provider.com/data')
data = response.json()
return data
# Run async function
data = asyncio.run(oauth_flow())from authlib.integrations.httpx_client import OAuth1Client, AsyncOAuth1Client
# Synchronous OAuth 1.0
with OAuth1Client(
client_key='your-client-key',
client_secret='your-client-secret'
) as client:
# Complete OAuth 1.0 flow
request_token = client.fetch_request_token('https://provider.com/oauth/request_token')
auth_url = client.create_authorization_url(
'https://provider.com/oauth/authorize'
)
# After user authorization
client.verifier = 'user-provided-verifier'
access_token = client.fetch_access_token('https://provider.com/oauth/access_token')
# Make authenticated requests
response = client.get('https://api.provider.com/user')
# Asynchronous OAuth 1.0
async def async_oauth1_flow():
async with AsyncOAuth1Client(
client_key='your-client-key',
client_secret='your-client-secret'
) as client:
request_token = await client.fetch_request_token(
'https://provider.com/oauth/request_token'
)
# Handle authorization...
access_token = await client.fetch_access_token(
'https://provider.com/oauth/access_token',
verifier='user-verifier'
)
response = await client.get('https://api.provider.com/user')
return response.json()
user_data = asyncio.run(async_oauth1_flow())import requests
from authlib.integrations.requests_client import OAuth2Auth
# Using OAuth2Auth with regular requests
auth = OAuth2Auth(token={'access_token': 'your-access-token', 'token_type': 'Bearer'})
response = requests.get('https://api.provider.com/user', auth=auth)
# Custom token placement
auth = OAuth2Auth(
token={'access_token': 'your-access-token', 'token_type': 'Bearer'},
token_placement='uri' # Add token as query parameter
)
response = requests.get('https://api.provider.com/user', auth=auth)from authlib.integrations.requests_client import AssertionSession
from authlib.jose import JsonWebToken
import time
# Create JWT assertion
jwt = JsonWebToken(['RS256'])
header = {'alg': 'RS256'}
payload = {
'iss': 'your-client-id',
'sub': 'your-client-id',
'aud': 'https://provider.com/token',
'iat': int(time.time()),
'exp': int(time.time()) + 3600
}
assertion = jwt.encode(header, payload, private_key)
# Use assertion to get token
session = AssertionSession(
grant_type='urn:ietf:params:oauth:grant-type:jwt-bearer',
assertion=assertion,
issuer='your-client-id',
audience='https://provider.com/token'
)
token = session.fetch_token('https://provider.com/token')
# Make authenticated requests
response = session.get('https://api.provider.com/user')from authlib.integrations.requests_client import OAuth2Session
from authlib.common.errors import AuthlibHTTPError
from requests.exceptions import HTTPError
client = OAuth2Session(client_id='your-client-id')
try:
token = client.fetch_token(
'https://provider.com/token',
code='invalid-code',
client_secret='your-client-secret'
)
except AuthlibHTTPError as error:
print(f'OAuth error: {error.error}')
print(f'Description: {error.description}')
print(f'Status code: {error.status_code}')
except HTTPError as error:
print(f'HTTP error: {error.response.status_code}')import json
from authlib.integrations.requests_client import OAuth2Session
def load_token():
try:
with open('token.json', 'r') as f:
return json.load(f)
except FileNotFoundError:
return None
def save_token(token):
with open('token.json', 'w') as f:
json.dump(token, f)
# Load existing token
token = load_token()
client = OAuth2Session(
client_id='your-client-id',
token=token,
auto_refresh_url='https://provider.com/token',
auto_refresh_kwargs={'client_secret': 'your-client-secret'},
token_updater=save_token
)
# Token will be automatically saved when refreshed
response = client.get('https://api.provider.com/user')Install with Tessl CLI
npx tessl i tessl/pypi-authlib