CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-oauth2

A Python OAuth 1.0 library providing comprehensive authentication, signing, and client capabilities.

Pending
Overview
Eval results
Files

server-verification.mddocs/

Server Verification

Server-side OAuth request verification for service providers implementing OAuth 1.0 protected resources. The Server class validates incoming requests, checks signatures, and extracts verified parameters for application use.

Capabilities

OAuth Request Verification

The Server class provides comprehensive request verification including signature validation, timestamp checking, version verification, and parameter extraction.

class Server:
    def __init__(self, signature_methods: dict = None):
        """
        Initialize OAuth server with supported signature methods.
        
        Args:
            signature_methods (dict): Dictionary of signature method name to instance mappings.
                                    If None, defaults to empty dict.
        """
    
    def add_signature_method(self, signature_method) -> dict:
        """
        Add signature method to server.
        
        Args:
            signature_method: SignatureMethod instance
            
        Returns:
            dict: Updated signature methods dictionary
        """
    
    def verify_request(self, request, consumer, token) -> dict:
        """
        Verify OAuth request and return non-OAuth parameters.
        
        Args:
            request: Request object to verify
            consumer: Consumer credentials to verify against
            token: Token credentials to verify against
            
        Returns:
            dict: Non-OAuth parameters from verified request
            
        Raises:
            Error: If verification fails (invalid signature, expired timestamp, etc.)
            MissingSignature: If oauth_signature parameter is missing
        """
    
    def build_authenticate_header(self, realm: str = '') -> dict:
        """
        Build WWW-Authenticate header for 401 responses.
        
        Args:
            realm (str): OAuth realm parameter
            
        Returns:
            dict: Header dictionary with WWW-Authenticate
        """

Server Configuration

# Server properties
Server.timestamp_threshold = 300  # Time threshold in seconds (default: 5 minutes)
Server.version = '1.0'           # OAuth version

Usage Examples

Basic Request Verification

import oauth2

# Set up server with supported signature methods
server = oauth2.Server()
server.add_signature_method(oauth2.SignatureMethod_HMAC_SHA1())
server.add_signature_method(oauth2.SignatureMethod_PLAINTEXT())

# Consumer from your database/store
consumer = oauth2.Consumer('known_consumer_key', 'known_consumer_secret')

# Token from your database/store  
token = oauth2.Token('known_token_key', 'known_token_secret')

# Create request from incoming HTTP request
request = oauth2.Request.from_request(
    http_method='GET',
    http_url='https://api.myservice.com/protected_resource',
    headers={'Authorization': 'OAuth oauth_consumer_key="known_consumer_key"...'},
    query_string='param1=value1&param2=value2'
)

try:
    # Verify the request
    parameters = server.verify_request(request, consumer, token)
    print(f"Verified parameters: {parameters}")
    
    # Process the authenticated request
    # parameters contains non-OAuth query parameters
    
except oauth2.Error as e:
    print(f"OAuth verification failed: {e}")
    # Return 401 Unauthorized with authenticate header
    auth_header = server.build_authenticate_header(realm='api.myservice.com')

Web Framework Integration (Django Example)

import oauth2
from django.http import HttpResponse, HttpResponseBadRequest
from django.views.decorators.csrf import csrf_exempt

# Initialize server
oauth_server = oauth2.Server()
oauth_server.add_signature_method(oauth2.SignatureMethod_HMAC_SHA1())

def get_consumer(consumer_key):
    """Look up consumer from database."""
    # Replace with your consumer lookup logic
    if consumer_key == 'valid_key':
        return oauth2.Consumer('valid_key', 'valid_secret')
    return None

def get_token(token_key):
    """Look up token from database.""" 
    # Replace with your token lookup logic
    if token_key == 'valid_token':
        return oauth2.Token('valid_token', 'valid_token_secret')
    return None

@csrf_exempt
def protected_resource(request):
    """Protected API endpoint."""
    
    # Create OAuth request from Django request
    oauth_request = oauth2.Request.from_request(
        http_method=request.method,
        http_url=request.build_absolute_uri(),
        headers=request.META,
        query_string=request.META.get('QUERY_STRING', '')
    )
    
    if not oauth_request:
        return HttpResponseBadRequest("Invalid OAuth request")
    
    # Look up consumer
    consumer_key = oauth_request.get('oauth_consumer_key')
    consumer = get_consumer(consumer_key)
    if not consumer:
        response = HttpResponse("Unauthorized", status=401)
        auth_header = oauth_server.build_authenticate_header()
        response['WWW-Authenticate'] = auth_header['WWW-Authenticate']
        return response
    
    # Look up token
    token_key = oauth_request.get('oauth_token')
    token = get_token(token_key) if token_key else None
    
    try:
        # Verify request
        parameters = oauth_server.verify_request(oauth_request, consumer, token)
        
        # Process authenticated request
        return HttpResponse(f"Success! Parameters: {parameters}")
        
    except oauth2.Error as e:
        response = HttpResponse(f"OAuth error: {e}", status=401)
        auth_header = oauth_server.build_authenticate_header()
        response['WWW-Authenticate'] = auth_header['WWW-Authenticate']
        return response

Flask Integration Example

import oauth2
from flask import Flask, request, jsonify, make_response

app = Flask(__name__)

# OAuth server setup
oauth_server = oauth2.Server()
oauth_server.add_signature_method(oauth2.SignatureMethod_HMAC_SHA1())

def oauth_required(f):
    """Decorator for OAuth-protected endpoints."""
    def decorated_function(*args, **kwargs):
        # Create OAuth request
        oauth_request = oauth2.Request.from_request(
            http_method=request.method,
            http_url=request.url,
            headers=dict(request.headers),
            query_string=request.query_string.decode('utf-8')
        )
        
        if not oauth_request:
            return make_response("Invalid OAuth request", 400)
        
        # Look up consumer and token (implement your lookup logic)
        consumer = lookup_consumer(oauth_request.get('oauth_consumer_key'))
        token = lookup_token(oauth_request.get('oauth_token'))
        
        if not consumer:
            response = make_response("Unauthorized", 401)
            auth_header = oauth_server.build_authenticate_header()
            response.headers['WWW-Authenticate'] = auth_header['WWW-Authenticate']
            return response
        
        try:
            # Verify request
            parameters = oauth_server.verify_request(oauth_request, consumer, token)
            
            # Add verified parameters to request context
            request.oauth_parameters = parameters
            request.oauth_consumer = consumer
            request.oauth_token = token
            
            return f(*args, **kwargs)
            
        except oauth2.Error as e:
            response = make_response(f"OAuth error: {e}", 401)
            auth_header = oauth_server.build_authenticate_header()
            response.headers['WWW-Authenticate'] = auth_header['WWW-Authenticate']
            return response
    
    decorated_function.__name__ = f.__name__
    return decorated_function

@app.route('/api/protected')
@oauth_required
def protected_endpoint():
    """OAuth-protected API endpoint."""
    return jsonify({
        'message': 'Success!',
        'parameters': request.oauth_parameters,
        'consumer': request.oauth_consumer.key
    })

Custom Signature Method

import oauth2
import hashlib

class SignatureMethod_SHA256(oauth2.SignatureMethod):
    """Custom SHA256-based signature method."""
    name = 'HMAC-SHA256'
    
    def signing_base(self, request, consumer, token):
        """Calculate signing base and key."""
        sig = (
            oauth2.escape(request.method),
            oauth2.escape(request.normalized_url),
            oauth2.escape(request.get_normalized_parameters()),
        )
        
        key = f"{oauth2.escape(consumer.secret)}&"
        if token:
            key += oauth2.escape(token.secret)
        
        raw = '&'.join(sig)
        return key.encode('ascii'), raw.encode('ascii')
    
    def sign(self, request, consumer, token):
        """Generate SHA256 signature."""
        import hmac
        import base64
        
        key, raw = self.signing_base(request, consumer, token)
        hashed = hmac.new(key, raw, hashlib.sha256)
        return base64.b64encode(hashed.digest())[:-1]

# Use custom signature method
server = oauth2.Server()
server.add_signature_method(SignatureMethod_SHA256())
server.add_signature_method(oauth2.SignatureMethod_HMAC_SHA1())  # Fallback

Error Handling and Debugging

import oauth2

server = oauth2.Server()
server.add_signature_method(oauth2.SignatureMethod_HMAC_SHA1())

try:
    parameters = server.verify_request(request, consumer, token)
    
except oauth2.MissingSignature:
    # oauth_signature parameter is missing
    return error_response("Missing signature", 401)
    
except oauth2.Error as e:
    error_msg = str(e)
    
    if "Invalid signature" in error_msg:
        # Signature calculation mismatch
        # Log expected signature base string for debugging
        print(f"Signature verification failed: {error_msg}")
        
    elif "Expired timestamp" in error_msg:
        # Request timestamp is too old
        print(f"Timestamp expired: {error_msg}")
        
    elif "OAuth version" in error_msg:
        # Unsupported OAuth version
        print(f"Version error: {error_msg}")
        
    return error_response(f"OAuth error: {error_msg}", 401)

def error_response(message, status_code):
    """Return error response with proper headers."""
    response = make_response(message, status_code)
    if status_code == 401:
        auth_header = server.build_authenticate_header(realm='api.example.com')
        response.headers.update(auth_header)
    return response

Timestamp Configuration

import oauth2

# Create server with custom timestamp threshold
server = oauth2.Server()
server.timestamp_threshold = 600  # Allow 10 minutes clock skew

# Or modify global default
oauth2.Server.timestamp_threshold = 120  # 2 minutes

server.add_signature_method(oauth2.SignatureMethod_HMAC_SHA1())

Security Considerations

  1. Timestamp Validation: Default 5-minute threshold prevents replay attacks
  2. Nonce Tracking: Implement nonce storage to prevent duplicate requests
  3. HTTPS Only: Always use HTTPS in production for credential protection
  4. Consumer/Token Storage: Securely store consumer secrets and token secrets
  5. Signature Method: HMAC-SHA1 is recommended over PLAINTEXT
  6. Error Messages: Avoid exposing sensitive information in error responses

Install with Tessl CLI

npx tessl i tessl/pypi-oauth2

docs

client-extensions.md

http-client.md

index.md

oauth-core.md

server-verification.md

tile.json