or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

factory-connection.mdindex.mdpubsub.mdpushpull.mdreqrep.mdrouter-dealer.md
tile.json

tessl/pypi-txzmq

Twisted bindings for ZeroMQ enabling asynchronous ZMQ socket integration with Twisted's reactor pattern.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/txzmq@1.0.x

To install, run

npx @tessl/cli install tessl/pypi-txzmq@1.0.0

index.mddocs/

txZMQ

txZMQ provides Twisted bindings for ZeroMQ, enabling seamless integration between ZeroMQ messaging library and Twisted's asynchronous networking framework. It allows developers to use ZeroMQ sockets within Twisted's event loop (reactor) for building high-performance, message-oriented applications that combine Twisted's networking capabilities with ZeroMQ's efficient messaging patterns.

Package Information

  • Package Name: txZMQ
  • Language: Python
  • Installation: pip install txZMQ
  • Dependencies: Twisted>=10.0, pyzmq>=13
  • Compatibility: Python 2/3, CPython/PyPy, ZeroMQ 2.2.x/3.2.x

Core Imports

import txzmq

Common imports for typical usage:

from txzmq import ZmqFactory, ZmqEndpoint, ZmqEndpointType
from txzmq import ZmqPubConnection, ZmqSubConnection
from txzmq import ZmqPushConnection, ZmqPullConnection
from txzmq import ZmqREQConnection, ZmqREPConnection
from txzmq import ZmqRouterConnection, ZmqDealerConnection

Basic Usage

from twisted.internet import reactor
from txzmq import ZmqFactory, ZmqEndpoint, ZmqEndpointType
from txzmq import ZmqPubConnection, ZmqSubConnection

# Create factory for managing ZeroMQ context
factory = ZmqFactory()
factory.registerForShutdown()

# Publisher example
endpoint = ZmqEndpoint(ZmqEndpointType.bind, "tcp://127.0.0.1:5555")
publisher = ZmqPubConnection(factory, endpoint)
publisher.publish(b"Hello World", b"topic1")

# Subscriber example
class MySubscriber(ZmqSubConnection):
    def gotMessage(self, message, tag):
        print(f"Received: {message} with tag: {tag}")

endpoint = ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5555")
subscriber = MySubscriber(factory, endpoint)
subscriber.subscribe(b"topic1")

reactor.run()

Architecture

txZMQ follows a factory pattern combined with Twisted's descriptor interfaces:

  • ZmqFactory: Manages ZeroMQ context and connection lifecycle, integrates with Twisted reactor
  • ZmqConnection: Base class implementing Twisted's IReadDescriptor and IFileDescriptor interfaces
  • Pattern-Specific Connections: Specialized classes for each ZeroMQ socket type (PUB/SUB, PUSH/PULL, REQ/REP, ROUTER/DEALER)
  • ZmqEndpoint: Represents connection endpoints with bind/connect semantics
  • Integration: Uses Twisted Deferred objects for asynchronous operations (REQ/REP pattern)

This design provides non-blocking message sending/receiving, proper integration with Twisted's event loop, and supports all major ZeroMQ messaging patterns within Twisted applications.

Capabilities

Factory and Connection Management

Core factory and connection infrastructure for managing ZeroMQ context, creating connections, and handling connection lifecycle within Twisted's reactor pattern.

class ZmqFactory(object):
    reactor = reactor
    ioThreads = 1
    lingerPeriod = 100
    
    def __init__(self): ...
    def shutdown(self): ...
    def registerForShutdown(self): ...

class ZmqEndpoint(namedtuple('ZmqEndpoint', ['type', 'address'])): ...

class ZmqEndpointType(object):
    bind = "bind"
    connect = "connect"

class ZmqConnection(object):
    def __init__(self, factory, endpoint=None, identity=None): ...
    def addEndpoints(self, endpoints): ...
    def shutdown(self): ...
    def send(self, message): ...
    def messageReceived(self, message): ...

Factory and Connection Management

Publish-Subscribe Messaging

Publisher-subscriber pattern for one-to-many broadcast messaging with topic-based filtering and subscription management.

class ZmqPubConnection(ZmqConnection):
    socketType = constants.PUB
    
    def publish(self, message, tag=b''): ...

class ZmqSubConnection(ZmqConnection):
    socketType = constants.SUB
    
    def subscribe(self, tag): ...
    def unsubscribe(self, tag): ...
    def gotMessage(self, message, tag): ...

Publish-Subscribe Messaging

Push-Pull Messaging

Push-pull pattern for load-balanced work distribution and result collection in distributed processing scenarios.

class ZmqPushConnection(ZmqConnection):
    socketType = constants.PUSH
    
    def push(self, message): ...

class ZmqPullConnection(ZmqConnection):
    socketType = constants.PULL
    
    def onPull(self, message): ...

Push-Pull Messaging

Request-Reply Messaging

Request-reply pattern with timeout support and Twisted Deferred integration for building client-server applications with asynchronous response handling.

class ZmqRequestTimeoutError(Exception): ...

class ZmqREQConnection(ZmqConnection):
    socketType = constants.DEALER
    defaultRequestTimeout = None
    
    def sendMsg(self, *messageParts, **kwargs): ...

class ZmqREPConnection(ZmqConnection):
    socketType = constants.ROUTER
    
    def reply(self, messageId, *messageParts): ...
    def gotMessage(self, messageId, *messageParts): ...

Request-Reply Messaging

Router-Dealer Messaging

Advanced routing patterns providing raw ROUTER and DEALER socket access for building custom messaging topologies and complex routing scenarios.

class ZmqRouterConnection(ZmqConnection):
    socketType = constants.ROUTER
    
    def sendMsg(self, recipientId, message): ...
    def sendMultipart(self, recipientId, parts): ...
    def gotMessage(self, sender_id, *message): ...

class ZmqDealerConnection(ZmqConnection):
    socketType = constants.DEALER
    
    def sendMsg(self, message): ...
    def sendMultipart(self, parts): ...
    def gotMessage(self, *args): ...

Router-Dealer Messaging

Common Types

from collections import namedtuple
from typing import List, Union

# Endpoint types
class ZmqEndpointType:
    bind = "bind"      # Bind and listen for connections
    connect = "connect"  # Connect to existing endpoint

# Endpoint specification
ZmqEndpoint = namedtuple('ZmqEndpoint', ['type', 'address'])
# Usage: ZmqEndpoint(ZmqEndpointType.bind, "tcp://127.0.0.1:5555")

# Message formats (type hints)
Message = Union[bytes, List[bytes]]  # Single part or multipart message
Tag = bytes                          # Topic tag for pub/sub
MessageId = bytes                    # Unique identifier for req/rep
RecipientId = bytes                  # Recipient identifier for router