0
# Push-Pull Messaging
1
2
Push-pull pattern for load-balanced work distribution and result collection. This pattern creates a pipeline where work is distributed among multiple workers (pull) and results are collected (push). It provides automatic load balancing and is ideal for parallel processing, task queues, and distributed computing scenarios.
3
4
## Capabilities
5
6
### Push Connection
7
8
Sends work items to available workers in a load-balanced manner. Messages are automatically distributed among connected pull sockets.
9
10
```python { .api }
11
class ZmqPushConnection(ZmqConnection):
12
"""
13
Push connection for distributing work to workers.
14
15
Uses ZeroMQ PUSH socket type. Messages are load-balanced automatically
16
among connected PULL sockets using round-robin distribution.
17
"""
18
19
socketType = constants.PUSH
20
21
def push(self, message):
22
"""
23
Push work item to next available worker.
24
25
Args:
26
message (bytes): Work item data to be processed
27
Single part message containing task data
28
29
Note:
30
ZeroMQ automatically load-balances messages among connected
31
PULL sockets. Each message goes to exactly one worker.
32
"""
33
```
34
35
#### Push Usage Example
36
37
```python
38
from twisted.internet import reactor
39
from txzmq import ZmqFactory, ZmqEndpoint, ZmqEndpointType, ZmqPushConnection
40
import json
41
import uuid
42
43
# Work distributor
44
class WorkDistributor:
45
def __init__(self, factory, bind_address):
46
endpoint = ZmqEndpoint(ZmqEndpointType.bind, bind_address)
47
self.pusher = ZmqPushConnection(factory, endpoint)
48
49
def distribute_work(self, work_items):
50
"""Distribute work items to workers."""
51
for item in work_items:
52
# Add unique ID for tracking
53
work_package = {
54
'id': str(uuid.uuid4()),
55
'task': item['task'],
56
'data': item['data'],
57
'priority': item.get('priority', 'normal')
58
}
59
message = json.dumps(work_package).encode('utf-8')
60
self.pusher.push(message)
61
print(f"Sent work item {work_package['id']}: {work_package['task']}")
62
63
# Usage
64
factory = ZmqFactory()
65
distributor = WorkDistributor(factory, "tcp://*:5555")
66
67
# Distribute various types of work
68
work_queue = [
69
{'task': 'process_image', 'data': 'image1.jpg'},
70
{'task': 'calculate_stats', 'data': [1, 2, 3, 4, 5]},
71
{'task': 'send_email', 'data': 'user@example.com', 'priority': 'high'},
72
{'task': 'backup_database', 'data': 'table_users'},
73
{'task': 'generate_report', 'data': 'monthly_sales'}
74
]
75
76
distributor.distribute_work(work_queue)
77
78
# Continuous work distribution
79
def generate_periodic_work():
80
work_item = {
81
'task': 'health_check',
82
'data': f'timestamp_{reactor.seconds()}'
83
}
84
distributor.distribute_work([work_item])
85
reactor.callLater(10.0, generate_periodic_work) # Every 10 seconds
86
87
generate_periodic_work()
88
reactor.run()
89
```
90
91
### Pull Connection
92
93
Receives work items from pushers and processes them. Multiple pull connections can connect to the same push source for parallel processing.
94
95
```python { .api }
96
class ZmqPullConnection(ZmqConnection):
97
"""
98
Pull connection for receiving work from pushers.
99
100
Uses ZeroMQ PULL socket type. Receives work items in load-balanced manner
101
from connected PUSH sockets. Each worker gets different messages.
102
"""
103
104
socketType = constants.PULL
105
106
def onPull(self, message):
107
"""
108
Abstract method called when work item is received.
109
110
Must be implemented by subclasses to process work items.
111
112
Args:
113
message (list): List containing single message part with work data
114
message[0] contains the actual work item (bytes)
115
"""
116
```
117
118
#### Pull Usage Example
119
120
```python
121
from twisted.internet import reactor, defer
122
from txzmq import ZmqFactory, ZmqEndpoint, ZmqEndpointType, ZmqPullConnection
123
import json
124
import time
125
126
class Worker(ZmqPullConnection):
127
def __init__(self, factory, endpoint, worker_id):
128
super().__init__(factory, endpoint)
129
self.worker_id = worker_id
130
self.processed_count = 0
131
132
def onPull(self, message):
133
"""Process received work item."""
134
try:
135
# Parse work item
136
work_data = json.loads(message[0].decode('utf-8'))
137
work_id = work_data['id']
138
task_type = work_data['task']
139
task_data = work_data['data']
140
141
print(f"Worker {self.worker_id} processing {work_id}: {task_type}")
142
143
# Simulate different types of work
144
if task_type == 'process_image':
145
self.process_image(task_data, work_id)
146
elif task_type == 'calculate_stats':
147
self.calculate_stats(task_data, work_id)
148
elif task_type == 'send_email':
149
self.send_email(task_data, work_id)
150
elif task_type == 'backup_database':
151
self.backup_database(task_data, work_id)
152
elif task_type == 'generate_report':
153
self.generate_report(task_data, work_id)
154
elif task_type == 'health_check':
155
self.health_check(task_data, work_id)
156
else:
157
print(f"Worker {self.worker_id}: Unknown task type {task_type}")
158
159
self.processed_count += 1
160
161
except Exception as e:
162
print(f"Worker {self.worker_id} error processing message: {e}")
163
164
def process_image(self, image_path, work_id):
165
# Simulate image processing
166
time.sleep(2) # Simulate work
167
print(f"Worker {self.worker_id}: Processed image {image_path} ({work_id})")
168
169
def calculate_stats(self, data, work_id):
170
# Simulate statistical calculation
171
result = sum(data) / len(data)
172
time.sleep(1)
173
print(f"Worker {self.worker_id}: Calculated average {result} ({work_id})")
174
175
def send_email(self, email, work_id):
176
# Simulate email sending
177
time.sleep(0.5)
178
print(f"Worker {self.worker_id}: Sent email to {email} ({work_id})")
179
180
def backup_database(self, table, work_id):
181
# Simulate database backup
182
time.sleep(3)
183
print(f"Worker {self.worker_id}: Backed up {table} ({work_id})")
184
185
def generate_report(self, report_type, work_id):
186
# Simulate report generation
187
time.sleep(2.5)
188
print(f"Worker {self.worker_id}: Generated {report_type} report ({work_id})")
189
190
def health_check(self, data, work_id):
191
# Quick health check
192
print(f"Worker {self.worker_id}: Health check OK - {data} ({work_id})")
193
194
# Create multiple workers
195
factory = ZmqFactory()
196
endpoint = ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5555")
197
198
# Start multiple worker processes
199
workers = []
200
for i in range(3):
201
worker = Worker(factory, endpoint, f"W{i+1}")
202
workers.append(worker)
203
204
# Monitor worker performance
205
def print_stats():
206
total_processed = sum(w.processed_count for w in workers)
207
print(f"\n=== Stats ===")
208
for worker in workers:
209
print(f"{worker.worker_id}: {worker.processed_count} items processed")
210
print(f"Total: {total_processed} items")
211
print("=============\n")
212
reactor.callLater(30.0, print_stats) # Every 30 seconds
213
214
print_stats()
215
reactor.run()
216
```
217
218
### Pipeline Architecture
219
220
Building multi-stage processing pipelines using push-pull patterns for complex data processing workflows.
221
222
#### Two-Stage Pipeline Example
223
224
```python
225
from twisted.internet import reactor
226
from txzmq import ZmqFactory, ZmqEndpoint, ZmqEndpointType
227
from txzmq import ZmqPushConnection, ZmqPullConnection
228
import json
229
230
# Stage 1: Data preprocessor
231
class Preprocessor(ZmqPullConnection):
232
def __init__(self, factory, input_endpoint, output_endpoint):
233
super().__init__(factory, input_endpoint)
234
# Connect to next stage
235
self.output = ZmqPushConnection(factory, output_endpoint)
236
237
def onPull(self, message):
238
# Receive raw data from stage 0 (data source)
239
raw_data = json.loads(message[0].decode('utf-8'))
240
241
# Preprocess the data
242
processed_data = {
243
'id': raw_data['id'],
244
'processed_at': reactor.seconds(),
245
'normalized_data': self.normalize(raw_data['raw_values']),
246
'metadata': raw_data.get('metadata', {})
247
}
248
249
# Send to next stage
250
self.output.push(json.dumps(processed_data).encode('utf-8'))
251
print(f"Preprocessed item {processed_data['id']}")
252
253
def normalize(self, values):
254
"""Simple data normalization."""
255
if not values:
256
return []
257
max_val = max(values)
258
return [v / max_val for v in values] if max_val > 0 else values
259
260
# Stage 2: Data analyzer
261
class Analyzer(ZmqPullConnection):
262
def __init__(self, factory, input_endpoint, output_endpoint):
263
super().__init__(factory, input_endpoint)
264
self.output = ZmqPushConnection(factory, output_endpoint)
265
266
def onPull(self, message):
267
# Receive preprocessed data from stage 1
268
processed_data = json.loads(message[0].decode('utf-8'))
269
270
# Analyze the data
271
analysis_result = {
272
'id': processed_data['id'],
273
'analyzed_at': reactor.seconds(),
274
'mean': self.calculate_mean(processed_data['normalized_data']),
275
'variance': self.calculate_variance(processed_data['normalized_data']),
276
'trend': self.detect_trend(processed_data['normalized_data']),
277
'original_metadata': processed_data['metadata']
278
}
279
280
# Send to final stage (results collector)
281
self.output.push(json.dumps(analysis_result).encode('utf-8'))
282
print(f"Analyzed item {analysis_result['id']}: trend={analysis_result['trend']}")
283
284
def calculate_mean(self, values):
285
return sum(values) / len(values) if values else 0
286
287
def calculate_variance(self, values):
288
if not values:
289
return 0
290
mean = self.calculate_mean(values)
291
return sum((x - mean) ** 2 for x in values) / len(values)
292
293
def detect_trend(self, values):
294
if len(values) < 2:
295
return "unknown"
296
return "increasing" if values[-1] > values[0] else "decreasing"
297
298
# Results collector
299
class ResultsCollector(ZmqPullConnection):
300
def __init__(self, factory, input_endpoint):
301
super().__init__(factory, input_endpoint)
302
self.results = []
303
304
def onPull(self, message):
305
# Receive final analysis results
306
result = json.loads(message[0].decode('utf-8'))
307
self.results.append(result)
308
309
print(f"Collected result {result['id']}: "
310
f"mean={result['mean']:.3f}, "
311
f"variance={result['variance']:.3f}, "
312
f"trend={result['trend']}")
313
314
# Could save to database, file, or forward to another system
315
if len(self.results) % 10 == 0:
316
print(f"Collected {len(self.results)} total results")
317
318
# Set up pipeline
319
factory = ZmqFactory()
320
321
# Create pipeline stages
322
# Stage 0 -> Stage 1
323
preprocessor = Preprocessor(
324
factory,
325
ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5555"), # Input
326
ZmqEndpoint(ZmqEndpointType.bind, "tcp://*:5556") # Output
327
)
328
329
# Stage 1 -> Stage 2
330
analyzer = Analyzer(
331
factory,
332
ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5556"), # Input
333
ZmqEndpoint(ZmqEndpointType.bind, "tcp://*:5557") # Output
334
)
335
336
# Stage 2 -> Final
337
collector = ResultsCollector(
338
factory,
339
ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5557") # Input
340
)
341
342
print("Pipeline ready: Stage0 -> Preprocessor -> Analyzer -> Collector")
343
reactor.run()
344
```
345
346
### Load Balancing and Scalability
347
348
Horizontal scaling patterns for high-throughput processing using multiple workers and dynamic load distribution.
349
350
```python
351
class ScalableWorkerPool:
352
"""Manages a pool of workers that can be dynamically scaled."""
353
354
def __init__(self, factory, work_source_address, result_sink_address=None):
355
self.factory = factory
356
self.work_source = work_source_address
357
self.result_sink = result_sink_address
358
self.workers = []
359
self.worker_stats = {}
360
361
def add_worker(self, worker_class, worker_id):
362
"""Add a new worker to the pool."""
363
work_endpoint = ZmqEndpoint(ZmqEndpointType.connect, self.work_source)
364
365
if self.result_sink:
366
result_endpoint = ZmqEndpoint(ZmqEndpointType.connect, self.result_sink)
367
worker = worker_class(self.factory, work_endpoint, result_endpoint, worker_id)
368
else:
369
worker = worker_class(self.factory, work_endpoint, worker_id)
370
371
self.workers.append(worker)
372
self.worker_stats[worker_id] = {'started': reactor.seconds(), 'processed': 0}
373
print(f"Added worker {worker_id} to pool (total: {len(self.workers)})")
374
375
def remove_worker(self, worker_id):
376
"""Remove a worker from the pool."""
377
for worker in self.workers:
378
if hasattr(worker, 'worker_id') and worker.worker_id == worker_id:
379
worker.shutdown()
380
self.workers.remove(worker)
381
del self.worker_stats[worker_id]
382
print(f"Removed worker {worker_id} from pool (total: {len(self.workers)})")
383
break
384
385
def scale_to(self, target_workers, worker_class):
386
"""Scale worker pool to target size."""
387
current_count = len(self.workers)
388
389
if target_workers > current_count:
390
# Scale up
391
for i in range(target_workers - current_count):
392
worker_id = f"worker_{current_count + i + 1}"
393
self.add_worker(worker_class, worker_id)
394
elif target_workers < current_count:
395
# Scale down
396
for i in range(current_count - target_workers):
397
if self.workers:
398
last_worker = self.workers[-1]
399
if hasattr(last_worker, 'worker_id'):
400
self.remove_worker(last_worker.worker_id)
401
402
# Usage example
403
class ProcessingWorker(ZmqPullConnection):
404
def __init__(self, factory, work_endpoint, worker_id):
405
super().__init__(factory, work_endpoint)
406
self.worker_id = worker_id
407
self.processed_count = 0
408
409
def onPull(self, message):
410
# Process work item
411
work_data = json.loads(message[0].decode('utf-8'))
412
# Simulate processing time
413
import time
414
time.sleep(0.1)
415
self.processed_count += 1
416
print(f"{self.worker_id} processed item {work_data.get('id', 'unknown')}")
417
418
# Create scalable worker pool
419
factory = ZmqFactory()
420
pool = ScalableWorkerPool(factory, "tcp://127.0.0.1:5555")
421
422
# Start with 2 workers
423
pool.scale_to(2, ProcessingWorker)
424
425
# Simulate dynamic scaling based on load
426
def monitor_and_scale():
427
# Scale up during high load periods
428
current_hour = int(reactor.seconds()) % 24
429
if 9 <= current_hour <= 17: # Business hours
430
pool.scale_to(5, ProcessingWorker)
431
else: # Off hours
432
pool.scale_to(2, ProcessingWorker)
433
434
reactor.callLater(3600, monitor_and_scale) # Check every hour
435
436
monitor_and_scale()
437
```