OpenID support for modern servers and consumers with comprehensive authentication functionality.
—
Pluggable storage system for associations and nonces with multiple backend implementations. The storage layer provides persistent storage for OpenID associations (shared secrets) and nonces (one-time tokens) with automatic cleanup of expired data.
Base interface that all storage backends must implement for OpenID data persistence.
class OpenIDStore:
"""Abstract base class for OpenID storage backends."""
def storeAssociation(self, server_url, association):
"""
Store an association between consumer and server.
Parameters:
- server_url: str, server URL for the association
- association: Association object containing shared secret
"""
def getAssociation(self, server_url, handle=None):
"""
Retrieve association for server URL and optional handle.
Parameters:
- server_url: str, server URL
- handle: str, association handle (None for most recent)
Returns:
Association object or None if not found/expired
"""
def removeAssociation(self, server_url, handle):
"""
Remove specific association from storage.
Parameters:
- server_url: str, server URL
- handle: str, association handle
Returns:
bool, True if association was removed
"""
def useNonce(self, server_url, timestamp, salt):
"""
Use a nonce, ensuring it can only be used once.
Parameters:
- server_url: str, server URL
- timestamp: int, nonce timestamp
- salt: str, nonce salt value
Returns:
bool, True if nonce was valid and unused, False if already used
"""
def cleanupNonces(self):
"""
Remove expired nonces from storage.
Returns:
int, number of nonces cleaned up
"""
def cleanupAssociations(self):
"""
Remove expired associations from storage.
Returns:
int, number of associations cleaned up
"""
def cleanup(self):
"""
Clean up both expired nonces and associations.
Returns:
int, total number of items cleaned up
"""File system storage implementation using directory structure for organization.
class FileOpenIDStore(OpenIDStore):
"""File-based OpenID storage implementation."""
def __init__(self, directory):
"""
Initialize file store with directory path.
Parameters:
- directory: str, path to storage directory
"""
def storeAssociation(self, server_url, association):
"""Store association in file system."""
def getAssociation(self, server_url, handle=None):
"""Retrieve association from file system."""
def removeAssociation(self, server_url, handle):
"""Remove association file."""
def useNonce(self, server_url, timestamp, salt):
"""Check and mark nonce as used in file system."""
def cleanupNonces(self):
"""Remove expired nonce files."""
def cleanupAssociations(self):
"""Remove expired association files."""Memory-based storage for testing and development environments.
class MemoryStore(OpenIDStore):
"""In-memory OpenID storage implementation."""
def __init__(self):
"""Initialize memory store with empty collections."""
def storeAssociation(self, server_url, association):
"""Store association in memory."""
def getAssociation(self, server_url, handle=None):
"""Retrieve association from memory."""
def removeAssociation(self, server_url, handle):
"""Remove association from memory."""
def useNonce(self, server_url, timestamp, salt):
"""Check and mark nonce as used in memory."""
def cleanupNonces(self):
"""Remove expired nonces from memory."""
def cleanupAssociations(self):
"""Remove expired associations from memory."""Base class for SQL database storage backends with common database operations.
class SQLStore(OpenIDStore):
"""Base class for SQL database storage implementations."""
def __init__(self, connection, associations_table=None, nonces_table=None):
"""
Initialize SQL store with database connection.
Parameters:
- connection: database connection object
- associations_table: str, associations table name (default: 'oid_associations')
- nonces_table: str, nonces table name (default: 'oid_nonces')
"""
def createTables(self):
"""
Create required database tables if they don't exist.
"""
def storeAssociation(self, server_url, association):
"""Store association in database."""
def getAssociation(self, server_url, handle=None):
"""Retrieve association from database."""
def removeAssociation(self, server_url, handle):
"""Remove association from database."""
def useNonce(self, server_url, timestamp, salt):
"""Check and mark nonce as used in database."""
def cleanupNonces(self):
"""Remove expired nonces from database."""
def cleanupAssociations(self):
"""Remove expired associations from database."""
class MySQLStore(SQLStore):
"""MySQL-specific storage implementation."""
def __init__(self, connection, associations_table=None, nonces_table=None):
"""Initialize MySQL store with MySQL connection."""
class PostgreSQLStore(SQLStore):
"""PostgreSQL-specific storage implementation."""
def __init__(self, connection, associations_table=None, nonces_table=None):
"""Initialize PostgreSQL store with psycopg2 connection."""
class SQLiteStore(SQLStore):
"""SQLite-specific storage implementation."""
def __init__(self, connection, associations_table=None, nonces_table=None):
"""Initialize SQLite store with sqlite3 connection."""Utilities for handling nonces (one-time tokens) with timestamp validation.
def mkNonce():
"""
Generate a new nonce value.
Returns:
str, randomly generated nonce
"""
def split_nonce(nonce_string):
"""
Split nonce string into timestamp and salt components.
Parameters:
- nonce_string: str, nonce in format 'timestamp-salt'
Returns:
tuple, (timestamp, salt) or None if invalid format
"""
def checkTimestamp(nonce_string, allowed_skew=None, now=None):
"""
Check if nonce timestamp is within allowed time window.
Parameters:
- nonce_string: str, nonce string
- allowed_skew: int, allowed time skew in seconds (default: 5 hours)
- now: int, current timestamp (default: current time)
Returns:
bool, True if timestamp is valid
"""from openid.store.filestore import FileOpenIDStore
import os
# Create file store
store_dir = '/tmp/openid_store'
os.makedirs(store_dir, exist_ok=True)
store = FileOpenIDStore(store_dir)
# Use with consumer
from openid.consumer import consumer
openid_consumer = consumer.Consumer({}, store)
# Use with server
from openid.server import server
openid_server = server.Server(store, op_endpoint="https://myop.com/openid")from openid.store.sqlstore import SQLiteStore
import sqlite3
# Create SQLite store
connection = sqlite3.connect('/tmp/openid.db')
store = SQLiteStore(connection)
store.createTables()
# Use with consumer or server
openid_consumer = consumer.Consumer({}, store)from openid.store.sqlstore import MySQLStore
import mysql.connector
# Create MySQL store (requires mysql-connector-python)
connection = mysql.connector.connect(
host='localhost',
user='openid_user',
password='password',
database='openid_db'
)
store = MySQLStore(connection)
store.createTables()from openid.store.sqlstore import PostgreSQLStore
import psycopg2
# Create PostgreSQL store (requires psycopg2)
connection = psycopg2.connect(
host='localhost',
user='openid_user',
password='password',
database='openid_db'
)
store = PostgreSQLStore(connection)
store.createTables()from openid.store.memstore import MemoryStore
# Create memory store (for testing only)
store = MemoryStore()
# Use for unit tests
def test_consumer():
test_consumer = consumer.Consumer({}, store)
# ... test codefrom openid.store.interface import OpenIDStore
from openid.association import Association
class CustomStore(OpenIDStore):
"""Custom storage backend implementation."""
def __init__(self, custom_backend):
self.backend = custom_backend
def storeAssociation(self, server_url, association):
# Store in custom backend
self.backend.set(
f"assoc:{server_url}:{association.handle}",
association.serialize(),
ttl=association.expiresIn
)
def getAssociation(self, server_url, handle=None):
if handle:
key = f"assoc:{server_url}:{handle}"
else:
# Get most recent association
key = self.backend.get_latest_key(f"assoc:{server_url}:*")
data = self.backend.get(key)
if data:
return Association.deserialize(data)
return None
# Implement other required methods...# Regular cleanup of expired data
def cleanup_openid_store(store):
"""Perform regular cleanup of OpenID store."""
nonces_cleaned = store.cleanupNonces()
associations_cleaned = store.cleanupAssociations()
print(f"Cleaned up {nonces_cleaned} nonces and {associations_cleaned} associations")
# Schedule cleanup (example with APScheduler)
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
scheduler.add_job(
cleanup_openid_store,
'interval',
hours=1, # Run hourly
args=[store]
)
scheduler.start()# Default table names
DEFAULT_ASSOCIATIONS_TABLE = 'oid_associations'
DEFAULT_NONCES_TABLE = 'oid_nonces'
# Nonce configuration
NONCE_LENGTH = 8 # Length of random nonce component
SKEW = 5 * 60 * 60 # Default allowed time skew (5 hours)
# SQL table schemas (for reference)
ASSOCIATIONS_TABLE_SCHEMA = """
CREATE TABLE oid_associations (
server_url VARCHAR(2047) NOT NULL,
handle VARCHAR(255) NOT NULL,
secret BLOB NOT NULL,
issued INTEGER NOT NULL,
lifetime INTEGER NOT NULL,
assoc_type VARCHAR(64) NOT NULL,
PRIMARY KEY (server_url, handle)
);
"""
NONCES_TABLE_SCHEMA = """
CREATE TABLE oid_nonces (
server_url VARCHAR(2047) NOT NULL,
timestamp INTEGER NOT NULL,
salt CHAR(40) NOT NULL,
PRIMARY KEY (server_url, timestamp, salt)
);
"""Install with Tessl CLI
npx tessl i tessl/pypi-python3-openid