Python bindings for ZeroMQ (ØMQ), a lightweight and fast messaging library
—
ZAP (ZMQ Authentication Protocol) implementation with support for NULL, PLAIN, and CURVE security mechanisms, including certificate generation and management.
The base Authenticator class and its threading/asyncio variants provide ZAP authentication services.
class Authenticator:
def __init__(self, context: Context = None, encoding: str = 'utf-8') -> None:
"""
Create a new Authenticator.
Parameters:
- context: ZMQ context (creates new if None)
- encoding: String encoding for credentials
"""
def start(self) -> None:
"""Start the authenticator in a background thread."""
def stop(self) -> None:
"""Stop the authenticator and clean up resources."""
def allow(self, *addresses: str) -> None:
"""
Allow connections from specified IP addresses.
Parameters:
- addresses: IP addresses or subnets to allow
"""
def deny(self, *addresses: str) -> None:
"""
Deny connections from specified IP addresses.
Parameters:
- addresses: IP addresses or subnets to deny
"""
def configure_plain(self, domain: str = '*', passwords: dict = None) -> None:
"""
Configure PLAIN authentication.
Parameters:
- domain: Authentication domain ('*' for all)
- passwords: Dict of username -> password mappings
"""
def configure_curve(self, domain: str = '*', location: str = '') -> None:
"""
Configure CURVE authentication.
Parameters:
- domain: Authentication domain ('*' for all)
- location: Directory containing authorized public keys
"""
def configure_curve_callback(self, domain: str = '*', callback: callable = None) -> None:
"""
Configure CURVE authentication with callback.
Parameters:
- domain: Authentication domain
- callback: Function to validate public keys
"""Thread-based authenticator for synchronous applications.
class ThreadAuthenticator(Authenticator):
def __init__(self, context: Context = None, encoding: str = 'utf-8', log: Any = None) -> None:
"""
Create a thread-based authenticator.
Parameters:
- context: ZMQ context
- encoding: String encoding
- log: Logger instance
"""
def __enter__(self) -> ThreadAuthenticator:
"""Context manager entry."""
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
"""Context manager exit with cleanup."""Async-compatible authenticator for asyncio applications.
class AsyncioAuthenticator(Authenticator):
def __init__(self, context: Context = None, encoding: str = 'utf-8', log: Any = None) -> None:
"""
Create an asyncio-compatible authenticator.
Parameters:
- context: ZMQ context
- encoding: String encoding
- log: Logger instance
"""
async def start(self) -> None:
"""Start the authenticator asynchronously."""
async def stop(self) -> None:
"""Stop the authenticator asynchronously."""
async def __aenter__(self) -> AsyncioAuthenticator:
"""Async context manager entry."""
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
"""Async context manager exit with cleanup."""Functions for generating and managing CURVE certificates.
def create_certificates(keys_dir: str, name: str) -> tuple[str, str]:
"""
Create a new certificate (keypair).
Parameters:
- keys_dir: Directory to store certificate files
- name: Certificate name (used for filenames)
Returns:
- tuple: (public_key_file, secret_key_file) paths
"""
def load_certificate(filename: str) -> tuple[bytes, bytes]:
"""
Load a certificate from file.
Parameters:
- filename: Certificate file path
Returns:
- tuple: (public_key, secret_key) as bytes
"""
def save_certificate(filename: str, public_key: bytes, secret_key: bytes) -> None:
"""
Save a certificate to file.
Parameters:
- filename: Target file path
- public_key: Public key bytes
- secret_key: Secret key bytes
"""
def curve_keypair() -> tuple[bytes, bytes]:
"""
Generate a new CURVE keypair.
Returns:
- tuple: (public_key, secret_key) as bytes
"""
def curve_public(secret_key: bytes) -> bytes:
"""
Derive public key from secret key.
Parameters:
- secret_key: Secret key bytes
Returns:
- bytes: Corresponding public key
"""import zmq
from zmq.auth import Authenticator
# Server with PLAIN authentication
context = zmq.Context()
# Create and start authenticator
auth = Authenticator(context)
auth.start()
# Configure PLAIN authentication
passwords = {'admin': 'secret', 'user': 'password'}
auth.configure_plain(domain='*', passwords=passwords)
# Create server socket
server = context.socket(zmq.REP)
server.set(zmq.PLAIN_SERVER, 1) # Enable PLAIN server
server.bind("tcp://*:5555")
try:
while True:
message = server.recv_string()
print(f"Authenticated message: {message}")
server.send_string(f"Echo: {message}")
except KeyboardInterrupt:
pass
finally:
server.close()
auth.stop()
context.term()import zmq
# Client with PLAIN authentication
context = zmq.Context()
client = context.socket(zmq.REQ)
client.set(zmq.PLAIN_USERNAME, b'admin')
client.set(zmq.PLAIN_PASSWORD, b'secret')
client.connect("tcp://localhost:5555")
try:
client.send_string("Hello authenticated server")
reply = client.recv_string()
print(f"Server reply: {reply}")
finally:
client.close()
context.term()import zmq
from zmq.auth import Authenticator, create_certificates
import os
# Create certificates directory
keys_dir = "certificates"
os.makedirs(keys_dir, exist_ok=True)
# Generate server certificate
server_public_file, server_secret_file = create_certificates(keys_dir, "server")
client_public_file, client_secret_file = create_certificates(keys_dir, "client")
# Server with CURVE authentication
context = zmq.Context()
# Load server keys
with open(server_secret_file, 'rb') as f:
server_secret = f.read()
with open(server_public_file, 'rb') as f:
server_public = f.read()
# Create and configure authenticator
auth = Authenticator(context)
auth.start()
auth.configure_curve(domain='*', location=keys_dir)
# Create server socket
server = context.socket(zmq.REP)
server.set(zmq.CURVE_SERVER, 1)
server.set(zmq.CURVE_SECRETKEY, server_secret)
server.bind("tcp://*:5555")
try:
while True:
message = server.recv_string()
print(f"Encrypted message: {message}")
server.send_string(f"Encrypted echo: {message}")
except KeyboardInterrupt:
pass
finally:
server.close()
auth.stop()
context.term()import zmq
# Client with CURVE authentication
context = zmq.Context()
# Load client keys
with open(client_secret_file, 'rb') as f:
client_secret = f.read()
with open(client_public_file, 'rb') as f:
client_public = f.read()
with open(server_public_file, 'rb') as f:
server_public = f.read()
client = context.socket(zmq.REQ)
client.set(zmq.CURVE_SECRETKEY, client_secret)
client.set(zmq.CURVE_PUBLICKEY, client_public)
client.set(zmq.CURVE_SERVERKEY, server_public)
client.connect("tcp://localhost:5555")
try:
client.send_string("Hello encrypted server")
reply = client.recv_string()
print(f"Encrypted reply: {reply}")
finally:
client.close()
context.term()import zmq
from zmq.auth import ThreadAuthenticator
context = zmq.Context()
# Use authenticator as context manager
with ThreadAuthenticator(context) as auth:
# Configure authentication
passwords = {'client': 'secret'}
auth.configure_plain(domain='*', passwords=passwords)
# Create server
server = context.socket(zmq.REP)
server.set(zmq.PLAIN_SERVER, 1)
server.bind("tcp://*:5555")
try:
message = server.recv_string()
server.send_string(f"Authenticated: {message}")
finally:
server.close()
# Authenticator automatically stopped when leaving context
context.term()import asyncio
import zmq.asyncio
from zmq.auth.asyncio import AsyncioAuthenticator
async def authenticated_server():
context = zmq.asyncio.Context()
async with AsyncioAuthenticator(context) as auth:
# Configure authentication
passwords = {'async_client': 'async_secret'}
auth.configure_plain(domain='*', passwords=passwords)
# Create server socket
server = context.socket(zmq.REP)
server.set(zmq.PLAIN_SERVER, 1)
server.bind("tcp://*:5555")
try:
while True:
message = await server.recv_string()
print(f"Async authenticated: {message}")
await server.send_string(f"Async echo: {message}")
except KeyboardInterrupt:
pass
finally:
server.close()
context.term()
# Run async server
asyncio.run(authenticated_server())import zmq
from zmq.auth import ThreadAuthenticator
context = zmq.Context()
with ThreadAuthenticator(context) as auth:
# Allow specific IP addresses
auth.allow('127.0.0.1', '192.168.1.0/24')
# Deny specific IP addresses
auth.deny('192.168.1.100')
# Configure PLAIN authentication
passwords = {'user': 'pass'}
auth.configure_plain(domain='*', passwords=passwords)
server = context.socket(zmq.REP)
server.set(zmq.PLAIN_SERVER, 1)
server.bind("tcp://*:5555")
try:
while True:
message = server.recv_string()
server.send_string(f"Filtered and authenticated: {message}")
except KeyboardInterrupt:
pass
finally:
server.close()
context.term()import zmq
from zmq.auth import ThreadAuthenticator
def validate_key(public_key):
"""Custom public key validation callback"""
# Implement custom validation logic
allowed_keys = load_allowed_keys() # Your implementation
return public_key in allowed_keys
context = zmq.Context()
with ThreadAuthenticator(context) as auth:
# Configure CURVE with callback
auth.configure_curve_callback(domain='*', callback=validate_key)
server = context.socket(zmq.REP)
server.set(zmq.CURVE_SERVER, 1)
server.set(zmq.CURVE_SECRETKEY, server_secret_key)
server.bind("tcp://*:5555")
try:
while True:
message = server.recv_string()
server.send_string(f"Callback authenticated: {message}")
except KeyboardInterrupt:
pass
finally:
server.close()
context.term()import zmq
from zmq.auth.certs import create_certificates, load_certificate
import os
# Create certificates directory
certs_dir = "auth_certificates"
os.makedirs(certs_dir, exist_ok=True)
# Generate multiple certificates
server_pub, server_sec = create_certificates(certs_dir, "server")
client1_pub, client1_sec = create_certificates(certs_dir, "client1")
client2_pub, client2_sec = create_certificates(certs_dir, "client2")
print(f"Server certificate: {server_pub}")
print(f"Client certificates: {client1_pub}, {client2_pub}")
# Load certificate for use
public_key, secret_key = load_certificate(server_sec)
print(f"Server keys loaded: {len(public_key)} + {len(secret_key)} bytes")
# Generate keypair programmatically
public, secret = zmq.curve_keypair()
print(f"Generated keypair: {len(public)} + {len(secret)} bytes")
# Derive public key from secret
derived_public = zmq.curve_public(secret)
assert public == derived_public
print("Key derivation verified")import zmq
from zmq.auth import ThreadAuthenticator
context = zmq.Context()
with ThreadAuthenticator(context) as auth:
# Configure different authentication for different domains
admin_passwords = {'admin': 'admin_secret'}
user_passwords = {'user1': 'user_secret', 'user2': 'user_secret'}
auth.configure_plain(domain='admin', passwords=admin_passwords)
auth.configure_plain(domain='users', passwords=user_passwords)
# Create servers for different domains
admin_server = context.socket(zmq.REP)
admin_server.set(zmq.PLAIN_SERVER, 1)
admin_server.set_string(zmq.ZAP_DOMAIN, 'admin')
admin_server.bind("tcp://*:5555")
user_server = context.socket(zmq.REP)
user_server.set(zmq.PLAIN_SERVER, 1)
user_server.set_string(zmq.ZAP_DOMAIN, 'users')
user_server.bind("tcp://*:5556")
# Handle requests (simplified example)
poller = zmq.Poller()
poller.register(admin_server, zmq.POLLIN)
poller.register(user_server, zmq.POLLIN)
try:
while True:
events = poller.poll(1000)
for socket, event in events:
if socket is admin_server:
message = admin_server.recv_string()
admin_server.send_string(f"Admin: {message}")
elif socket is user_server:
message = user_server.recv_string()
user_server.send_string(f"User: {message}")
except KeyboardInterrupt:
pass
finally:
admin_server.close()
user_server.close()
context.term()import zmq
import os
# Generate secure random keys
public_key, secret_key = zmq.curve_keypair()
# Store keys securely (restrict file permissions)
os.umask(0o077) # Only owner can read/write
with open('server.key', 'wb') as f:
f.write(secret_key)
with open('server.pub', 'wb') as f:
f.write(public_key)
# Verify permissions
stat = os.stat('server.key')
assert stat.st_mode & 0o077 == 0, "Secret key file has insecure permissions"import zmq
from zmq.auth import ThreadAuthenticator
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('auth')
context = zmq.Context()
with ThreadAuthenticator(context, log=logger) as auth:
# Authentication events will be logged
passwords = {'user': 'secret'}
auth.configure_plain(domain='*', passwords=passwords)
server = context.socket(zmq.REP)
server.set(zmq.PLAIN_SERVER, 1)
server.bind("tcp://*:5555")
# Authentication successes/failures logged automatically
try:
message = server.recv_string()
server.send_string("Authenticated")
finally:
server.close()
context.term()from typing import Dict, Optional, Callable, Tuple, Any, Union
import logging
# Authentication types
Passwords = Dict[str, str] # username -> password mapping
PublicKey = bytes
SecretKey = bytes
Keypair = Tuple[PublicKey, SecretKey]
# Address types
IPAddress = str
AddressList = List[IPAddress]
# Callback types
KeyValidationCallback = Callable[[PublicKey], bool]
AuthContext = Union[Context, None]
AuthLogger = Union[logging.Logger, Any, None]
# Certificate types
CertificatePath = str
CertificateFiles = Tuple[CertificatePath, CertificatePath] # (public_file, secret_file)
# Domain types
AuthDomain = strInstall with Tessl CLI
npx tessl i tessl/pypi-pyzmq