CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-txzmq

Twisted bindings for ZeroMQ enabling asynchronous ZMQ socket integration with Twisted's reactor pattern.

Overview
Eval results
Files

pushpull.mddocs/

Push-Pull Messaging

Push-pull pattern for load-balanced work distribution and result collection. This pattern creates a pipeline where work is distributed among multiple workers (pull) and results are collected (push). It provides automatic load balancing and is ideal for parallel processing, task queues, and distributed computing scenarios.

Capabilities

Push Connection

Sends work items to available workers in a load-balanced manner. Messages are automatically distributed among connected pull sockets.

class ZmqPushConnection(ZmqConnection):
    """
    Push connection for distributing work to workers.
    
    Uses ZeroMQ PUSH socket type. Messages are load-balanced automatically
    among connected PULL sockets using round-robin distribution.
    """
    
    socketType = constants.PUSH
    
    def push(self, message):
        """
        Push work item to next available worker.
        
        Args:
            message (bytes): Work item data to be processed
                           Single part message containing task data
        
        Note:
            ZeroMQ automatically load-balances messages among connected
            PULL sockets. Each message goes to exactly one worker.
        """

Push Usage Example

from twisted.internet import reactor
from txzmq import ZmqFactory, ZmqEndpoint, ZmqEndpointType, ZmqPushConnection
import json
import uuid

# Work distributor
class WorkDistributor:
    def __init__(self, factory, bind_address):
        endpoint = ZmqEndpoint(ZmqEndpointType.bind, bind_address)
        self.pusher = ZmqPushConnection(factory, endpoint)
    
    def distribute_work(self, work_items):
        """Distribute work items to workers."""
        for item in work_items:
            # Add unique ID for tracking
            work_package = {
                'id': str(uuid.uuid4()),
                'task': item['task'],
                'data': item['data'],
                'priority': item.get('priority', 'normal')
            }
            message = json.dumps(work_package).encode('utf-8')
            self.pusher.push(message)
            print(f"Sent work item {work_package['id']}: {work_package['task']}")

# Usage
factory = ZmqFactory()
distributor = WorkDistributor(factory, "tcp://*:5555")

# Distribute various types of work
work_queue = [
    {'task': 'process_image', 'data': 'image1.jpg'},
    {'task': 'calculate_stats', 'data': [1, 2, 3, 4, 5]},
    {'task': 'send_email', 'data': 'user@example.com', 'priority': 'high'},
    {'task': 'backup_database', 'data': 'table_users'},
    {'task': 'generate_report', 'data': 'monthly_sales'}
]

distributor.distribute_work(work_queue)

# Continuous work distribution
def generate_periodic_work():
    work_item = {
        'task': 'health_check',
        'data': f'timestamp_{reactor.seconds()}'
    }
    distributor.distribute_work([work_item])
    reactor.callLater(10.0, generate_periodic_work)  # Every 10 seconds

generate_periodic_work()
reactor.run()

Pull Connection

Receives work items from pushers and processes them. Multiple pull connections can connect to the same push source for parallel processing.

class ZmqPullConnection(ZmqConnection):
    """
    Pull connection for receiving work from pushers.
    
    Uses ZeroMQ PULL socket type. Receives work items in load-balanced manner
    from connected PUSH sockets. Each worker gets different messages.
    """
    
    socketType = constants.PULL
    
    def onPull(self, message):
        """
        Abstract method called when work item is received.
        
        Must be implemented by subclasses to process work items.
        
        Args:
            message (list): List containing single message part with work data
                          message[0] contains the actual work item (bytes)
        """

Pull Usage Example

from twisted.internet import reactor, defer
from txzmq import ZmqFactory, ZmqEndpoint, ZmqEndpointType, ZmqPullConnection
import json
import time

class Worker(ZmqPullConnection):
    def __init__(self, factory, endpoint, worker_id):
        super().__init__(factory, endpoint)
        self.worker_id = worker_id
        self.processed_count = 0
    
    def onPull(self, message):
        """Process received work item."""
        try:
            # Parse work item
            work_data = json.loads(message[0].decode('utf-8'))
            work_id = work_data['id']
            task_type = work_data['task']
            task_data = work_data['data']
            
            print(f"Worker {self.worker_id} processing {work_id}: {task_type}")
            
            # Simulate different types of work
            if task_type == 'process_image':
                self.process_image(task_data, work_id)
            elif task_type == 'calculate_stats':
                self.calculate_stats(task_data, work_id)
            elif task_type == 'send_email':
                self.send_email(task_data, work_id)
            elif task_type == 'backup_database':
                self.backup_database(task_data, work_id)
            elif task_type == 'generate_report':
                self.generate_report(task_data, work_id)
            elif task_type == 'health_check':
                self.health_check(task_data, work_id)
            else:
                print(f"Worker {self.worker_id}: Unknown task type {task_type}")
            
            self.processed_count += 1
            
        except Exception as e:
            print(f"Worker {self.worker_id} error processing message: {e}")
    
    def process_image(self, image_path, work_id):
        # Simulate image processing
        time.sleep(2)  # Simulate work
        print(f"Worker {self.worker_id}: Processed image {image_path} ({work_id})")
    
    def calculate_stats(self, data, work_id):
        # Simulate statistical calculation
        result = sum(data) / len(data)
        time.sleep(1)
        print(f"Worker {self.worker_id}: Calculated average {result} ({work_id})")
    
    def send_email(self, email, work_id):
        # Simulate email sending
        time.sleep(0.5)
        print(f"Worker {self.worker_id}: Sent email to {email} ({work_id})")
    
    def backup_database(self, table, work_id):
        # Simulate database backup
        time.sleep(3)
        print(f"Worker {self.worker_id}: Backed up {table} ({work_id})")
    
    def generate_report(self, report_type, work_id):
        # Simulate report generation
        time.sleep(2.5)
        print(f"Worker {self.worker_id}: Generated {report_type} report ({work_id})")
    
    def health_check(self, data, work_id):
        # Quick health check
        print(f"Worker {self.worker_id}: Health check OK - {data} ({work_id})")

# Create multiple workers
factory = ZmqFactory()
endpoint = ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5555")

# Start multiple worker processes
workers = []
for i in range(3):
    worker = Worker(factory, endpoint, f"W{i+1}")
    workers.append(worker)

# Monitor worker performance
def print_stats():
    total_processed = sum(w.processed_count for w in workers)
    print(f"\n=== Stats ===")
    for worker in workers:
        print(f"{worker.worker_id}: {worker.processed_count} items processed")
    print(f"Total: {total_processed} items")
    print("=============\n")
    reactor.callLater(30.0, print_stats)  # Every 30 seconds

print_stats()
reactor.run()

Pipeline Architecture

Building multi-stage processing pipelines using push-pull patterns for complex data processing workflows.

Two-Stage Pipeline Example

from twisted.internet import reactor
from txzmq import ZmqFactory, ZmqEndpoint, ZmqEndpointType
from txzmq import ZmqPushConnection, ZmqPullConnection
import json

# Stage 1: Data preprocessor
class Preprocessor(ZmqPullConnection):
    def __init__(self, factory, input_endpoint, output_endpoint):
        super().__init__(factory, input_endpoint)
        # Connect to next stage
        self.output = ZmqPushConnection(factory, output_endpoint)
    
    def onPull(self, message):
        # Receive raw data from stage 0 (data source)
        raw_data = json.loads(message[0].decode('utf-8'))
        
        # Preprocess the data
        processed_data = {
            'id': raw_data['id'],
            'processed_at': reactor.seconds(),
            'normalized_data': self.normalize(raw_data['raw_values']),
            'metadata': raw_data.get('metadata', {})
        }
        
        # Send to next stage
        self.output.push(json.dumps(processed_data).encode('utf-8'))
        print(f"Preprocessed item {processed_data['id']}")
    
    def normalize(self, values):
        """Simple data normalization."""
        if not values:
            return []
        max_val = max(values)
        return [v / max_val for v in values] if max_val > 0 else values

# Stage 2: Data analyzer
class Analyzer(ZmqPullConnection):
    def __init__(self, factory, input_endpoint, output_endpoint):
        super().__init__(factory, input_endpoint)
        self.output = ZmqPushConnection(factory, output_endpoint)
    
    def onPull(self, message):
        # Receive preprocessed data from stage 1
        processed_data = json.loads(message[0].decode('utf-8'))
        
        # Analyze the data
        analysis_result = {
            'id': processed_data['id'],
            'analyzed_at': reactor.seconds(),
            'mean': self.calculate_mean(processed_data['normalized_data']),
            'variance': self.calculate_variance(processed_data['normalized_data']),
            'trend': self.detect_trend(processed_data['normalized_data']),
            'original_metadata': processed_data['metadata']
        }
        
        # Send to final stage (results collector)
        self.output.push(json.dumps(analysis_result).encode('utf-8'))
        print(f"Analyzed item {analysis_result['id']}: trend={analysis_result['trend']}")
    
    def calculate_mean(self, values):
        return sum(values) / len(values) if values else 0
    
    def calculate_variance(self, values):
        if not values:
            return 0
        mean = self.calculate_mean(values)
        return sum((x - mean) ** 2 for x in values) / len(values)
    
    def detect_trend(self, values):
        if len(values) < 2:
            return "unknown"
        return "increasing" if values[-1] > values[0] else "decreasing"

# Results collector
class ResultsCollector(ZmqPullConnection):
    def __init__(self, factory, input_endpoint):
        super().__init__(factory, input_endpoint)
        self.results = []
    
    def onPull(self, message):
        # Receive final analysis results
        result = json.loads(message[0].decode('utf-8'))
        self.results.append(result)
        
        print(f"Collected result {result['id']}: "
              f"mean={result['mean']:.3f}, "
              f"variance={result['variance']:.3f}, "
              f"trend={result['trend']}")
        
        # Could save to database, file, or forward to another system
        if len(self.results) % 10 == 0:
            print(f"Collected {len(self.results)} total results")

# Set up pipeline
factory = ZmqFactory()

# Create pipeline stages
# Stage 0 -> Stage 1
preprocessor = Preprocessor(
    factory,
    ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5555"),  # Input
    ZmqEndpoint(ZmqEndpointType.bind, "tcp://*:5556")              # Output
)

# Stage 1 -> Stage 2  
analyzer = Analyzer(
    factory,
    ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5556"),  # Input
    ZmqEndpoint(ZmqEndpointType.bind, "tcp://*:5557")              # Output
)

# Stage 2 -> Final
collector = ResultsCollector(
    factory,
    ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5557")   # Input
)

print("Pipeline ready: Stage0 -> Preprocessor -> Analyzer -> Collector")
reactor.run()

Load Balancing and Scalability

Horizontal scaling patterns for high-throughput processing using multiple workers and dynamic load distribution.

class ScalableWorkerPool:
    """Manages a pool of workers that can be dynamically scaled."""
    
    def __init__(self, factory, work_source_address, result_sink_address=None):
        self.factory = factory
        self.work_source = work_source_address
        self.result_sink = result_sink_address
        self.workers = []
        self.worker_stats = {}
    
    def add_worker(self, worker_class, worker_id):
        """Add a new worker to the pool."""
        work_endpoint = ZmqEndpoint(ZmqEndpointType.connect, self.work_source)
        
        if self.result_sink:
            result_endpoint = ZmqEndpoint(ZmqEndpointType.connect, self.result_sink)
            worker = worker_class(self.factory, work_endpoint, result_endpoint, worker_id)
        else:
            worker = worker_class(self.factory, work_endpoint, worker_id)
        
        self.workers.append(worker)
        self.worker_stats[worker_id] = {'started': reactor.seconds(), 'processed': 0}
        print(f"Added worker {worker_id} to pool (total: {len(self.workers)})")
    
    def remove_worker(self, worker_id):
        """Remove a worker from the pool."""
        for worker in self.workers:
            if hasattr(worker, 'worker_id') and worker.worker_id == worker_id:
                worker.shutdown()
                self.workers.remove(worker)
                del self.worker_stats[worker_id]
                print(f"Removed worker {worker_id} from pool (total: {len(self.workers)})")
                break
    
    def scale_to(self, target_workers, worker_class):
        """Scale worker pool to target size."""
        current_count = len(self.workers)
        
        if target_workers > current_count:
            # Scale up
            for i in range(target_workers - current_count):
                worker_id = f"worker_{current_count + i + 1}"
                self.add_worker(worker_class, worker_id)
        elif target_workers < current_count:
            # Scale down
            for i in range(current_count - target_workers):
                if self.workers:
                    last_worker = self.workers[-1]
                    if hasattr(last_worker, 'worker_id'):
                        self.remove_worker(last_worker.worker_id)

# Usage example
class ProcessingWorker(ZmqPullConnection):
    def __init__(self, factory, work_endpoint, worker_id):
        super().__init__(factory, work_endpoint)
        self.worker_id = worker_id
        self.processed_count = 0
    
    def onPull(self, message):
        # Process work item
        work_data = json.loads(message[0].decode('utf-8'))
        # Simulate processing time
        import time
        time.sleep(0.1)
        self.processed_count += 1
        print(f"{self.worker_id} processed item {work_data.get('id', 'unknown')}")

# Create scalable worker pool
factory = ZmqFactory()
pool = ScalableWorkerPool(factory, "tcp://127.0.0.1:5555")

# Start with 2 workers
pool.scale_to(2, ProcessingWorker)

# Simulate dynamic scaling based on load
def monitor_and_scale():
    # Scale up during high load periods
    current_hour = int(reactor.seconds()) % 24
    if 9 <= current_hour <= 17:  # Business hours
        pool.scale_to(5, ProcessingWorker)
    else:  # Off hours
        pool.scale_to(2, ProcessingWorker)
    
    reactor.callLater(3600, monitor_and_scale)  # Check every hour

monitor_and_scale()

Install with Tessl CLI

npx tessl i tessl/pypi-txzmq

docs

factory-connection.md

index.md

pubsub.md

pushpull.md

reqrep.md

router-dealer.md

tile.json