0
# Factory and Connection Management
1
2
Core infrastructure for managing ZeroMQ context, creating connections, and handling connection lifecycle within Twisted's reactor pattern. The factory manages the global ZeroMQ context while connections handle individual socket operations.
3
4
## Capabilities
5
6
### ZeroMQ Factory
7
8
Manages ZeroMQ context and connection lifecycle, providing centralized context management and reactor integration.
9
10
```python { .api }
11
class ZmqFactory(object):
12
"""
13
Factory for creating and managing ZeroMQ connections.
14
15
Attributes:
16
reactor: Twisted reactor reference (default: twisted.internet.reactor)
17
ioThreads (int): Number of I/O threads for ZeroMQ context (default: 1)
18
lingerPeriod (int): Linger period in milliseconds for socket closure (default: 100)
19
connections (set): Set of active ZmqConnection instances
20
context: ZeroMQ context instance
21
"""
22
23
reactor = reactor
24
ioThreads = 1
25
lingerPeriod = 100
26
27
def __init__(self):
28
"""Create ZeroMQ context with specified I/O threads."""
29
30
def shutdown(self):
31
"""
32
Shutdown all connections and terminate ZeroMQ context.
33
Cleans up reactor triggers and closes all managed connections.
34
"""
35
36
def registerForShutdown(self):
37
"""
38
Register factory for automatic shutdown when reactor shuts down.
39
Recommended to call on any created factory.
40
"""
41
```
42
43
#### Usage Example
44
45
```python
46
from twisted.internet import reactor
47
from txzmq import ZmqFactory
48
49
# Create factory
50
factory = ZmqFactory()
51
factory.registerForShutdown() # Auto-cleanup on reactor shutdown
52
53
# Configure factory settings
54
factory.ioThreads = 2 # Use 2 I/O threads
55
factory.lingerPeriod = 500 # Wait 500ms for pending messages
56
57
# Factory will automatically manage context and connections
58
# When reactor shuts down, all connections are cleaned up
59
```
60
61
### Connection Endpoints
62
63
Endpoint specification for ZeroMQ connection addressing and binding/connecting semantics.
64
65
```python { .api }
66
class ZmqEndpointType(object):
67
"""Constants for endpoint connection types."""
68
69
bind = "bind" # Bind and listen for incoming connections
70
connect = "connect" # Connect to existing endpoint
71
72
ZmqEndpoint = namedtuple('ZmqEndpoint', ['type', 'address'])
73
"""
74
Named tuple representing a ZeroMQ endpoint.
75
76
Fields:
77
type (str): Either ZmqEndpointType.bind or ZmqEndpointType.connect
78
address (str): ZeroMQ address (e.g., "tcp://127.0.0.1:5555", "ipc:///tmp/socket")
79
"""
80
```
81
82
#### Usage Example
83
84
```python
85
from txzmq import ZmqEndpoint, ZmqEndpointType
86
87
# Create endpoints for different protocols
88
tcp_bind = ZmqEndpoint(ZmqEndpointType.bind, "tcp://127.0.0.1:5555")
89
tcp_connect = ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5555")
90
91
ipc_bind = ZmqEndpoint(ZmqEndpointType.bind, "ipc:///tmp/my_socket")
92
inproc_connect = ZmqEndpoint(ZmqEndpointType.connect, "inproc://internal")
93
94
# Multiple endpoints can be used with single connection
95
endpoints = [
96
ZmqEndpoint(ZmqEndpointType.bind, "tcp://*:5555"),
97
ZmqEndpoint(ZmqEndpointType.bind, "ipc:///tmp/backup")
98
]
99
```
100
101
### Base Connection Class
102
103
Abstract base class for all ZeroMQ connections, implementing Twisted descriptor interfaces and providing common connection functionality.
104
105
```python { .api }
106
class ZmqConnection(object):
107
"""
108
Base class for ZeroMQ connections with Twisted integration.
109
110
Implements IReadDescriptor and IFileDescriptor interfaces.
111
Should not be used directly - use pattern-specific subclasses.
112
113
Class Attributes:
114
socketType: ZeroMQ socket type constant (must be set by subclasses)
115
allowLoopbackMulticast (bool): Allow loopback multicast (default: False)
116
multicastRate (int): Multicast rate in kbps (default: 100)
117
highWaterMark (int): High water mark for message queuing (default: 0)
118
tcpKeepalive (int): TCP keepalive setting (default: 0)
119
tcpKeepaliveCount (int): TCP keepalive count (default: 0)
120
tcpKeepaliveIdle (int): TCP keepalive idle time (default: 0)
121
tcpKeepaliveInterval (int): TCP keepalive interval (default: 0)
122
reconnectInterval (int): Reconnection interval in ms (default: 100)
123
reconnectIntervalMax (int): Maximum reconnection interval (default: 0)
124
125
Instance Attributes:
126
factory (ZmqFactory): Associated factory instance
127
endpoints (list): List of ZmqEndpoint objects
128
identity (bytes): Socket identity for routing
129
socket: Underlying ZeroMQ socket
130
fd (int): File descriptor for reactor integration
131
"""
132
133
socketType = None # Must be overridden by subclasses
134
allowLoopbackMulticast = False
135
multicastRate = 100
136
highWaterMark = 0
137
tcpKeepalive = 0
138
tcpKeepaliveCount = 0
139
tcpKeepaliveIdle = 0
140
tcpKeepaliveInterval = 0
141
reconnectInterval = 100
142
reconnectIntervalMax = 0
143
144
def __init__(self, factory, endpoint=None, identity=None):
145
"""
146
Initialize connection.
147
148
Args:
149
factory (ZmqFactory): Factory managing this connection
150
endpoint (ZmqEndpoint, optional): Initial endpoint to connect/bind
151
identity (bytes, optional): Socket identity for routing
152
"""
153
154
def addEndpoints(self, endpoints):
155
"""
156
Add connection endpoints after initialization.
157
158
Args:
159
endpoints (list): List of ZmqEndpoint objects to add
160
"""
161
162
def shutdown(self):
163
"""
164
Shutdown connection and close socket.
165
Removes from reactor and cleans up resources.
166
"""
167
168
def send(self, message):
169
"""
170
Send message via ZeroMQ socket.
171
172
Args:
173
message (bytes or list): Message data - single part (bytes) or
174
multipart (list of bytes)
175
176
Raises:
177
ZMQError: If sending fails (e.g., EAGAIN when HWM reached)
178
"""
179
180
def messageReceived(self, message):
181
"""
182
Abstract method called when message is received.
183
184
Must be implemented by subclasses to handle incoming messages.
185
186
Args:
187
message (list): List of message parts (bytes)
188
"""
189
190
def fileno(self):
191
"""
192
Get file descriptor for Twisted reactor integration.
193
194
Returns:
195
int: Platform file descriptor number
196
"""
197
198
def connectionLost(self, reason):
199
"""
200
Handle connection loss (Twisted interface).
201
202
Args:
203
reason: Reason for connection loss
204
"""
205
206
def doRead(self):
207
"""
208
Handle read events from reactor (Twisted interface).
209
Processes incoming ZeroMQ messages.
210
"""
211
212
def logPrefix(self):
213
"""
214
Get log prefix for Twisted logging.
215
216
Returns:
217
str: Log prefix ("ZMQ")
218
"""
219
```
220
221
#### Usage Example
222
223
```python
224
from txzmq import ZmqFactory, ZmqEndpoint, ZmqEndpointType, ZmqPubConnection
225
226
factory = ZmqFactory()
227
228
# Create connection with single endpoint
229
endpoint = ZmqEndpoint(ZmqEndpointType.bind, "tcp://127.0.0.1:5555")
230
pub = ZmqPubConnection(factory, endpoint)
231
232
# Add additional endpoints
233
additional_endpoints = [
234
ZmqEndpoint(ZmqEndpointType.bind, "ipc:///tmp/pub_socket"),
235
ZmqEndpoint(ZmqEndpointType.connect, "tcp://remote-server:5556")
236
]
237
pub.addEndpoints(additional_endpoints)
238
239
# Configure connection-specific settings
240
pub.highWaterMark = 1000 # Limit queued messages
241
pub.multicastRate = 200 # Increase multicast rate
242
243
# Connection automatically integrates with Twisted reactor
244
# Messages are processed asynchronously through doRead()
245
```
246
247
### Connection Configuration
248
249
Advanced configuration options for fine-tuning connection behavior, network settings, and performance characteristics.
250
251
```python { .api }
252
# Network and Performance Settings
253
class ZmqConnection:
254
allowLoopbackMulticast = False # Enable loopback multicast
255
multicastRate = 100 # Multicast rate in kbps
256
highWaterMark = 0 # Message queue limit (0 = unlimited)
257
258
# TCP Keepalive (ZeroMQ 3.x only)
259
tcpKeepalive = 0 # Enable TCP keepalive
260
tcpKeepaliveCount = 0 # Keepalive probe count
261
tcpKeepaliveIdle = 0 # Keepalive idle time
262
tcpKeepaliveInterval = 0 # Keepalive probe interval
263
264
# Reconnection Settings (ZeroMQ 3.x only)
265
reconnectInterval = 100 # Initial reconnect interval (ms)
266
reconnectIntervalMax = 0 # Maximum reconnect interval (ms)
267
```
268
269
#### Configuration Example
270
271
```python
272
from txzmq import ZmqFactory, ZmqPubConnection, ZmqEndpoint, ZmqEndpointType
273
274
class HighPerformancePub(ZmqPubConnection):
275
# Configure for high-throughput publishing
276
highWaterMark = 10000 # Queue up to 10k messages
277
multicastRate = 1000 # High multicast rate
278
279
# Enable TCP keepalive for reliable connections
280
tcpKeepalive = 1
281
tcpKeepaliveIdle = 600 # 10 minutes idle
282
tcpKeepaliveInterval = 60 # 1 minute between probes
283
tcpKeepaliveCount = 3 # 3 failed probes = disconnect
284
285
# Aggressive reconnection
286
reconnectInterval = 50 # Start at 50ms
287
reconnectIntervalMax = 5000 # Cap at 5 seconds
288
289
factory = ZmqFactory()
290
endpoint = ZmqEndpoint(ZmqEndpointType.bind, "tcp://*:5555")
291
pub = HighPerformancePub(factory, endpoint)
292
```