A comprehensive Python library for implementing OAuth 1.0 and OAuth 2.0 authentication protocols
Complete OAuth 1.0 (RFC 5849) client and server implementation supporting the full three-legged authentication flow. Includes request token acquisition, user authorization, access token exchange, and protected resource access with multiple signature methods.
OAuth 1.0 client for signing requests and managing the authentication flow. Supports multiple signature methods including HMAC-SHA1, HMAC-SHA256, HMAC-SHA512, RSA-SHA1, RSA-SHA256, RSA-SHA512, and PLAINTEXT.
class Client:
def __init__(
self,
client_key: str,
client_secret: str | None = None,
resource_owner_key=None,
resource_owner_secret=None,
callback_uri=None,
signature_method: str = "HMAC-SHA1",
signature_type: str = "AUTH_HEADER",
rsa_key=None,
verifier=None,
realm=None,
encoding: str = "utf-8",
decoding=None,
nonce=None,
timestamp=None,
):
"""
OAuth 1.0 client for request signing and token management.
Parameters:
- client_key: Client identifier
- client_secret: Client secret for HMAC signatures
- resource_owner_key: Resource owner token (request or access token)
- resource_owner_secret: Resource owner token secret
- callback_uri: Callback URI for authorization
- signature_method: Signature method (HMAC-SHA1, RSA-SHA1, PLAINTEXT, etc.)
- signature_type: Where to place signature (AUTH_HEADER, QUERY, BODY)
- rsa_key: RSA private key for RSA signatures
- verifier: Authorization verifier code
- realm: Authorization realm
- encoding: String encoding
- nonce: Custom nonce generator
- timestamp: Custom timestamp generator
"""
def sign(
self,
uri: str,
http_method: str = "GET",
body: str | dict[str, str] | list[tuple[str, str]] | None = None,
headers: dict[str, str] | None = None,
realm=None,
):
"""
Sign an OAuth 1.0 request.
Parameters:
- uri: Request URI
- http_method: HTTP method (GET, POST, etc.)
- body: Request body
- headers: Request headers
- realm: Authorization realm
Returns:
Tuple of (uri, headers, body) with OAuth signature applied
"""
def get_oauth_signature(self, request):
"""Get OAuth signature for a request."""
def get_oauth_params(self, request):
"""Get OAuth parameters for a request."""
@classmethod
def register_signature_method(cls, method_name: str, method_callback):
"""Register a custom signature method."""Usage example:
from oauthlib.oauth1 import Client
# Create client with consumer credentials
client = Client(
'your-client-key',
client_secret='your-client-secret'
)
# Sign a request
uri, headers, body = client.sign(
'https://api.example.com/data',
http_method='GET'
)
# Make signed request
import requests
response = requests.get(uri, headers=headers)Server endpoint for handling OAuth 1.0 request token requests. Issues temporary credentials that clients use to obtain authorization from resource owners.
class RequestTokenEndpoint:
def __init__(self, request_validator): ...
def create_request_token(self, request, credentials):
"""
Create a request token.
Parameters:
- request: OAuth request object
- credentials: Server credentials
Returns:
OAuth token dictionary
"""
def create_request_token_response(
self,
uri: str,
http_method: str = "GET",
body: str | None = None,
headers: dict[str, str] | None = None,
credentials: dict[str, str] | None = None,
):
"""Create request token endpoint response."""
def validate_request_token_request(self, request):
"""Validate request token request."""Server endpoint for handling OAuth 1.0 authorization requests. Manages user authorization and verifier code generation.
class AuthorizationEndpoint:
def __init__(self, request_validator): ...
def create_verifier(self, request, credentials):
"""Create authorization verifier."""
def create_authorization_response(
self,
uri: str,
http_method: str = "GET",
body: str | None = None,
headers: dict[str, str] | None = None,
realms: list[str] | None = None,
credentials: dict[str, str] | None = None,
):
"""Create authorization endpoint response."""
def get_realms_and_credentials(
self,
uri: str,
http_method: str = "GET",
body: str | None = None,
headers: dict[str, str] | None = None,
):
"""Get authorization realms and credentials."""Server endpoint for handling OAuth 1.0 access token requests. Exchanges authorized request tokens for access tokens that can access protected resources.
class AccessTokenEndpoint:
def __init__(self, request_validator): ...
def create_access_token(self, request, credentials):
"""Create an access token."""
def create_access_token_response(
self,
uri: str,
http_method: str = "GET",
body: str | None = None,
headers: dict[str, str] | None = None,
credentials: dict[str, str] | None = None,
):
"""Create access token endpoint response."""
def validate_access_token_request(self, request):
"""Validate access token request."""Server endpoint for validating OAuth 1.0 signed requests to protected resources. Verifies access tokens and signatures before granting access to protected resources.
class ResourceEndpoint:
def __init__(self, request_validator): ...
def validate_protected_resource_request(
self,
uri: str,
http_method: str = "GET",
body: str | None = None,
headers: dict[str, str] | None = None,
realms: list[str] | None = None,
) -> tuple[bool, dict]:
"""
Validate request to protected resource.
Returns:
Tuple of (valid, request) where valid is boolean and request contains OAuth params
"""Pre-configured OAuth 1.0 server combining all endpoints for web application flows. Provides a complete server implementation with request token, authorization, access token, and resource protection capabilities.
class WebApplicationServer:
def __init__(self, request_validator): ...
# Inherits all methods from RequestTokenEndpoint, AuthorizationEndpoint,
# AccessTokenEndpoint, and ResourceEndpointOAuth 1.0 request validation interface that must be implemented to provide custom authentication and authorization logic. Handles client verification, token management, and security policies.
class RequestValidator:
# Configuration properties
allowed_signature_methods: list[str]
safe_characters: set[str]
client_key_length: tuple[int, int]
request_token_length: tuple[int, int]
access_token_length: tuple[int, int]
timestamp_lifetime: int
nonce_length: tuple[int, int]
verifier_length: tuple[int, int]
realms: list[str]
enforce_ssl: bool
# Dummy values for timing attack protection
dummy_client: str
dummy_request_token: str
dummy_access_token: str
# Client validation
def validate_client_key(self, client_key: str, request) -> bool:
"""Validate client key exists and is valid."""
def check_client_key(self, client_key: str) -> bool:
"""Check client key format."""
def get_client_secret(self, client_key: str, request) -> str:
"""Get client secret for HMAC signatures."""
def get_rsa_key(self, client_key: str, request) -> str:
"""Get RSA public key for RSA signature verification."""
# Request token validation
def validate_request_token(self, client_key: str, token: str, request) -> bool:
"""Validate request token."""
def check_request_token(self, request_token: str) -> bool:
"""Check request token format."""
def get_request_token_secret(self, client_key: str, token: str, request) -> str:
"""Get request token secret."""
def save_request_token(self, token: dict, request) -> None:
"""Store request token."""
def invalidate_request_token(self, client_key: str, request_token: str, request) -> None:
"""Invalidate used request token."""
# Access token validation
def validate_access_token(self, client_key: str, token: str, request) -> bool:
"""Validate access token."""
def check_access_token(self, access_token: str) -> bool:
"""Check access token format."""
def get_access_token_secret(self, client_key: str, token: str, request) -> str:
"""Get access token secret."""
def save_access_token(self, token: dict, request) -> None:
"""Store access token."""
# Authorization validation
def validate_timestamp_and_nonce(
self,
client_key: str,
timestamp: str,
nonce: str,
request_token: str = None,
access_token: str = None,
) -> bool:
"""Validate timestamp and nonce to prevent replay attacks."""
def validate_redirect_uri(self, client_key: str, redirect_uri: str, request) -> bool:
"""Validate redirect URI."""
def validate_verifier(self, client_key: str, token: str, verifier: str, request) -> bool:
"""Validate authorization verifier."""
def save_verifier(self, token: str, verifier: str, request) -> None:
"""Store authorization verifier."""
def get_redirect_uri(self, token: str, request) -> str:
"""Get redirect URI for token."""
# Realm validation
def validate_requested_realms(self, client_key: str, realms: list[str], request) -> bool:
"""Validate requested authorization realms."""
def validate_realms(
self,
client_key: str,
token: str,
request,
uri: str = None,
realms: list[str] = None,
) -> bool:
"""Validate access to realms."""
def get_default_realms(self, client_key: str, request) -> list[str]:
"""Get default realms for client."""
def get_realms(self, token: str, request) -> list[str]:
"""Get realms associated with token."""
def check_realms(self, realms: list[str]) -> bool:
"""Check realm format."""
# Format checking
def check_nonce(self, nonce: str) -> bool:
"""Check nonce format."""
def check_verifier(self, verifier: str) -> bool:
"""Check verifier format."""OAuth 1.0 signature method implementations for request authentication. Supports HMAC-based and RSA-based signatures with multiple hash algorithms.
# Signature base string creation
def signature_base_string(
http_method: str,
base_str_uri: str,
normalized_encoded_request_parameters: str,
) -> str:
"""Create OAuth 1.0 signature base string."""
def base_string_uri(uri: str, host: str = None) -> str:
"""Normalize URI for signature base string."""
def collect_parameters(
uri_query: str = "",
body: str | None = None,
headers: dict[str, str] | None = None,
exclude_oauth_signature: bool = True,
with_realm: bool = False,
) -> list[tuple[str, str]]:
"""Collect OAuth parameters for signature."""
def normalize_parameters(params: list[tuple[str, str]]) -> str:
"""Normalize parameters for signature base string."""
# HMAC signature methods
def sign_hmac_sha1(base_string: str, client_secret: str, resource_owner_secret: str) -> str:
"""Sign with HMAC-SHA1."""
def verify_hmac_sha1(
request,
client_secret: str = None,
resource_owner_secret: str = None,
) -> bool:
"""Verify HMAC-SHA1 signature."""
def sign_hmac_sha256(base_string: str, client_secret: str, resource_owner_secret: str) -> str:
"""Sign with HMAC-SHA256."""
def verify_hmac_sha256(
request,
client_secret: str = None,
resource_owner_secret: str = None,
) -> bool:
"""Verify HMAC-SHA256 signature."""
def sign_hmac_sha512(base_string: str, client_secret: str, resource_owner_secret: str) -> str:
"""Sign with HMAC-SHA512."""
def verify_hmac_sha512(
request,
client_secret: str = None,
resource_owner_secret: str = None,
) -> bool:
"""Verify HMAC-SHA512 signature."""
# RSA signature methods
def sign_rsa_sha1(base_string: str, rsa_private_key: str) -> str:
"""Sign with RSA-SHA1."""
def verify_rsa_sha1(request, rsa_public_key: str) -> bool:
"""Verify RSA-SHA1 signature."""
def sign_rsa_sha256(base_string: str, rsa_private_key: str) -> str:
"""Sign with RSA-SHA256."""
def verify_rsa_sha256(request, rsa_public_key: str) -> bool:
"""Verify RSA-SHA256 signature."""
def sign_rsa_sha512(base_string: str, rsa_private_key: str) -> str:
"""Sign with RSA-SHA512."""
def verify_rsa_sha512(request, rsa_public_key: str) -> bool:
"""Verify RSA-SHA512 signature."""
# Plaintext signature method
def sign_plaintext(client_secret: str, resource_owner_secret: str) -> str:
"""Sign with PLAINTEXT method."""
def verify_plaintext(
request,
client_secret: str = None,
resource_owner_secret: str = None,
) -> bool:
"""Verify PLAINTEXT signature."""OAuth 1.0 parameter formatting and preparation utilities for different signature placement methods.
def prepare_headers(
params: list[tuple[str, str]],
headers: dict[str, str] | None = None,
realm: str | None = None,
) -> dict[str, str]:
"""Prepare OAuth parameters for Authorization header."""
def prepare_form_encoded_body(
oauth_params: list[tuple[str, str]],
body: str | list[tuple[str, str]],
) -> str:
"""Prepare OAuth parameters for form-encoded body."""
def prepare_request_uri_query(
oauth_params: list[tuple[str, str]],
uri: str,
) -> str:
"""Prepare OAuth parameters for URI query string."""OAuth 1.0 utility functions for parameter handling, URL encoding, and header parsing.
def filter_oauth_params(params: list[tuple[str, str]]) -> list[str]:
"""Filter OAuth-specific parameters."""
def escape(u: str) -> str:
"""URL escape string for OAuth."""
def unescape(u: str) -> str:
"""URL unescape string."""
def parse_authorization_header(authorization_header: str) -> list[tuple[str, str]]:
"""Parse OAuth Authorization header."""
def parse_http_list(u: str) -> list[str]:
"""Parse HTTP list format."""
def parse_keqv_list(l: list[str]) -> dict[str, str]:
"""Parse key=value list format."""
def filter_params(target: list[str]) -> callable:
"""Create parameter filter function."""# Signature methods
SIGNATURE_HMAC_SHA1: str
SIGNATURE_HMAC_SHA256: str
SIGNATURE_HMAC_SHA512: str
SIGNATURE_HMAC: str
SIGNATURE_RSA_SHA1: str
SIGNATURE_RSA_SHA256: str
SIGNATURE_RSA_SHA512: str
SIGNATURE_RSA: str
SIGNATURE_PLAINTEXT: str
# Signature placement
SIGNATURE_TYPE_AUTH_HEADER: str
SIGNATURE_TYPE_QUERY: str
SIGNATURE_TYPE_BODY: str
# Content type
CONTENT_TYPE_FORM_URLENCODED: strclass OAuth1Error(Exception):
"""Base OAuth 1.0 exception."""
error: str
description: str
uri: str | None
status_code: int
def __init__(
self,
description: str | None = None,
uri: str | None = None,
status_code: int = 400,
request=None,
) -> None: ...
def in_uri(self, uri: str) -> str: ...
@property
def twotuples(self) -> list[tuple[str, str]]: ...
@property
def urlencoded(self) -> str: ...
class InsecureTransportError(OAuth1Error):
"""HTTPS transport required."""
error: str
description: str
class InvalidSignatureMethodError(OAuth1Error):
"""Invalid signature method."""
error: str
class InvalidRequestError(OAuth1Error):
"""Invalid OAuth request."""
error: str
class InvalidClientError(OAuth1Error):
"""Invalid client credentials."""
error: strInstall with Tessl CLI
npx tessl i tessl/pypi-oauthlib