Twisted bindings for ZeroMQ enabling asynchronous ZMQ socket integration with Twisted's reactor pattern.
npx @tessl/cli install tessl/pypi-txzmq@1.0.0txZMQ provides Twisted bindings for ZeroMQ, enabling seamless integration between ZeroMQ messaging library and Twisted's asynchronous networking framework. It allows developers to use ZeroMQ sockets within Twisted's event loop (reactor) for building high-performance, message-oriented applications that combine Twisted's networking capabilities with ZeroMQ's efficient messaging patterns.
pip install txZMQTwisted>=10.0, pyzmq>=13import txzmqCommon imports for typical usage:
from txzmq import ZmqFactory, ZmqEndpoint, ZmqEndpointType
from txzmq import ZmqPubConnection, ZmqSubConnection
from txzmq import ZmqPushConnection, ZmqPullConnection
from txzmq import ZmqREQConnection, ZmqREPConnection
from txzmq import ZmqRouterConnection, ZmqDealerConnectionfrom twisted.internet import reactor
from txzmq import ZmqFactory, ZmqEndpoint, ZmqEndpointType
from txzmq import ZmqPubConnection, ZmqSubConnection
# Create factory for managing ZeroMQ context
factory = ZmqFactory()
factory.registerForShutdown()
# Publisher example
endpoint = ZmqEndpoint(ZmqEndpointType.bind, "tcp://127.0.0.1:5555")
publisher = ZmqPubConnection(factory, endpoint)
publisher.publish(b"Hello World", b"topic1")
# Subscriber example
class MySubscriber(ZmqSubConnection):
def gotMessage(self, message, tag):
print(f"Received: {message} with tag: {tag}")
endpoint = ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5555")
subscriber = MySubscriber(factory, endpoint)
subscriber.subscribe(b"topic1")
reactor.run()txZMQ follows a factory pattern combined with Twisted's descriptor interfaces:
This design provides non-blocking message sending/receiving, proper integration with Twisted's event loop, and supports all major ZeroMQ messaging patterns within Twisted applications.
Core factory and connection infrastructure for managing ZeroMQ context, creating connections, and handling connection lifecycle within Twisted's reactor pattern.
class ZmqFactory(object):
reactor = reactor
ioThreads = 1
lingerPeriod = 100
def __init__(self): ...
def shutdown(self): ...
def registerForShutdown(self): ...
class ZmqEndpoint(namedtuple('ZmqEndpoint', ['type', 'address'])): ...
class ZmqEndpointType(object):
bind = "bind"
connect = "connect"
class ZmqConnection(object):
def __init__(self, factory, endpoint=None, identity=None): ...
def addEndpoints(self, endpoints): ...
def shutdown(self): ...
def send(self, message): ...
def messageReceived(self, message): ...Factory and Connection Management
Publisher-subscriber pattern for one-to-many broadcast messaging with topic-based filtering and subscription management.
class ZmqPubConnection(ZmqConnection):
socketType = constants.PUB
def publish(self, message, tag=b''): ...
class ZmqSubConnection(ZmqConnection):
socketType = constants.SUB
def subscribe(self, tag): ...
def unsubscribe(self, tag): ...
def gotMessage(self, message, tag): ...Push-pull pattern for load-balanced work distribution and result collection in distributed processing scenarios.
class ZmqPushConnection(ZmqConnection):
socketType = constants.PUSH
def push(self, message): ...
class ZmqPullConnection(ZmqConnection):
socketType = constants.PULL
def onPull(self, message): ...Request-reply pattern with timeout support and Twisted Deferred integration for building client-server applications with asynchronous response handling.
class ZmqRequestTimeoutError(Exception): ...
class ZmqREQConnection(ZmqConnection):
socketType = constants.DEALER
defaultRequestTimeout = None
def sendMsg(self, *messageParts, **kwargs): ...
class ZmqREPConnection(ZmqConnection):
socketType = constants.ROUTER
def reply(self, messageId, *messageParts): ...
def gotMessage(self, messageId, *messageParts): ...Advanced routing patterns providing raw ROUTER and DEALER socket access for building custom messaging topologies and complex routing scenarios.
class ZmqRouterConnection(ZmqConnection):
socketType = constants.ROUTER
def sendMsg(self, recipientId, message): ...
def sendMultipart(self, recipientId, parts): ...
def gotMessage(self, sender_id, *message): ...
class ZmqDealerConnection(ZmqConnection):
socketType = constants.DEALER
def sendMsg(self, message): ...
def sendMultipart(self, parts): ...
def gotMessage(self, *args): ...from collections import namedtuple
from typing import List, Union
# Endpoint types
class ZmqEndpointType:
bind = "bind" # Bind and listen for connections
connect = "connect" # Connect to existing endpoint
# Endpoint specification
ZmqEndpoint = namedtuple('ZmqEndpoint', ['type', 'address'])
# Usage: ZmqEndpoint(ZmqEndpointType.bind, "tcp://127.0.0.1:5555")
# Message formats (type hints)
Message = Union[bytes, List[bytes]] # Single part or multipart message
Tag = bytes # Topic tag for pub/sub
MessageId = bytes # Unique identifier for req/rep
RecipientId = bytes # Recipient identifier for router