or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

factory-connection.mdindex.mdpubsub.mdpushpull.mdreqrep.mdrouter-dealer.md

reqrep.mddocs/

0

# Request-Reply Messaging

1

2

Request-reply pattern with timeout support and Twisted Deferred integration for building client-server applications. This pattern provides reliable one-to-one communication where clients send requests and wait for responses from servers. It includes automatic correlation of requests and responses, timeout handling, and integration with Twisted's asynchronous programming model.

3

4

## Capabilities

5

6

### Request Connection

7

8

Sends requests to servers and receives responses asynchronously using Twisted Deferred objects. Supports request timeouts and automatic correlation.

9

10

```python { .api }

11

class ZmqRequestTimeoutError(Exception):

12

"""

13

Exception raised when a request times out before receiving a response.

14

15

Attributes:

16

msgId: The message ID that timed out

17

"""

18

19

class ZmqREQConnection(ZmqConnection):

20

"""

21

Request connection for client-side request-reply messaging.

22

23

Uses ZeroMQ DEALER socket internally for async operation while providing

24

REQ-like semantics. Each request gets a unique ID and returns a Deferred.

25

"""

26

27

socketType = constants.DEALER

28

defaultRequestTimeout = None # No timeout by default

29

UUID_POOL_GEN_SIZE = 5 # Number of UUIDs to generate at once

30

31

def sendMsg(self, *messageParts, **kwargs):

32

"""

33

Send request message and return Deferred for response.

34

35

Args:

36

*messageParts: Variable number of message parts (bytes)

37

**kwargs: Keyword arguments

38

timeout (float, optional): Request timeout in seconds

39

Overrides defaultRequestTimeout

40

41

Returns:

42

twisted.internet.defer.Deferred: Deferred that fires with response

43

or errback with ZmqRequestTimeoutError

44

45

Example:

46

d = connection.sendMsg(b"get_user", b"12345", timeout=5.0)

47

d.addCallback(handle_response)

48

d.addErrback(handle_error)

49

"""

50

```

51

52

#### Request Client Usage Example

53

54

```python

55

from twisted.internet import reactor, defer

56

from txzmq import ZmqFactory, ZmqEndpoint, ZmqEndpointType, ZmqREQConnection

57

from txzmq import ZmqRequestTimeoutError

58

import json

59

60

class APIClient(ZmqREQConnection):

61

"""Client for making API requests to server."""

62

63

defaultRequestTimeout = 10.0 # 10 second default timeout

64

65

def get_user(self, user_id):

66

"""Get user information by ID."""

67

request = {

68

'action': 'get_user',

69

'user_id': user_id

70

}

71

message = json.dumps(request).encode('utf-8')

72

return self.sendMsg(message)

73

74

def create_user(self, user_data, timeout=None):

75

"""Create new user with optional custom timeout."""

76

request = {

77

'action': 'create_user',

78

'data': user_data

79

}

80

message = json.dumps(request).encode('utf-8')

81

kwargs = {'timeout': timeout} if timeout else {}

82

return self.sendMsg(message, **kwargs)

83

84

def delete_user(self, user_id):

85

"""Delete user by ID."""

86

request = {

87

'action': 'delete_user',

88

'user_id': user_id

89

}

90

message = json.dumps(request).encode('utf-8')

91

return self.sendMsg(message, timeout=5.0) # Quick timeout for deletes

92

93

# Usage example

94

def main():

95

factory = ZmqFactory()

96

factory.registerForShutdown()

97

98

endpoint = ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5555")

99

client = APIClient(factory, endpoint)

100

101

@defer.inlineCallbacks

102

def run_requests():

103

try:

104

# Get user information

105

print("Getting user 123...")

106

response = yield client.get_user("123")

107

user_data = json.loads(response[0].decode('utf-8'))

108

print(f"User: {user_data}")

109

110

# Create new user

111

print("Creating new user...")

112

new_user = {

113

'name': 'John Doe',

114

'email': 'john@example.com',

115

'age': 30

116

}

117

response = yield client.create_user(new_user, timeout=15.0)

118

result = json.loads(response[0].decode('utf-8'))

119

print(f"Created user: {result}")

120

121

# Delete user

122

print("Deleting user 456...")

123

response = yield client.delete_user("456")

124

result = json.loads(response[0].decode('utf-8'))

125

print(f"Delete result: {result}")

126

127

except ZmqRequestTimeoutError as e:

128

print(f"Request timed out: {e}")

129

except Exception as e:

130

print(f"Request failed: {e}")

131

finally:

132

reactor.stop()

133

134

# Start making requests

135

reactor.callWhenRunning(run_requests)

136

reactor.run()

137

138

if __name__ == "__main__":

139

main()

140

```

141

142

### Reply Connection

143

144

Receives requests from clients and sends back responses. Uses message correlation to ensure responses reach the correct client.

145

146

```python { .api }

147

class ZmqREPConnection(ZmqConnection):

148

"""

149

Reply connection for server-side request-reply messaging.

150

151

Uses ZeroMQ ROUTER socket internally to handle multiple clients

152

while providing REP-like semantics with proper message routing.

153

"""

154

155

socketType = constants.ROUTER

156

157

def reply(self, messageId, *messageParts):

158

"""

159

Send reply to specific request.

160

161

Args:

162

messageId (bytes): Message ID from gotMessage callback

163

*messageParts: Variable number of response message parts (bytes)

164

165

Note:

166

Must be called exactly once for each request received via gotMessage.

167

The messageId must match the one provided in gotMessage callback.

168

"""

169

170

def gotMessage(self, messageId, *messageParts):

171

"""

172

Abstract method called when request is received.

173

174

Must be implemented by subclasses to handle incoming requests.

175

Must call reply() with the same messageId to send response.

176

177

Args:

178

messageId (bytes): Unique message identifier for correlation

179

*messageParts: Request message parts (bytes)

180

"""

181

```

182

183

#### Reply Server Usage Example

184

185

```python

186

from twisted.internet import reactor

187

from txzmq import ZmqFactory, ZmqEndpoint, ZmqEndpointType, ZmqREPConnection

188

import json

189

import time

190

191

class APIServer(ZmqREPConnection):

192

"""Server handling API requests."""

193

194

def __init__(self, factory, endpoint):

195

super().__init__(factory, endpoint)

196

# Simulate user database

197

self.users = {

198

"123": {"id": "123", "name": "Alice", "email": "alice@example.com", "age": 25},

199

"456": {"id": "456", "name": "Bob", "email": "bob@example.com", "age": 30},

200

}

201

self.next_id = 1000

202

203

print("API Server started and ready for requests")

204

205

def gotMessage(self, messageId, *messageParts):

206

"""Handle incoming API request."""

207

try:

208

# Parse request

209

request_data = json.loads(messageParts[0].decode('utf-8'))

210

action = request_data.get('action')

211

212

print(f"Processing request: {action}")

213

214

# Route to appropriate handler

215

if action == 'get_user':

216

response = self.handle_get_user(request_data)

217

elif action == 'create_user':

218

response = self.handle_create_user(request_data)

219

elif action == 'delete_user':

220

response = self.handle_delete_user(request_data)

221

elif action == 'list_users':

222

response = self.handle_list_users(request_data)

223

else:

224

response = {

225

'success': False,

226

'error': f'Unknown action: {action}'

227

}

228

229

# Send response

230

response_message = json.dumps(response).encode('utf-8')

231

self.reply(messageId, response_message)

232

233

except Exception as e:

234

# Send error response

235

error_response = {

236

'success': False,

237

'error': str(e)

238

}

239

response_message = json.dumps(error_response).encode('utf-8')

240

self.reply(messageId, response_message)

241

242

def handle_get_user(self, request):

243

"""Get user by ID."""

244

user_id = request.get('user_id')

245

if user_id in self.users:

246

return {

247

'success': True,

248

'user': self.users[user_id]

249

}

250

else:

251

return {

252

'success': False,

253

'error': f'User {user_id} not found'

254

}

255

256

def handle_create_user(self, request):

257

"""Create new user."""

258

user_data = request.get('data', {})

259

260

# Validate required fields

261

if not user_data.get('name') or not user_data.get('email'):

262

return {

263

'success': False,

264

'error': 'Name and email are required'

265

}

266

267

# Create user with new ID

268

user_id = str(self.next_id)

269

self.next_id += 1

270

271

new_user = {

272

'id': user_id,

273

'name': user_data['name'],

274

'email': user_data['email'],

275

'age': user_data.get('age', 0),

276

'created_at': time.time()

277

}

278

279

self.users[user_id] = new_user

280

281

return {

282

'success': True,

283

'user': new_user

284

}

285

286

def handle_delete_user(self, request):

287

"""Delete user by ID."""

288

user_id = request.get('user_id')

289

if user_id in self.users:

290

deleted_user = self.users.pop(user_id)

291

return {

292

'success': True,

293

'deleted_user': deleted_user

294

}

295

else:

296

return {

297

'success': False,

298

'error': f'User {user_id} not found'

299

}

300

301

def handle_list_users(self, request):

302

"""List all users."""

303

return {

304

'success': True,

305

'users': list(self.users.values()),

306

'count': len(self.users)

307

}

308

309

# Start server

310

def main():

311

factory = ZmqFactory()

312

factory.registerForShutdown()

313

314

endpoint = ZmqEndpoint(ZmqEndpointType.bind, "tcp://*:5555")

315

server = APIServer(factory, endpoint)

316

317

print("Starting API server on tcp://*:5555")

318

reactor.run()

319

320

if __name__ == "__main__":

321

main()

322

```

323

324

### Advanced Request-Reply Patterns

325

326

Complex request-reply scenarios including load balancing, service discovery, and multi-stage request processing.

327

328

#### Load Balanced Server Pool

329

330

```python

331

class LoadBalancedService:

332

"""Multiple server instances for load balancing."""

333

334

def __init__(self, factory, service_name, bind_addresses):

335

self.service_name = service_name

336

self.servers = []

337

338

for i, address in enumerate(bind_addresses):

339

endpoint = ZmqEndpoint(ZmqEndpointType.bind, address)

340

server = ServiceServer(factory, endpoint, f"{service_name}-{i+1}")

341

self.servers.append(server)

342

print(f"Started {service_name} server {i+1} on {address}")

343

344

class ServiceServer(ZmqREPConnection):

345

def __init__(self, factory, endpoint, server_id):

346

super().__init__(factory, endpoint)

347

self.server_id = server_id

348

self.request_count = 0

349

350

def gotMessage(self, messageId, *messageParts):

351

self.request_count += 1

352

request = json.loads(messageParts[0].decode('utf-8'))

353

354

# Add server info to response

355

response = self.process_request(request)

356

response['server_id'] = self.server_id

357

response['request_number'] = self.request_count

358

359

response_data = json.dumps(response).encode('utf-8')

360

self.reply(messageId, response_data)

361

362

def process_request(self, request):

363

# Simulate processing

364

import time

365

time.sleep(0.1) # Simulate work

366

367

return {

368

'success': True,

369

'result': f"Processed {request.get('task', 'unknown')}",

370

'timestamp': time.time()

371

}

372

373

# Client with retry logic

374

class RobustClient(ZmqREQConnection):

375

def __init__(self, factory, endpoints):

376

# Connect to multiple server addresses

377

super().__init__(factory)

378

self.addEndpoints(endpoints)

379

self.defaultRequestTimeout = 5.0

380

381

@defer.inlineCallbacks

382

def robust_request(self, request_data, max_retries=3):

383

"""Make request with retry logic."""

384

for attempt in range(max_retries):

385

try:

386

print(f"Attempt {attempt + 1}: Making request")

387

response = yield self.sendMsg(

388

json.dumps(request_data).encode('utf-8'),

389

timeout=5.0

390

)

391

result = json.loads(response[0].decode('utf-8'))

392

print(f"Success on attempt {attempt + 1}: {result.get('server_id')}")

393

defer.returnValue(result)

394

395

except ZmqRequestTimeoutError:

396

print(f"Attempt {attempt + 1} timed out")

397

if attempt == max_retries - 1:

398

raise

399

# Wait before retry

400

yield defer.succeed(None)

401

reactor.callLater(1.0, lambda: None)

402

403

raise Exception("All retry attempts failed")

404

405

# Usage

406

factory = ZmqFactory()

407

408

# Start multiple servers

409

service = LoadBalancedService(factory, "calculator", [

410

"tcp://*:5555",

411

"tcp://*:5556",

412

"tcp://*:5557"

413

])

414

415

# Create client connecting to all servers

416

client_endpoints = [

417

ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5555"),

418

ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5556"),

419

ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5557")

420

]

421

client = RobustClient(factory, client_endpoints)

422

```

423

424

### Timeout Handling and Error Recovery

425

426

Comprehensive error handling patterns for robust request-reply applications.

427

428

```python

429

class TimeoutAwareClient(ZmqREQConnection):

430

"""Client with sophisticated timeout and error handling."""

431

432

def __init__(self, factory, endpoint):

433

super().__init__(factory, endpoint)

434

self.defaultRequestTimeout = 10.0

435

self.request_stats = {

436

'total': 0,

437

'successful': 0,

438

'timeouts': 0,

439

'errors': 0

440

}

441

442

@defer.inlineCallbacks

443

def adaptive_request(self, request_data, min_timeout=1.0, max_timeout=30.0):

444

"""Make request with adaptive timeout based on historical performance."""

445

# Calculate adaptive timeout based on recent performance

446

success_rate = (self.request_stats['successful'] /

447

max(self.request_stats['total'], 1))

448

449

if success_rate > 0.9:

450

timeout = min_timeout

451

elif success_rate > 0.7:

452

timeout = min_timeout * 2

453

else:

454

timeout = max_timeout

455

456

self.request_stats['total'] += 1

457

458

try:

459

print(f"Making request with {timeout}s timeout (success rate: {success_rate:.2%})")

460

response = yield self.sendMsg(

461

json.dumps(request_data).encode('utf-8'),

462

timeout=timeout

463

)

464

self.request_stats['successful'] += 1

465

result = json.loads(response[0].decode('utf-8'))

466

defer.returnValue(result)

467

468

except ZmqRequestTimeoutError as e:

469

self.request_stats['timeouts'] += 1

470

print(f"Request timed out after {timeout}s")

471

# Could implement exponential backoff here

472

raise

473

474

except Exception as e:

475

self.request_stats['errors'] += 1

476

print(f"Request failed: {e}")

477

raise

478

479

def get_stats(self):

480

"""Get client performance statistics."""

481

return self.request_stats.copy()

482

483

# Usage with automatic timeout adjustment

484

@defer.inlineCallbacks

485

def test_adaptive_timeout():

486

factory = ZmqFactory()

487

endpoint = ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5555")

488

client = TimeoutAwareClient(factory, endpoint)

489

490

# Make multiple requests to build statistics

491

for i in range(20):

492

try:

493

request = {'task': f'process_item_{i}', 'complexity': i % 5}

494

result = yield client.adaptive_request(request)

495

print(f"Request {i}: {result.get('result', 'no result')}")

496

497

except Exception as e:

498

print(f"Request {i} failed: {e}")

499

500

# Brief delay between requests

501

yield defer.succeed(None)

502

reactor.callLater(0.5, lambda: None)

503

504

# Print final statistics

505

stats = client.get_stats()

506

print(f"\nFinal stats: {stats}")

507

508

reactor.stop()

509

510

# Run test

511

reactor.callWhenRunning(test_adaptive_timeout)

512

```