An asynchronous networking framework written in Python
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pluggable authentication and authorization system with support for various credential types, checkers, and realms for securing network services. Twisted Cred provides a flexible framework for implementing authentication across different protocols.
Core authentication coordination and user domain management.
class portal.Portal:
"""
Authentication coordinator that manages checkers and realms.
Attributes:
- realm: Authentication realm (IRealm)
- checkers: Dict of credential interfaces to checker lists
"""
realm = None
checkers = None
def __init__(self, realm, checkers=None):
"""
Args:
realm: Authentication realm (IRealm)
checkers: Sequence of credential checkers
"""
def registerChecker(self, checker, interface=None):
"""
Register a credentials checker.
Args:
checker: Credentials checker (ICredentialsChecker)
interface: Credential interface this checker handles
"""
def login(self, credentials, mind, *interfaces):
"""
Authenticate user and return avatar.
Args:
credentials: User credentials (ICredentials)
mind: User's perspective object
*interfaces: Requested avatar interfaces
Returns:
Deferred: Fires with (interface, avatar, logout) tuple
"""
class portal.IRealm:
"""
Interface for authentication realms.
Realms define the domain of authenticated users and create
avatars representing users in the system.
"""
def requestAvatar(avatarId, mind, *interfaces):
"""
Create avatar for authenticated user.
Args:
avatarId: Unique user identifier
mind: User's perspective object
*interfaces: Requested avatar interfaces
Returns:
tuple: (interface, avatar, logout_callable)
"""Portal Usage Example:
from twisted.cred import portal, checkers
from twisted.internet import defer
class SimpleRealm:
"""Simple realm that creates user avatars."""
def requestAvatar(self, avatarId, mind, *interfaces):
if IUser in interfaces:
avatar = User(avatarId)
logout = lambda: None # Cleanup function
return (IUser, avatar, logout)
raise NotImplementedError("Interface not supported")
# Create portal with realm and checker
realm = SimpleRealm()
checker = checkers.InMemoryUsernamePasswordDatabaseDontUse()
checker.addUser(b"alice", b"password123")
portal_obj = portal.Portal(realm, [checker])
# Login user
from twisted.cred.credentials import UsernamePassword
credentials = UsernamePassword(b"alice", b"password123")
d = portal_obj.login(credentials, None, IUser)
d.addCallback(lambda result: print(f"Logged in: {result[1]}"))Different types of user credentials for authentication.
class credentials.ICredentials:
"""
Base interface for user credentials.
"""
class credentials.IUsernamePassword:
"""
Interface for username/password credentials.
Attributes:
- username: Username (bytes)
- password: Password (bytes)
"""
username = None
password = None
class credentials.UsernamePassword:
"""
Username and password credentials.
"""
def __init__(self, username, password):
"""
Args:
username (bytes): Username
password (bytes): Password
"""
def checkPassword(self, password):
"""
Check if password matches.
Args:
password (bytes): Password to check
Returns:
bool: True if password matches
"""
class credentials.IUsernameHashedPassword:
"""
Interface for username with hashed password credentials.
Attributes:
- username: Username (bytes)
- hashed: Hashed password (bytes)
"""
username = None
hashed = None
class credentials.UsernameHashedPassword:
"""
Username and hashed password credentials.
"""
def __init__(self, username, hashed):
"""
Args:
username (bytes): Username
hashed (bytes): Hashed password
"""
def checkPassword(self, password):
"""
Check password against hash.
Args:
password (bytes): Plain password to check
Returns:
bool: True if password matches hash
"""
class credentials.IAnonymous:
"""
Interface for anonymous credentials.
"""
class credentials.Anonymous:
"""
Anonymous credentials for unauthenticated access.
"""
class credentials.ISSHPrivateKey:
"""
Interface for SSH private key credentials.
Attributes:
- username: Username (bytes)
- algName: Key algorithm name (bytes)
- keyData: Public key data (bytes)
- signature: Authentication signature (bytes)
"""
username = None
algName = None
keyData = None
signature = None
class credentials.SSHPrivateKey:
"""
SSH private key credentials.
"""
def __init__(self, username, algName, keyData, signature, sigData):
"""
Args:
username (bytes): Username
algName (bytes): Key algorithm
keyData (bytes): Public key data
signature (bytes): Signature
sigData (bytes): Signed data
"""Implementations for verifying different types of credentials.
class checkers.ICredentialsChecker:
"""
Interface for credential verification.
Attributes:
- credentialInterfaces: Supported credential types
"""
credentialInterfaces = None
def requestAvatarId(credentials):
"""
Verify credentials and return avatar ID.
Args:
credentials: User credentials (ICredentials)
Returns:
Deferred: Fires with avatar ID on success
"""
class checkers.InMemoryUsernamePasswordDatabaseDontUse:
"""
In-memory username/password database.
WARNING: For testing only - stores passwords in plain text.
"""
def __init__(self):
"""Initialize empty user database."""
self.users = {}
def addUser(self, username, password):
"""
Add user to database.
Args:
username (bytes): Username
password (bytes): Password
"""
def requestAvatarId(self, credentials):
"""
Check username/password credentials.
Args:
credentials: Username/password credentials
Returns:
Deferred: Avatar ID on success
"""
class checkers.FilePasswordDB:
"""
File-based password database.
"""
def __init__(self, filename, delim=b':', usernameField=0, passwordField=1, caseSensitive=True, hash=None, cache=False):
"""
Args:
filename (str): Password file path
delim (bytes): Field delimiter
usernameField (int): Username field index
passwordField (int): Password field index
caseSensitive (bool): Case-sensitive usernames
hash: Password hash function
cache (bool): Cache file contents
"""
def requestAvatarId(self, credentials):
"""
Check credentials against file database.
Args:
credentials: Username/password credentials
Returns:
Deferred: Avatar ID on success
"""
class checkers.AllowAnonymousAccess:
"""
Checker that allows anonymous access.
"""
credentialInterfaces = (credentials.IAnonymous,)
def requestAvatarId(self, credentials):
"""
Allow anonymous access.
Args:
credentials: Anonymous credentials
Returns:
Deferred: Anonymous avatar ID
"""
class checkers.SSHPublicKeyDatabase:
"""
SSH public key database checker.
"""
def checkKey(self, credentials):
"""
Verify SSH public key credentials.
Args:
credentials: SSH key credentials
Returns:
Deferred: Avatar ID on success
"""
def requestAvatarId(self, credentials):
"""
Check SSH key credentials.
Args:
credentials: SSH private key credentials
Returns:
Deferred: Avatar ID on success
"""Checker Usage Example:
from twisted.cred import checkers, credentials
from twisted.internet import defer
# In-memory checker
memory_checker = checkers.InMemoryUsernamePasswordDatabaseDontUse()
memory_checker.addUser(b"admin", b"secret")
memory_checker.addUser(b"user", b"password")
# File-based checker
file_checker = checkers.FilePasswordDB("/etc/passwd", delim=b":")
# Anonymous access checker
anon_checker = checkers.AllowAnonymousAccess()
# Test authentication
creds = credentials.UsernamePassword(b"admin", b"secret")
d = memory_checker.requestAvatarId(creds)
d.addCallback(lambda avatar_id: print(f"Authenticated: {avatar_id}"))
d.addErrback(lambda failure: print(f"Auth failed: {failure.value}"))Exception types for authentication failures.
class error.Unauthorized:
"""
Base class for authentication failures.
"""
class error.UnauthorizedLogin:
"""
Login attempt with invalid credentials.
"""
class error.UnhandledCredentials:
"""
No suitable checker found for credential type.
"""
class error.LoginFailed:
"""
Generic login failure.
"""
class error.LoginDenied:
"""
Login denied by policy.
"""Utilities for configuring authentication from strings.
def strcred.makeChecker(description):
"""
Create credential checker from string description.
Args:
description (str): Checker specification
Returns:
ICredentialsChecker: Configured checker
"""
# String checker formats:
# "memory:user1:pass1:user2:pass2" - In-memory database
# "file:/path/to/passwd" - File-based database
# "unix" - Unix system accounts
# "anonymous" - Allow anonymous accessString Configuration Example:
from twisted.cred import strcred
# Create checkers from string descriptions
memory_checker = strcred.makeChecker("memory:alice:secret:bob:password")
file_checker = strcred.makeChecker("file:/etc/twisted-passwd")
anon_checker = strcred.makeChecker("anonymous")
# Use in portal
from twisted.cred.portal import Portal
portal_obj = Portal(realm, [memory_checker, file_checker, anon_checker])Integration points for using authentication in network protocols.
# HTTP authentication example
from twisted.web.guard import HTTPAuthSessionWrapper, BasicCredentialFactory, DigestCredentialFactory
# Create credential factories
basic_factory = BasicCredentialFactory("My Realm")
digest_factory = DigestCredentialFactory("md5", "My Realm")
# Wrap resource with authentication
protected_resource = HTTPAuthSessionWrapper(portal_obj, [basic_factory, digest_factory])
# SSH authentication integration
from twisted.conch.ssh import factory
class MySSHFactory(factory.SSHFactory):
def __init__(self, portal):
self.portal = portal
def buildProtocol(self, addr):
protocol = factory.SSHFactory.buildProtocol(self, addr)
protocol.portal = self.portal
return protocolPatterns for implementing custom authentication mechanisms.
# Custom credentials class
class TokenCredentials:
"""Custom token-based credentials."""
def __init__(self, token):
self.token = token
# Custom checker
class TokenChecker:
"""Custom token validation checker."""
credentialInterfaces = (TokenCredentials,)
def __init__(self, token_validator):
self.token_validator = token_validator
def requestAvatarId(self, credentials):
if self.token_validator.validate(credentials.token):
return defer.succeed(credentials.token.user_id)
else:
return defer.fail(error.UnauthorizedLogin("Invalid token"))
# Custom realm
class MyRealm:
"""Custom realm for creating user avatars."""
def requestAvatar(self, avatarId, mind, *interfaces):
if IMyUser in interfaces:
user = MyUser(avatarId)
logout = lambda: user.logout()
return (IMyUser, user, logout)
raise NotImplementedError()Complete Authentication Example:
from twisted.cred import portal, checkers, credentials
from twisted.internet import defer
from twisted.web import guard, resource, server
from twisted.application import internet, service
# Define user interface and avatar
class IUser(Interface):
pass
class User:
def __init__(self, username):
self.username = username
# Create realm
class MyRealm:
def requestAvatar(self, avatarId, mind, *interfaces):
if IUser in interfaces:
user = User(avatarId.decode())
return (IUser, user, lambda: None)
raise NotImplementedError()
# Set up authentication
realm = MyRealm()
checker = checkers.InMemoryUsernamePasswordDatabaseDontUse()
checker.addUser(b"admin", b"password")
portal_obj = portal.Portal(realm, [checker])
# Create protected web resource
protected_resource = resource.Resource()
# Wrap with HTTP authentication
credential_factory = guard.BasicCredentialFactory("Protected Area")
wrapper = guard.HTTPAuthSessionWrapper(portal_obj, [credential_factory])
# Add to service
site = server.Site(wrapper)
web_service = internet.TCPServer(8080, site)
# Create application
application = service.Application("AuthApp")
web_service.setServiceParent(application)Install with Tessl CLI
npx tessl i tessl/pypi-twisted