Twisted bindings for ZeroMQ enabling asynchronous ZMQ socket integration with Twisted's reactor pattern.
Core infrastructure for managing ZeroMQ context, creating connections, and handling connection lifecycle within Twisted's reactor pattern. The factory manages the global ZeroMQ context while connections handle individual socket operations.
Manages ZeroMQ context and connection lifecycle, providing centralized context management and reactor integration.
class ZmqFactory(object):
"""
Factory for creating and managing ZeroMQ connections.
Attributes:
reactor: Twisted reactor reference (default: twisted.internet.reactor)
ioThreads (int): Number of I/O threads for ZeroMQ context (default: 1)
lingerPeriod (int): Linger period in milliseconds for socket closure (default: 100)
connections (set): Set of active ZmqConnection instances
context: ZeroMQ context instance
"""
reactor = reactor
ioThreads = 1
lingerPeriod = 100
def __init__(self):
"""Create ZeroMQ context with specified I/O threads."""
def shutdown(self):
"""
Shutdown all connections and terminate ZeroMQ context.
Cleans up reactor triggers and closes all managed connections.
"""
def registerForShutdown(self):
"""
Register factory for automatic shutdown when reactor shuts down.
Recommended to call on any created factory.
"""from twisted.internet import reactor
from txzmq import ZmqFactory
# Create factory
factory = ZmqFactory()
factory.registerForShutdown() # Auto-cleanup on reactor shutdown
# Configure factory settings
factory.ioThreads = 2 # Use 2 I/O threads
factory.lingerPeriod = 500 # Wait 500ms for pending messages
# Factory will automatically manage context and connections
# When reactor shuts down, all connections are cleaned upEndpoint specification for ZeroMQ connection addressing and binding/connecting semantics.
class ZmqEndpointType(object):
"""Constants for endpoint connection types."""
bind = "bind" # Bind and listen for incoming connections
connect = "connect" # Connect to existing endpoint
ZmqEndpoint = namedtuple('ZmqEndpoint', ['type', 'address'])
"""
Named tuple representing a ZeroMQ endpoint.
Fields:
type (str): Either ZmqEndpointType.bind or ZmqEndpointType.connect
address (str): ZeroMQ address (e.g., "tcp://127.0.0.1:5555", "ipc:///tmp/socket")
"""from txzmq import ZmqEndpoint, ZmqEndpointType
# Create endpoints for different protocols
tcp_bind = ZmqEndpoint(ZmqEndpointType.bind, "tcp://127.0.0.1:5555")
tcp_connect = ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5555")
ipc_bind = ZmqEndpoint(ZmqEndpointType.bind, "ipc:///tmp/my_socket")
inproc_connect = ZmqEndpoint(ZmqEndpointType.connect, "inproc://internal")
# Multiple endpoints can be used with single connection
endpoints = [
ZmqEndpoint(ZmqEndpointType.bind, "tcp://*:5555"),
ZmqEndpoint(ZmqEndpointType.bind, "ipc:///tmp/backup")
]Abstract base class for all ZeroMQ connections, implementing Twisted descriptor interfaces and providing common connection functionality.
class ZmqConnection(object):
"""
Base class for ZeroMQ connections with Twisted integration.
Implements IReadDescriptor and IFileDescriptor interfaces.
Should not be used directly - use pattern-specific subclasses.
Class Attributes:
socketType: ZeroMQ socket type constant (must be set by subclasses)
allowLoopbackMulticast (bool): Allow loopback multicast (default: False)
multicastRate (int): Multicast rate in kbps (default: 100)
highWaterMark (int): High water mark for message queuing (default: 0)
tcpKeepalive (int): TCP keepalive setting (default: 0)
tcpKeepaliveCount (int): TCP keepalive count (default: 0)
tcpKeepaliveIdle (int): TCP keepalive idle time (default: 0)
tcpKeepaliveInterval (int): TCP keepalive interval (default: 0)
reconnectInterval (int): Reconnection interval in ms (default: 100)
reconnectIntervalMax (int): Maximum reconnection interval (default: 0)
Instance Attributes:
factory (ZmqFactory): Associated factory instance
endpoints (list): List of ZmqEndpoint objects
identity (bytes): Socket identity for routing
socket: Underlying ZeroMQ socket
fd (int): File descriptor for reactor integration
"""
socketType = None # Must be overridden by subclasses
allowLoopbackMulticast = False
multicastRate = 100
highWaterMark = 0
tcpKeepalive = 0
tcpKeepaliveCount = 0
tcpKeepaliveIdle = 0
tcpKeepaliveInterval = 0
reconnectInterval = 100
reconnectIntervalMax = 0
def __init__(self, factory, endpoint=None, identity=None):
"""
Initialize connection.
Args:
factory (ZmqFactory): Factory managing this connection
endpoint (ZmqEndpoint, optional): Initial endpoint to connect/bind
identity (bytes, optional): Socket identity for routing
"""
def addEndpoints(self, endpoints):
"""
Add connection endpoints after initialization.
Args:
endpoints (list): List of ZmqEndpoint objects to add
"""
def shutdown(self):
"""
Shutdown connection and close socket.
Removes from reactor and cleans up resources.
"""
def send(self, message):
"""
Send message via ZeroMQ socket.
Args:
message (bytes or list): Message data - single part (bytes) or
multipart (list of bytes)
Raises:
ZMQError: If sending fails (e.g., EAGAIN when HWM reached)
"""
def messageReceived(self, message):
"""
Abstract method called when message is received.
Must be implemented by subclasses to handle incoming messages.
Args:
message (list): List of message parts (bytes)
"""
def fileno(self):
"""
Get file descriptor for Twisted reactor integration.
Returns:
int: Platform file descriptor number
"""
def connectionLost(self, reason):
"""
Handle connection loss (Twisted interface).
Args:
reason: Reason for connection loss
"""
def doRead(self):
"""
Handle read events from reactor (Twisted interface).
Processes incoming ZeroMQ messages.
"""
def logPrefix(self):
"""
Get log prefix for Twisted logging.
Returns:
str: Log prefix ("ZMQ")
"""from txzmq import ZmqFactory, ZmqEndpoint, ZmqEndpointType, ZmqPubConnection
factory = ZmqFactory()
# Create connection with single endpoint
endpoint = ZmqEndpoint(ZmqEndpointType.bind, "tcp://127.0.0.1:5555")
pub = ZmqPubConnection(factory, endpoint)
# Add additional endpoints
additional_endpoints = [
ZmqEndpoint(ZmqEndpointType.bind, "ipc:///tmp/pub_socket"),
ZmqEndpoint(ZmqEndpointType.connect, "tcp://remote-server:5556")
]
pub.addEndpoints(additional_endpoints)
# Configure connection-specific settings
pub.highWaterMark = 1000 # Limit queued messages
pub.multicastRate = 200 # Increase multicast rate
# Connection automatically integrates with Twisted reactor
# Messages are processed asynchronously through doRead()Advanced configuration options for fine-tuning connection behavior, network settings, and performance characteristics.
# Network and Performance Settings
class ZmqConnection:
allowLoopbackMulticast = False # Enable loopback multicast
multicastRate = 100 # Multicast rate in kbps
highWaterMark = 0 # Message queue limit (0 = unlimited)
# TCP Keepalive (ZeroMQ 3.x only)
tcpKeepalive = 0 # Enable TCP keepalive
tcpKeepaliveCount = 0 # Keepalive probe count
tcpKeepaliveIdle = 0 # Keepalive idle time
tcpKeepaliveInterval = 0 # Keepalive probe interval
# Reconnection Settings (ZeroMQ 3.x only)
reconnectInterval = 100 # Initial reconnect interval (ms)
reconnectIntervalMax = 0 # Maximum reconnect interval (ms)from txzmq import ZmqFactory, ZmqPubConnection, ZmqEndpoint, ZmqEndpointType
class HighPerformancePub(ZmqPubConnection):
# Configure for high-throughput publishing
highWaterMark = 10000 # Queue up to 10k messages
multicastRate = 1000 # High multicast rate
# Enable TCP keepalive for reliable connections
tcpKeepalive = 1
tcpKeepaliveIdle = 600 # 10 minutes idle
tcpKeepaliveInterval = 60 # 1 minute between probes
tcpKeepaliveCount = 3 # 3 failed probes = disconnect
# Aggressive reconnection
reconnectInterval = 50 # Start at 50ms
reconnectIntervalMax = 5000 # Cap at 5 seconds
factory = ZmqFactory()
endpoint = ZmqEndpoint(ZmqEndpointType.bind, "tcp://*:5555")
pub = HighPerformancePub(factory, endpoint)Install with Tessl CLI
npx tessl i tessl/pypi-txzmq