Twisted bindings for ZeroMQ enabling asynchronous ZMQ socket integration with Twisted's reactor pattern.
npx @tessl/cli install tessl/pypi-txzmq@1.0.00
# txZMQ
1
2
txZMQ provides Twisted bindings for ZeroMQ, enabling seamless integration between ZeroMQ messaging library and Twisted's asynchronous networking framework. It allows developers to use ZeroMQ sockets within Twisted's event loop (reactor) for building high-performance, message-oriented applications that combine Twisted's networking capabilities with ZeroMQ's efficient messaging patterns.
3
4
## Package Information
5
6
- **Package Name**: txZMQ
7
- **Language**: Python
8
- **Installation**: `pip install txZMQ`
9
- **Dependencies**: `Twisted>=10.0`, `pyzmq>=13`
10
- **Compatibility**: Python 2/3, CPython/PyPy, ZeroMQ 2.2.x/3.2.x
11
12
## Core Imports
13
14
```python
15
import txzmq
16
```
17
18
Common imports for typical usage:
19
20
```python
21
from txzmq import ZmqFactory, ZmqEndpoint, ZmqEndpointType
22
from txzmq import ZmqPubConnection, ZmqSubConnection
23
from txzmq import ZmqPushConnection, ZmqPullConnection
24
from txzmq import ZmqREQConnection, ZmqREPConnection
25
from txzmq import ZmqRouterConnection, ZmqDealerConnection
26
```
27
28
## Basic Usage
29
30
```python
31
from twisted.internet import reactor
32
from txzmq import ZmqFactory, ZmqEndpoint, ZmqEndpointType
33
from txzmq import ZmqPubConnection, ZmqSubConnection
34
35
# Create factory for managing ZeroMQ context
36
factory = ZmqFactory()
37
factory.registerForShutdown()
38
39
# Publisher example
40
endpoint = ZmqEndpoint(ZmqEndpointType.bind, "tcp://127.0.0.1:5555")
41
publisher = ZmqPubConnection(factory, endpoint)
42
publisher.publish(b"Hello World", b"topic1")
43
44
# Subscriber example
45
class MySubscriber(ZmqSubConnection):
46
def gotMessage(self, message, tag):
47
print(f"Received: {message} with tag: {tag}")
48
49
endpoint = ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5555")
50
subscriber = MySubscriber(factory, endpoint)
51
subscriber.subscribe(b"topic1")
52
53
reactor.run()
54
```
55
56
## Architecture
57
58
txZMQ follows a factory pattern combined with Twisted's descriptor interfaces:
59
60
- **ZmqFactory**: Manages ZeroMQ context and connection lifecycle, integrates with Twisted reactor
61
- **ZmqConnection**: Base class implementing Twisted's IReadDescriptor and IFileDescriptor interfaces
62
- **Pattern-Specific Connections**: Specialized classes for each ZeroMQ socket type (PUB/SUB, PUSH/PULL, REQ/REP, ROUTER/DEALER)
63
- **ZmqEndpoint**: Represents connection endpoints with bind/connect semantics
64
- **Integration**: Uses Twisted Deferred objects for asynchronous operations (REQ/REP pattern)
65
66
This design provides non-blocking message sending/receiving, proper integration with Twisted's event loop, and supports all major ZeroMQ messaging patterns within Twisted applications.
67
68
## Capabilities
69
70
### Factory and Connection Management
71
72
Core factory and connection infrastructure for managing ZeroMQ context, creating connections, and handling connection lifecycle within Twisted's reactor pattern.
73
74
```python { .api }
75
class ZmqFactory(object):
76
reactor = reactor
77
ioThreads = 1
78
lingerPeriod = 100
79
80
def __init__(self): ...
81
def shutdown(self): ...
82
def registerForShutdown(self): ...
83
84
class ZmqEndpoint(namedtuple('ZmqEndpoint', ['type', 'address'])): ...
85
86
class ZmqEndpointType(object):
87
bind = "bind"
88
connect = "connect"
89
90
class ZmqConnection(object):
91
def __init__(self, factory, endpoint=None, identity=None): ...
92
def addEndpoints(self, endpoints): ...
93
def shutdown(self): ...
94
def send(self, message): ...
95
def messageReceived(self, message): ...
96
```
97
98
[Factory and Connection Management](./factory-connection.md)
99
100
### Publish-Subscribe Messaging
101
102
Publisher-subscriber pattern for one-to-many broadcast messaging with topic-based filtering and subscription management.
103
104
```python { .api }
105
class ZmqPubConnection(ZmqConnection):
106
socketType = constants.PUB
107
108
def publish(self, message, tag=b''): ...
109
110
class ZmqSubConnection(ZmqConnection):
111
socketType = constants.SUB
112
113
def subscribe(self, tag): ...
114
def unsubscribe(self, tag): ...
115
def gotMessage(self, message, tag): ...
116
```
117
118
[Publish-Subscribe Messaging](./pubsub.md)
119
120
### Push-Pull Messaging
121
122
Push-pull pattern for load-balanced work distribution and result collection in distributed processing scenarios.
123
124
```python { .api }
125
class ZmqPushConnection(ZmqConnection):
126
socketType = constants.PUSH
127
128
def push(self, message): ...
129
130
class ZmqPullConnection(ZmqConnection):
131
socketType = constants.PULL
132
133
def onPull(self, message): ...
134
```
135
136
[Push-Pull Messaging](./pushpull.md)
137
138
### Request-Reply Messaging
139
140
Request-reply pattern with timeout support and Twisted Deferred integration for building client-server applications with asynchronous response handling.
141
142
```python { .api }
143
class ZmqRequestTimeoutError(Exception): ...
144
145
class ZmqREQConnection(ZmqConnection):
146
socketType = constants.DEALER
147
defaultRequestTimeout = None
148
149
def sendMsg(self, *messageParts, **kwargs): ...
150
151
class ZmqREPConnection(ZmqConnection):
152
socketType = constants.ROUTER
153
154
def reply(self, messageId, *messageParts): ...
155
def gotMessage(self, messageId, *messageParts): ...
156
```
157
158
[Request-Reply Messaging](./reqrep.md)
159
160
### Router-Dealer Messaging
161
162
Advanced routing patterns providing raw ROUTER and DEALER socket access for building custom messaging topologies and complex routing scenarios.
163
164
```python { .api }
165
class ZmqRouterConnection(ZmqConnection):
166
socketType = constants.ROUTER
167
168
def sendMsg(self, recipientId, message): ...
169
def sendMultipart(self, recipientId, parts): ...
170
def gotMessage(self, sender_id, *message): ...
171
172
class ZmqDealerConnection(ZmqConnection):
173
socketType = constants.DEALER
174
175
def sendMsg(self, message): ...
176
def sendMultipart(self, parts): ...
177
def gotMessage(self, *args): ...
178
```
179
180
[Router-Dealer Messaging](./router-dealer.md)
181
182
## Common Types
183
184
```python { .api }
185
from collections import namedtuple
186
from typing import List, Union
187
188
# Endpoint types
189
class ZmqEndpointType:
190
bind = "bind" # Bind and listen for connections
191
connect = "connect" # Connect to existing endpoint
192
193
# Endpoint specification
194
ZmqEndpoint = namedtuple('ZmqEndpoint', ['type', 'address'])
195
# Usage: ZmqEndpoint(ZmqEndpointType.bind, "tcp://127.0.0.1:5555")
196
197
# Message formats (type hints)
198
Message = Union[bytes, List[bytes]] # Single part or multipart message
199
Tag = bytes # Topic tag for pub/sub
200
MessageId = bytes # Unique identifier for req/rep
201
RecipientId = bytes # Recipient identifier for router
202
```