Twisted bindings for ZeroMQ enabling asynchronous ZMQ socket integration with Twisted's reactor pattern.
Request-reply pattern with timeout support and Twisted Deferred integration for building client-server applications. This pattern provides reliable one-to-one communication where clients send requests and wait for responses from servers. It includes automatic correlation of requests and responses, timeout handling, and integration with Twisted's asynchronous programming model.
Sends requests to servers and receives responses asynchronously using Twisted Deferred objects. Supports request timeouts and automatic correlation.
class ZmqRequestTimeoutError(Exception):
"""
Exception raised when a request times out before receiving a response.
Attributes:
msgId: The message ID that timed out
"""
class ZmqREQConnection(ZmqConnection):
"""
Request connection for client-side request-reply messaging.
Uses ZeroMQ DEALER socket internally for async operation while providing
REQ-like semantics. Each request gets a unique ID and returns a Deferred.
"""
socketType = constants.DEALER
defaultRequestTimeout = None # No timeout by default
UUID_POOL_GEN_SIZE = 5 # Number of UUIDs to generate at once
def sendMsg(self, *messageParts, **kwargs):
"""
Send request message and return Deferred for response.
Args:
*messageParts: Variable number of message parts (bytes)
**kwargs: Keyword arguments
timeout (float, optional): Request timeout in seconds
Overrides defaultRequestTimeout
Returns:
twisted.internet.defer.Deferred: Deferred that fires with response
or errback with ZmqRequestTimeoutError
Example:
d = connection.sendMsg(b"get_user", b"12345", timeout=5.0)
d.addCallback(handle_response)
d.addErrback(handle_error)
"""from twisted.internet import reactor, defer
from txzmq import ZmqFactory, ZmqEndpoint, ZmqEndpointType, ZmqREQConnection
from txzmq import ZmqRequestTimeoutError
import json
class APIClient(ZmqREQConnection):
"""Client for making API requests to server."""
defaultRequestTimeout = 10.0 # 10 second default timeout
def get_user(self, user_id):
"""Get user information by ID."""
request = {
'action': 'get_user',
'user_id': user_id
}
message = json.dumps(request).encode('utf-8')
return self.sendMsg(message)
def create_user(self, user_data, timeout=None):
"""Create new user with optional custom timeout."""
request = {
'action': 'create_user',
'data': user_data
}
message = json.dumps(request).encode('utf-8')
kwargs = {'timeout': timeout} if timeout else {}
return self.sendMsg(message, **kwargs)
def delete_user(self, user_id):
"""Delete user by ID."""
request = {
'action': 'delete_user',
'user_id': user_id
}
message = json.dumps(request).encode('utf-8')
return self.sendMsg(message, timeout=5.0) # Quick timeout for deletes
# Usage example
def main():
factory = ZmqFactory()
factory.registerForShutdown()
endpoint = ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5555")
client = APIClient(factory, endpoint)
@defer.inlineCallbacks
def run_requests():
try:
# Get user information
print("Getting user 123...")
response = yield client.get_user("123")
user_data = json.loads(response[0].decode('utf-8'))
print(f"User: {user_data}")
# Create new user
print("Creating new user...")
new_user = {
'name': 'John Doe',
'email': 'john@example.com',
'age': 30
}
response = yield client.create_user(new_user, timeout=15.0)
result = json.loads(response[0].decode('utf-8'))
print(f"Created user: {result}")
# Delete user
print("Deleting user 456...")
response = yield client.delete_user("456")
result = json.loads(response[0].decode('utf-8'))
print(f"Delete result: {result}")
except ZmqRequestTimeoutError as e:
print(f"Request timed out: {e}")
except Exception as e:
print(f"Request failed: {e}")
finally:
reactor.stop()
# Start making requests
reactor.callWhenRunning(run_requests)
reactor.run()
if __name__ == "__main__":
main()Receives requests from clients and sends back responses. Uses message correlation to ensure responses reach the correct client.
class ZmqREPConnection(ZmqConnection):
"""
Reply connection for server-side request-reply messaging.
Uses ZeroMQ ROUTER socket internally to handle multiple clients
while providing REP-like semantics with proper message routing.
"""
socketType = constants.ROUTER
def reply(self, messageId, *messageParts):
"""
Send reply to specific request.
Args:
messageId (bytes): Message ID from gotMessage callback
*messageParts: Variable number of response message parts (bytes)
Note:
Must be called exactly once for each request received via gotMessage.
The messageId must match the one provided in gotMessage callback.
"""
def gotMessage(self, messageId, *messageParts):
"""
Abstract method called when request is received.
Must be implemented by subclasses to handle incoming requests.
Must call reply() with the same messageId to send response.
Args:
messageId (bytes): Unique message identifier for correlation
*messageParts: Request message parts (bytes)
"""from twisted.internet import reactor
from txzmq import ZmqFactory, ZmqEndpoint, ZmqEndpointType, ZmqREPConnection
import json
import time
class APIServer(ZmqREPConnection):
"""Server handling API requests."""
def __init__(self, factory, endpoint):
super().__init__(factory, endpoint)
# Simulate user database
self.users = {
"123": {"id": "123", "name": "Alice", "email": "alice@example.com", "age": 25},
"456": {"id": "456", "name": "Bob", "email": "bob@example.com", "age": 30},
}
self.next_id = 1000
print("API Server started and ready for requests")
def gotMessage(self, messageId, *messageParts):
"""Handle incoming API request."""
try:
# Parse request
request_data = json.loads(messageParts[0].decode('utf-8'))
action = request_data.get('action')
print(f"Processing request: {action}")
# Route to appropriate handler
if action == 'get_user':
response = self.handle_get_user(request_data)
elif action == 'create_user':
response = self.handle_create_user(request_data)
elif action == 'delete_user':
response = self.handle_delete_user(request_data)
elif action == 'list_users':
response = self.handle_list_users(request_data)
else:
response = {
'success': False,
'error': f'Unknown action: {action}'
}
# Send response
response_message = json.dumps(response).encode('utf-8')
self.reply(messageId, response_message)
except Exception as e:
# Send error response
error_response = {
'success': False,
'error': str(e)
}
response_message = json.dumps(error_response).encode('utf-8')
self.reply(messageId, response_message)
def handle_get_user(self, request):
"""Get user by ID."""
user_id = request.get('user_id')
if user_id in self.users:
return {
'success': True,
'user': self.users[user_id]
}
else:
return {
'success': False,
'error': f'User {user_id} not found'
}
def handle_create_user(self, request):
"""Create new user."""
user_data = request.get('data', {})
# Validate required fields
if not user_data.get('name') or not user_data.get('email'):
return {
'success': False,
'error': 'Name and email are required'
}
# Create user with new ID
user_id = str(self.next_id)
self.next_id += 1
new_user = {
'id': user_id,
'name': user_data['name'],
'email': user_data['email'],
'age': user_data.get('age', 0),
'created_at': time.time()
}
self.users[user_id] = new_user
return {
'success': True,
'user': new_user
}
def handle_delete_user(self, request):
"""Delete user by ID."""
user_id = request.get('user_id')
if user_id in self.users:
deleted_user = self.users.pop(user_id)
return {
'success': True,
'deleted_user': deleted_user
}
else:
return {
'success': False,
'error': f'User {user_id} not found'
}
def handle_list_users(self, request):
"""List all users."""
return {
'success': True,
'users': list(self.users.values()),
'count': len(self.users)
}
# Start server
def main():
factory = ZmqFactory()
factory.registerForShutdown()
endpoint = ZmqEndpoint(ZmqEndpointType.bind, "tcp://*:5555")
server = APIServer(factory, endpoint)
print("Starting API server on tcp://*:5555")
reactor.run()
if __name__ == "__main__":
main()Complex request-reply scenarios including load balancing, service discovery, and multi-stage request processing.
class LoadBalancedService:
"""Multiple server instances for load balancing."""
def __init__(self, factory, service_name, bind_addresses):
self.service_name = service_name
self.servers = []
for i, address in enumerate(bind_addresses):
endpoint = ZmqEndpoint(ZmqEndpointType.bind, address)
server = ServiceServer(factory, endpoint, f"{service_name}-{i+1}")
self.servers.append(server)
print(f"Started {service_name} server {i+1} on {address}")
class ServiceServer(ZmqREPConnection):
def __init__(self, factory, endpoint, server_id):
super().__init__(factory, endpoint)
self.server_id = server_id
self.request_count = 0
def gotMessage(self, messageId, *messageParts):
self.request_count += 1
request = json.loads(messageParts[0].decode('utf-8'))
# Add server info to response
response = self.process_request(request)
response['server_id'] = self.server_id
response['request_number'] = self.request_count
response_data = json.dumps(response).encode('utf-8')
self.reply(messageId, response_data)
def process_request(self, request):
# Simulate processing
import time
time.sleep(0.1) # Simulate work
return {
'success': True,
'result': f"Processed {request.get('task', 'unknown')}",
'timestamp': time.time()
}
# Client with retry logic
class RobustClient(ZmqREQConnection):
def __init__(self, factory, endpoints):
# Connect to multiple server addresses
super().__init__(factory)
self.addEndpoints(endpoints)
self.defaultRequestTimeout = 5.0
@defer.inlineCallbacks
def robust_request(self, request_data, max_retries=3):
"""Make request with retry logic."""
for attempt in range(max_retries):
try:
print(f"Attempt {attempt + 1}: Making request")
response = yield self.sendMsg(
json.dumps(request_data).encode('utf-8'),
timeout=5.0
)
result = json.loads(response[0].decode('utf-8'))
print(f"Success on attempt {attempt + 1}: {result.get('server_id')}")
defer.returnValue(result)
except ZmqRequestTimeoutError:
print(f"Attempt {attempt + 1} timed out")
if attempt == max_retries - 1:
raise
# Wait before retry
yield defer.succeed(None)
reactor.callLater(1.0, lambda: None)
raise Exception("All retry attempts failed")
# Usage
factory = ZmqFactory()
# Start multiple servers
service = LoadBalancedService(factory, "calculator", [
"tcp://*:5555",
"tcp://*:5556",
"tcp://*:5557"
])
# Create client connecting to all servers
client_endpoints = [
ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5555"),
ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5556"),
ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5557")
]
client = RobustClient(factory, client_endpoints)Comprehensive error handling patterns for robust request-reply applications.
class TimeoutAwareClient(ZmqREQConnection):
"""Client with sophisticated timeout and error handling."""
def __init__(self, factory, endpoint):
super().__init__(factory, endpoint)
self.defaultRequestTimeout = 10.0
self.request_stats = {
'total': 0,
'successful': 0,
'timeouts': 0,
'errors': 0
}
@defer.inlineCallbacks
def adaptive_request(self, request_data, min_timeout=1.0, max_timeout=30.0):
"""Make request with adaptive timeout based on historical performance."""
# Calculate adaptive timeout based on recent performance
success_rate = (self.request_stats['successful'] /
max(self.request_stats['total'], 1))
if success_rate > 0.9:
timeout = min_timeout
elif success_rate > 0.7:
timeout = min_timeout * 2
else:
timeout = max_timeout
self.request_stats['total'] += 1
try:
print(f"Making request with {timeout}s timeout (success rate: {success_rate:.2%})")
response = yield self.sendMsg(
json.dumps(request_data).encode('utf-8'),
timeout=timeout
)
self.request_stats['successful'] += 1
result = json.loads(response[0].decode('utf-8'))
defer.returnValue(result)
except ZmqRequestTimeoutError as e:
self.request_stats['timeouts'] += 1
print(f"Request timed out after {timeout}s")
# Could implement exponential backoff here
raise
except Exception as e:
self.request_stats['errors'] += 1
print(f"Request failed: {e}")
raise
def get_stats(self):
"""Get client performance statistics."""
return self.request_stats.copy()
# Usage with automatic timeout adjustment
@defer.inlineCallbacks
def test_adaptive_timeout():
factory = ZmqFactory()
endpoint = ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5555")
client = TimeoutAwareClient(factory, endpoint)
# Make multiple requests to build statistics
for i in range(20):
try:
request = {'task': f'process_item_{i}', 'complexity': i % 5}
result = yield client.adaptive_request(request)
print(f"Request {i}: {result.get('result', 'no result')}")
except Exception as e:
print(f"Request {i} failed: {e}")
# Brief delay between requests
yield defer.succeed(None)
reactor.callLater(0.5, lambda: None)
# Print final statistics
stats = client.get_stats()
print(f"\nFinal stats: {stats}")
reactor.stop()
# Run test
reactor.callWhenRunning(test_adaptive_timeout)Install with Tessl CLI
npx tessl i tessl/pypi-txzmq