CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-requests-oauthlib

OAuthlib authentication support for Requests.

Pending
Overview
Eval results
Files

oauth2.mddocs/

OAuth 2.0 Implementation

Comprehensive OAuth 2.0 support with automatic token refresh, PKCE extension, compliance hooks, and support for all standard grant types. Provides both low-level token authentication and high-level workflow management.

Capabilities

OAuth2 Authentication Class

Low-level authentication handler that adds OAuth 2.0 tokens to requests. Implements the requests.auth.AuthBase interface for simple token-based authentication.

class OAuth2(requests.auth.AuthBase):
    def __init__(
        self,
        client_id: str = None,
        client = None,
        token: dict = None
    ):
        """
        Create OAuth 2.0 authentication handler.

        Args:
            client_id (str, optional): Client ID from provider registration
            client: oauthlib.oauth2.Client instance (default: WebApplicationClient)
            token (dict, optional): Token dictionary with access_token and token_type
        """

Usage Example:

import requests
from requests_oauthlib import OAuth2

# Create auth handler with token
auth = OAuth2(token={
    'access_token': 'your_access_token',
    'token_type': 'Bearer'
})

# Use with requests
response = requests.get('https://api.example.com/protected', auth=auth)

OAuth2Session Workflow Class

High-level session class that extends requests.Session with comprehensive OAuth 2.0 workflow support, automatic token refresh, and provider compliance hooks.

class OAuth2Session(requests.Session):
    def __init__(
        self,
        client_id: str = None,
        client = None,
        auto_refresh_url: str = None,
        auto_refresh_kwargs: dict = None,
        scope: list = None,
        redirect_uri: str = None,
        token: dict = None,
        state = None,
        token_updater = None,
        pkce: str = None,
        **kwargs
    ):
        """
        Create OAuth 2.0 session for workflow management.

        Args:
            client_id (str, optional): Client ID from provider
            client: oauthlib.oauth2.Client instance (default: WebApplicationClient)
            auto_refresh_url (str, optional): Token refresh endpoint URL
            auto_refresh_kwargs (dict, optional): Extra arguments for token refresh
            scope (list, optional): List of requested scopes
            redirect_uri (str, optional): Registered callback URI
            token (dict, optional): Initial token dictionary
            state: CSRF protection string or callable
            token_updater: Callback function for token updates
            pkce (str, optional): PKCE method ("S256", "plain", or None)
        """

Properties:

@property
def scope(self) -> list:
    """OAuth scopes for the session"""

@scope.setter
def scope(self, scope: list):
    """Set OAuth scopes"""

@property
def client_id(self) -> str:
    """Client identifier"""

@client_id.setter
def client_id(self, value: str):
    """Set client identifier"""

@client_id.deleter
def client_id(self):
    """Delete client identifier"""

@property
def token(self) -> dict:
    """Current token dictionary"""

@token.setter
def token(self, value: dict):
    """Set token and populate client attributes"""

@property
def access_token(self) -> str:
    """Current access token"""

@access_token.setter
def access_token(self, value: str):
    """Set access token"""

@access_token.deleter
def access_token(self):
    """Delete access token"""

@property
def authorized(self) -> bool:
    """True if session has valid access token"""

Authorization Flow Methods

def new_state(self) -> str:
    """
    Generate new state string for CSRF protection.

    Returns:
        str: Generated state string
    """

def authorization_url(
    self,
    url: str,
    state: str = None,
    **kwargs
) -> tuple:
    """
    Create authorization URL for user consent.

    Args:
        url (str): Authorization endpoint URL (must be HTTPS)
        state (str, optional): CSRF protection state
        **kwargs: Additional parameters for authorization URL

    Returns:
        tuple: (authorization_url, state)
    """

def fetch_token(
    self,
    token_url: str,
    code: str = None,
    authorization_response: str = None,
    body: str = "",
    auth = None,
    username: str = None,
    password: str = None,
    method: str = "POST",
    force_querystring: bool = False,
    timeout = None,
    headers: dict = None,
    verify = None,
    proxies = None,
    include_client_id = None,
    client_secret: str = None,
    cert = None,
    **kwargs
) -> dict:
    """
    Fetch access token from token endpoint.

    Args:
        token_url (str): Token endpoint URL (must be HTTPS)
        code (str, optional): Authorization code from callback
        authorization_response (str, optional): Full callback URL
        body (str): Additional request body content
        auth: Authentication tuple or method
        username (str, optional): Username for password grant
        password (str, optional): Password for password grant
        method (str): HTTP method (default: "POST")
        force_querystring (bool): Force parameters in query string
        timeout: Request timeout
        headers (dict, optional): Additional request headers
        verify: SSL certificate verification
        proxies: Request proxies
        include_client_id: Include client_id in request body
        client_secret (str, optional): Client secret
        cert: Client certificate for mTLS
        **kwargs: Additional token request parameters

    Returns:
        dict: Token response from provider

    Raises:
        InsecureTransportError: If token_url is not HTTPS
        ValueError: If required parameters are missing
    """

def token_from_fragment(self, authorization_response: str) -> dict:
    """
    Parse token from URI fragment (for Implicit Grant).

    Args:
        authorization_response (str): Full callback URL with fragment

    Returns:
        dict: Parsed token data
    """

Token Refresh Methods

def refresh_token(
    self,
    token_url: str,
    refresh_token: str = None,
    body: str = "",
    auth = None,
    timeout = None,
    headers: dict = None,
    verify = None,
    proxies = None,
    **kwargs
) -> dict:
    """
    Refresh access token using refresh token.

    Args:
        token_url (str): Refresh endpoint URL (must be HTTPS)
        refresh_token (str, optional): Refresh token to use
        body (str): Additional request body
        auth: Authentication method
        timeout: Request timeout
        headers (dict, optional): Request headers
        verify: SSL verification
        proxies: Request proxies
        **kwargs: Additional refresh parameters

    Returns:
        dict: New token response

    Raises:
        ValueError: If no token endpoint configured
        InsecureTransportError: If token_url is not HTTPS
    """

Request Methods

def request(
    self,
    method: str,
    url: str,
    data = None,
    headers: dict = None,
    withhold_token: bool = False,
    client_id: str = None,
    client_secret: str = None,
    files = None,
    **kwargs
):
    """
    Make authenticated HTTP request with automatic token handling.

    Args:
        method (str): HTTP method
        url (str): Request URL (must be HTTPS)
        data: Request body data
        headers (dict, optional): Request headers
        withhold_token (bool): Skip adding OAuth token
        client_id (str, optional): Client ID for auto-refresh
        client_secret (str, optional): Client secret for auto-refresh
        files: File uploads
        **kwargs: Additional request arguments

    Returns:
        Response: HTTP response object

    Raises:
        InsecureTransportError: If URL is not HTTPS
        TokenExpiredError: If token expired and auto-refresh fails
        TokenUpdated: If token was automatically refreshed (warning)
    """

Compliance Hook System

def register_compliance_hook(self, hook_type: str, hook):
    """
    Register hook for request/response customization.

    Args:
        hook_type (str): Hook type identifier
        hook: Callable to modify requests/responses

    Hook Types:
        - "access_token_response": Before token parsing
        - "refresh_token_response": Before refresh token parsing
        - "protected_request": Before making authenticated request
        - "access_token_request": Before token fetch request
        - "refresh_token_request": Before refresh request

    Raises:
        ValueError: If hook_type is not supported
    """

Grant Type Examples

Authorization Code Grant (Web Applications)

from requests_oauthlib import OAuth2Session

# Step 1: Create session
oauth = OAuth2Session(
    'client_id',
    redirect_uri='https://example.com/callback',
    scope=['read', 'write']
)

# Step 2: Get authorization URL
authorization_url = 'https://provider.com/oauth/authorize'
auth_url, state = oauth.authorization_url(authorization_url)
print(f"Go to: {auth_url}")

# Step 3: Exchange authorization code for token
token_url = 'https://provider.com/oauth/token'
token = oauth.fetch_token(
    token_url,
    authorization_response='https://example.com/callback?code=AUTH_CODE&state=STATE',
    client_secret='client_secret'
)

# Step 4: Make authenticated requests
response = oauth.get('https://api.provider.com/user')

Resource Owner Password Credentials Grant

from requests_oauthlib import OAuth2Session
from oauthlib.oauth2 import LegacyApplicationClient

# Create session with password client
oauth = OAuth2Session(client=LegacyApplicationClient(client_id='client_id'))

# Fetch token using username/password
token = oauth.fetch_token(
    token_url='https://provider.com/oauth/token',
    username='user@example.com',
    password='password',
    client_id='client_id',
    client_secret='client_secret'
)

# Make authenticated requests
response = oauth.get('https://api.provider.com/user')

Client Credentials Grant

from requests_oauthlib import OAuth2Session
from oauthlib.oauth2 import BackendApplicationClient

# Create session with backend client
oauth = OAuth2Session(client=BackendApplicationClient(client_id='client_id'))

# Fetch token using client credentials
token = oauth.fetch_token(
    token_url='https://provider.com/oauth/token',
    client_id='client_id',
    client_secret='client_secret'
)

# Make authenticated requests
response = oauth.get('https://api.provider.com/data')

PKCE Support

Proof Key for Code Exchange (PKCE) enhances security for public clients:

from requests_oauthlib import OAuth2Session

# Enable PKCE with S256 method
oauth = OAuth2Session(
    'client_id',
    redirect_uri='https://example.com/callback',
    scope=['read'],
    pkce='S256'  # or 'plain'
)

# Authorization URL automatically includes PKCE parameters
auth_url, state = oauth.authorization_url('https://provider.com/oauth/authorize')

# Token exchange automatically includes code_verifier
token = oauth.fetch_token(
    'https://provider.com/oauth/token',
    authorization_response=callback_url
)

Automatic Token Refresh

Configure automatic token refresh for long-running applications:

from requests_oauthlib import OAuth2Session

def save_token(token):
    """Save updated token to storage"""
    print(f"Token updated: {token}")

oauth = OAuth2Session(
    'client_id',
    token=existing_token,
    auto_refresh_url='https://provider.com/oauth/token',
    auto_refresh_kwargs={'client_id': 'client_id', 'client_secret': 'client_secret'},
    token_updater=save_token
)

# Automatically refreshes token if expired
response = oauth.get('https://api.provider.com/user')

Exception Classes

class TokenUpdated(Warning):
    """Warning raised when token is automatically refreshed"""
    def __init__(self, token: dict):
        """
        Args:
            token (dict): New token dictionary
        """

Security Considerations

  • HTTPS Enforcement: All OAuth 2.0 endpoints must use HTTPS (enforced by library)
  • State Parameter: Always use state parameter to prevent CSRF attacks
  • PKCE: Use PKCE for public clients (mobile apps, SPAs)
  • Token Storage: Store tokens securely and implement proper token lifecycle management
  • Scope Principle: Request minimal necessary scopes
  • Token Expiration: Implement proper token refresh workflows for long-running applications

Install with Tessl CLI

npx tessl i tessl/pypi-requests-oauthlib

docs

compliance-fixes.md

index.md

oauth1.md

oauth2.md

tile.json