CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-txzmq

Twisted bindings for ZeroMQ enabling asynchronous ZMQ socket integration with Twisted's reactor pattern.

Overview
Eval results
Files

factory-connection.mddocs/

Factory and Connection Management

Core infrastructure for managing ZeroMQ context, creating connections, and handling connection lifecycle within Twisted's reactor pattern. The factory manages the global ZeroMQ context while connections handle individual socket operations.

Capabilities

ZeroMQ Factory

Manages ZeroMQ context and connection lifecycle, providing centralized context management and reactor integration.

class ZmqFactory(object):
    """
    Factory for creating and managing ZeroMQ connections.
    
    Attributes:
        reactor: Twisted reactor reference (default: twisted.internet.reactor)
        ioThreads (int): Number of I/O threads for ZeroMQ context (default: 1)
        lingerPeriod (int): Linger period in milliseconds for socket closure (default: 100)
        connections (set): Set of active ZmqConnection instances
        context: ZeroMQ context instance
    """
    
    reactor = reactor
    ioThreads = 1
    lingerPeriod = 100
    
    def __init__(self):
        """Create ZeroMQ context with specified I/O threads."""
    
    def shutdown(self):
        """
        Shutdown all connections and terminate ZeroMQ context.
        Cleans up reactor triggers and closes all managed connections.
        """
    
    def registerForShutdown(self):
        """
        Register factory for automatic shutdown when reactor shuts down.
        Recommended to call on any created factory.
        """

Usage Example

from twisted.internet import reactor
from txzmq import ZmqFactory

# Create factory
factory = ZmqFactory()
factory.registerForShutdown()  # Auto-cleanup on reactor shutdown

# Configure factory settings
factory.ioThreads = 2  # Use 2 I/O threads
factory.lingerPeriod = 500  # Wait 500ms for pending messages

# Factory will automatically manage context and connections
# When reactor shuts down, all connections are cleaned up

Connection Endpoints

Endpoint specification for ZeroMQ connection addressing and binding/connecting semantics.

class ZmqEndpointType(object):
    """Constants for endpoint connection types."""
    
    bind = "bind"        # Bind and listen for incoming connections
    connect = "connect"  # Connect to existing endpoint

ZmqEndpoint = namedtuple('ZmqEndpoint', ['type', 'address'])
"""
Named tuple representing a ZeroMQ endpoint.

Fields:
    type (str): Either ZmqEndpointType.bind or ZmqEndpointType.connect
    address (str): ZeroMQ address (e.g., "tcp://127.0.0.1:5555", "ipc:///tmp/socket")
"""

Usage Example

from txzmq import ZmqEndpoint, ZmqEndpointType

# Create endpoints for different protocols
tcp_bind = ZmqEndpoint(ZmqEndpointType.bind, "tcp://127.0.0.1:5555")
tcp_connect = ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5555")

ipc_bind = ZmqEndpoint(ZmqEndpointType.bind, "ipc:///tmp/my_socket")
inproc_connect = ZmqEndpoint(ZmqEndpointType.connect, "inproc://internal")

# Multiple endpoints can be used with single connection
endpoints = [
    ZmqEndpoint(ZmqEndpointType.bind, "tcp://*:5555"),
    ZmqEndpoint(ZmqEndpointType.bind, "ipc:///tmp/backup")
]

Base Connection Class

Abstract base class for all ZeroMQ connections, implementing Twisted descriptor interfaces and providing common connection functionality.

class ZmqConnection(object):
    """
    Base class for ZeroMQ connections with Twisted integration.
    
    Implements IReadDescriptor and IFileDescriptor interfaces.
    Should not be used directly - use pattern-specific subclasses.
    
    Class Attributes:
        socketType: ZeroMQ socket type constant (must be set by subclasses)
        allowLoopbackMulticast (bool): Allow loopback multicast (default: False)
        multicastRate (int): Multicast rate in kbps (default: 100)
        highWaterMark (int): High water mark for message queuing (default: 0)
        tcpKeepalive (int): TCP keepalive setting (default: 0)
        tcpKeepaliveCount (int): TCP keepalive count (default: 0)
        tcpKeepaliveIdle (int): TCP keepalive idle time (default: 0)
        tcpKeepaliveInterval (int): TCP keepalive interval (default: 0)
        reconnectInterval (int): Reconnection interval in ms (default: 100)
        reconnectIntervalMax (int): Maximum reconnection interval (default: 0)
    
    Instance Attributes:
        factory (ZmqFactory): Associated factory instance
        endpoints (list): List of ZmqEndpoint objects
        identity (bytes): Socket identity for routing
        socket: Underlying ZeroMQ socket
        fd (int): File descriptor for reactor integration
    """
    
    socketType = None  # Must be overridden by subclasses
    allowLoopbackMulticast = False
    multicastRate = 100
    highWaterMark = 0
    tcpKeepalive = 0
    tcpKeepaliveCount = 0
    tcpKeepaliveIdle = 0
    tcpKeepaliveInterval = 0
    reconnectInterval = 100
    reconnectIntervalMax = 0
    
    def __init__(self, factory, endpoint=None, identity=None):
        """
        Initialize connection.
        
        Args:
            factory (ZmqFactory): Factory managing this connection
            endpoint (ZmqEndpoint, optional): Initial endpoint to connect/bind
            identity (bytes, optional): Socket identity for routing
        """
    
    def addEndpoints(self, endpoints):
        """
        Add connection endpoints after initialization.
        
        Args:
            endpoints (list): List of ZmqEndpoint objects to add
        """
    
    def shutdown(self):
        """
        Shutdown connection and close socket.
        Removes from reactor and cleans up resources.
        """
    
    def send(self, message):
        """
        Send message via ZeroMQ socket.
        
        Args:
            message (bytes or list): Message data - single part (bytes) or 
                                   multipart (list of bytes)
        
        Raises:
            ZMQError: If sending fails (e.g., EAGAIN when HWM reached)
        """
    
    def messageReceived(self, message):
        """
        Abstract method called when message is received.
        
        Must be implemented by subclasses to handle incoming messages.
        
        Args:
            message (list): List of message parts (bytes)
        """
    
    def fileno(self):
        """
        Get file descriptor for Twisted reactor integration.
        
        Returns:
            int: Platform file descriptor number
        """
    
    def connectionLost(self, reason):
        """
        Handle connection loss (Twisted interface).
        
        Args:
            reason: Reason for connection loss
        """
    
    def doRead(self):
        """
        Handle read events from reactor (Twisted interface).
        Processes incoming ZeroMQ messages.
        """
    
    def logPrefix(self):
        """
        Get log prefix for Twisted logging.
        
        Returns:
            str: Log prefix ("ZMQ")
        """

Usage Example

from txzmq import ZmqFactory, ZmqEndpoint, ZmqEndpointType, ZmqPubConnection

factory = ZmqFactory()

# Create connection with single endpoint
endpoint = ZmqEndpoint(ZmqEndpointType.bind, "tcp://127.0.0.1:5555")
pub = ZmqPubConnection(factory, endpoint)

# Add additional endpoints
additional_endpoints = [
    ZmqEndpoint(ZmqEndpointType.bind, "ipc:///tmp/pub_socket"),
    ZmqEndpoint(ZmqEndpointType.connect, "tcp://remote-server:5556")
]
pub.addEndpoints(additional_endpoints)

# Configure connection-specific settings
pub.highWaterMark = 1000  # Limit queued messages
pub.multicastRate = 200   # Increase multicast rate

# Connection automatically integrates with Twisted reactor
# Messages are processed asynchronously through doRead()

Connection Configuration

Advanced configuration options for fine-tuning connection behavior, network settings, and performance characteristics.

# Network and Performance Settings
class ZmqConnection:
    allowLoopbackMulticast = False  # Enable loopback multicast
    multicastRate = 100             # Multicast rate in kbps
    highWaterMark = 0               # Message queue limit (0 = unlimited)
    
    # TCP Keepalive (ZeroMQ 3.x only)
    tcpKeepalive = 0                # Enable TCP keepalive
    tcpKeepaliveCount = 0           # Keepalive probe count
    tcpKeepaliveIdle = 0            # Keepalive idle time
    tcpKeepaliveInterval = 0        # Keepalive probe interval
    
    # Reconnection Settings (ZeroMQ 3.x only)
    reconnectInterval = 100         # Initial reconnect interval (ms)
    reconnectIntervalMax = 0        # Maximum reconnect interval (ms)

Configuration Example

from txzmq import ZmqFactory, ZmqPubConnection, ZmqEndpoint, ZmqEndpointType

class HighPerformancePub(ZmqPubConnection):
    # Configure for high-throughput publishing
    highWaterMark = 10000          # Queue up to 10k messages
    multicastRate = 1000           # High multicast rate
    
    # Enable TCP keepalive for reliable connections
    tcpKeepalive = 1
    tcpKeepaliveIdle = 600         # 10 minutes idle
    tcpKeepaliveInterval = 60      # 1 minute between probes
    tcpKeepaliveCount = 3          # 3 failed probes = disconnect
    
    # Aggressive reconnection
    reconnectInterval = 50         # Start at 50ms
    reconnectIntervalMax = 5000    # Cap at 5 seconds

factory = ZmqFactory()
endpoint = ZmqEndpoint(ZmqEndpointType.bind, "tcp://*:5555")
pub = HighPerformancePub(factory, endpoint)

Install with Tessl CLI

npx tessl i tessl/pypi-txzmq

docs

factory-connection.md

index.md

pubsub.md

pushpull.md

reqrep.md

router-dealer.md

tile.json