OpenID support for modern servers and consumers with comprehensive authentication functionality.
—
Complete OpenID server functionality for identity providers. The server handles association requests, authentication requests, and verification with support for multiple session types and response encoding formats.
Initialize an OpenID server with storage backend and endpoint configuration.
class Server:
def __init__(self, store, op_endpoint=None, signatoryClass=Signatory, encoderClass=SigningEncoder, decoderClass=Decoder):
"""
Initialize OpenID server.
Parameters:
- store: OpenIDStore instance for persistent storage
- op_endpoint: str, server endpoint URL
- signatoryClass: class, signatory implementation (default: Signatory)
- encoderClass: class, encoder implementation (default: SigningEncoder)
- decoderClass: class, decoder implementation (default: Decoder)
"""
def decodeRequest(self, query):
"""
Decode incoming OpenID request from query parameters.
Parameters:
- query: dict, query parameters from HTTP request
Returns:
OpenIDRequest subclass or None if invalid
"""
def encodeResponse(self, response):
"""
Encode OpenID response for HTTP transmission.
Parameters:
- response: OpenIDResponse object
Returns:
WebResponse object with HTTP status, headers, and body
"""
def handleRequest(self, request):
"""
Handle decoded OpenID request and generate appropriate response.
Parameters:
- request: OpenIDRequest object
Returns:
OpenIDResponse object
"""
def openid_check_authentication(self, request):
"""
Handle check_authentication request for verifying signatures.
Parameters:
- request: CheckAuthRequest object
Returns:
OpenIDResponse with verification result
"""
def openid_associate(self, request):
"""
Handle associate request for establishing shared secrets.
Parameters:
- request: AssociateRequest object
Returns:
OpenIDResponse with association data or error
"""Handle the base OpenID request types and common functionality.
class OpenIDRequest:
"""Base class for all OpenID requests.
Represents an incoming OpenID request with common attributes and methods.
"""
mode = None # The openid.mode parameter value
def __init__(self):
"""
Initialize base OpenID request.
The mode attribute should be set by subclasses to indicate
the specific OpenID operation being requested.
"""Handle user authentication requests with trust root validation and response generation.
class CheckIDRequest:
"""OpenID authentication request (checkid_setup or checkid_immediate)."""
@classmethod
def fromMessage(cls, message, op_endpoint):
"""
Create CheckIDRequest from OpenID message.
Parameters:
- message: Message object
- op_endpoint: str, server endpoint URL
Returns:
CheckIDRequest object
"""
def answer(self, allow, server_url=None, identity=None, claimed_id=None):
"""
Generate response to authentication request.
Parameters:
- allow: bool, whether to allow authentication
- server_url: str, server URL for response
- identity: str, user's identity URL
- claimed_id: str, user's claimed identifier
Returns:
OpenIDResponse object
"""
def encodeToURL(self, server_url):
"""
Encode request as URL for redirecting user.
Parameters:
- server_url: str, server base URL
Returns:
str, encoded URL
"""
def getCancelURL(self):
"""
Get URL for user cancellation.
Returns:
str, cancel URL
"""
def idSelect(self):
"""
Check if request uses identifier select mode.
Returns:
bool, True if identifier select
"""
def trustRootValid(self):
"""
Validate the trust root against return_to URL.
Returns:
bool, True if trust root is valid
"""
def returnToVerified(self):
"""
Verify the return_to URL is valid and matches trust root.
Returns:
bool, True if return_to is verified
"""
def getClaimedID(self):
"""
Get the claimed identifier from this request.
Returns:
str, the claimed identifier (OpenID 2.0) or identity (OpenID 1.x)
"""
def getTrustRoot(self):
"""
Get the trust root (realm) from this request.
Returns:
str, the trust root URL that identifies the requesting party
"""Handle association establishment for shared secret creation between consumer and server.
class AssociateRequest(OpenIDRequest):
"""OpenID association request.
A request to establish an association between consumer and server.
Attributes:
- mode: "associate"
- assoc_type: str, type of association ("HMAC-SHA1" or "HMAC-SHA256")
- session: ServerSession, session handler for this association type
- session_classes: dict, mapping of session types to handler classes
"""
mode = "associate"
session_classes = {
'no-encryption': 'PlainTextServerSession',
'DH-SHA1': 'DiffieHellmanSHA1ServerSession',
'DH-SHA256': 'DiffieHellmanSHA256ServerSession',
}
def __init__(self, session, assoc_type):
"""
Construct AssociateRequest.
Parameters:
- session: ServerSession object for handling this association type
- assoc_type: str, association type ("HMAC-SHA1" or "HMAC-SHA256")
"""
@classmethod
def fromMessage(cls, message, op_endpoint=None):
"""
Create AssociateRequest from OpenID message.
Parameters:
- message: Message object
- op_endpoint: str, server endpoint URL
Returns:
AssociateRequest object
"""
def answer(self, assoc):
"""
Generate successful association response.
Parameters:
- assoc: Association object with shared secret
Returns:
OpenIDResponse with association data
"""
def answerUnsupported(self, message, preferred_association_type=None, preferred_session_type=None):
"""
Generate unsupported association type response.
Parameters:
- message: str, error message
- preferred_association_type: str, preferred association type
- preferred_session_type: str, preferred session type
Returns:
OpenIDResponse with error and preferences
"""Handle signature verification requests from consumers.
class CheckAuthRequest(OpenIDRequest):
"""OpenID check_authentication request.
A request to verify the validity of a previous response signature.
Attributes:
- mode: "check_authentication"
- assoc_handle: str, association handle the response was signed with
- signed: Message, the message with signature to verify
- invalidate_handle: str, optional association handle to check validity
"""
mode = "check_authentication"
required_fields = ["identity", "return_to", "response_nonce"]
def __init__(self, assoc_handle, signed, invalidate_handle=None):
"""
Construct CheckAuthRequest.
Parameters:
- assoc_handle: str, association handle for signature verification
- signed: Message, signed message to verify
- invalidate_handle: str, optional handle to invalidate
"""
@classmethod
def fromMessage(cls, message, op_endpoint=None):
"""
Create CheckAuthRequest from OpenID message.
Parameters:
- message: Message object
- op_endpoint: str, server endpoint URL
Returns:
CheckAuthRequest object
"""
def answer(self, signatory):
"""
Generate signature verification response.
Parameters:
- signatory: Signatory object for verification
Returns:
OpenIDResponse with verification result
"""Generate and encode OpenID responses with proper formatting and signing.
class OpenIDResponse:
"""Base class for OpenID responses.
Represents a response to an OpenID request with fields and encoding methods.
Attributes:
- request: OpenIDRequest, the original request being responded to
- fields: Message, response fields and parameters
"""
def __init__(self, request):
"""
Initialize OpenID response.
Parameters:
- request: OpenIDRequest, the request this response answers
"""
def toFormMarkup(self, form_tag_attrs=None):
"""
Generate HTML form markup for response.
Parameters:
- form_tag_attrs: dict, additional form tag attributes
Returns:
str, HTML form markup
"""
def toHTML(self, form_tag_attrs=None):
"""
Generate complete HTML page with auto-submitting form.
Parameters:
- form_tag_attrs: dict, additional form tag attributes
Returns:
str, complete HTML page
"""
def renderAsForm(self):
"""
Check if response should be rendered as HTML form.
Returns:
bool, True if form rendering required
"""
def needsSigning(self):
"""
Check if response needs to be signed.
Returns:
bool, True if signing required
"""
def whichEncoding(self):
"""
Determine appropriate encoding method for response.
Returns:
tuple, encoding method identifier
"""
def encodeToURL(self):
"""
Encode response as URL query parameters.
Returns:
str, URL with encoded response
"""
def addExtension(self, extension_response):
"""
Add extension data to response.
Parameters:
- extension_response: Extension response object
"""
def encodeToKVForm(self):
"""
Encode response as key-value form.
Returns:
str, key-value form data
"""
class WebResponse:
"""HTTP response wrapper for OpenID server responses.
Encapsulates HTTP response data including status code, headers, and body
for transmission back to the client.
Attributes:
- code: int, HTTP status code (default: 200)
- headers: dict, HTTP response headers (default: {})
- body: str/bytes, response body content
"""
def __init__(self, code=200, headers=None, body=""):
"""
Initialize HTTP response.
Parameters:
- code: int, HTTP status code
- headers: dict, HTTP headers
- body: str, response body
"""Handle message signing and verification with association management.
class Signatory:
"""Message signing and verification component.
Handles message signing with associations and signature verification.
Manages association lifecycle including creation and invalidation.
Attributes:
- store: OpenIDStore, storage backend for associations and nonces
"""
def __init__(self, store):
"""
Initialize signatory with storage backend.
Parameters:
- store: OpenIDStore instance
"""
def verify(self, assoc_handle, message):
"""
Verify message signature using association.
Parameters:
- assoc_handle: str, association handle
- message: Message object to verify
Returns:
bool, True if signature valid
"""
def sign(self, response):
"""
Sign response message.
Parameters:
- response: OpenIDResponse object to sign
Returns:
Association object used for signing
"""
def createAssociation(self, dumb=True, assoc_type='HMAC-SHA1'):
"""
Create new association.
Parameters:
- dumb: bool, whether association is for dumb mode
- assoc_type: str, association type ('HMAC-SHA1' or 'HMAC-SHA256')
Returns:
Association object
"""
def getAssociation(self, assoc_handle, dumb, checkExpiration=True):
"""
Retrieve association by handle.
Parameters:
- assoc_handle: str, association handle
- dumb: bool, whether association is for dumb mode
- checkExpiration: bool, whether to check expiration
Returns:
Association object or None
"""
def invalidate(self, assoc_handle, dumb):
"""
Invalidate association.
Parameters:
- assoc_handle: str, association handle to invalidate
- dumb: bool, whether association is for dumb mode
"""Handle message encoding for responses and decoding of incoming requests.
class Encoder:
"""Base encoder for OpenID responses to WebResponse format.
Encodes OpenIDResponse objects into WebResponse objects for HTTP transmission.
Attributes:
- responseFactory: class, WebResponse factory for creating responses (default: WebResponse)
"""
responseFactory = 'WebResponse'
def encode(self, response):
"""
Encode OpenIDResponse to WebResponse.
Parameters:
- response: OpenIDResponse, response object to encode
Returns:
WebResponse object with appropriate encoding (URL redirect, HTML form, or key-value form)
Raises:
EncodingError when response cannot be encoded as protocol message
"""
class SigningEncoder(Encoder):
"""Encoder that signs responses before encoding.
Extends Encoder to automatically sign responses that require signing
before converting to WebResponse format.
Attributes:
- signatory: Signatory, component used for signing responses
"""
def __init__(self, signatory):
"""
Create SigningEncoder with signatory.
Parameters:
- signatory: Signatory, component for message signing
"""
def encode(self, response):
"""
Encode OpenIDResponse to WebResponse, signing first if needed.
Parameters:
- response: OpenIDResponse, response object to encode and potentially sign
Returns:
WebResponse object with signed response data
Raises:
EncodingError when response cannot be encoded
AlreadySigned when response is already signed
"""
class Decoder:
"""Decoder for incoming OpenID requests from query parameters.
Transforms HTTP query parameters into appropriate OpenIDRequest objects
based on the openid.mode parameter.
Attributes:
- server: Server, the server instance this decoder works for
- _handlers: dict, mapping of modes to request handler classes
"""
_handlers = {
'checkid_setup': 'CheckIDRequest.fromMessage',
'checkid_immediate': 'CheckIDRequest.fromMessage',
'check_authentication': 'CheckAuthRequest.fromMessage',
'associate': 'AssociateRequest.fromMessage',
}
def __init__(self, server):
"""
Construct Decoder.
Parameters:
- server: Server, the server instance for request processing
"""
def decode(self, query):
"""
Transform query parameters into OpenIDRequest.
Parameters:
- query: dict, HTTP query parameters with each key mapping to one value
Returns:
OpenIDRequest subclass instance or None if not an OpenID request
Raises:
ProtocolError when query appears to be OpenID request but is invalid
"""
def defaultDecoder(self, message, server):
"""
Called when no handler found for the request mode.
Parameters:
- message: Message, the decoded message object
- server: Server, the server instance
Returns:
None (default behavior for unknown modes)
"""Handle different session types for association establishment.
class PlainTextServerSession:
"""Plain text session type (no encryption).
Handles association requests where no encryption is used for the shared secret.
The secret is transmitted in base64 encoded form.
Attributes:
- session_type: "no-encryption"
- allowed_assoc_types: list, supported association types ["HMAC-SHA1", "HMAC-SHA256"]
"""
session_type = 'no-encryption'
allowed_assoc_types = ['HMAC-SHA1', 'HMAC-SHA256']
@classmethod
def fromMessage(cls, unused_request):
"""
Create session from association request.
Parameters:
- unused_request: AssociateRequest object (not used for plain text)
Returns:
PlainTextServerSession object
"""
def answer(self, secret):
"""
Generate session response with shared secret.
Parameters:
- secret: bytes, shared secret
Returns:
dict, session response data
"""
class DiffieHellmanSHA1ServerSession:
"""Diffie-Hellman SHA1 session type.
Handles association requests using Diffie-Hellman key exchange with SHA1 hashing.
Encrypts the shared secret using the consumer's public key.
Attributes:
- session_type: "DH-SHA1"
- hash_func: function, SHA1 hash function for key derivation
- allowed_assoc_types: list, supported association types ["HMAC-SHA1"]
- dh: DiffieHellman, DH algorithm values for this request
- consumer_pubkey: long, consumer's public key from the request
"""
session_type = 'DH-SHA1'
hash_func = 'sha1'
allowed_assoc_types = ['HMAC-SHA1']
def __init__(self, dh, consumer_pubkey):
"""
Initialize DH SHA1 session.
Parameters:
- dh: DiffieHellman, DH algorithm parameters
- consumer_pubkey: long, consumer's public key
"""
@classmethod
def fromMessage(cls, message):
"""
Create DH session from association request.
Parameters:
- message: Message object with DH parameters
Returns:
DiffieHellmanSHA1ServerSession object
"""
def answer(self, secret):
"""
Generate DH session response with encrypted secret.
Parameters:
- secret: bytes, shared secret
Returns:
dict, DH session response data
"""
class DiffieHellmanSHA256ServerSession(DiffieHellmanSHA1ServerSession):
"""Diffie-Hellman SHA256 session type.
Extends DiffieHellmanSHA1ServerSession with SHA256 hashing instead of SHA1.
Provides stronger cryptographic hashing for the key derivation process.
Attributes:
- session_type: "DH-SHA256"
- hash_func: function, SHA256 hash function for key derivation
- allowed_assoc_types: list, supported association types ["HMAC-SHA256"]
"""
session_type = 'DH-SHA256'
hash_func = 'sha256'
allowed_assoc_types = ['HMAC-SHA256']
@classmethod
def fromMessage(cls, message):
"""
Create DH SHA256 session from association request.
Parameters:
- message: Message object with DH parameters
Returns:
DiffieHellmanSHA256ServerSession object
"""
def answer(self, secret):
"""
Generate DH SHA256 session response.
Parameters:
- secret: bytes, shared secret
Returns:
dict, DH session response data
"""from openid.server import server
from openid.store.filestore import FileOpenIDStore
# Initialize server
store = FileOpenIDStore('/tmp/openid_store')
openid_server = server.Server(store, op_endpoint="https://myop.com/openid")
# Handle incoming request
def handle_openid_request(request):
query = dict(request.GET.items())
openid_request = openid_server.decodeRequest(query)
if openid_request is None:
return HttpResponse("Invalid OpenID request", status=400)
if isinstance(openid_request, server.CheckIDRequest):
return handle_checkid_request(openid_request)
elif isinstance(openid_request, server.AssociateRequest):
return handle_associate_request(openid_request)
elif isinstance(openid_request, server.CheckAuthRequest):
return handle_checkauth_request(openid_request)def handle_checkid_request(openid_request):
# Validate trust root and return_to
if not openid_request.trustRootValid():
return HttpResponse("Invalid trust root", status=400)
if not openid_request.returnToVerified():
return HttpResponse("Invalid return_to URL", status=400)
# Check if user is authenticated
if user_is_authenticated(request):
# User is authenticated, generate positive response
openid_response = openid_request.answer(
allow=True,
server_url="https://myop.com/openid",
identity=get_user_identity_url(),
claimed_id=get_user_claimed_id()
)
# Add extension data if requested
sreg_request = sreg.SRegRequest.fromOpenIDRequest(openid_request)
if sreg_request:
sreg_response = sreg.SRegResponse.extractResponse(
sreg_request,
get_user_profile_data()
)
openid_response.addExtension(sreg_response)
else:
# User not authenticated, redirect to login
if openid_request.immediate:
# Immediate mode - cannot show login page
openid_response = openid_request.answer(allow=False)
else:
# Setup mode - redirect to login page
login_url = build_login_url(openid_request)
return HttpResponseRedirect(login_url)
# Encode and return response
web_response = openid_server.encodeResponse(openid_response)
return HttpResponse(
web_response.body,
status=web_response.code,
content_type='text/html'
)def handle_associate_request(openid_request):
openid_response = openid_server.openid_associate(openid_request)
web_response = openid_server.encodeResponse(openid_response)
return HttpResponse(
web_response.body,
status=web_response.code,
content_type='text/plain'
)# HTTP Status Codes
HTTP_OK = 200
HTTP_REDIRECT = 302
HTTP_ERROR = 400
# Request Modes
BROWSER_REQUEST_MODES = ['checkid_setup', 'checkid_immediate']
# Encoding Types
ENCODE_KVFORM = ('kvform',)
ENCODE_URL = ('URL/redirect',)
ENCODE_HTML_FORM = ('HTML form',)
# Exception Types
class ProtocolError(Exception):
"""OpenID protocol violation.
Raised when an OpenID protocol error occurs during request processing.
Attributes:
- message: Message, the OpenID message that caused the error
- text: str, human-readable error description
- reference: str, optional reference to specification
- contact: str, optional contact information for debugging
"""
def __init__(self, message, text=None, reference=None, contact=None):
"""
Initialize protocol error.
Parameters:
- message: Message, OpenID message causing the error
- text: str, error description
- reference: str, specification reference
- contact: str, contact information
"""
class VersionError(Exception):
"""Unsupported OpenID version.
Raised when an operation is attempted that is not compatible with
the protocol version being used.
"""
class NoReturnToError(Exception):
"""Missing return_to parameter.
Raised when a response cannot be generated because the request
contains no return_to URL for redirecting the user back.
"""
class EncodingError(Exception):
"""Response encoding error.
Raised when a response cannot be encoded as a protocol message
and should probably be rendered and shown to the user.
Attributes:
- response: OpenIDResponse, the response that failed to encode
- explanation: str, optional detailed explanation of the error
"""
def __init__(self, response, explanation=None):
"""
Initialize encoding error.
Parameters:
- response: OpenIDResponse, response that failed to encode
- explanation: str, detailed error explanation
"""
class AlreadySigned(EncodingError):
"""Response already signed.
Raised when attempting to sign a response that has already been signed.
Inherits from EncodingError.
"""
class UntrustedReturnURL(ProtocolError):
"""return_to URL not trusted.
Raised when a return_to URL is outside the declared trust_root.
Inherits from ProtocolError.
Attributes:
- return_to: str, the untrusted return_to URL
- trust_root: str, the declared trust root
"""
def __init__(self, message, return_to, trust_root):
"""
Initialize untrusted return URL error.
Parameters:
- message: Message, the OpenID message
- return_to: str, the untrusted return_to URL
- trust_root: str, the declared trust root
"""
class MalformedReturnURL(ProtocolError):
"""Malformed return_to URL.
Raised when the return_to URL doesn't look like a valid URL.
Inherits from ProtocolError.
Attributes:
- return_to: str, the malformed return_to URL
"""
def __init__(self, openid_message, return_to):
"""
Initialize malformed return URL error.
Parameters:
- openid_message: Message, the OpenID message
- return_to: str, the malformed return_to URL
"""
class MalformedTrustRoot(ProtocolError):
"""Malformed trust root.
Raised when the trust root is not well-formed according to
the OpenID specification trust_root format.
Inherits from ProtocolError.
"""Install with Tessl CLI
npx tessl i tessl/pypi-python3-openid