or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

factory-connection.mdindex.mdpubsub.mdpushpull.mdreqrep.mdrouter-dealer.md

pushpull.mddocs/

0

# Push-Pull Messaging

1

2

Push-pull pattern for load-balanced work distribution and result collection. This pattern creates a pipeline where work is distributed among multiple workers (pull) and results are collected (push). It provides automatic load balancing and is ideal for parallel processing, task queues, and distributed computing scenarios.

3

4

## Capabilities

5

6

### Push Connection

7

8

Sends work items to available workers in a load-balanced manner. Messages are automatically distributed among connected pull sockets.

9

10

```python { .api }

11

class ZmqPushConnection(ZmqConnection):

12

"""

13

Push connection for distributing work to workers.

14

15

Uses ZeroMQ PUSH socket type. Messages are load-balanced automatically

16

among connected PULL sockets using round-robin distribution.

17

"""

18

19

socketType = constants.PUSH

20

21

def push(self, message):

22

"""

23

Push work item to next available worker.

24

25

Args:

26

message (bytes): Work item data to be processed

27

Single part message containing task data

28

29

Note:

30

ZeroMQ automatically load-balances messages among connected

31

PULL sockets. Each message goes to exactly one worker.

32

"""

33

```

34

35

#### Push Usage Example

36

37

```python

38

from twisted.internet import reactor

39

from txzmq import ZmqFactory, ZmqEndpoint, ZmqEndpointType, ZmqPushConnection

40

import json

41

import uuid

42

43

# Work distributor

44

class WorkDistributor:

45

def __init__(self, factory, bind_address):

46

endpoint = ZmqEndpoint(ZmqEndpointType.bind, bind_address)

47

self.pusher = ZmqPushConnection(factory, endpoint)

48

49

def distribute_work(self, work_items):

50

"""Distribute work items to workers."""

51

for item in work_items:

52

# Add unique ID for tracking

53

work_package = {

54

'id': str(uuid.uuid4()),

55

'task': item['task'],

56

'data': item['data'],

57

'priority': item.get('priority', 'normal')

58

}

59

message = json.dumps(work_package).encode('utf-8')

60

self.pusher.push(message)

61

print(f"Sent work item {work_package['id']}: {work_package['task']}")

62

63

# Usage

64

factory = ZmqFactory()

65

distributor = WorkDistributor(factory, "tcp://*:5555")

66

67

# Distribute various types of work

68

work_queue = [

69

{'task': 'process_image', 'data': 'image1.jpg'},

70

{'task': 'calculate_stats', 'data': [1, 2, 3, 4, 5]},

71

{'task': 'send_email', 'data': 'user@example.com', 'priority': 'high'},

72

{'task': 'backup_database', 'data': 'table_users'},

73

{'task': 'generate_report', 'data': 'monthly_sales'}

74

]

75

76

distributor.distribute_work(work_queue)

77

78

# Continuous work distribution

79

def generate_periodic_work():

80

work_item = {

81

'task': 'health_check',

82

'data': f'timestamp_{reactor.seconds()}'

83

}

84

distributor.distribute_work([work_item])

85

reactor.callLater(10.0, generate_periodic_work) # Every 10 seconds

86

87

generate_periodic_work()

88

reactor.run()

89

```

90

91

### Pull Connection

92

93

Receives work items from pushers and processes them. Multiple pull connections can connect to the same push source for parallel processing.

94

95

```python { .api }

96

class ZmqPullConnection(ZmqConnection):

97

"""

98

Pull connection for receiving work from pushers.

99

100

Uses ZeroMQ PULL socket type. Receives work items in load-balanced manner

101

from connected PUSH sockets. Each worker gets different messages.

102

"""

103

104

socketType = constants.PULL

105

106

def onPull(self, message):

107

"""

108

Abstract method called when work item is received.

109

110

Must be implemented by subclasses to process work items.

111

112

Args:

113

message (list): List containing single message part with work data

114

message[0] contains the actual work item (bytes)

115

"""

116

```

117

118

#### Pull Usage Example

119

120

```python

121

from twisted.internet import reactor, defer

122

from txzmq import ZmqFactory, ZmqEndpoint, ZmqEndpointType, ZmqPullConnection

123

import json

124

import time

125

126

class Worker(ZmqPullConnection):

127

def __init__(self, factory, endpoint, worker_id):

128

super().__init__(factory, endpoint)

129

self.worker_id = worker_id

130

self.processed_count = 0

131

132

def onPull(self, message):

133

"""Process received work item."""

134

try:

135

# Parse work item

136

work_data = json.loads(message[0].decode('utf-8'))

137

work_id = work_data['id']

138

task_type = work_data['task']

139

task_data = work_data['data']

140

141

print(f"Worker {self.worker_id} processing {work_id}: {task_type}")

142

143

# Simulate different types of work

144

if task_type == 'process_image':

145

self.process_image(task_data, work_id)

146

elif task_type == 'calculate_stats':

147

self.calculate_stats(task_data, work_id)

148

elif task_type == 'send_email':

149

self.send_email(task_data, work_id)

150

elif task_type == 'backup_database':

151

self.backup_database(task_data, work_id)

152

elif task_type == 'generate_report':

153

self.generate_report(task_data, work_id)

154

elif task_type == 'health_check':

155

self.health_check(task_data, work_id)

156

else:

157

print(f"Worker {self.worker_id}: Unknown task type {task_type}")

158

159

self.processed_count += 1

160

161

except Exception as e:

162

print(f"Worker {self.worker_id} error processing message: {e}")

163

164

def process_image(self, image_path, work_id):

165

# Simulate image processing

166

time.sleep(2) # Simulate work

167

print(f"Worker {self.worker_id}: Processed image {image_path} ({work_id})")

168

169

def calculate_stats(self, data, work_id):

170

# Simulate statistical calculation

171

result = sum(data) / len(data)

172

time.sleep(1)

173

print(f"Worker {self.worker_id}: Calculated average {result} ({work_id})")

174

175

def send_email(self, email, work_id):

176

# Simulate email sending

177

time.sleep(0.5)

178

print(f"Worker {self.worker_id}: Sent email to {email} ({work_id})")

179

180

def backup_database(self, table, work_id):

181

# Simulate database backup

182

time.sleep(3)

183

print(f"Worker {self.worker_id}: Backed up {table} ({work_id})")

184

185

def generate_report(self, report_type, work_id):

186

# Simulate report generation

187

time.sleep(2.5)

188

print(f"Worker {self.worker_id}: Generated {report_type} report ({work_id})")

189

190

def health_check(self, data, work_id):

191

# Quick health check

192

print(f"Worker {self.worker_id}: Health check OK - {data} ({work_id})")

193

194

# Create multiple workers

195

factory = ZmqFactory()

196

endpoint = ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5555")

197

198

# Start multiple worker processes

199

workers = []

200

for i in range(3):

201

worker = Worker(factory, endpoint, f"W{i+1}")

202

workers.append(worker)

203

204

# Monitor worker performance

205

def print_stats():

206

total_processed = sum(w.processed_count for w in workers)

207

print(f"\n=== Stats ===")

208

for worker in workers:

209

print(f"{worker.worker_id}: {worker.processed_count} items processed")

210

print(f"Total: {total_processed} items")

211

print("=============\n")

212

reactor.callLater(30.0, print_stats) # Every 30 seconds

213

214

print_stats()

215

reactor.run()

216

```

217

218

### Pipeline Architecture

219

220

Building multi-stage processing pipelines using push-pull patterns for complex data processing workflows.

221

222

#### Two-Stage Pipeline Example

223

224

```python

225

from twisted.internet import reactor

226

from txzmq import ZmqFactory, ZmqEndpoint, ZmqEndpointType

227

from txzmq import ZmqPushConnection, ZmqPullConnection

228

import json

229

230

# Stage 1: Data preprocessor

231

class Preprocessor(ZmqPullConnection):

232

def __init__(self, factory, input_endpoint, output_endpoint):

233

super().__init__(factory, input_endpoint)

234

# Connect to next stage

235

self.output = ZmqPushConnection(factory, output_endpoint)

236

237

def onPull(self, message):

238

# Receive raw data from stage 0 (data source)

239

raw_data = json.loads(message[0].decode('utf-8'))

240

241

# Preprocess the data

242

processed_data = {

243

'id': raw_data['id'],

244

'processed_at': reactor.seconds(),

245

'normalized_data': self.normalize(raw_data['raw_values']),

246

'metadata': raw_data.get('metadata', {})

247

}

248

249

# Send to next stage

250

self.output.push(json.dumps(processed_data).encode('utf-8'))

251

print(f"Preprocessed item {processed_data['id']}")

252

253

def normalize(self, values):

254

"""Simple data normalization."""

255

if not values:

256

return []

257

max_val = max(values)

258

return [v / max_val for v in values] if max_val > 0 else values

259

260

# Stage 2: Data analyzer

261

class Analyzer(ZmqPullConnection):

262

def __init__(self, factory, input_endpoint, output_endpoint):

263

super().__init__(factory, input_endpoint)

264

self.output = ZmqPushConnection(factory, output_endpoint)

265

266

def onPull(self, message):

267

# Receive preprocessed data from stage 1

268

processed_data = json.loads(message[0].decode('utf-8'))

269

270

# Analyze the data

271

analysis_result = {

272

'id': processed_data['id'],

273

'analyzed_at': reactor.seconds(),

274

'mean': self.calculate_mean(processed_data['normalized_data']),

275

'variance': self.calculate_variance(processed_data['normalized_data']),

276

'trend': self.detect_trend(processed_data['normalized_data']),

277

'original_metadata': processed_data['metadata']

278

}

279

280

# Send to final stage (results collector)

281

self.output.push(json.dumps(analysis_result).encode('utf-8'))

282

print(f"Analyzed item {analysis_result['id']}: trend={analysis_result['trend']}")

283

284

def calculate_mean(self, values):

285

return sum(values) / len(values) if values else 0

286

287

def calculate_variance(self, values):

288

if not values:

289

return 0

290

mean = self.calculate_mean(values)

291

return sum((x - mean) ** 2 for x in values) / len(values)

292

293

def detect_trend(self, values):

294

if len(values) < 2:

295

return "unknown"

296

return "increasing" if values[-1] > values[0] else "decreasing"

297

298

# Results collector

299

class ResultsCollector(ZmqPullConnection):

300

def __init__(self, factory, input_endpoint):

301

super().__init__(factory, input_endpoint)

302

self.results = []

303

304

def onPull(self, message):

305

# Receive final analysis results

306

result = json.loads(message[0].decode('utf-8'))

307

self.results.append(result)

308

309

print(f"Collected result {result['id']}: "

310

f"mean={result['mean']:.3f}, "

311

f"variance={result['variance']:.3f}, "

312

f"trend={result['trend']}")

313

314

# Could save to database, file, or forward to another system

315

if len(self.results) % 10 == 0:

316

print(f"Collected {len(self.results)} total results")

317

318

# Set up pipeline

319

factory = ZmqFactory()

320

321

# Create pipeline stages

322

# Stage 0 -> Stage 1

323

preprocessor = Preprocessor(

324

factory,

325

ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5555"), # Input

326

ZmqEndpoint(ZmqEndpointType.bind, "tcp://*:5556") # Output

327

)

328

329

# Stage 1 -> Stage 2

330

analyzer = Analyzer(

331

factory,

332

ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5556"), # Input

333

ZmqEndpoint(ZmqEndpointType.bind, "tcp://*:5557") # Output

334

)

335

336

# Stage 2 -> Final

337

collector = ResultsCollector(

338

factory,

339

ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5557") # Input

340

)

341

342

print("Pipeline ready: Stage0 -> Preprocessor -> Analyzer -> Collector")

343

reactor.run()

344

```

345

346

### Load Balancing and Scalability

347

348

Horizontal scaling patterns for high-throughput processing using multiple workers and dynamic load distribution.

349

350

```python

351

class ScalableWorkerPool:

352

"""Manages a pool of workers that can be dynamically scaled."""

353

354

def __init__(self, factory, work_source_address, result_sink_address=None):

355

self.factory = factory

356

self.work_source = work_source_address

357

self.result_sink = result_sink_address

358

self.workers = []

359

self.worker_stats = {}

360

361

def add_worker(self, worker_class, worker_id):

362

"""Add a new worker to the pool."""

363

work_endpoint = ZmqEndpoint(ZmqEndpointType.connect, self.work_source)

364

365

if self.result_sink:

366

result_endpoint = ZmqEndpoint(ZmqEndpointType.connect, self.result_sink)

367

worker = worker_class(self.factory, work_endpoint, result_endpoint, worker_id)

368

else:

369

worker = worker_class(self.factory, work_endpoint, worker_id)

370

371

self.workers.append(worker)

372

self.worker_stats[worker_id] = {'started': reactor.seconds(), 'processed': 0}

373

print(f"Added worker {worker_id} to pool (total: {len(self.workers)})")

374

375

def remove_worker(self, worker_id):

376

"""Remove a worker from the pool."""

377

for worker in self.workers:

378

if hasattr(worker, 'worker_id') and worker.worker_id == worker_id:

379

worker.shutdown()

380

self.workers.remove(worker)

381

del self.worker_stats[worker_id]

382

print(f"Removed worker {worker_id} from pool (total: {len(self.workers)})")

383

break

384

385

def scale_to(self, target_workers, worker_class):

386

"""Scale worker pool to target size."""

387

current_count = len(self.workers)

388

389

if target_workers > current_count:

390

# Scale up

391

for i in range(target_workers - current_count):

392

worker_id = f"worker_{current_count + i + 1}"

393

self.add_worker(worker_class, worker_id)

394

elif target_workers < current_count:

395

# Scale down

396

for i in range(current_count - target_workers):

397

if self.workers:

398

last_worker = self.workers[-1]

399

if hasattr(last_worker, 'worker_id'):

400

self.remove_worker(last_worker.worker_id)

401

402

# Usage example

403

class ProcessingWorker(ZmqPullConnection):

404

def __init__(self, factory, work_endpoint, worker_id):

405

super().__init__(factory, work_endpoint)

406

self.worker_id = worker_id

407

self.processed_count = 0

408

409

def onPull(self, message):

410

# Process work item

411

work_data = json.loads(message[0].decode('utf-8'))

412

# Simulate processing time

413

import time

414

time.sleep(0.1)

415

self.processed_count += 1

416

print(f"{self.worker_id} processed item {work_data.get('id', 'unknown')}")

417

418

# Create scalable worker pool

419

factory = ZmqFactory()

420

pool = ScalableWorkerPool(factory, "tcp://127.0.0.1:5555")

421

422

# Start with 2 workers

423

pool.scale_to(2, ProcessingWorker)

424

425

# Simulate dynamic scaling based on load

426

def monitor_and_scale():

427

# Scale up during high load periods

428

current_hour = int(reactor.seconds()) % 24

429

if 9 <= current_hour <= 17: # Business hours

430

pool.scale_to(5, ProcessingWorker)

431

else: # Off hours

432

pool.scale_to(2, ProcessingWorker)

433

434

reactor.callLater(3600, monitor_and_scale) # Check every hour

435

436

monitor_and_scale()

437

```