or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-support.mdauthentication.mdconstants.mdcore-messaging.mddevices.mderror-handling.mdindex.mdmessage-handling.mdpolling.md

devices.mddocs/

0

# Devices

1

2

Background devices for message routing including proxy, queue, forwarder, and steerable proxy implementations with monitoring capabilities.

3

4

## Capabilities

5

6

### Proxy Functions

7

8

Built-in proxy functions for message routing between sockets.

9

10

```python { .api }

11

def proxy(frontend: Socket, backend: Socket, capture: Socket = None) -> None:

12

"""

13

Simple proxy connecting frontend and backend sockets.

14

15

Parameters:

16

- frontend: Frontend socket (e.g., ROUTER for clients)

17

- backend: Backend socket (e.g., DEALER for workers)

18

- capture: Optional socket to capture all messages

19

"""

20

21

def proxy_steerable(frontend: Socket, backend: Socket, capture: Socket = None, control: Socket = None) -> None:

22

"""

23

Steerable proxy with control socket for runtime management.

24

25

Parameters:

26

- frontend: Frontend socket

27

- backend: Backend socket

28

- capture: Optional capture socket

29

- control: Control socket for PAUSE/RESUME/TERMINATE commands

30

"""

31

```

32

33

### Device Base Classes

34

35

Base classes for creating custom background devices.

36

37

```python { .api }

38

class Device:

39

def __init__(self, device_type: int, frontend: Socket, backend: Socket) -> None:

40

"""

41

Base device class.

42

43

Parameters:

44

- device_type: Device type constant (QUEUE, FORWARDER, STREAMER)

45

- frontend: Frontend socket

46

- backend: Backend socket

47

"""

48

49

def run(self) -> None:

50

"""Run the device (blocking)."""

51

52

class ThreadDevice(Device):

53

def __init__(self, device_type: int, frontend: Socket, backend: Socket) -> None:

54

"""Thread-based device that runs in background."""

55

56

def start(self) -> None:

57

"""Start the device in a background thread."""

58

59

def join(self, timeout: float = None) -> None:

60

"""

61

Wait for device thread to terminate.

62

63

Parameters:

64

- timeout: Timeout in seconds (None for infinite)

65

"""

66

67

@property

68

def done(self) -> bool:

69

"""True if device has finished running."""

70

```

71

72

### Monitored Queue Device

73

74

Queue device with monitoring capabilities.

75

76

```python { .api }

77

class MonitoredQueue:

78

def __init__(self, in_socket: Socket, out_socket: Socket, mon_socket: Socket) -> None:

79

"""

80

Create a monitored queue device.

81

82

Parameters:

83

- in_socket: Input socket

84

- out_socket: Output socket

85

- mon_socket: Monitoring socket

86

"""

87

88

def run(self) -> None:

89

"""Run the monitored queue (blocking)."""

90

91

class MonitoredQueueDevice(ThreadDevice):

92

def __init__(self, in_socket: Socket, out_socket: Socket, mon_socket: Socket) -> None:

93

"""Thread-based monitored queue device."""

94

```

95

96

### Proxy Device Classes

97

98

High-level proxy device implementations.

99

100

```python { .api }

101

class ProxyDevice(ThreadDevice):

102

def __init__(self, frontend: Socket, backend: Socket, capture: Socket = None) -> None:

103

"""

104

Proxy device running in background thread.

105

106

Parameters:

107

- frontend: Frontend socket

108

- backend: Backend socket

109

- capture: Optional capture socket

110

"""

111

112

class ProxySteerableDevice(ThreadDevice):

113

def __init__(self, frontend: Socket, backend: Socket, capture: Socket = None, control: Socket = None) -> None:

114

"""

115

Steerable proxy device with control interface.

116

117

Parameters:

118

- frontend: Frontend socket

119

- backend: Backend socket

120

- capture: Optional capture socket

121

- control: Control socket for management

122

"""

123

124

def pause(self) -> None:

125

"""Pause message forwarding."""

126

127

def resume(self) -> None:

128

"""Resume message forwarding."""

129

130

def terminate(self) -> None:

131

"""Terminate the proxy device."""

132

```

133

134

## Usage Examples

135

136

### Basic Proxy

137

138

```python

139

import zmq

140

141

context = zmq.Context()

142

143

# Create frontend for clients

144

frontend = context.socket(zmq.ROUTER)

145

frontend.bind("tcp://*:5559")

146

147

# Create backend for workers

148

backend = context.socket(zmq.DEALER)

149

backend.bind("tcp://*:5560")

150

151

try:

152

# Run proxy (blocking)

153

zmq.proxy(frontend, backend)

154

except KeyboardInterrupt:

155

print("Proxy interrupted")

156

finally:

157

frontend.close()

158

backend.close()

159

context.term()

160

```

161

162

### Proxy with Message Capture

163

164

```python

165

import zmq

166

167

context = zmq.Context()

168

169

# Create sockets

170

frontend = context.socket(zmq.ROUTER)

171

frontend.bind("tcp://*:5559")

172

173

backend = context.socket(zmq.DEALER)

174

backend.bind("tcp://*:5560")

175

176

# Capture socket for monitoring

177

capture = context.socket(zmq.PUB)

178

capture.bind("tcp://*:5561")

179

180

try:

181

# All messages will be captured and published

182

zmq.proxy(frontend, backend, capture)

183

except KeyboardInterrupt:

184

print("Proxy with capture interrupted")

185

finally:

186

frontend.close()

187

backend.close()

188

capture.close()

189

context.term()

190

```

191

192

### Steerable Proxy

193

194

```python

195

import zmq

196

import threading

197

import time

198

199

def control_proxy():

200

"""Control function for steerable proxy"""

201

context = zmq.Context()

202

control = context.socket(zmq.REQ)

203

control.connect("inproc://control")

204

205

time.sleep(2)

206

207

# Pause proxy

208

control.send_string("PAUSE")

209

control.recv_string()

210

print("Proxy paused")

211

212

time.sleep(2)

213

214

# Resume proxy

215

control.send_string("RESUME")

216

control.recv_string()

217

print("Proxy resumed")

218

219

time.sleep(2)

220

221

# Terminate proxy

222

control.send_string("TERMINATE")

223

control.recv_string()

224

print("Proxy terminated")

225

226

control.close()

227

context.term()

228

229

def main():

230

context = zmq.Context()

231

232

# Create proxy sockets

233

frontend = context.socket(zmq.ROUTER)

234

frontend.bind("tcp://*:5559")

235

236

backend = context.socket(zmq.DEALER)

237

backend.bind("tcp://*:5560")

238

239

# Control socket

240

control = context.socket(zmq.REP)

241

control.bind("inproc://control")

242

243

# Start control thread

244

control_thread = threading.Thread(target=control_proxy)

245

control_thread.start()

246

247

try:

248

# Run steerable proxy

249

zmq.proxy_steerable(frontend, backend, None, control)

250

finally:

251

control_thread.join()

252

frontend.close()

253

backend.close()

254

control.close()

255

context.term()

256

257

main()

258

```

259

260

### Thread-based Proxy Device

261

262

```python

263

import zmq

264

from zmq.devices import ProxyDevice

265

import time

266

267

context = zmq.Context()

268

269

# Create sockets

270

frontend = context.socket(zmq.ROUTER)

271

frontend.bind("tcp://*:5559")

272

273

backend = context.socket(zmq.DEALER)

274

backend.bind("tcp://*:5560")

275

276

# Create proxy device

277

device = ProxyDevice(frontend, backend)

278

279

try:

280

# Start proxy in background thread

281

device.start()

282

print("Proxy started in background")

283

284

# Do other work while proxy runs

285

for i in range(10):

286

print(f"Main thread working... {i}")

287

time.sleep(1)

288

289

# Wait for device to complete (won't happen in this example)

290

device.join(timeout=1.0)

291

292

except KeyboardInterrupt:

293

print("Main interrupted")

294

finally:

295

# Device will be cleaned up automatically

296

frontend.close()

297

backend.close()

298

context.term()

299

```

300

301

### Monitored Queue

302

303

```python

304

import zmq

305

from zmq.devices import MonitoredQueueDevice

306

307

context = zmq.Context()

308

309

# Create queue sockets

310

input_socket = context.socket(zmq.PULL)

311

input_socket.bind("tcp://*:5557")

312

313

output_socket = context.socket(zmq.PUSH)

314

output_socket.bind("tcp://*:5558")

315

316

# Monitoring socket

317

monitor_socket = context.socket(zmq.PUB)

318

monitor_socket.bind("tcp://*:5559")

319

320

# Create monitored queue device

321

device = MonitoredQueueDevice(input_socket, output_socket, monitor_socket)

322

323

try:

324

device.start()

325

print("Monitored queue started")

326

327

# Monitor the queue

328

monitor_client = context.socket(zmq.SUB)

329

monitor_client.connect("tcp://localhost:5559")

330

monitor_client.setsockopt(zmq.SUBSCRIBE, b"")

331

332

# Process monitoring messages

333

while True:

334

try:

335

message = monitor_client.recv_string(zmq.NOBLOCK)

336

print(f"Monitor: {message}")

337

except zmq.Again:

338

time.sleep(0.1)

339

340

except KeyboardInterrupt:

341

print("Monitored queue interrupted")

342

finally:

343

device.join()

344

input_socket.close()

345

output_socket.close()

346

monitor_socket.close()

347

monitor_client.close()

348

context.term()

349

```

350

351

### Custom Device

352

353

```python

354

import zmq

355

from zmq.devices import Device

356

import json

357

import time

358

359

class LoggingDevice(Device):

360

"""Custom device that logs all messages"""

361

362

def __init__(self, frontend, backend, log_file="messages.log"):

363

super().__init__(zmq.QUEUE, frontend, backend)

364

self.log_file = log_file

365

366

def run(self):

367

"""Custom run method with logging"""

368

poller = zmq.Poller()

369

poller.register(self.frontend_socket, zmq.POLLIN)

370

poller.register(self.backend_socket, zmq.POLLIN)

371

372

with open(self.log_file, 'w') as log:

373

while True:

374

events = poller.poll()

375

376

for socket, event in events:

377

if socket is self.frontend_socket and event & zmq.POLLIN:

378

# Forward frontend -> backend

379

message = self.frontend_socket.recv_multipart()

380

self.backend_socket.send_multipart(message)

381

382

# Log message

383

log_entry = {

384

'timestamp': time.time(),

385

'direction': 'frontend->backend',

386

'message': [part.decode('utf-8', errors='ignore') for part in message]

387

}

388

log.write(json.dumps(log_entry) + '\n')

389

log.flush()

390

391

elif socket is self.backend_socket and event & zmq.POLLIN:

392

# Forward backend -> frontend

393

message = self.backend_socket.recv_multipart()

394

self.frontend_socket.send_multipart(message)

395

396

# Log message

397

log_entry = {

398

'timestamp': time.time(),

399

'direction': 'backend->frontend',

400

'message': [part.decode('utf-8', errors='ignore') for part in message]

401

}

402

log.write(json.dumps(log_entry) + '\n')

403

log.flush()

404

405

# Usage

406

context = zmq.Context()

407

408

frontend = context.socket(zmq.ROUTER)

409

frontend.bind("tcp://*:5559")

410

411

backend = context.socket(zmq.DEALER)

412

backend.bind("tcp://*:5560")

413

414

device = LoggingDevice(frontend, backend, "proxy.log")

415

416

try:

417

device.run()

418

except KeyboardInterrupt:

419

print("Logging device interrupted")

420

finally:

421

frontend.close()

422

backend.close()

423

context.term()

424

```

425

426

### Load Balancer Device

427

428

```python

429

import zmq

430

from zmq.devices import ThreadDevice

431

import random

432

433

class LoadBalancerDevice(ThreadDevice):

434

"""Load balancer that distributes work across multiple backends"""

435

436

def __init__(self, frontend, backends):

437

# Use first backend as representative

438

super().__init__(zmq.QUEUE, frontend, backends[0])

439

self.backends = backends

440

441

def run(self):

442

"""Custom load balancing logic"""

443

poller = zmq.Poller()

444

poller.register(self.frontend_socket, zmq.POLLIN)

445

446

# Track available backends

447

available_backends = list(self.backends)

448

backend_poller = zmq.Poller()

449

450

for backend in self.backends:

451

backend_poller.register(backend, zmq.POLLIN)

452

453

while True:

454

# Check for frontend requests

455

if poller.poll(10): # 10ms timeout

456

message = self.frontend_socket.recv_multipart()

457

458

# Select available backend

459

if available_backends:

460

backend = random.choice(available_backends)

461

backend.send_multipart(message)

462

print(f"Sent to backend {self.backends.index(backend)}")

463

464

# Check for backend responses

465

backend_events = backend_poller.poll(10)

466

for backend, event in backend_events:

467

if event & zmq.POLLIN:

468

response = backend.recv_multipart()

469

self.frontend_socket.send_multipart(response)

470

print(f"Response from backend {self.backends.index(backend)}")

471

472

# Usage

473

context = zmq.Context()

474

475

frontend = context.socket(zmq.ROUTER)

476

frontend.bind("tcp://*:5559")

477

478

# Multiple backend sockets

479

backends = []

480

for i in range(3):

481

backend = context.socket(zmq.DEALER)

482

backend.bind(f"tcp://*:{5560 + i}")

483

backends.append(backend)

484

485

device = LoadBalancerDevice(frontend, backends)

486

487

try:

488

device.start()

489

print("Load balancer started with 3 backends")

490

491

# Keep main thread alive

492

device.join()

493

494

except KeyboardInterrupt:

495

print("Load balancer interrupted")

496

finally:

497

frontend.close()

498

for backend in backends:

499

backend.close()

500

context.term()

501

```

502

503

### Device with Statistics

504

505

```python

506

import zmq

507

from zmq.devices import Device

508

import time

509

import threading

510

511

class StatisticsDevice(Device):

512

"""Device that tracks message statistics"""

513

514

def __init__(self, frontend, backend):

515

super().__init__(zmq.QUEUE, frontend, backend)

516

self.stats = {

517

'messages_forwarded': 0,

518

'bytes_forwarded': 0,

519

'start_time': time.time(),

520

'last_message_time': 0

521

}

522

self.stats_lock = threading.Lock()

523

524

def get_stats(self):

525

"""Get current statistics"""

526

with self.stats_lock:

527

runtime = time.time() - self.stats['start_time']

528

return {

529

**self.stats,

530

'runtime_seconds': runtime,

531

'messages_per_second': self.stats['messages_forwarded'] / max(runtime, 1),

532

'bytes_per_second': self.stats['bytes_forwarded'] / max(runtime, 1)

533

}

534

535

def run(self):

536

"""Run with statistics tracking"""

537

poller = zmq.Poller()

538

poller.register(self.frontend_socket, zmq.POLLIN)

539

poller.register(self.backend_socket, zmq.POLLIN)

540

541

while True:

542

events = poller.poll()

543

544

for socket, event in events:

545

if event & zmq.POLLIN:

546

message = socket.recv_multipart()

547

548

# Update statistics

549

with self.stats_lock:

550

self.stats['messages_forwarded'] += 1

551

self.stats['bytes_forwarded'] += sum(len(part) for part in message)

552

self.stats['last_message_time'] = time.time()

553

554

# Forward message

555

if socket is self.frontend_socket:

556

self.backend_socket.send_multipart(message)

557

else:

558

self.frontend_socket.send_multipart(message)

559

560

# Usage with statistics reporting

561

def print_stats(device):

562

"""Print statistics periodically"""

563

while True:

564

time.sleep(5)

565

stats = device.get_stats()

566

print(f"Messages: {stats['messages_forwarded']}, "

567

f"Rate: {stats['messages_per_second']:.1f} msg/s, "

568

f"Bytes: {stats['bytes_forwarded']}")

569

570

context = zmq.Context()

571

572

frontend = context.socket(zmq.ROUTER)

573

frontend.bind("tcp://*:5559")

574

575

backend = context.socket(zmq.DEALER)

576

backend.bind("tcp://*:5560")

577

578

device = StatisticsDevice(frontend, backend)

579

580

# Start statistics reporter

581

stats_thread = threading.Thread(target=print_stats, args=(device,))

582

stats_thread.daemon = True

583

stats_thread.start()

584

585

try:

586

device.run()

587

except KeyboardInterrupt:

588

print("\nFinal statistics:")

589

stats = device.get_stats()

590

for key, value in stats.items():

591

print(f" {key}: {value}")

592

finally:

593

frontend.close()

594

backend.close()

595

context.term()

596

```

597

598

## Device Types

599

600

PyZMQ supports several predefined device types:

601

602

```python

603

# Device type constants

604

zmq.QUEUE # Load-balancing queue device

605

zmq.FORWARDER # Message forwarder device

606

zmq.STREAMER # Message streamer device

607

```

608

609

## Control Commands

610

611

Steerable proxy devices accept these control commands:

612

613

```python

614

# Control commands for steerable proxy

615

"PAUSE" # Pause message forwarding

616

"RESUME" # Resume message forwarding

617

"TERMINATE" # Terminate the proxy

618

"STATISTICS" # Get proxy statistics (if supported)

619

```

620

621

## Types

622

623

```python { .api }

624

from typing import Optional, Dict, Any, List

625

import threading

626

627

# Device types

628

DeviceType = int # QUEUE, FORWARDER, STREAMER constants

629

630

# Socket types for devices

631

DeviceSocket = Socket

632

FrontendSocket = Socket

633

BackendSocket = Socket

634

CaptureSocket = Optional[Socket]

635

ControlSocket = Optional[Socket]

636

MonitorSocket = Socket

637

638

# Statistics types

639

DeviceStats = Dict[str, Any]

640

MessageCount = int

641

ByteCount = int

642

Timestamp = float

643

644

# Thread types

645

DeviceThread = threading.Thread

646

JoinTimeout = Optional[float]

647

648

# Control types

649

ControlCommand = str # "PAUSE", "RESUME", "TERMINATE", etc.

650

```