CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pyzmq

Python bindings for ZeroMQ (ØMQ), a lightweight and fast messaging library

Pending
Overview
Eval results
Files

authentication.mddocs/

Authentication

ZAP (ZMQ Authentication Protocol) implementation with support for NULL, PLAIN, and CURVE security mechanisms, including certificate generation and management.

Capabilities

Authenticator Classes

The base Authenticator class and its threading/asyncio variants provide ZAP authentication services.

class Authenticator:
    def __init__(self, context: Context = None, encoding: str = 'utf-8') -> None:
        """
        Create a new Authenticator.
        
        Parameters:
        - context: ZMQ context (creates new if None)
        - encoding: String encoding for credentials
        """

    def start(self) -> None:
        """Start the authenticator in a background thread."""

    def stop(self) -> None:
        """Stop the authenticator and clean up resources."""

    def allow(self, *addresses: str) -> None:
        """
        Allow connections from specified IP addresses.
        
        Parameters:
        - addresses: IP addresses or subnets to allow
        """

    def deny(self, *addresses: str) -> None:
        """
        Deny connections from specified IP addresses.
        
        Parameters:
        - addresses: IP addresses or subnets to deny
        """

    def configure_plain(self, domain: str = '*', passwords: dict = None) -> None:
        """
        Configure PLAIN authentication.
        
        Parameters:
        - domain: Authentication domain ('*' for all)
        - passwords: Dict of username -> password mappings
        """

    def configure_curve(self, domain: str = '*', location: str = '') -> None:
        """
        Configure CURVE authentication.
        
        Parameters:
        - domain: Authentication domain ('*' for all)  
        - location: Directory containing authorized public keys
        """

    def configure_curve_callback(self, domain: str = '*', callback: callable = None) -> None:
        """
        Configure CURVE authentication with callback.
        
        Parameters:
        - domain: Authentication domain
        - callback: Function to validate public keys
        """

Threading Authenticator

Thread-based authenticator for synchronous applications.

class ThreadAuthenticator(Authenticator):
    def __init__(self, context: Context = None, encoding: str = 'utf-8', log: Any = None) -> None:
        """
        Create a thread-based authenticator.
        
        Parameters:
        - context: ZMQ context
        - encoding: String encoding
        - log: Logger instance
        """

    def __enter__(self) -> ThreadAuthenticator:
        """Context manager entry."""

    def __exit__(self, exc_type, exc_val, exc_tb) -> None:
        """Context manager exit with cleanup."""

AsyncIO Authenticator

Async-compatible authenticator for asyncio applications.

class AsyncioAuthenticator(Authenticator):
    def __init__(self, context: Context = None, encoding: str = 'utf-8', log: Any = None) -> None:
        """
        Create an asyncio-compatible authenticator.
        
        Parameters:
        - context: ZMQ context
        - encoding: String encoding  
        - log: Logger instance
        """

    async def start(self) -> None:
        """Start the authenticator asynchronously."""

    async def stop(self) -> None:
        """Stop the authenticator asynchronously."""

    async def __aenter__(self) -> AsyncioAuthenticator:
        """Async context manager entry."""

    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        """Async context manager exit with cleanup."""

Certificate Management

Functions for generating and managing CURVE certificates.

def create_certificates(keys_dir: str, name: str) -> tuple[str, str]:
    """
    Create a new certificate (keypair).
    
    Parameters:
    - keys_dir: Directory to store certificate files
    - name: Certificate name (used for filenames)
    
    Returns:
    - tuple: (public_key_file, secret_key_file) paths
    """

def load_certificate(filename: str) -> tuple[bytes, bytes]:  
    """
    Load a certificate from file.
    
    Parameters:
    - filename: Certificate file path
    
    Returns:  
    - tuple: (public_key, secret_key) as bytes
    """

def save_certificate(filename: str, public_key: bytes, secret_key: bytes) -> None:
    """
    Save a certificate to file.
    
    Parameters:
    - filename: Target file path
    - public_key: Public key bytes
    - secret_key: Secret key bytes
    """

def curve_keypair() -> tuple[bytes, bytes]:
    """
    Generate a new CURVE keypair.
    
    Returns:
    - tuple: (public_key, secret_key) as bytes
    """

def curve_public(secret_key: bytes) -> bytes:
    """
    Derive public key from secret key.
    
    Parameters:
    - secret_key: Secret key bytes
    
    Returns:
    - bytes: Corresponding public key
    """

Usage Examples

Basic PLAIN Authentication

import zmq
from zmq.auth import Authenticator

# Server with PLAIN authentication
context = zmq.Context()

# Create and start authenticator
auth = Authenticator(context)
auth.start()

# Configure PLAIN authentication
passwords = {'admin': 'secret', 'user': 'password'}
auth.configure_plain(domain='*', passwords=passwords)

# Create server socket
server = context.socket(zmq.REP)
server.set(zmq.PLAIN_SERVER, 1)  # Enable PLAIN server
server.bind("tcp://*:5555")

try:
    while True:
        message = server.recv_string()
        print(f"Authenticated message: {message}")
        server.send_string(f"Echo: {message}")
        
except KeyboardInterrupt:
    pass
finally:
    server.close()
    auth.stop()
    context.term()
import zmq

# Client with PLAIN authentication
context = zmq.Context()

client = context.socket(zmq.REQ)
client.set(zmq.PLAIN_USERNAME, b'admin')
client.set(zmq.PLAIN_PASSWORD, b'secret')
client.connect("tcp://localhost:5555")

try:
    client.send_string("Hello authenticated server")
    reply = client.recv_string()
    print(f"Server reply: {reply}")
finally:
    client.close()
    context.term()

CURVE Authentication

import zmq
from zmq.auth import Authenticator, create_certificates
import os

# Create certificates directory
keys_dir = "certificates"
os.makedirs(keys_dir, exist_ok=True)

# Generate server certificate
server_public_file, server_secret_file = create_certificates(keys_dir, "server")
client_public_file, client_secret_file = create_certificates(keys_dir, "client")

# Server with CURVE authentication
context = zmq.Context()

# Load server keys
with open(server_secret_file, 'rb') as f:
    server_secret = f.read()
with open(server_public_file, 'rb') as f:
    server_public = f.read()

# Create and configure authenticator
auth = Authenticator(context)
auth.start()
auth.configure_curve(domain='*', location=keys_dir)

# Create server socket
server = context.socket(zmq.REP)
server.set(zmq.CURVE_SERVER, 1)
server.set(zmq.CURVE_SECRETKEY, server_secret)

server.bind("tcp://*:5555")

try:
    while True:
        message = server.recv_string()
        print(f"Encrypted message: {message}")
        server.send_string(f"Encrypted echo: {message}")
        
except KeyboardInterrupt:
    pass
finally:
    server.close()
    auth.stop()
    context.term()
import zmq

# Client with CURVE authentication
context = zmq.Context()

# Load client keys
with open(client_secret_file, 'rb') as f:
    client_secret = f.read()
with open(client_public_file, 'rb') as f:
    client_public = f.read()
with open(server_public_file, 'rb') as f:
    server_public = f.read()

client = context.socket(zmq.REQ) 
client.set(zmq.CURVE_SECRETKEY, client_secret)
client.set(zmq.CURVE_PUBLICKEY, client_public)
client.set(zmq.CURVE_SERVERKEY, server_public)

client.connect("tcp://localhost:5555")

try:
    client.send_string("Hello encrypted server")
    reply = client.recv_string()
    print(f"Encrypted reply: {reply}")
finally:
    client.close()
    context.term()

Context Manager Usage

import zmq
from zmq.auth import ThreadAuthenticator

context = zmq.Context()

# Use authenticator as context manager
with ThreadAuthenticator(context) as auth:
    # Configure authentication
    passwords = {'client': 'secret'}
    auth.configure_plain(domain='*', passwords=passwords)
    
    # Create server
    server = context.socket(zmq.REP)
    server.set(zmq.PLAIN_SERVER, 1)
    server.bind("tcp://*:5555")
    
    try:
        message = server.recv_string()
        server.send_string(f"Authenticated: {message}")
    finally:
        server.close()
# Authenticator automatically stopped when leaving context

context.term()

AsyncIO Authentication

import asyncio
import zmq.asyncio
from zmq.auth.asyncio import AsyncioAuthenticator

async def authenticated_server():
    context = zmq.asyncio.Context()
    
    async with AsyncioAuthenticator(context) as auth:
        # Configure authentication
        passwords = {'async_client': 'async_secret'}
        auth.configure_plain(domain='*', passwords=passwords)
        
        # Create server socket
        server = context.socket(zmq.REP)
        server.set(zmq.PLAIN_SERVER, 1)
        server.bind("tcp://*:5555")
        
        try:
            while True:
                message = await server.recv_string()
                print(f"Async authenticated: {message}")
                await server.send_string(f"Async echo: {message}")
        except KeyboardInterrupt:
            pass
        finally:
            server.close()
    
    context.term()

# Run async server
asyncio.run(authenticated_server())

IP Address Filtering

import zmq
from zmq.auth import ThreadAuthenticator

context = zmq.Context()

with ThreadAuthenticator(context) as auth:
    # Allow specific IP addresses
    auth.allow('127.0.0.1', '192.168.1.0/24')
    
    # Deny specific IP addresses
    auth.deny('192.168.1.100')
    
    # Configure PLAIN authentication
    passwords = {'user': 'pass'}
    auth.configure_plain(domain='*', passwords=passwords)
    
    server = context.socket(zmq.REP)
    server.set(zmq.PLAIN_SERVER, 1)
    server.bind("tcp://*:5555")
    
    try:
        while True:
            message = server.recv_string()
            server.send_string(f"Filtered and authenticated: {message}")
    except KeyboardInterrupt:
        pass
    finally:
        server.close()

context.term()

CURVE with Callback Authentication

import zmq
from zmq.auth import ThreadAuthenticator

def validate_key(public_key):
    """Custom public key validation callback"""
    # Implement custom validation logic
    allowed_keys = load_allowed_keys()  # Your implementation
    return public_key in allowed_keys

context = zmq.Context()

with ThreadAuthenticator(context) as auth:
    # Configure CURVE with callback
    auth.configure_curve_callback(domain='*', callback=validate_key)
    
    server = context.socket(zmq.REP)
    server.set(zmq.CURVE_SERVER, 1)
    server.set(zmq.CURVE_SECRETKEY, server_secret_key)
    server.bind("tcp://*:5555")
    
    try:
        while True:
            message = server.recv_string()
            server.send_string(f"Callback authenticated: {message}")
    except KeyboardInterrupt:
        pass
    finally:
        server.close()

context.term()

Certificate Management

import zmq
from zmq.auth.certs import create_certificates, load_certificate
import os

# Create certificates directory
certs_dir = "auth_certificates"
os.makedirs(certs_dir, exist_ok=True)

# Generate multiple certificates
server_pub, server_sec = create_certificates(certs_dir, "server")
client1_pub, client1_sec = create_certificates(certs_dir, "client1") 
client2_pub, client2_sec = create_certificates(certs_dir, "client2")

print(f"Server certificate: {server_pub}")
print(f"Client certificates: {client1_pub}, {client2_pub}")

# Load certificate for use
public_key, secret_key = load_certificate(server_sec)
print(f"Server keys loaded: {len(public_key)} + {len(secret_key)} bytes")

# Generate keypair programmatically
public, secret = zmq.curve_keypair()
print(f"Generated keypair: {len(public)} + {len(secret)} bytes")

# Derive public key from secret
derived_public = zmq.curve_public(secret)
assert public == derived_public
print("Key derivation verified")

Domain-Based Authentication

import zmq
from zmq.auth import ThreadAuthenticator

context = zmq.Context()

with ThreadAuthenticator(context) as auth:
    # Configure different authentication for different domains
    admin_passwords = {'admin': 'admin_secret'}
    user_passwords = {'user1': 'user_secret', 'user2': 'user_secret'}
    
    auth.configure_plain(domain='admin', passwords=admin_passwords)
    auth.configure_plain(domain='users', passwords=user_passwords)
    
    # Create servers for different domains
    admin_server = context.socket(zmq.REP)
    admin_server.set(zmq.PLAIN_SERVER, 1)
    admin_server.set_string(zmq.ZAP_DOMAIN, 'admin')
    admin_server.bind("tcp://*:5555")
    
    user_server = context.socket(zmq.REP)
    user_server.set(zmq.PLAIN_SERVER, 1)
    user_server.set_string(zmq.ZAP_DOMAIN, 'users')
    user_server.bind("tcp://*:5556")
    
    # Handle requests (simplified example)
    poller = zmq.Poller()
    poller.register(admin_server, zmq.POLLIN)
    poller.register(user_server, zmq.POLLIN)
    
    try:
        while True:
            events = poller.poll(1000)
            for socket, event in events:
                if socket is admin_server:
                    message = admin_server.recv_string()
                    admin_server.send_string(f"Admin: {message}")
                elif socket is user_server:
                    message = user_server.recv_string()
                    user_server.send_string(f"User: {message}")
    except KeyboardInterrupt:
        pass
    finally:
        admin_server.close()
        user_server.close()

context.term()

Security Considerations

Key Management

import zmq
import os

# Generate secure random keys
public_key, secret_key = zmq.curve_keypair()

# Store keys securely (restrict file permissions)
os.umask(0o077)  # Only owner can read/write

with open('server.key', 'wb') as f:
    f.write(secret_key)

with open('server.pub', 'wb') as f:
    f.write(public_key)

# Verify permissions
stat = os.stat('server.key')
assert stat.st_mode & 0o077 == 0, "Secret key file has insecure permissions"

Authentication Logging

import zmq
from zmq.auth import ThreadAuthenticator
import logging

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('auth')

context = zmq.Context()

with ThreadAuthenticator(context, log=logger) as auth:
    # Authentication events will be logged
    passwords = {'user': 'secret'}
    auth.configure_plain(domain='*', passwords=passwords)
    
    server = context.socket(zmq.REP)
    server.set(zmq.PLAIN_SERVER, 1)
    server.bind("tcp://*:5555")
    
    # Authentication successes/failures logged automatically
    try:
        message = server.recv_string()
        server.send_string("Authenticated")
    finally:
        server.close()

context.term()

Types

from typing import Dict, Optional, Callable, Tuple, Any, Union
import logging

# Authentication types
Passwords = Dict[str, str]  # username -> password mapping
PublicKey = bytes
SecretKey = bytes  
Keypair = Tuple[PublicKey, SecretKey]

# Address types
IPAddress = str
AddressList = List[IPAddress]

# Callback types
KeyValidationCallback = Callable[[PublicKey], bool]
AuthContext = Union[Context, None]
AuthLogger = Union[logging.Logger, Any, None]

# Certificate types
CertificatePath = str
CertificateFiles = Tuple[CertificatePath, CertificatePath]  # (public_file, secret_file)

# Domain types
AuthDomain = str

Install with Tessl CLI

npx tessl i tessl/pypi-pyzmq

docs

async-support.md

authentication.md

constants.md

core-messaging.md

devices.md

error-handling.md

index.md

message-handling.md

polling.md

tile.json