OAuthlib authentication support for Requests.
—
Comprehensive OAuth 2.0 support with automatic token refresh, PKCE extension, compliance hooks, and support for all standard grant types. Provides both low-level token authentication and high-level workflow management.
Low-level authentication handler that adds OAuth 2.0 tokens to requests. Implements the requests.auth.AuthBase interface for simple token-based authentication.
class OAuth2(requests.auth.AuthBase):
def __init__(
self,
client_id: str = None,
client = None,
token: dict = None
):
"""
Create OAuth 2.0 authentication handler.
Args:
client_id (str, optional): Client ID from provider registration
client: oauthlib.oauth2.Client instance (default: WebApplicationClient)
token (dict, optional): Token dictionary with access_token and token_type
"""Usage Example:
import requests
from requests_oauthlib import OAuth2
# Create auth handler with token
auth = OAuth2(token={
'access_token': 'your_access_token',
'token_type': 'Bearer'
})
# Use with requests
response = requests.get('https://api.example.com/protected', auth=auth)High-level session class that extends requests.Session with comprehensive OAuth 2.0 workflow support, automatic token refresh, and provider compliance hooks.
class OAuth2Session(requests.Session):
def __init__(
self,
client_id: str = None,
client = None,
auto_refresh_url: str = None,
auto_refresh_kwargs: dict = None,
scope: list = None,
redirect_uri: str = None,
token: dict = None,
state = None,
token_updater = None,
pkce: str = None,
**kwargs
):
"""
Create OAuth 2.0 session for workflow management.
Args:
client_id (str, optional): Client ID from provider
client: oauthlib.oauth2.Client instance (default: WebApplicationClient)
auto_refresh_url (str, optional): Token refresh endpoint URL
auto_refresh_kwargs (dict, optional): Extra arguments for token refresh
scope (list, optional): List of requested scopes
redirect_uri (str, optional): Registered callback URI
token (dict, optional): Initial token dictionary
state: CSRF protection string or callable
token_updater: Callback function for token updates
pkce (str, optional): PKCE method ("S256", "plain", or None)
"""Properties:
@property
def scope(self) -> list:
"""OAuth scopes for the session"""
@scope.setter
def scope(self, scope: list):
"""Set OAuth scopes"""
@property
def client_id(self) -> str:
"""Client identifier"""
@client_id.setter
def client_id(self, value: str):
"""Set client identifier"""
@client_id.deleter
def client_id(self):
"""Delete client identifier"""
@property
def token(self) -> dict:
"""Current token dictionary"""
@token.setter
def token(self, value: dict):
"""Set token and populate client attributes"""
@property
def access_token(self) -> str:
"""Current access token"""
@access_token.setter
def access_token(self, value: str):
"""Set access token"""
@access_token.deleter
def access_token(self):
"""Delete access token"""
@property
def authorized(self) -> bool:
"""True if session has valid access token"""def new_state(self) -> str:
"""
Generate new state string for CSRF protection.
Returns:
str: Generated state string
"""
def authorization_url(
self,
url: str,
state: str = None,
**kwargs
) -> tuple:
"""
Create authorization URL for user consent.
Args:
url (str): Authorization endpoint URL (must be HTTPS)
state (str, optional): CSRF protection state
**kwargs: Additional parameters for authorization URL
Returns:
tuple: (authorization_url, state)
"""
def fetch_token(
self,
token_url: str,
code: str = None,
authorization_response: str = None,
body: str = "",
auth = None,
username: str = None,
password: str = None,
method: str = "POST",
force_querystring: bool = False,
timeout = None,
headers: dict = None,
verify = None,
proxies = None,
include_client_id = None,
client_secret: str = None,
cert = None,
**kwargs
) -> dict:
"""
Fetch access token from token endpoint.
Args:
token_url (str): Token endpoint URL (must be HTTPS)
code (str, optional): Authorization code from callback
authorization_response (str, optional): Full callback URL
body (str): Additional request body content
auth: Authentication tuple or method
username (str, optional): Username for password grant
password (str, optional): Password for password grant
method (str): HTTP method (default: "POST")
force_querystring (bool): Force parameters in query string
timeout: Request timeout
headers (dict, optional): Additional request headers
verify: SSL certificate verification
proxies: Request proxies
include_client_id: Include client_id in request body
client_secret (str, optional): Client secret
cert: Client certificate for mTLS
**kwargs: Additional token request parameters
Returns:
dict: Token response from provider
Raises:
InsecureTransportError: If token_url is not HTTPS
ValueError: If required parameters are missing
"""
def token_from_fragment(self, authorization_response: str) -> dict:
"""
Parse token from URI fragment (for Implicit Grant).
Args:
authorization_response (str): Full callback URL with fragment
Returns:
dict: Parsed token data
"""def refresh_token(
self,
token_url: str,
refresh_token: str = None,
body: str = "",
auth = None,
timeout = None,
headers: dict = None,
verify = None,
proxies = None,
**kwargs
) -> dict:
"""
Refresh access token using refresh token.
Args:
token_url (str): Refresh endpoint URL (must be HTTPS)
refresh_token (str, optional): Refresh token to use
body (str): Additional request body
auth: Authentication method
timeout: Request timeout
headers (dict, optional): Request headers
verify: SSL verification
proxies: Request proxies
**kwargs: Additional refresh parameters
Returns:
dict: New token response
Raises:
ValueError: If no token endpoint configured
InsecureTransportError: If token_url is not HTTPS
"""def request(
self,
method: str,
url: str,
data = None,
headers: dict = None,
withhold_token: bool = False,
client_id: str = None,
client_secret: str = None,
files = None,
**kwargs
):
"""
Make authenticated HTTP request with automatic token handling.
Args:
method (str): HTTP method
url (str): Request URL (must be HTTPS)
data: Request body data
headers (dict, optional): Request headers
withhold_token (bool): Skip adding OAuth token
client_id (str, optional): Client ID for auto-refresh
client_secret (str, optional): Client secret for auto-refresh
files: File uploads
**kwargs: Additional request arguments
Returns:
Response: HTTP response object
Raises:
InsecureTransportError: If URL is not HTTPS
TokenExpiredError: If token expired and auto-refresh fails
TokenUpdated: If token was automatically refreshed (warning)
"""def register_compliance_hook(self, hook_type: str, hook):
"""
Register hook for request/response customization.
Args:
hook_type (str): Hook type identifier
hook: Callable to modify requests/responses
Hook Types:
- "access_token_response": Before token parsing
- "refresh_token_response": Before refresh token parsing
- "protected_request": Before making authenticated request
- "access_token_request": Before token fetch request
- "refresh_token_request": Before refresh request
Raises:
ValueError: If hook_type is not supported
"""from requests_oauthlib import OAuth2Session
# Step 1: Create session
oauth = OAuth2Session(
'client_id',
redirect_uri='https://example.com/callback',
scope=['read', 'write']
)
# Step 2: Get authorization URL
authorization_url = 'https://provider.com/oauth/authorize'
auth_url, state = oauth.authorization_url(authorization_url)
print(f"Go to: {auth_url}")
# Step 3: Exchange authorization code for token
token_url = 'https://provider.com/oauth/token'
token = oauth.fetch_token(
token_url,
authorization_response='https://example.com/callback?code=AUTH_CODE&state=STATE',
client_secret='client_secret'
)
# Step 4: Make authenticated requests
response = oauth.get('https://api.provider.com/user')from requests_oauthlib import OAuth2Session
from oauthlib.oauth2 import LegacyApplicationClient
# Create session with password client
oauth = OAuth2Session(client=LegacyApplicationClient(client_id='client_id'))
# Fetch token using username/password
token = oauth.fetch_token(
token_url='https://provider.com/oauth/token',
username='user@example.com',
password='password',
client_id='client_id',
client_secret='client_secret'
)
# Make authenticated requests
response = oauth.get('https://api.provider.com/user')from requests_oauthlib import OAuth2Session
from oauthlib.oauth2 import BackendApplicationClient
# Create session with backend client
oauth = OAuth2Session(client=BackendApplicationClient(client_id='client_id'))
# Fetch token using client credentials
token = oauth.fetch_token(
token_url='https://provider.com/oauth/token',
client_id='client_id',
client_secret='client_secret'
)
# Make authenticated requests
response = oauth.get('https://api.provider.com/data')Proof Key for Code Exchange (PKCE) enhances security for public clients:
from requests_oauthlib import OAuth2Session
# Enable PKCE with S256 method
oauth = OAuth2Session(
'client_id',
redirect_uri='https://example.com/callback',
scope=['read'],
pkce='S256' # or 'plain'
)
# Authorization URL automatically includes PKCE parameters
auth_url, state = oauth.authorization_url('https://provider.com/oauth/authorize')
# Token exchange automatically includes code_verifier
token = oauth.fetch_token(
'https://provider.com/oauth/token',
authorization_response=callback_url
)Configure automatic token refresh for long-running applications:
from requests_oauthlib import OAuth2Session
def save_token(token):
"""Save updated token to storage"""
print(f"Token updated: {token}")
oauth = OAuth2Session(
'client_id',
token=existing_token,
auto_refresh_url='https://provider.com/oauth/token',
auto_refresh_kwargs={'client_id': 'client_id', 'client_secret': 'client_secret'},
token_updater=save_token
)
# Automatically refreshes token if expired
response = oauth.get('https://api.provider.com/user')class TokenUpdated(Warning):
"""Warning raised when token is automatically refreshed"""
def __init__(self, token: dict):
"""
Args:
token (dict): New token dictionary
"""Install with Tessl CLI
npx tessl i tessl/pypi-requests-oauthlib