A Python OAuth 1.0 library providing comprehensive authentication, signing, and client capabilities.
—
Server-side OAuth request verification for service providers implementing OAuth 1.0 protected resources. The Server class validates incoming requests, checks signatures, and extracts verified parameters for application use.
The Server class provides comprehensive request verification including signature validation, timestamp checking, version verification, and parameter extraction.
class Server:
def __init__(self, signature_methods: dict = None):
"""
Initialize OAuth server with supported signature methods.
Args:
signature_methods (dict): Dictionary of signature method name to instance mappings.
If None, defaults to empty dict.
"""
def add_signature_method(self, signature_method) -> dict:
"""
Add signature method to server.
Args:
signature_method: SignatureMethod instance
Returns:
dict: Updated signature methods dictionary
"""
def verify_request(self, request, consumer, token) -> dict:
"""
Verify OAuth request and return non-OAuth parameters.
Args:
request: Request object to verify
consumer: Consumer credentials to verify against
token: Token credentials to verify against
Returns:
dict: Non-OAuth parameters from verified request
Raises:
Error: If verification fails (invalid signature, expired timestamp, etc.)
MissingSignature: If oauth_signature parameter is missing
"""
def build_authenticate_header(self, realm: str = '') -> dict:
"""
Build WWW-Authenticate header for 401 responses.
Args:
realm (str): OAuth realm parameter
Returns:
dict: Header dictionary with WWW-Authenticate
"""# Server properties
Server.timestamp_threshold = 300 # Time threshold in seconds (default: 5 minutes)
Server.version = '1.0' # OAuth versionimport oauth2
# Set up server with supported signature methods
server = oauth2.Server()
server.add_signature_method(oauth2.SignatureMethod_HMAC_SHA1())
server.add_signature_method(oauth2.SignatureMethod_PLAINTEXT())
# Consumer from your database/store
consumer = oauth2.Consumer('known_consumer_key', 'known_consumer_secret')
# Token from your database/store
token = oauth2.Token('known_token_key', 'known_token_secret')
# Create request from incoming HTTP request
request = oauth2.Request.from_request(
http_method='GET',
http_url='https://api.myservice.com/protected_resource',
headers={'Authorization': 'OAuth oauth_consumer_key="known_consumer_key"...'},
query_string='param1=value1¶m2=value2'
)
try:
# Verify the request
parameters = server.verify_request(request, consumer, token)
print(f"Verified parameters: {parameters}")
# Process the authenticated request
# parameters contains non-OAuth query parameters
except oauth2.Error as e:
print(f"OAuth verification failed: {e}")
# Return 401 Unauthorized with authenticate header
auth_header = server.build_authenticate_header(realm='api.myservice.com')import oauth2
from django.http import HttpResponse, HttpResponseBadRequest
from django.views.decorators.csrf import csrf_exempt
# Initialize server
oauth_server = oauth2.Server()
oauth_server.add_signature_method(oauth2.SignatureMethod_HMAC_SHA1())
def get_consumer(consumer_key):
"""Look up consumer from database."""
# Replace with your consumer lookup logic
if consumer_key == 'valid_key':
return oauth2.Consumer('valid_key', 'valid_secret')
return None
def get_token(token_key):
"""Look up token from database."""
# Replace with your token lookup logic
if token_key == 'valid_token':
return oauth2.Token('valid_token', 'valid_token_secret')
return None
@csrf_exempt
def protected_resource(request):
"""Protected API endpoint."""
# Create OAuth request from Django request
oauth_request = oauth2.Request.from_request(
http_method=request.method,
http_url=request.build_absolute_uri(),
headers=request.META,
query_string=request.META.get('QUERY_STRING', '')
)
if not oauth_request:
return HttpResponseBadRequest("Invalid OAuth request")
# Look up consumer
consumer_key = oauth_request.get('oauth_consumer_key')
consumer = get_consumer(consumer_key)
if not consumer:
response = HttpResponse("Unauthorized", status=401)
auth_header = oauth_server.build_authenticate_header()
response['WWW-Authenticate'] = auth_header['WWW-Authenticate']
return response
# Look up token
token_key = oauth_request.get('oauth_token')
token = get_token(token_key) if token_key else None
try:
# Verify request
parameters = oauth_server.verify_request(oauth_request, consumer, token)
# Process authenticated request
return HttpResponse(f"Success! Parameters: {parameters}")
except oauth2.Error as e:
response = HttpResponse(f"OAuth error: {e}", status=401)
auth_header = oauth_server.build_authenticate_header()
response['WWW-Authenticate'] = auth_header['WWW-Authenticate']
return responseimport oauth2
from flask import Flask, request, jsonify, make_response
app = Flask(__name__)
# OAuth server setup
oauth_server = oauth2.Server()
oauth_server.add_signature_method(oauth2.SignatureMethod_HMAC_SHA1())
def oauth_required(f):
"""Decorator for OAuth-protected endpoints."""
def decorated_function(*args, **kwargs):
# Create OAuth request
oauth_request = oauth2.Request.from_request(
http_method=request.method,
http_url=request.url,
headers=dict(request.headers),
query_string=request.query_string.decode('utf-8')
)
if not oauth_request:
return make_response("Invalid OAuth request", 400)
# Look up consumer and token (implement your lookup logic)
consumer = lookup_consumer(oauth_request.get('oauth_consumer_key'))
token = lookup_token(oauth_request.get('oauth_token'))
if not consumer:
response = make_response("Unauthorized", 401)
auth_header = oauth_server.build_authenticate_header()
response.headers['WWW-Authenticate'] = auth_header['WWW-Authenticate']
return response
try:
# Verify request
parameters = oauth_server.verify_request(oauth_request, consumer, token)
# Add verified parameters to request context
request.oauth_parameters = parameters
request.oauth_consumer = consumer
request.oauth_token = token
return f(*args, **kwargs)
except oauth2.Error as e:
response = make_response(f"OAuth error: {e}", 401)
auth_header = oauth_server.build_authenticate_header()
response.headers['WWW-Authenticate'] = auth_header['WWW-Authenticate']
return response
decorated_function.__name__ = f.__name__
return decorated_function
@app.route('/api/protected')
@oauth_required
def protected_endpoint():
"""OAuth-protected API endpoint."""
return jsonify({
'message': 'Success!',
'parameters': request.oauth_parameters,
'consumer': request.oauth_consumer.key
})import oauth2
import hashlib
class SignatureMethod_SHA256(oauth2.SignatureMethod):
"""Custom SHA256-based signature method."""
name = 'HMAC-SHA256'
def signing_base(self, request, consumer, token):
"""Calculate signing base and key."""
sig = (
oauth2.escape(request.method),
oauth2.escape(request.normalized_url),
oauth2.escape(request.get_normalized_parameters()),
)
key = f"{oauth2.escape(consumer.secret)}&"
if token:
key += oauth2.escape(token.secret)
raw = '&'.join(sig)
return key.encode('ascii'), raw.encode('ascii')
def sign(self, request, consumer, token):
"""Generate SHA256 signature."""
import hmac
import base64
key, raw = self.signing_base(request, consumer, token)
hashed = hmac.new(key, raw, hashlib.sha256)
return base64.b64encode(hashed.digest())[:-1]
# Use custom signature method
server = oauth2.Server()
server.add_signature_method(SignatureMethod_SHA256())
server.add_signature_method(oauth2.SignatureMethod_HMAC_SHA1()) # Fallbackimport oauth2
server = oauth2.Server()
server.add_signature_method(oauth2.SignatureMethod_HMAC_SHA1())
try:
parameters = server.verify_request(request, consumer, token)
except oauth2.MissingSignature:
# oauth_signature parameter is missing
return error_response("Missing signature", 401)
except oauth2.Error as e:
error_msg = str(e)
if "Invalid signature" in error_msg:
# Signature calculation mismatch
# Log expected signature base string for debugging
print(f"Signature verification failed: {error_msg}")
elif "Expired timestamp" in error_msg:
# Request timestamp is too old
print(f"Timestamp expired: {error_msg}")
elif "OAuth version" in error_msg:
# Unsupported OAuth version
print(f"Version error: {error_msg}")
return error_response(f"OAuth error: {error_msg}", 401)
def error_response(message, status_code):
"""Return error response with proper headers."""
response = make_response(message, status_code)
if status_code == 401:
auth_header = server.build_authenticate_header(realm='api.example.com')
response.headers.update(auth_header)
return responseimport oauth2
# Create server with custom timestamp threshold
server = oauth2.Server()
server.timestamp_threshold = 600 # Allow 10 minutes clock skew
# Or modify global default
oauth2.Server.timestamp_threshold = 120 # 2 minutes
server.add_signature_method(oauth2.SignatureMethod_HMAC_SHA1())Install with Tessl CLI
npx tessl i tessl/pypi-oauth2