An AMQP 1.0 client library for Python
Comprehensive authentication support including SASL mechanisms, token-based authentication, and Claims-Based Security (CBS) for Azure services and other AMQP brokers.
Base authentication class that provides common functionality for all authentication mechanisms.
class AMQPAuth:
def __init__(self, hostname, port=None, verify=None, http_proxy=None,
transport_type=TransportType.Amqp, encoding='UTF-8'):
"""
Base AMQP authentication mixin class.
Parameters:
- hostname (str): AMQP broker hostname
- port (int): AMQP broker port (default: None, auto-selected based on transport)
- verify (str): Path to CA certificate file for TLS verification
- http_proxy (dict): HTTP proxy configuration
- transport_type (TransportType): Transport protocol (Amqp, AmqpOverWebsocket)
- encoding (str): Character encoding
"""Key Methods:
def set_io(self, hostname, port, http_proxy, transport_type):
"""Set the IO handler for the connection."""
def set_wsio(self, hostname, port, http_proxy):
"""Set WebSocket IO handler."""
def set_tlsio(self, hostname, port):
"""Set TLS IO handler."""
def close(self):
"""Close authentication session."""Anonymous authentication for brokers that allow unauthenticated access.
class SASLAnonymous(AMQPAuth):
def __init__(self, hostname, port=5671, verify=None, http_proxy=None,
transport_type=None, encoding='UTF-8'):
"""
Anonymous SASL authentication.
Parameters:
- hostname (str): AMQP broker hostname
- port (int): AMQP broker port
- verify (str): Path to CA certificate file
- http_proxy (dict): HTTP proxy configuration
- transport_type (TransportType): Transport protocol
- encoding (str): Character encoding
"""Usage Example:
from uamqp.authentication import SASLAnonymous
# Anonymous authentication (for development/testing)
auth = SASLAnonymous("localhost", port=5672) # Non-TLS port for local testingUsername and password authentication using SASL PLAIN mechanism.
class SASLPlain(AMQPAuth):
def __init__(self, hostname, username, password, port=5671, verify=None,
http_proxy=None, transport_type=None, encoding='UTF-8'):
"""
Username/password SASL authentication.
Parameters:
- hostname (str): AMQP broker hostname
- username (str): Authentication username
- password (str): Authentication password
- port (int): AMQP broker port
- verify (str): Path to CA certificate file
- http_proxy (dict): HTTP proxy configuration
- transport_type (TransportType): Transport protocol
- encoding (str): Character encoding
"""Usage Example:
from uamqp.authentication import SASLPlain
# Username/password authentication
auth = SASLPlain(
hostname="amqp.example.com",
username="myuser",
password="mypassword",
port=5671 # AMQPS port
)
# With TLS certificate verification
auth = SASLPlain(
hostname="amqp.example.com",
username="myuser",
password="mypassword",
verify="/path/to/ca-cert.pem"
)Configuration for token refresh retry behavior.
class TokenRetryPolicy:
def __init__(self, retries=3, backoff=0):
"""
Token authentication retry policy.
Parameters:
- retries (int): Number of retry attempts (default: 3)
- backoff (float): Delay between retries in milliseconds (default: 0)
"""Base class for Claims-Based Security authentication used by Azure services.
class CBSAuthMixin(AMQPAuth):
def __init__(self, hostname, port=5671, verify=None, http_proxy=None,
transport_type=None, encoding='UTF-8', token_retry_policy=None):
"""
Claims-Based Security authentication mixin.
Parameters:
- hostname (str): AMQP broker hostname
- port (int): AMQP broker port
- verify (str): Path to CA certificate file
- http_proxy (dict): HTTP proxy configuration
- transport_type (TransportType): Transport protocol
- encoding (str): Character encoding
- token_retry_policy (TokenRetryPolicy): Token retry configuration
"""
def update_token(self):
"""Update the authentication token."""
def create_authenticator(self, connection, debug=False, **kwargs):
"""Create CBS authenticator for connection."""
def close_authenticator(self):
"""Close the CBS authenticator."""
def handle_token(self):
"""
Handle token authentication.
Returns:
tuple[bool, bool]: (success, should_retry)
"""Shared Access Signature token authentication for Azure services.
class SASTokenAuth(AMQPAuth, CBSAuthMixin):
def __init__(self, audience, uri, token, expires_in=None, expires_at=None,
username=None, password=None, port=None, timeout=10,
retry_policy=TokenRetryPolicy(), verify=None,
token_type=b"servicebus.windows.net:sastoken", http_proxy=None,
transport_type=TransportType.Amqp, encoding='UTF-8', **kwargs):
"""
Shared Access Signature (SAS) token authentication.
Parameters:
- audience (str): Token audience
- uri (str): Resource URI
- token (str): SAS token string
- expires_in (int): Token lifetime in seconds
- expires_at (datetime): Token expiration time
- username (str): Optional username
- password (str): Optional password
- port (int): Service port
- timeout (int): Authentication timeout in seconds
- retry_policy (TokenRetryPolicy): Token retry configuration
- verify (str): Path to CA certificate file
- token_type (bytes): Token type identifier
- http_proxy (dict): HTTP proxy configuration
- transport_type (TransportType): Transport protocol
- encoding (str): Character encoding
"""
def update_token(self):
"""Update the SAS token."""
@classmethod
def from_shared_access_key(cls, uri, key_name, shared_access_key, expiry=None, **kwargs):
"""
Create SASTokenAuth from shared access key components.
Parameters:
- uri (str): Resource URI
- key_name (str): Shared access key name
- shared_access_key (str): Shared access key value
- expiry (datetime): Token expiration time
Returns:
SASTokenAuth: Configured authentication instance
"""Usage Examples:
from uamqp.authentication import SASTokenAuth
import datetime
# SAS token for Azure Service Bus
sas_token = "SharedAccessSignature sr=https%3A%2F%2Fmynamespace.servicebus.windows.net%2F&sig=..."
auth = SASTokenAuth(
hostname="mynamespace.servicebus.windows.net",
token=sas_token
)
# SAS token with explicit expiration
auth = SASTokenAuth(
hostname="mynamespace.servicebus.windows.net",
token=sas_token,
expires_at=datetime.datetime.utcnow() + datetime.timedelta(hours=1)
)JSON Web Token authentication for services that support JWT tokens.
class JWTTokenAuth(AMQPAuth, CBSAuthMixin):
def __init__(self, audience, uri, get_token,
expires_in=datetime.timedelta(seconds=3600), expires_at=None,
port=None, timeout=10, retry_policy=TokenRetryPolicy(),
verify=None, token_type=b"jwt", http_proxy=None,
transport_type=TransportType.Amqp, encoding='UTF-8', **kwargs):
"""
JSON Web Token (JWT) authentication.
Parameters:
- audience (str): Token audience claim
- uri (str): Resource URI
- get_token (callable): Function/coroutine to get JWT token
- expires_in (timedelta): Token lifetime duration
- expires_at (datetime): Token expiration time
- port (int): Service port
- timeout (int): Authentication timeout in seconds
- retry_policy (TokenRetryPolicy): Token retry configuration
- verify (str): Path to CA certificate file
- token_type (bytes): Token type identifier
- http_proxy (dict): HTTP proxy configuration
- transport_type (TransportType): Transport protocol
- encoding (str): Character encoding
"""
def create_authenticator(self, connection, debug=False, **kwargs):
"""Create authenticator for JWT token authentication."""
def update_token(self):
"""Update the JWT token."""Usage Example:
from uamqp.authentication import JWTTokenAuth
import datetime
# JWT token authentication
jwt_token = "eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9..."
auth = JWTTokenAuth(
hostname="service.example.com",
token=jwt_token,
audience="https://service.example.com",
expires_at=datetime.datetime.utcnow() + datetime.timedelta(hours=1)
)Asynchronous versions of authentication classes for use with async AMQP operations.
class CBSAsyncAuthMixin(CBSAuthMixin):
"""Async Claims-Based Security authentication mixin."""
@property
def loop(self):
"""Event loop for async operations."""
async def create_authenticator_async(self, connection, debug=False, loop=None, **kwargs):
"""Create async CBS authenticator for connection."""
async def close_authenticator_async(self):
"""Close the async CBS authenticator."""
async def handle_token_async(self):
"""
Handle async token authentication.
Returns:
tuple[bool, bool]: (success, should_retry)
"""class SASTokenAsync(CBSAsyncAuthMixin):
def __init__(self, audience, uri, token, expires_in=None, expires_at=None,
username=None, password=None, port=None, timeout=10,
retry_policy=TokenRetryPolicy(), verify=None,
token_type=b"servicebus.windows.net:sastoken", http_proxy=None,
transport_type=TransportType.Amqp, encoding='UTF-8', **kwargs):
"""Async SAS token authentication."""
async def update_token(self):
"""Update the SAS token asynchronously."""class JWTTokenAsync(CBSAsyncAuthMixin):
def __init__(self, audience, uri, get_token,
expires_in=datetime.timedelta(seconds=3600), expires_at=None,
port=None, timeout=10, retry_policy=TokenRetryPolicy(),
verify=None, token_type=b"jwt", http_proxy=None,
transport_type=TransportType.Amqp, encoding='UTF-8', **kwargs):
"""Async JWT token authentication."""
async def create_authenticator_async(self, connection, debug=False, loop=None, **kwargs):
"""Create async authenticator for JWT token authentication."""
async def update_token(self):
"""Update the JWT token asynchronously."""Usage Example:
from uamqp.authentication import SASTokenAsync
from uamqp.async_ops import SendClientAsync
async def send_message_async():
auth = SASTokenAsync(
hostname="mynamespace.servicebus.windows.net",
token=sas_token
)
async with SendClientAsync(target, auth=auth) as client:
await client.send_all_messages_async()All authentication classes support HTTP proxy configuration for environments that require proxy access:
proxy_config = {
'proxy_hostname': 'proxy.company.com',
'proxy_port': 8080,
'username': 'proxy_user', # Optional
'password': 'proxy_pass' # Optional
}
auth = SASLPlain(
hostname="amqp.example.com",
username="user",
password="pass",
http_proxy=proxy_config
)Authentication supports different transport protocols:
from uamqp.constants import TransportType
# Standard AMQP over TCP
auth = SASLPlain(hostname="host", username="user", password="pass",
transport_type=TransportType.Amqp)
# AMQP over WebSocket (for firewalls that block non-HTTP traffic)
auth = SASLPlain(hostname="host", username="user", password="pass",
transport_type=TransportType.AmqpOverWebsocket,
port=443)Authentication operations may raise these exceptions:
Install with Tessl CLI
npx tessl i tessl/pypi-uamqp