0
# Request-Reply Messaging
1
2
Request-reply pattern with timeout support and Twisted Deferred integration for building client-server applications. This pattern provides reliable one-to-one communication where clients send requests and wait for responses from servers. It includes automatic correlation of requests and responses, timeout handling, and integration with Twisted's asynchronous programming model.
3
4
## Capabilities
5
6
### Request Connection
7
8
Sends requests to servers and receives responses asynchronously using Twisted Deferred objects. Supports request timeouts and automatic correlation.
9
10
```python { .api }
11
class ZmqRequestTimeoutError(Exception):
12
"""
13
Exception raised when a request times out before receiving a response.
14
15
Attributes:
16
msgId: The message ID that timed out
17
"""
18
19
class ZmqREQConnection(ZmqConnection):
20
"""
21
Request connection for client-side request-reply messaging.
22
23
Uses ZeroMQ DEALER socket internally for async operation while providing
24
REQ-like semantics. Each request gets a unique ID and returns a Deferred.
25
"""
26
27
socketType = constants.DEALER
28
defaultRequestTimeout = None # No timeout by default
29
UUID_POOL_GEN_SIZE = 5 # Number of UUIDs to generate at once
30
31
def sendMsg(self, *messageParts, **kwargs):
32
"""
33
Send request message and return Deferred for response.
34
35
Args:
36
*messageParts: Variable number of message parts (bytes)
37
**kwargs: Keyword arguments
38
timeout (float, optional): Request timeout in seconds
39
Overrides defaultRequestTimeout
40
41
Returns:
42
twisted.internet.defer.Deferred: Deferred that fires with response
43
or errback with ZmqRequestTimeoutError
44
45
Example:
46
d = connection.sendMsg(b"get_user", b"12345", timeout=5.0)
47
d.addCallback(handle_response)
48
d.addErrback(handle_error)
49
"""
50
```
51
52
#### Request Client Usage Example
53
54
```python
55
from twisted.internet import reactor, defer
56
from txzmq import ZmqFactory, ZmqEndpoint, ZmqEndpointType, ZmqREQConnection
57
from txzmq import ZmqRequestTimeoutError
58
import json
59
60
class APIClient(ZmqREQConnection):
61
"""Client for making API requests to server."""
62
63
defaultRequestTimeout = 10.0 # 10 second default timeout
64
65
def get_user(self, user_id):
66
"""Get user information by ID."""
67
request = {
68
'action': 'get_user',
69
'user_id': user_id
70
}
71
message = json.dumps(request).encode('utf-8')
72
return self.sendMsg(message)
73
74
def create_user(self, user_data, timeout=None):
75
"""Create new user with optional custom timeout."""
76
request = {
77
'action': 'create_user',
78
'data': user_data
79
}
80
message = json.dumps(request).encode('utf-8')
81
kwargs = {'timeout': timeout} if timeout else {}
82
return self.sendMsg(message, **kwargs)
83
84
def delete_user(self, user_id):
85
"""Delete user by ID."""
86
request = {
87
'action': 'delete_user',
88
'user_id': user_id
89
}
90
message = json.dumps(request).encode('utf-8')
91
return self.sendMsg(message, timeout=5.0) # Quick timeout for deletes
92
93
# Usage example
94
def main():
95
factory = ZmqFactory()
96
factory.registerForShutdown()
97
98
endpoint = ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5555")
99
client = APIClient(factory, endpoint)
100
101
@defer.inlineCallbacks
102
def run_requests():
103
try:
104
# Get user information
105
print("Getting user 123...")
106
response = yield client.get_user("123")
107
user_data = json.loads(response[0].decode('utf-8'))
108
print(f"User: {user_data}")
109
110
# Create new user
111
print("Creating new user...")
112
new_user = {
113
'name': 'John Doe',
114
'email': 'john@example.com',
115
'age': 30
116
}
117
response = yield client.create_user(new_user, timeout=15.0)
118
result = json.loads(response[0].decode('utf-8'))
119
print(f"Created user: {result}")
120
121
# Delete user
122
print("Deleting user 456...")
123
response = yield client.delete_user("456")
124
result = json.loads(response[0].decode('utf-8'))
125
print(f"Delete result: {result}")
126
127
except ZmqRequestTimeoutError as e:
128
print(f"Request timed out: {e}")
129
except Exception as e:
130
print(f"Request failed: {e}")
131
finally:
132
reactor.stop()
133
134
# Start making requests
135
reactor.callWhenRunning(run_requests)
136
reactor.run()
137
138
if __name__ == "__main__":
139
main()
140
```
141
142
### Reply Connection
143
144
Receives requests from clients and sends back responses. Uses message correlation to ensure responses reach the correct client.
145
146
```python { .api }
147
class ZmqREPConnection(ZmqConnection):
148
"""
149
Reply connection for server-side request-reply messaging.
150
151
Uses ZeroMQ ROUTER socket internally to handle multiple clients
152
while providing REP-like semantics with proper message routing.
153
"""
154
155
socketType = constants.ROUTER
156
157
def reply(self, messageId, *messageParts):
158
"""
159
Send reply to specific request.
160
161
Args:
162
messageId (bytes): Message ID from gotMessage callback
163
*messageParts: Variable number of response message parts (bytes)
164
165
Note:
166
Must be called exactly once for each request received via gotMessage.
167
The messageId must match the one provided in gotMessage callback.
168
"""
169
170
def gotMessage(self, messageId, *messageParts):
171
"""
172
Abstract method called when request is received.
173
174
Must be implemented by subclasses to handle incoming requests.
175
Must call reply() with the same messageId to send response.
176
177
Args:
178
messageId (bytes): Unique message identifier for correlation
179
*messageParts: Request message parts (bytes)
180
"""
181
```
182
183
#### Reply Server Usage Example
184
185
```python
186
from twisted.internet import reactor
187
from txzmq import ZmqFactory, ZmqEndpoint, ZmqEndpointType, ZmqREPConnection
188
import json
189
import time
190
191
class APIServer(ZmqREPConnection):
192
"""Server handling API requests."""
193
194
def __init__(self, factory, endpoint):
195
super().__init__(factory, endpoint)
196
# Simulate user database
197
self.users = {
198
"123": {"id": "123", "name": "Alice", "email": "alice@example.com", "age": 25},
199
"456": {"id": "456", "name": "Bob", "email": "bob@example.com", "age": 30},
200
}
201
self.next_id = 1000
202
203
print("API Server started and ready for requests")
204
205
def gotMessage(self, messageId, *messageParts):
206
"""Handle incoming API request."""
207
try:
208
# Parse request
209
request_data = json.loads(messageParts[0].decode('utf-8'))
210
action = request_data.get('action')
211
212
print(f"Processing request: {action}")
213
214
# Route to appropriate handler
215
if action == 'get_user':
216
response = self.handle_get_user(request_data)
217
elif action == 'create_user':
218
response = self.handle_create_user(request_data)
219
elif action == 'delete_user':
220
response = self.handle_delete_user(request_data)
221
elif action == 'list_users':
222
response = self.handle_list_users(request_data)
223
else:
224
response = {
225
'success': False,
226
'error': f'Unknown action: {action}'
227
}
228
229
# Send response
230
response_message = json.dumps(response).encode('utf-8')
231
self.reply(messageId, response_message)
232
233
except Exception as e:
234
# Send error response
235
error_response = {
236
'success': False,
237
'error': str(e)
238
}
239
response_message = json.dumps(error_response).encode('utf-8')
240
self.reply(messageId, response_message)
241
242
def handle_get_user(self, request):
243
"""Get user by ID."""
244
user_id = request.get('user_id')
245
if user_id in self.users:
246
return {
247
'success': True,
248
'user': self.users[user_id]
249
}
250
else:
251
return {
252
'success': False,
253
'error': f'User {user_id} not found'
254
}
255
256
def handle_create_user(self, request):
257
"""Create new user."""
258
user_data = request.get('data', {})
259
260
# Validate required fields
261
if not user_data.get('name') or not user_data.get('email'):
262
return {
263
'success': False,
264
'error': 'Name and email are required'
265
}
266
267
# Create user with new ID
268
user_id = str(self.next_id)
269
self.next_id += 1
270
271
new_user = {
272
'id': user_id,
273
'name': user_data['name'],
274
'email': user_data['email'],
275
'age': user_data.get('age', 0),
276
'created_at': time.time()
277
}
278
279
self.users[user_id] = new_user
280
281
return {
282
'success': True,
283
'user': new_user
284
}
285
286
def handle_delete_user(self, request):
287
"""Delete user by ID."""
288
user_id = request.get('user_id')
289
if user_id in self.users:
290
deleted_user = self.users.pop(user_id)
291
return {
292
'success': True,
293
'deleted_user': deleted_user
294
}
295
else:
296
return {
297
'success': False,
298
'error': f'User {user_id} not found'
299
}
300
301
def handle_list_users(self, request):
302
"""List all users."""
303
return {
304
'success': True,
305
'users': list(self.users.values()),
306
'count': len(self.users)
307
}
308
309
# Start server
310
def main():
311
factory = ZmqFactory()
312
factory.registerForShutdown()
313
314
endpoint = ZmqEndpoint(ZmqEndpointType.bind, "tcp://*:5555")
315
server = APIServer(factory, endpoint)
316
317
print("Starting API server on tcp://*:5555")
318
reactor.run()
319
320
if __name__ == "__main__":
321
main()
322
```
323
324
### Advanced Request-Reply Patterns
325
326
Complex request-reply scenarios including load balancing, service discovery, and multi-stage request processing.
327
328
#### Load Balanced Server Pool
329
330
```python
331
class LoadBalancedService:
332
"""Multiple server instances for load balancing."""
333
334
def __init__(self, factory, service_name, bind_addresses):
335
self.service_name = service_name
336
self.servers = []
337
338
for i, address in enumerate(bind_addresses):
339
endpoint = ZmqEndpoint(ZmqEndpointType.bind, address)
340
server = ServiceServer(factory, endpoint, f"{service_name}-{i+1}")
341
self.servers.append(server)
342
print(f"Started {service_name} server {i+1} on {address}")
343
344
class ServiceServer(ZmqREPConnection):
345
def __init__(self, factory, endpoint, server_id):
346
super().__init__(factory, endpoint)
347
self.server_id = server_id
348
self.request_count = 0
349
350
def gotMessage(self, messageId, *messageParts):
351
self.request_count += 1
352
request = json.loads(messageParts[0].decode('utf-8'))
353
354
# Add server info to response
355
response = self.process_request(request)
356
response['server_id'] = self.server_id
357
response['request_number'] = self.request_count
358
359
response_data = json.dumps(response).encode('utf-8')
360
self.reply(messageId, response_data)
361
362
def process_request(self, request):
363
# Simulate processing
364
import time
365
time.sleep(0.1) # Simulate work
366
367
return {
368
'success': True,
369
'result': f"Processed {request.get('task', 'unknown')}",
370
'timestamp': time.time()
371
}
372
373
# Client with retry logic
374
class RobustClient(ZmqREQConnection):
375
def __init__(self, factory, endpoints):
376
# Connect to multiple server addresses
377
super().__init__(factory)
378
self.addEndpoints(endpoints)
379
self.defaultRequestTimeout = 5.0
380
381
@defer.inlineCallbacks
382
def robust_request(self, request_data, max_retries=3):
383
"""Make request with retry logic."""
384
for attempt in range(max_retries):
385
try:
386
print(f"Attempt {attempt + 1}: Making request")
387
response = yield self.sendMsg(
388
json.dumps(request_data).encode('utf-8'),
389
timeout=5.0
390
)
391
result = json.loads(response[0].decode('utf-8'))
392
print(f"Success on attempt {attempt + 1}: {result.get('server_id')}")
393
defer.returnValue(result)
394
395
except ZmqRequestTimeoutError:
396
print(f"Attempt {attempt + 1} timed out")
397
if attempt == max_retries - 1:
398
raise
399
# Wait before retry
400
yield defer.succeed(None)
401
reactor.callLater(1.0, lambda: None)
402
403
raise Exception("All retry attempts failed")
404
405
# Usage
406
factory = ZmqFactory()
407
408
# Start multiple servers
409
service = LoadBalancedService(factory, "calculator", [
410
"tcp://*:5555",
411
"tcp://*:5556",
412
"tcp://*:5557"
413
])
414
415
# Create client connecting to all servers
416
client_endpoints = [
417
ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5555"),
418
ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5556"),
419
ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5557")
420
]
421
client = RobustClient(factory, client_endpoints)
422
```
423
424
### Timeout Handling and Error Recovery
425
426
Comprehensive error handling patterns for robust request-reply applications.
427
428
```python
429
class TimeoutAwareClient(ZmqREQConnection):
430
"""Client with sophisticated timeout and error handling."""
431
432
def __init__(self, factory, endpoint):
433
super().__init__(factory, endpoint)
434
self.defaultRequestTimeout = 10.0
435
self.request_stats = {
436
'total': 0,
437
'successful': 0,
438
'timeouts': 0,
439
'errors': 0
440
}
441
442
@defer.inlineCallbacks
443
def adaptive_request(self, request_data, min_timeout=1.0, max_timeout=30.0):
444
"""Make request with adaptive timeout based on historical performance."""
445
# Calculate adaptive timeout based on recent performance
446
success_rate = (self.request_stats['successful'] /
447
max(self.request_stats['total'], 1))
448
449
if success_rate > 0.9:
450
timeout = min_timeout
451
elif success_rate > 0.7:
452
timeout = min_timeout * 2
453
else:
454
timeout = max_timeout
455
456
self.request_stats['total'] += 1
457
458
try:
459
print(f"Making request with {timeout}s timeout (success rate: {success_rate:.2%})")
460
response = yield self.sendMsg(
461
json.dumps(request_data).encode('utf-8'),
462
timeout=timeout
463
)
464
self.request_stats['successful'] += 1
465
result = json.loads(response[0].decode('utf-8'))
466
defer.returnValue(result)
467
468
except ZmqRequestTimeoutError as e:
469
self.request_stats['timeouts'] += 1
470
print(f"Request timed out after {timeout}s")
471
# Could implement exponential backoff here
472
raise
473
474
except Exception as e:
475
self.request_stats['errors'] += 1
476
print(f"Request failed: {e}")
477
raise
478
479
def get_stats(self):
480
"""Get client performance statistics."""
481
return self.request_stats.copy()
482
483
# Usage with automatic timeout adjustment
484
@defer.inlineCallbacks
485
def test_adaptive_timeout():
486
factory = ZmqFactory()
487
endpoint = ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5555")
488
client = TimeoutAwareClient(factory, endpoint)
489
490
# Make multiple requests to build statistics
491
for i in range(20):
492
try:
493
request = {'task': f'process_item_{i}', 'complexity': i % 5}
494
result = yield client.adaptive_request(request)
495
print(f"Request {i}: {result.get('result', 'no result')}")
496
497
except Exception as e:
498
print(f"Request {i} failed: {e}")
499
500
# Brief delay between requests
501
yield defer.succeed(None)
502
reactor.callLater(0.5, lambda: None)
503
504
# Print final statistics
505
stats = client.get_stats()
506
print(f"\nFinal stats: {stats}")
507
508
reactor.stop()
509
510
# Run test
511
reactor.callWhenRunning(test_adaptive_timeout)
512
```