Twisted bindings for ZeroMQ enabling asynchronous ZMQ socket integration with Twisted's reactor pattern.
Push-pull pattern for load-balanced work distribution and result collection. This pattern creates a pipeline where work is distributed among multiple workers (pull) and results are collected (push). It provides automatic load balancing and is ideal for parallel processing, task queues, and distributed computing scenarios.
Sends work items to available workers in a load-balanced manner. Messages are automatically distributed among connected pull sockets.
class ZmqPushConnection(ZmqConnection):
"""
Push connection for distributing work to workers.
Uses ZeroMQ PUSH socket type. Messages are load-balanced automatically
among connected PULL sockets using round-robin distribution.
"""
socketType = constants.PUSH
def push(self, message):
"""
Push work item to next available worker.
Args:
message (bytes): Work item data to be processed
Single part message containing task data
Note:
ZeroMQ automatically load-balances messages among connected
PULL sockets. Each message goes to exactly one worker.
"""from twisted.internet import reactor
from txzmq import ZmqFactory, ZmqEndpoint, ZmqEndpointType, ZmqPushConnection
import json
import uuid
# Work distributor
class WorkDistributor:
def __init__(self, factory, bind_address):
endpoint = ZmqEndpoint(ZmqEndpointType.bind, bind_address)
self.pusher = ZmqPushConnection(factory, endpoint)
def distribute_work(self, work_items):
"""Distribute work items to workers."""
for item in work_items:
# Add unique ID for tracking
work_package = {
'id': str(uuid.uuid4()),
'task': item['task'],
'data': item['data'],
'priority': item.get('priority', 'normal')
}
message = json.dumps(work_package).encode('utf-8')
self.pusher.push(message)
print(f"Sent work item {work_package['id']}: {work_package['task']}")
# Usage
factory = ZmqFactory()
distributor = WorkDistributor(factory, "tcp://*:5555")
# Distribute various types of work
work_queue = [
{'task': 'process_image', 'data': 'image1.jpg'},
{'task': 'calculate_stats', 'data': [1, 2, 3, 4, 5]},
{'task': 'send_email', 'data': 'user@example.com', 'priority': 'high'},
{'task': 'backup_database', 'data': 'table_users'},
{'task': 'generate_report', 'data': 'monthly_sales'}
]
distributor.distribute_work(work_queue)
# Continuous work distribution
def generate_periodic_work():
work_item = {
'task': 'health_check',
'data': f'timestamp_{reactor.seconds()}'
}
distributor.distribute_work([work_item])
reactor.callLater(10.0, generate_periodic_work) # Every 10 seconds
generate_periodic_work()
reactor.run()Receives work items from pushers and processes them. Multiple pull connections can connect to the same push source for parallel processing.
class ZmqPullConnection(ZmqConnection):
"""
Pull connection for receiving work from pushers.
Uses ZeroMQ PULL socket type. Receives work items in load-balanced manner
from connected PUSH sockets. Each worker gets different messages.
"""
socketType = constants.PULL
def onPull(self, message):
"""
Abstract method called when work item is received.
Must be implemented by subclasses to process work items.
Args:
message (list): List containing single message part with work data
message[0] contains the actual work item (bytes)
"""from twisted.internet import reactor, defer
from txzmq import ZmqFactory, ZmqEndpoint, ZmqEndpointType, ZmqPullConnection
import json
import time
class Worker(ZmqPullConnection):
def __init__(self, factory, endpoint, worker_id):
super().__init__(factory, endpoint)
self.worker_id = worker_id
self.processed_count = 0
def onPull(self, message):
"""Process received work item."""
try:
# Parse work item
work_data = json.loads(message[0].decode('utf-8'))
work_id = work_data['id']
task_type = work_data['task']
task_data = work_data['data']
print(f"Worker {self.worker_id} processing {work_id}: {task_type}")
# Simulate different types of work
if task_type == 'process_image':
self.process_image(task_data, work_id)
elif task_type == 'calculate_stats':
self.calculate_stats(task_data, work_id)
elif task_type == 'send_email':
self.send_email(task_data, work_id)
elif task_type == 'backup_database':
self.backup_database(task_data, work_id)
elif task_type == 'generate_report':
self.generate_report(task_data, work_id)
elif task_type == 'health_check':
self.health_check(task_data, work_id)
else:
print(f"Worker {self.worker_id}: Unknown task type {task_type}")
self.processed_count += 1
except Exception as e:
print(f"Worker {self.worker_id} error processing message: {e}")
def process_image(self, image_path, work_id):
# Simulate image processing
time.sleep(2) # Simulate work
print(f"Worker {self.worker_id}: Processed image {image_path} ({work_id})")
def calculate_stats(self, data, work_id):
# Simulate statistical calculation
result = sum(data) / len(data)
time.sleep(1)
print(f"Worker {self.worker_id}: Calculated average {result} ({work_id})")
def send_email(self, email, work_id):
# Simulate email sending
time.sleep(0.5)
print(f"Worker {self.worker_id}: Sent email to {email} ({work_id})")
def backup_database(self, table, work_id):
# Simulate database backup
time.sleep(3)
print(f"Worker {self.worker_id}: Backed up {table} ({work_id})")
def generate_report(self, report_type, work_id):
# Simulate report generation
time.sleep(2.5)
print(f"Worker {self.worker_id}: Generated {report_type} report ({work_id})")
def health_check(self, data, work_id):
# Quick health check
print(f"Worker {self.worker_id}: Health check OK - {data} ({work_id})")
# Create multiple workers
factory = ZmqFactory()
endpoint = ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5555")
# Start multiple worker processes
workers = []
for i in range(3):
worker = Worker(factory, endpoint, f"W{i+1}")
workers.append(worker)
# Monitor worker performance
def print_stats():
total_processed = sum(w.processed_count for w in workers)
print(f"\n=== Stats ===")
for worker in workers:
print(f"{worker.worker_id}: {worker.processed_count} items processed")
print(f"Total: {total_processed} items")
print("=============\n")
reactor.callLater(30.0, print_stats) # Every 30 seconds
print_stats()
reactor.run()Building multi-stage processing pipelines using push-pull patterns for complex data processing workflows.
from twisted.internet import reactor
from txzmq import ZmqFactory, ZmqEndpoint, ZmqEndpointType
from txzmq import ZmqPushConnection, ZmqPullConnection
import json
# Stage 1: Data preprocessor
class Preprocessor(ZmqPullConnection):
def __init__(self, factory, input_endpoint, output_endpoint):
super().__init__(factory, input_endpoint)
# Connect to next stage
self.output = ZmqPushConnection(factory, output_endpoint)
def onPull(self, message):
# Receive raw data from stage 0 (data source)
raw_data = json.loads(message[0].decode('utf-8'))
# Preprocess the data
processed_data = {
'id': raw_data['id'],
'processed_at': reactor.seconds(),
'normalized_data': self.normalize(raw_data['raw_values']),
'metadata': raw_data.get('metadata', {})
}
# Send to next stage
self.output.push(json.dumps(processed_data).encode('utf-8'))
print(f"Preprocessed item {processed_data['id']}")
def normalize(self, values):
"""Simple data normalization."""
if not values:
return []
max_val = max(values)
return [v / max_val for v in values] if max_val > 0 else values
# Stage 2: Data analyzer
class Analyzer(ZmqPullConnection):
def __init__(self, factory, input_endpoint, output_endpoint):
super().__init__(factory, input_endpoint)
self.output = ZmqPushConnection(factory, output_endpoint)
def onPull(self, message):
# Receive preprocessed data from stage 1
processed_data = json.loads(message[0].decode('utf-8'))
# Analyze the data
analysis_result = {
'id': processed_data['id'],
'analyzed_at': reactor.seconds(),
'mean': self.calculate_mean(processed_data['normalized_data']),
'variance': self.calculate_variance(processed_data['normalized_data']),
'trend': self.detect_trend(processed_data['normalized_data']),
'original_metadata': processed_data['metadata']
}
# Send to final stage (results collector)
self.output.push(json.dumps(analysis_result).encode('utf-8'))
print(f"Analyzed item {analysis_result['id']}: trend={analysis_result['trend']}")
def calculate_mean(self, values):
return sum(values) / len(values) if values else 0
def calculate_variance(self, values):
if not values:
return 0
mean = self.calculate_mean(values)
return sum((x - mean) ** 2 for x in values) / len(values)
def detect_trend(self, values):
if len(values) < 2:
return "unknown"
return "increasing" if values[-1] > values[0] else "decreasing"
# Results collector
class ResultsCollector(ZmqPullConnection):
def __init__(self, factory, input_endpoint):
super().__init__(factory, input_endpoint)
self.results = []
def onPull(self, message):
# Receive final analysis results
result = json.loads(message[0].decode('utf-8'))
self.results.append(result)
print(f"Collected result {result['id']}: "
f"mean={result['mean']:.3f}, "
f"variance={result['variance']:.3f}, "
f"trend={result['trend']}")
# Could save to database, file, or forward to another system
if len(self.results) % 10 == 0:
print(f"Collected {len(self.results)} total results")
# Set up pipeline
factory = ZmqFactory()
# Create pipeline stages
# Stage 0 -> Stage 1
preprocessor = Preprocessor(
factory,
ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5555"), # Input
ZmqEndpoint(ZmqEndpointType.bind, "tcp://*:5556") # Output
)
# Stage 1 -> Stage 2
analyzer = Analyzer(
factory,
ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5556"), # Input
ZmqEndpoint(ZmqEndpointType.bind, "tcp://*:5557") # Output
)
# Stage 2 -> Final
collector = ResultsCollector(
factory,
ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5557") # Input
)
print("Pipeline ready: Stage0 -> Preprocessor -> Analyzer -> Collector")
reactor.run()Horizontal scaling patterns for high-throughput processing using multiple workers and dynamic load distribution.
class ScalableWorkerPool:
"""Manages a pool of workers that can be dynamically scaled."""
def __init__(self, factory, work_source_address, result_sink_address=None):
self.factory = factory
self.work_source = work_source_address
self.result_sink = result_sink_address
self.workers = []
self.worker_stats = {}
def add_worker(self, worker_class, worker_id):
"""Add a new worker to the pool."""
work_endpoint = ZmqEndpoint(ZmqEndpointType.connect, self.work_source)
if self.result_sink:
result_endpoint = ZmqEndpoint(ZmqEndpointType.connect, self.result_sink)
worker = worker_class(self.factory, work_endpoint, result_endpoint, worker_id)
else:
worker = worker_class(self.factory, work_endpoint, worker_id)
self.workers.append(worker)
self.worker_stats[worker_id] = {'started': reactor.seconds(), 'processed': 0}
print(f"Added worker {worker_id} to pool (total: {len(self.workers)})")
def remove_worker(self, worker_id):
"""Remove a worker from the pool."""
for worker in self.workers:
if hasattr(worker, 'worker_id') and worker.worker_id == worker_id:
worker.shutdown()
self.workers.remove(worker)
del self.worker_stats[worker_id]
print(f"Removed worker {worker_id} from pool (total: {len(self.workers)})")
break
def scale_to(self, target_workers, worker_class):
"""Scale worker pool to target size."""
current_count = len(self.workers)
if target_workers > current_count:
# Scale up
for i in range(target_workers - current_count):
worker_id = f"worker_{current_count + i + 1}"
self.add_worker(worker_class, worker_id)
elif target_workers < current_count:
# Scale down
for i in range(current_count - target_workers):
if self.workers:
last_worker = self.workers[-1]
if hasattr(last_worker, 'worker_id'):
self.remove_worker(last_worker.worker_id)
# Usage example
class ProcessingWorker(ZmqPullConnection):
def __init__(self, factory, work_endpoint, worker_id):
super().__init__(factory, work_endpoint)
self.worker_id = worker_id
self.processed_count = 0
def onPull(self, message):
# Process work item
work_data = json.loads(message[0].decode('utf-8'))
# Simulate processing time
import time
time.sleep(0.1)
self.processed_count += 1
print(f"{self.worker_id} processed item {work_data.get('id', 'unknown')}")
# Create scalable worker pool
factory = ZmqFactory()
pool = ScalableWorkerPool(factory, "tcp://127.0.0.1:5555")
# Start with 2 workers
pool.scale_to(2, ProcessingWorker)
# Simulate dynamic scaling based on load
def monitor_and_scale():
# Scale up during high load periods
current_hour = int(reactor.seconds()) % 24
if 9 <= current_hour <= 17: # Business hours
pool.scale_to(5, ProcessingWorker)
else: # Off hours
pool.scale_to(2, ProcessingWorker)
reactor.callLater(3600, monitor_and_scale) # Check every hour
monitor_and_scale()Install with Tessl CLI
npx tessl i tessl/pypi-txzmq