or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-support.mdauthentication.mdconstants.mdcore-messaging.mddevices.mderror-handling.mdindex.mdmessage-handling.mdpolling.md

polling.mddocs/

0

# Polling

1

2

High-performance event polling for monitoring multiple sockets simultaneously, with support for timeouts and different polling backends.

3

4

## Capabilities

5

6

### Poller Class

7

8

The Poller class provides efficient event monitoring for multiple sockets using platform-specific polling mechanisms.

9

10

```python { .api }

11

class Poller:

12

def __init__(self) -> None:

13

"""Create a new Poller instance."""

14

15

def register(self, socket: Union[Socket, int], flags: int = POLLIN | POLLOUT) -> None:

16

"""

17

Register a socket for polling.

18

19

Parameters:

20

- socket: Socket or file descriptor to monitor

21

- flags: Event flags to monitor (POLLIN, POLLOUT, POLLERR)

22

"""

23

24

def modify(self, socket: Union[Socket, int], flags: int) -> None:

25

"""

26

Modify polling flags for a registered socket.

27

28

Parameters:

29

- socket: Socket or file descriptor

30

- flags: New event flags

31

"""

32

33

def unregister(self, socket: Union[Socket, int]) -> None:

34

"""

35

Unregister a socket from polling.

36

37

Parameters:

38

- socket: Socket or file descriptor to unregister

39

"""

40

41

def poll(self, timeout: int = -1) -> list[tuple[Socket, int]]:

42

"""

43

Poll for events on registered sockets.

44

45

Parameters:

46

- timeout: Timeout in milliseconds (-1 for infinite, 0 for non-blocking)

47

48

Returns:

49

- list: List of (socket, events) tuples for sockets with events

50

"""

51

```

52

53

### Low-Level Polling

54

55

Direct access to ZMQ's polling functionality for maximum control.

56

57

```python { .api }

58

def zmq_poll(sockets: list[tuple[Socket, int]], timeout: int = -1) -> list[tuple[Socket, int]]:

59

"""

60

Poll sockets for events.

61

62

Parameters:

63

- sockets: List of (socket, flags) tuples to poll

64

- timeout: Timeout in milliseconds (-1 for infinite)

65

66

Returns:

67

- list: List of (socket, events) tuples for sockets with events

68

"""

69

```

70

71

### Select-Style Interface

72

73

Python select-compatible interface for polling sockets.

74

75

```python { .api }

76

def select(rlist: list[Socket], wlist: list[Socket], xlist: list[Socket], timeout: float = None) -> tuple[list[Socket], list[Socket], list[Socket]]:

77

"""

78

Select-style polling interface.

79

80

Parameters:

81

- rlist: Sockets to check for readability

82

- wlist: Sockets to check for writability

83

- xlist: Sockets to check for errors

84

- timeout: Timeout in seconds (None for infinite)

85

86

Returns:

87

- tuple: (readable, writable, error) socket lists

88

"""

89

```

90

91

## Usage Examples

92

93

### Basic Polling

94

95

```python

96

import zmq

97

98

context = zmq.Context()

99

100

# Create multiple sockets

101

frontend = context.socket(zmq.ROUTER)

102

frontend.bind("tcp://*:5555")

103

104

backend = context.socket(zmq.DEALER)

105

backend.bind("tcp://*:5556")

106

107

# Create poller and register sockets

108

poller = zmq.Poller()

109

poller.register(frontend, zmq.POLLIN)

110

poller.register(backend, zmq.POLLIN)

111

112

try:

113

while True:

114

# Poll for events with 1 second timeout

115

events = poller.poll(1000)

116

117

if not events:

118

print("No events in 1 second")

119

continue

120

121

for socket, event in events:

122

if socket is frontend and event & zmq.POLLIN:

123

# Handle frontend message

124

message = frontend.recv_multipart()

125

print(f"Frontend received: {message}")

126

backend.send_multipart(message)

127

128

elif socket is backend and event & zmq.POLLIN:

129

# Handle backend message

130

message = backend.recv_multipart()

131

print(f"Backend received: {message}")

132

frontend.send_multipart(message)

133

134

except KeyboardInterrupt:

135

print("Interrupted")

136

finally:

137

frontend.close()

138

backend.close()

139

context.term()

140

```

141

142

### Proxy with Polling

143

144

```python

145

import zmq

146

147

def create_proxy():

148

context = zmq.Context()

149

150

# Frontend for clients

151

frontend = context.socket(zmq.ROUTER)

152

frontend.bind("tcp://*:5559")

153

154

# Backend for workers

155

backend = context.socket(zmq.DEALER)

156

backend.bind("tcp://*:5560")

157

158

# Control socket for shutdown

159

control = context.socket(zmq.SUB)

160

control.connect("inproc://control")

161

control.setsockopt(zmq.SUBSCRIBE, b"")

162

163

# Poll all sockets

164

poller = zmq.Poller()

165

poller.register(frontend, zmq.POLLIN)

166

poller.register(backend, zmq.POLLIN)

167

poller.register(control, zmq.POLLIN)

168

169

try:

170

while True:

171

events = poller.poll()

172

173

for socket, event in events:

174

if socket is control and event & zmq.POLLIN:

175

# Shutdown signal

176

message = control.recv()

177

if message == b"TERMINATE":

178

return

179

180

elif socket is frontend and event & zmq.POLLIN:

181

# Route frontend to backend

182

message = frontend.recv_multipart()

183

backend.send_multipart(message)

184

185

elif socket is backend and event & zmq.POLLIN:

186

# Route backend to frontend

187

message = backend.recv_multipart()

188

frontend.send_multipart(message)

189

finally:

190

frontend.close()

191

backend.close()

192

control.close()

193

context.term()

194

195

create_proxy()

196

```

197

198

### Non-Blocking Polling

199

200

```python

201

import zmq

202

import time

203

204

context = zmq.Context()

205

socket = context.socket(zmq.SUB)

206

socket.connect("tcp://localhost:5556")

207

socket.setsockopt(zmq.SUBSCRIBE, b"")

208

209

poller = zmq.Poller()

210

poller.register(socket, zmq.POLLIN)

211

212

try:

213

while True:

214

# Non-blocking poll (timeout = 0)

215

events = poller.poll(0)

216

217

if events:

218

# Process available messages

219

for sock, event in events:

220

if event & zmq.POLLIN:

221

message = sock.recv_string()

222

print(f"Received: {message}")

223

else:

224

# No messages available, do other work

225

print("No messages, doing other work...")

226

time.sleep(0.1)

227

228

except KeyboardInterrupt:

229

print("Interrupted")

230

finally:

231

socket.close()

232

context.term()

233

```

234

235

### Mixed Socket Types

236

237

```python

238

import zmq

239

import socket as py_socket

240

241

context = zmq.Context()

242

243

# ZMQ socket

244

zmq_socket = context.socket(zmq.SUB)

245

zmq_socket.connect("tcp://localhost:5556")

246

zmq_socket.setsockopt(zmq.SUBSCRIBE, b"")

247

248

# Regular Python socket

249

tcp_socket = py_socket.socket(py_socket.AF_INET, py_socket.SOCK_STREAM)

250

tcp_socket.bind(("localhost", 8080))

251

tcp_socket.listen(5)

252

tcp_socket.setblocking(False)

253

254

# Poll both socket types

255

poller = zmq.Poller()

256

poller.register(zmq_socket, zmq.POLLIN)

257

poller.register(tcp_socket, zmq.POLLIN) # Can register Python sockets too

258

259

try:

260

while True:

261

events = poller.poll(1000)

262

263

for sock, event in events:

264

if sock is zmq_socket and event & zmq.POLLIN:

265

# Handle ZMQ message

266

message = zmq_socket.recv_string()

267

print(f"ZMQ message: {message}")

268

269

elif sock is tcp_socket and event & zmq.POLLIN:

270

# Handle TCP connection

271

try:

272

client_sock, addr = tcp_socket.accept()

273

print(f"TCP connection from {addr}")

274

client_sock.close()

275

except BlockingIOError:

276

pass

277

278

except KeyboardInterrupt:

279

print("Interrupted")

280

finally:

281

zmq_socket.close()

282

tcp_socket.close()

283

context.term()

284

```

285

286

### Select-Style Polling

287

288

```python

289

import zmq

290

291

context = zmq.Context()

292

293

# Create sockets

294

pub = context.socket(zmq.PUB)

295

pub.bind("tcp://*:5557")

296

297

sub1 = context.socket(zmq.SUB)

298

sub1.connect("tcp://localhost:5557")

299

sub1.setsockopt(zmq.SUBSCRIBE, b"")

300

301

sub2 = context.socket(zmq.SUB)

302

sub2.connect("tcp://localhost:5557")

303

sub2.setsockopt(zmq.SUBSCRIBE, b"")

304

305

# Send some messages

306

pub.send_string("Hello 1")

307

pub.send_string("Hello 2")

308

309

# Use select-style polling

310

readable, writable, error = zmq.select([sub1, sub2], [pub], [], timeout=1.0)

311

312

print(f"Readable sockets: {len(readable)}")

313

print(f"Writable sockets: {len(writable)}")

314

print(f"Error sockets: {len(error)}")

315

316

# Process readable sockets

317

for sock in readable:

318

message = sock.recv_string()

319

print(f"Received: {message}")

320

321

# Clean up

322

pub.close()

323

sub1.close()

324

sub2.close()

325

context.term()

326

```

327

328

### Timeout Handling

329

330

```python

331

import zmq

332

import time

333

334

context = zmq.Context()

335

socket = context.socket(zmq.REQ)

336

socket.connect("tcp://localhost:5555")

337

338

poller = zmq.Poller()

339

poller.register(socket, zmq.POLLIN)

340

341

# Send request

342

socket.send_string("Hello Server")

343

start_time = time.time()

344

345

try:

346

# Poll with timeout

347

events = poller.poll(5000) # 5 second timeout

348

349

if events:

350

# Response received

351

response = socket.recv_string()

352

elapsed = time.time() - start_time

353

print(f"Response: {response} (took {elapsed:.2f}s)")

354

else:

355

# Timeout occurred

356

print("Request timed out after 5 seconds")

357

358

# Socket is now in inconsistent state, need to recreate

359

socket.close()

360

socket = context.socket(zmq.REQ)

361

socket.connect("tcp://localhost:5555")

362

363

except KeyboardInterrupt:

364

print("Interrupted")

365

finally:

366

socket.close()

367

context.term()

368

```

369

370

### Event Flag Combinations

371

372

```python

373

import zmq

374

375

context = zmq.Context()

376

socket = context.socket(zmq.DEALER)

377

socket.connect("tcp://localhost:5555")

378

379

poller = zmq.Poller()

380

381

# Register for different event combinations

382

poller.register(socket, zmq.POLLIN) # Read only

383

# poller.register(socket, zmq.POLLOUT) # Write only

384

# poller.register(socket, zmq.POLLIN | zmq.POLLOUT) # Read and write

385

# poller.register(socket, zmq.POLLERR) # Error only

386

387

try:

388

while True:

389

events = poller.poll(1000)

390

391

for sock, event in events:

392

if event & zmq.POLLIN:

393

print("Socket is readable")

394

message = sock.recv_string()

395

print(f"Received: {message}")

396

397

if event & zmq.POLLOUT:

398

print("Socket is writable")

399

sock.send_string("Response")

400

401

if event & zmq.POLLERR:

402

print("Socket has error")

403

break

404

405

except KeyboardInterrupt:

406

print("Interrupted")

407

finally:

408

socket.close()

409

context.term()

410

```

411

412

### Dynamic Socket Registration

413

414

```python

415

import zmq

416

import random

417

418

context = zmq.Context()

419

poller = zmq.Poller()

420

421

# Track active sockets

422

sockets = {}

423

424

def add_socket(name):

425

"""Add a new socket to polling"""

426

socket = context.socket(zmq.SUB)

427

socket.connect("tcp://localhost:5556")

428

socket.setsockopt(zmq.SUBSCRIBE, name.encode())

429

430

sockets[name] = socket

431

poller.register(socket, zmq.POLLIN)

432

print(f"Added socket: {name}")

433

434

def remove_socket(name):

435

"""Remove socket from polling"""

436

if name in sockets:

437

socket = sockets[name]

438

poller.unregister(socket)

439

socket.close()

440

del sockets[name]

441

print(f"Removed socket: {name}")

442

443

try:

444

# Start with some sockets

445

add_socket("weather")

446

add_socket("news")

447

448

while True:

449

events = poller.poll(1000)

450

451

if events:

452

for socket, event in events:

453

if event & zmq.POLLIN:

454

message = socket.recv_string()

455

print(f"Received: {message}")

456

else:

457

# Randomly add/remove sockets for demonstration

458

action = random.choice(["add", "remove", "nothing"])

459

460

if action == "add" and len(sockets) < 5:

461

name = f"topic_{random.randint(1, 100)}"

462

if name not in sockets:

463

add_socket(name)

464

465

elif action == "remove" and len(sockets) > 1:

466

name = random.choice(list(sockets.keys()))

467

remove_socket(name)

468

469

except KeyboardInterrupt:

470

print("Interrupted")

471

finally:

472

# Clean up all sockets

473

for socket in sockets.values():

474

socket.close()

475

context.term()

476

```

477

478

## Performance Tips

479

480

### Efficient Polling Patterns

481

482

```python

483

import zmq

484

485

# Reuse poller instance

486

poller = zmq.Poller()

487

488

# Register sockets once

489

for socket in sockets:

490

poller.register(socket, zmq.POLLIN)

491

492

# Poll in tight loop

493

while True:

494

# Use appropriate timeout

495

events = poller.poll(100) # Short timeout for responsiveness

496

497

if not events:

498

continue

499

500

# Process events efficiently

501

for socket, event in events:

502

# Handle events without blocking

503

if event & zmq.POLLIN:

504

message = socket.recv(zmq.NOBLOCK)

505

```

506

507

### Memory Management

508

509

```python

510

import zmq

511

512

# Unregister sockets before closing

513

poller.unregister(socket)

514

socket.close()

515

516

# Reuse poller instances when possible

517

# Creating new pollers frequently can be expensive

518

```

519

520

## Types

521

522

```python { .api }

523

from typing import Union, List, Tuple, Optional

524

525

# Socket types for polling

526

PollSocket = Union[Socket, int] # Socket or file descriptor

527

528

# Event flags

529

EventFlags = int # Combination of POLLIN, POLLOUT, POLLERR

530

531

# Poll results

532

PollEvent = Tuple[Socket, int] # (socket, events)

533

PollResult = List[PollEvent]

534

535

# Select-style results

536

SelectResult = Tuple[List[Socket], List[Socket], List[Socket]] # (readable, writable, error)

537

538

# Timeout types

539

PollTimeout = int # Milliseconds (-1 for infinite)

540

SelectTimeout = Optional[float] # Seconds (None for infinite)

541

```