CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-txzmq

Twisted bindings for ZeroMQ enabling asynchronous ZMQ socket integration with Twisted's reactor pattern.

Overview
Eval results
Files

reqrep.mddocs/

Request-Reply Messaging

Request-reply pattern with timeout support and Twisted Deferred integration for building client-server applications. This pattern provides reliable one-to-one communication where clients send requests and wait for responses from servers. It includes automatic correlation of requests and responses, timeout handling, and integration with Twisted's asynchronous programming model.

Capabilities

Request Connection

Sends requests to servers and receives responses asynchronously using Twisted Deferred objects. Supports request timeouts and automatic correlation.

class ZmqRequestTimeoutError(Exception):
    """
    Exception raised when a request times out before receiving a response.
    
    Attributes:
        msgId: The message ID that timed out
    """

class ZmqREQConnection(ZmqConnection):
    """
    Request connection for client-side request-reply messaging.
    
    Uses ZeroMQ DEALER socket internally for async operation while providing
    REQ-like semantics. Each request gets a unique ID and returns a Deferred.
    """
    
    socketType = constants.DEALER
    defaultRequestTimeout = None  # No timeout by default
    UUID_POOL_GEN_SIZE = 5       # Number of UUIDs to generate at once
    
    def sendMsg(self, *messageParts, **kwargs):
        """
        Send request message and return Deferred for response.
        
        Args:
            *messageParts: Variable number of message parts (bytes)
            **kwargs: Keyword arguments
                timeout (float, optional): Request timeout in seconds
                                         Overrides defaultRequestTimeout
        
        Returns:
            twisted.internet.defer.Deferred: Deferred that fires with response
                                            or errback with ZmqRequestTimeoutError
        
        Example:
            d = connection.sendMsg(b"get_user", b"12345", timeout=5.0)
            d.addCallback(handle_response)
            d.addErrback(handle_error)
        """

Request Client Usage Example

from twisted.internet import reactor, defer
from txzmq import ZmqFactory, ZmqEndpoint, ZmqEndpointType, ZmqREQConnection
from txzmq import ZmqRequestTimeoutError
import json

class APIClient(ZmqREQConnection):
    """Client for making API requests to server."""
    
    defaultRequestTimeout = 10.0  # 10 second default timeout
    
    def get_user(self, user_id):
        """Get user information by ID."""
        request = {
            'action': 'get_user',
            'user_id': user_id
        }
        message = json.dumps(request).encode('utf-8')
        return self.sendMsg(message)
    
    def create_user(self, user_data, timeout=None):
        """Create new user with optional custom timeout."""
        request = {
            'action': 'create_user',
            'data': user_data
        }
        message = json.dumps(request).encode('utf-8')
        kwargs = {'timeout': timeout} if timeout else {}
        return self.sendMsg(message, **kwargs)
    
    def delete_user(self, user_id):
        """Delete user by ID."""
        request = {
            'action': 'delete_user',
            'user_id': user_id
        }
        message = json.dumps(request).encode('utf-8')
        return self.sendMsg(message, timeout=5.0)  # Quick timeout for deletes

# Usage example
def main():
    factory = ZmqFactory()
    factory.registerForShutdown()
    
    endpoint = ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5555")
    client = APIClient(factory, endpoint)
    
    @defer.inlineCallbacks
    def run_requests():
        try:
            # Get user information
            print("Getting user 123...")
            response = yield client.get_user("123")
            user_data = json.loads(response[0].decode('utf-8'))
            print(f"User: {user_data}")
            
            # Create new user
            print("Creating new user...")
            new_user = {
                'name': 'John Doe',
                'email': 'john@example.com',
                'age': 30
            }
            response = yield client.create_user(new_user, timeout=15.0)
            result = json.loads(response[0].decode('utf-8'))
            print(f"Created user: {result}")
            
            # Delete user
            print("Deleting user 456...")
            response = yield client.delete_user("456")
            result = json.loads(response[0].decode('utf-8'))
            print(f"Delete result: {result}")
            
        except ZmqRequestTimeoutError as e:
            print(f"Request timed out: {e}")
        except Exception as e:
            print(f"Request failed: {e}")
        finally:
            reactor.stop()
    
    # Start making requests
    reactor.callWhenRunning(run_requests)
    reactor.run()

if __name__ == "__main__":
    main()

Reply Connection

Receives requests from clients and sends back responses. Uses message correlation to ensure responses reach the correct client.

class ZmqREPConnection(ZmqConnection):
    """
    Reply connection for server-side request-reply messaging.
    
    Uses ZeroMQ ROUTER socket internally to handle multiple clients
    while providing REP-like semantics with proper message routing.
    """
    
    socketType = constants.ROUTER
    
    def reply(self, messageId, *messageParts):
        """
        Send reply to specific request.
        
        Args:
            messageId (bytes): Message ID from gotMessage callback
            *messageParts: Variable number of response message parts (bytes)
        
        Note:
            Must be called exactly once for each request received via gotMessage.
            The messageId must match the one provided in gotMessage callback.
        """
    
    def gotMessage(self, messageId, *messageParts):
        """
        Abstract method called when request is received.
        
        Must be implemented by subclasses to handle incoming requests.
        Must call reply() with the same messageId to send response.
        
        Args:
            messageId (bytes): Unique message identifier for correlation
            *messageParts: Request message parts (bytes)
        """

Reply Server Usage Example

from twisted.internet import reactor
from txzmq import ZmqFactory, ZmqEndpoint, ZmqEndpointType, ZmqREPConnection
import json
import time

class APIServer(ZmqREPConnection):
    """Server handling API requests."""
    
    def __init__(self, factory, endpoint):
        super().__init__(factory, endpoint)
        # Simulate user database
        self.users = {
            "123": {"id": "123", "name": "Alice", "email": "alice@example.com", "age": 25},
            "456": {"id": "456", "name": "Bob", "email": "bob@example.com", "age": 30},
        }
        self.next_id = 1000
        
        print("API Server started and ready for requests")
    
    def gotMessage(self, messageId, *messageParts):
        """Handle incoming API request."""
        try:
            # Parse request
            request_data = json.loads(messageParts[0].decode('utf-8'))
            action = request_data.get('action')
            
            print(f"Processing request: {action}")
            
            # Route to appropriate handler
            if action == 'get_user':
                response = self.handle_get_user(request_data)
            elif action == 'create_user':
                response = self.handle_create_user(request_data)
            elif action == 'delete_user':
                response = self.handle_delete_user(request_data)
            elif action == 'list_users':
                response = self.handle_list_users(request_data)
            else:
                response = {
                    'success': False,
                    'error': f'Unknown action: {action}'
                }
            
            # Send response
            response_message = json.dumps(response).encode('utf-8')
            self.reply(messageId, response_message)
            
        except Exception as e:
            # Send error response
            error_response = {
                'success': False,
                'error': str(e)
            }
            response_message = json.dumps(error_response).encode('utf-8')
            self.reply(messageId, response_message)
    
    def handle_get_user(self, request):
        """Get user by ID."""
        user_id = request.get('user_id')
        if user_id in self.users:
            return {
                'success': True,
                'user': self.users[user_id]
            }
        else:
            return {
                'success': False,
                'error': f'User {user_id} not found'
            }
    
    def handle_create_user(self, request):
        """Create new user."""
        user_data = request.get('data', {})
        
        # Validate required fields
        if not user_data.get('name') or not user_data.get('email'):
            return {
                'success': False,
                'error': 'Name and email are required'
            }
        
        # Create user with new ID
        user_id = str(self.next_id)
        self.next_id += 1
        
        new_user = {
            'id': user_id,
            'name': user_data['name'],
            'email': user_data['email'],
            'age': user_data.get('age', 0),
            'created_at': time.time()
        }
        
        self.users[user_id] = new_user
        
        return {
            'success': True,
            'user': new_user
        }
    
    def handle_delete_user(self, request):
        """Delete user by ID."""
        user_id = request.get('user_id')
        if user_id in self.users:
            deleted_user = self.users.pop(user_id)
            return {
                'success': True,
                'deleted_user': deleted_user
            }
        else:
            return {
                'success': False,
                'error': f'User {user_id} not found'
            }
    
    def handle_list_users(self, request):
        """List all users."""
        return {
            'success': True,
            'users': list(self.users.values()),
            'count': len(self.users)
        }

# Start server
def main():
    factory = ZmqFactory()
    factory.registerForShutdown()
    
    endpoint = ZmqEndpoint(ZmqEndpointType.bind, "tcp://*:5555")
    server = APIServer(factory, endpoint)
    
    print("Starting API server on tcp://*:5555")
    reactor.run()

if __name__ == "__main__":
    main()

Advanced Request-Reply Patterns

Complex request-reply scenarios including load balancing, service discovery, and multi-stage request processing.

Load Balanced Server Pool

class LoadBalancedService:
    """Multiple server instances for load balancing."""
    
    def __init__(self, factory, service_name, bind_addresses):
        self.service_name = service_name
        self.servers = []
        
        for i, address in enumerate(bind_addresses):
            endpoint = ZmqEndpoint(ZmqEndpointType.bind, address)
            server = ServiceServer(factory, endpoint, f"{service_name}-{i+1}")
            self.servers.append(server)
            print(f"Started {service_name} server {i+1} on {address}")

class ServiceServer(ZmqREPConnection):
    def __init__(self, factory, endpoint, server_id):
        super().__init__(factory, endpoint)
        self.server_id = server_id
        self.request_count = 0
    
    def gotMessage(self, messageId, *messageParts):
        self.request_count += 1
        request = json.loads(messageParts[0].decode('utf-8'))
        
        # Add server info to response
        response = self.process_request(request)
        response['server_id'] = self.server_id
        response['request_number'] = self.request_count
        
        response_data = json.dumps(response).encode('utf-8')
        self.reply(messageId, response_data)
    
    def process_request(self, request):
        # Simulate processing
        import time
        time.sleep(0.1)  # Simulate work
        
        return {
            'success': True,
            'result': f"Processed {request.get('task', 'unknown')}",
            'timestamp': time.time()
        }

# Client with retry logic
class RobustClient(ZmqREQConnection):
    def __init__(self, factory, endpoints):
        # Connect to multiple server addresses
        super().__init__(factory)
        self.addEndpoints(endpoints)
        self.defaultRequestTimeout = 5.0
    
    @defer.inlineCallbacks
    def robust_request(self, request_data, max_retries=3):
        """Make request with retry logic."""
        for attempt in range(max_retries):
            try:
                print(f"Attempt {attempt + 1}: Making request")
                response = yield self.sendMsg(
                    json.dumps(request_data).encode('utf-8'),
                    timeout=5.0
                )
                result = json.loads(response[0].decode('utf-8'))
                print(f"Success on attempt {attempt + 1}: {result.get('server_id')}")
                defer.returnValue(result)
                
            except ZmqRequestTimeoutError:
                print(f"Attempt {attempt + 1} timed out")
                if attempt == max_retries - 1:
                    raise
                # Wait before retry
                yield defer.succeed(None)
                reactor.callLater(1.0, lambda: None)
        
        raise Exception("All retry attempts failed")

# Usage
factory = ZmqFactory()

# Start multiple servers
service = LoadBalancedService(factory, "calculator", [
    "tcp://*:5555",
    "tcp://*:5556", 
    "tcp://*:5557"
])

# Create client connecting to all servers
client_endpoints = [
    ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5555"),
    ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5556"),
    ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5557")
]
client = RobustClient(factory, client_endpoints)

Timeout Handling and Error Recovery

Comprehensive error handling patterns for robust request-reply applications.

class TimeoutAwareClient(ZmqREQConnection):
    """Client with sophisticated timeout and error handling."""
    
    def __init__(self, factory, endpoint):
        super().__init__(factory, endpoint)
        self.defaultRequestTimeout = 10.0
        self.request_stats = {
            'total': 0,
            'successful': 0,
            'timeouts': 0,
            'errors': 0
        }
    
    @defer.inlineCallbacks
    def adaptive_request(self, request_data, min_timeout=1.0, max_timeout=30.0):
        """Make request with adaptive timeout based on historical performance."""
        # Calculate adaptive timeout based on recent performance
        success_rate = (self.request_stats['successful'] / 
                       max(self.request_stats['total'], 1))
        
        if success_rate > 0.9:
            timeout = min_timeout
        elif success_rate > 0.7:
            timeout = min_timeout * 2
        else:
            timeout = max_timeout
        
        self.request_stats['total'] += 1
        
        try:
            print(f"Making request with {timeout}s timeout (success rate: {success_rate:.2%})")
            response = yield self.sendMsg(
                json.dumps(request_data).encode('utf-8'),
                timeout=timeout
            )
            self.request_stats['successful'] += 1
            result = json.loads(response[0].decode('utf-8'))
            defer.returnValue(result)
            
        except ZmqRequestTimeoutError as e:
            self.request_stats['timeouts'] += 1
            print(f"Request timed out after {timeout}s")
            # Could implement exponential backoff here
            raise
            
        except Exception as e:
            self.request_stats['errors'] += 1
            print(f"Request failed: {e}")
            raise
    
    def get_stats(self):
        """Get client performance statistics."""
        return self.request_stats.copy()

# Usage with automatic timeout adjustment
@defer.inlineCallbacks
def test_adaptive_timeout():
    factory = ZmqFactory()
    endpoint = ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5555")
    client = TimeoutAwareClient(factory, endpoint)
    
    # Make multiple requests to build statistics
    for i in range(20):
        try:
            request = {'task': f'process_item_{i}', 'complexity': i % 5}
            result = yield client.adaptive_request(request)
            print(f"Request {i}: {result.get('result', 'no result')}")
            
        except Exception as e:
            print(f"Request {i} failed: {e}")
        
        # Brief delay between requests
        yield defer.succeed(None)
        reactor.callLater(0.5, lambda: None)
    
    # Print final statistics
    stats = client.get_stats()
    print(f"\nFinal stats: {stats}")
    
    reactor.stop()

# Run test
reactor.callWhenRunning(test_adaptive_timeout)

Install with Tessl CLI

npx tessl i tessl/pypi-txzmq

docs

factory-connection.md

index.md

pubsub.md

pushpull.md

reqrep.md

router-dealer.md

tile.json