or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-support.mdauthentication.mdconstants.mdcore-messaging.mddevices.mderror-handling.mdindex.mdmessage-handling.mdpolling.md

core-messaging.mddocs/

0

# Core Messaging

1

2

The fundamental Context and Socket classes that form the foundation of all ZMQ communication. These classes provide synchronous messaging operations and support all ZMQ socket types and messaging patterns.

3

4

## Capabilities

5

6

### Context Management

7

8

The Context class manages ZMQ contexts, which are containers for all sockets in a single process. Contexts handle I/O threads, socket limits, and global settings.

9

10

```python { .api }

11

class Context:

12

def __init__(self, io_threads: int | Context = 1, shadow: Context | int = 0) -> None:

13

"""

14

Create a new ZMQ context.

15

16

Parameters:

17

- io_threads: Number of I/O threads or existing Context to shadow (default: 1)

18

- shadow: Context or address to shadow (default: 0)

19

"""

20

21

def socket(self, socket_type: int) -> Socket:

22

"""

23

Create a socket of the specified type.

24

25

Parameters:

26

- socket_type: ZMQ socket type constant (REQ, REP, PUB, SUB, etc.)

27

28

Returns:

29

- Socket: New socket instance

30

"""

31

32

def term(self) -> None:

33

"""Terminate the context and close all associated sockets."""

34

35

def destroy(self, linger: int = None) -> None:

36

"""

37

Close all sockets and terminate context with optional linger period.

38

39

Parameters:

40

- linger: Time in milliseconds to wait for messages to be sent

41

"""

42

43

def set(self, option: int, value: int) -> None:

44

"""

45

Set a context option.

46

47

Parameters:

48

- option: Context option constant (IO_THREADS, MAX_SOCKETS, etc.)

49

- value: Option value

50

"""

51

52

def get(self, option: int) -> int:

53

"""

54

Get a context option value.

55

56

Parameters:

57

- option: Context option constant

58

59

Returns:

60

- int: Current option value

61

"""

62

63

def __enter__(self) -> Context:

64

"""Context manager entry."""

65

66

def __exit__(self, exc_type, exc_value, traceback) -> None:

67

"""Context manager exit - destroys context."""

68

69

@classmethod

70

def instance(cls, io_threads: int = 1) -> Context:

71

"""

72

Return a global Context instance.

73

74

Parameters:

75

- io_threads: Number of I/O threads for new instance

76

77

Returns:

78

- Context: Global singleton context instance

79

"""

80

81

@classmethod

82

def shadow(cls, address: int | Context) -> Context:

83

"""

84

Shadow an existing libzmq context.

85

86

Parameters:

87

- address: Context or integer address to shadow

88

89

Returns:

90

- Context: New context shadowing the existing one

91

"""

92

93

@property

94

def underlying(self) -> int:

95

"""Integer address of the underlying libzmq context."""

96

97

@property

98

def closed(self) -> bool:

99

"""True if the context has been terminated."""

100

```

101

102

Usage example:

103

104

```python

105

import zmq

106

107

# Create context with 2 I/O threads

108

context = zmq.Context(io_threads=2)

109

110

# Set context options

111

context.set(zmq.MAX_SOCKETS, 1024)

112

113

# Use as context manager for automatic cleanup

114

with zmq.Context() as ctx:

115

socket = ctx.socket(zmq.REQ)

116

# Socket operations...

117

# Context automatically terminated when leaving with block

118

```

119

120

### Socket Operations

121

122

The Socket class provides methods for connecting, binding, sending, and receiving messages across all ZMQ socket types.

123

124

```python { .api }

125

class Socket:

126

def bind(self, address: str) -> SocketContext:

127

"""

128

Bind socket to an address. Returns context manager for automatic unbind.

129

130

Parameters:

131

- address: Address string (tcp://*:5555, ipc:///tmp/socket, inproc://workers)

132

133

Returns:

134

- SocketContext: Context manager for automatic unbind on exit

135

"""

136

137

def bind_to_random_port(self, address: str, min_port: int = 49152, max_port: int = 65536, max_tries: int = 100) -> int:

138

"""

139

Bind socket to a random port in the specified range.

140

141

Parameters:

142

- address: Address template (tcp://*:%s)

143

- min_port: Minimum port number

144

- max_port: Maximum port number

145

- max_tries: Maximum binding attempts

146

147

Returns:

148

- int: The port number that was bound

149

"""

150

151

def connect(self, address: str) -> SocketContext:

152

"""

153

Connect socket to an address. Returns context manager for automatic disconnect.

154

155

Parameters:

156

- address: Address string (tcp://localhost:5555, ipc:///tmp/socket)

157

158

Returns:

159

- SocketContext: Context manager for automatic disconnect on exit

160

"""

161

162

def disconnect(self, address: str) -> None:

163

"""

164

Disconnect socket from an address.

165

166

Parameters:

167

- address: Address string to disconnect from

168

"""

169

170

def unbind(self, address: str) -> None:

171

"""

172

Unbind socket from an address.

173

174

Parameters:

175

- address: Address string to unbind from

176

"""

177

178

def close(self, linger: int = None) -> None:

179

"""

180

Close the socket.

181

182

Parameters:

183

- linger: Linger period in milliseconds (None for default)

184

"""

185

186

def __enter__(self) -> Socket:

187

"""Context manager entry."""

188

189

def __exit__(self, exc_type, exc_value, traceback) -> None:

190

"""Context manager exit - closes socket."""

191

192

def poll(self, timeout: int = -1, flags: int = POLLIN) -> int:

193

"""

194

Poll socket for events.

195

196

Parameters:

197

- timeout: Timeout in milliseconds (-1 for infinite)

198

- flags: Poll flags (POLLIN, POLLOUT, POLLERR)

199

200

Returns:

201

- int: Events that occurred (bitmask)

202

"""

203

204

def fileno(self) -> int:

205

"""

206

Get file descriptor for socket integration with select/poll.

207

208

Returns:

209

- int: File descriptor

210

"""

211

212

def subscribe(self, topic: str | bytes) -> None:

213

"""

214

Subscribe to a topic (SUB sockets only).

215

216

Parameters:

217

- topic: Topic to subscribe to

218

"""

219

220

def unsubscribe(self, topic: str | bytes) -> None:

221

"""

222

Unsubscribe from a topic (SUB sockets only).

223

224

Parameters:

225

- topic: Topic to unsubscribe from

226

"""

227

228

@classmethod

229

def shadow(cls, address: int | Socket) -> Socket:

230

"""

231

Shadow an existing libzmq socket.

232

233

Parameters:

234

- address: Socket or integer address to shadow

235

236

Returns:

237

- Socket: New socket shadowing the existing one

238

"""

239

240

@property

241

def underlying(self) -> int:

242

"""Integer address of the underlying libzmq socket."""

243

244

@property

245

def type(self) -> int:

246

"""Socket type (REQ, REP, PUB, SUB, etc.)."""

247

248

@property

249

def last_endpoint(self) -> str:

250

"""Last bound or connected endpoint address."""

251

252

@property

253

def copy_threshold(self) -> int:

254

"""Threshold for copying vs zero-copy operations."""

255

256

@copy_threshold.setter

257

def copy_threshold(self, value: int) -> None:

258

"""Set copy threshold."""

259

260

@property

261

def closed(self) -> bool:

262

"""True if the socket has been closed."""

263

```

264

265

### Message Sending

266

267

Methods for sending various data types with optional flags and routing information.

268

269

```python { .api }

270

def send(self, data: Union[bytes, Frame], flags: int = 0, copy: bool = True, track: bool = False) -> Optional[MessageTracker]:

271

"""

272

Send a message.

273

274

Parameters:

275

- data: Message data as bytes or Frame

276

- flags: Send flags (NOBLOCK, SNDMORE)

277

- copy: Whether to copy the message data

278

- track: Whether to return a MessageTracker

279

280

Returns:

281

- MessageTracker: If track=True, tracker for send completion

282

"""

283

284

def send_string(self, string: str, flags: int = 0, encoding: str = 'utf-8', copy: bool = True, track: bool = False) -> MessageTracker | None:

285

"""

286

Send a string message.

287

288

Parameters:

289

- string: String to send

290

- flags: Send flags (NOBLOCK, SNDMORE)

291

- encoding: String encoding (default: utf-8)

292

"""

293

294

def send_pyobj(self, obj: Any, flags: int = 0, protocol: int = DEFAULT_PROTOCOL, copy: bool = True, track: bool = False) -> MessageTracker | None:

295

"""

296

Send a Python object using pickle.

297

298

Parameters:

299

- obj: Python object to send

300

- flags: Send flags

301

- protocol: Pickle protocol version

302

"""

303

304

def send_json(self, obj: Any, flags: int = 0, copy: bool = True, track: bool = False, **kwargs) -> MessageTracker | None:

305

"""

306

Send a JSON-serializable object.

307

308

Parameters:

309

- obj: JSON-serializable object

310

- flags: Send flags

311

- kwargs: Additional arguments for json.dumps()

312

"""

313

314

def send_multipart(self, msg_parts: list, flags: int = 0, copy: bool = True, track: bool = False) -> MessageTracker | None:

315

"""

316

Send a multipart message.

317

318

Parameters:

319

- msg_parts: List of message parts (bytes, strings, or Frames)

320

- flags: Send flags

321

- copy: Whether to copy message data

322

- track: Whether to return MessageTracker

323

324

Returns:

325

- MessageTracker: If track=True, tracker for send completion

326

"""

327

328

def send_serialized(self, msg: Any, serialize: Callable, flags: int = 0, copy: bool = True, track: bool = False) -> MessageTracker | None:

329

"""

330

Send a message with custom serialization.

331

332

Parameters:

333

- msg: Message to serialize and send

334

- serialize: Serialization function

335

- flags: Send flags

336

- copy: Whether to copy message data

337

- track: Whether to return MessageTracker

338

339

Returns:

340

- MessageTracker: If track=True, tracker for send completion

341

"""

342

"""

343

Send a multipart message.

344

345

Parameters:

346

- msg_parts: List of message parts (bytes, strings, or Frames)

347

- flags: Send flags

348

- copy: Whether to copy message data

349

- track: Whether to return MessageTracker

350

351

Returns:

352

- MessageTracker: If track=True, tracker for send completion

353

"""

354

```

355

356

### Message Receiving

357

358

Methods for receiving various data types with optional flags and timeout handling.

359

360

```python { .api }

361

def recv(self, flags: int = 0, copy: bool = True, track: bool = False) -> Union[bytes, Frame]:

362

"""

363

Receive a message.

364

365

Parameters:

366

- flags: Receive flags (NOBLOCK)

367

- copy: Whether to copy the message data

368

- track: Whether to return a Frame with tracking

369

370

Returns:

371

- bytes or Frame: Received message data

372

"""

373

374

def recv_string(self, flags: int = 0, encoding: str = 'utf-8') -> str:

375

"""

376

Receive a string message.

377

378

Parameters:

379

- flags: Receive flags (NOBLOCK)

380

- encoding: String encoding (default: utf-8)

381

382

Returns:

383

- str: Received string

384

"""

385

386

def recv_pyobj(self, flags: int = 0) -> Any:

387

"""

388

Receive a Python object using pickle.

389

390

Parameters:

391

- flags: Receive flags

392

393

Returns:

394

- Any: Unpickled Python object

395

"""

396

397

def recv_json(self, flags: int = 0, **kwargs) -> Any:

398

"""

399

Receive a JSON object.

400

401

Parameters:

402

- flags: Receive flags

403

- kwargs: Additional arguments for json.loads()

404

405

Returns:

406

- Any: Deserialized JSON object

407

"""

408

409

def recv_multipart(self, flags: int = 0, copy: bool = True, track: bool = False) -> list:

410

"""

411

Receive a multipart message.

412

413

Parameters:

414

- flags: Receive flags

415

- copy: Whether to copy message data

416

- track: Whether to return Frames with tracking

417

418

Returns:

419

- list: List of message parts

420

"""

421

422

def recv_serialized(self, deserialize: Callable, flags: int = 0, copy: bool = True) -> Any:

423

"""

424

Receive a message with custom deserialization.

425

426

Parameters:

427

- deserialize: Deserialization function

428

- flags: Receive flags

429

- copy: Whether to copy message data

430

431

Returns:

432

- Any: Deserialized message

433

"""

434

435

def recv_into(self, buf: Any, flags: int = 0, copy: bool = True, track: bool = False) -> int:

436

"""

437

Receive a message into an existing buffer.

438

439

Parameters:

440

- buf: Buffer to receive into

441

- flags: Receive flags

442

- copy: Whether to copy message data

443

- track: Whether to return Frame with tracking

444

445

Returns:

446

- int: Number of bytes received

447

"""

448

```

449

450

### Socket Configuration

451

452

Methods for getting and setting socket options that control behavior, performance, and protocol settings.

453

454

```python { .api }

455

def set(self, option: int, value: int | bytes | str) -> None:

456

"""

457

Set a socket option (preferred method name).

458

459

Parameters:

460

- option: Socket option constant (LINGER, RCVHWM, SNDHWM, etc.)

461

- value: Option value (type depends on option)

462

"""

463

464

def setsockopt(self, option: int, value: int | bytes | str) -> None:

465

"""

466

Set a socket option.

467

468

Parameters:

469

- option: Socket option constant (LINGER, RCVHWM, SNDHWM, etc.)

470

- value: Option value (type depends on option)

471

"""

472

473

def get(self, option: int) -> int | bytes:

474

"""

475

Get a socket option value (preferred method name).

476

477

Parameters:

478

- option: Socket option constant

479

480

Returns:

481

- int or bytes: Current option value

482

"""

483

484

def getsockopt(self, option: int) -> int | bytes:

485

"""

486

Get a socket option value.

487

488

Parameters:

489

- option: Socket option constant

490

491

Returns:

492

- int or bytes: Current option value

493

"""

494

495

def set_string(self, option: int, value: str, encoding: str = 'utf-8') -> None:

496

"""

497

Set a socket option with string value (preferred method name).

498

499

Parameters:

500

- option: Socket option constant

501

- value: String value

502

- encoding: String encoding

503

"""

504

505

def setsockopt_string(self, option: int, value: str, encoding: str = 'utf-8') -> None:

506

"""

507

Set a socket option with string value.

508

509

Parameters:

510

- option: Socket option constant

511

- value: String value

512

- encoding: String encoding

513

"""

514

515

def get_string(self, option: int, encoding: str = 'utf-8') -> str:

516

"""

517

Get a socket option as string (preferred method name).

518

519

Parameters:

520

- option: Socket option constant

521

- encoding: String encoding

522

523

Returns:

524

- str: Option value as string

525

"""

526

527

def getsockopt_string(self, option: int, encoding: str = 'utf-8') -> str:

528

"""

529

Get a socket option as string.

530

531

Parameters:

532

- option: Socket option constant

533

- encoding: String encoding

534

535

Returns:

536

- str: Option value as string

537

"""

538

539

@property

540

def hwm(self) -> int:

541

"""High water mark for both send and receive."""

542

543

@hwm.setter

544

def hwm(self, value: int) -> None:

545

"""Set high water mark for both send and receive."""

546

547

@property

548

def linger(self) -> int:

549

"""Linger period for socket closure."""

550

551

@linger.setter

552

def linger(self, value: int) -> None:

553

"""Set linger period for socket closure."""

554

```

555

556

### Socket Monitoring

557

558

Methods for monitoring socket events and state changes.

559

560

```python { .api }

561

def monitor(self, address: str, events: int = EVENT_ALL) -> None:

562

"""

563

Start monitoring socket events.

564

565

Parameters:

566

- address: Address for monitor socket (inproc://monitor.socket)

567

- events: Bitmask of events to monitor

568

"""

569

570

def get_monitor_socket(self, events: int = EVENT_ALL, addr: str = None) -> Socket:

571

"""

572

Get a PAIR socket for receiving monitor events.

573

574

Parameters:

575

- events: Bitmask of events to monitor

576

- addr: Optional address for monitor socket

577

578

Returns:

579

- Socket: PAIR socket for receiving events

580

"""

581

582

def disable_monitor(self) -> None:

583

"""Stop monitoring socket events."""

584

```

585

586

## Usage Examples

587

588

### Request-Reply Pattern

589

590

```python

591

import zmq

592

593

# Server

594

with zmq.Context() as context:

595

socket = context.socket(zmq.REP)

596

socket.bind("tcp://*:5555")

597

598

while True:

599

message = socket.recv_string()

600

print(f"Received: {message}")

601

socket.send_string(f"Echo: {message}")

602

603

# Client

604

with zmq.Context() as context:

605

socket = context.socket(zmq.REQ)

606

socket.connect("tcp://localhost:5555")

607

608

socket.send_string("Hello World")

609

reply = socket.recv_string()

610

print(f"Reply: {reply}")

611

```

612

613

### Publisher-Subscriber Pattern

614

615

```python

616

import time

617

import zmq

618

619

# Publisher

620

with zmq.Context() as context:

621

socket = context.socket(zmq.PUB)

622

socket.bind("tcp://*:5556")

623

624

for i in range(100):

625

topic = "weather" if i % 2 else "news"

626

message = f"{topic} Update {i}"

627

socket.send_string(f"{topic} {message}")

628

time.sleep(0.1)

629

630

# Subscriber

631

with zmq.Context() as context:

632

socket = context.socket(zmq.SUB)

633

socket.connect("tcp://localhost:5556")

634

socket.setsockopt_string(zmq.SUBSCRIBE, "weather")

635

636

while True:

637

message = socket.recv_string()

638

print(f"Received: {message}")

639

```

640

641

## Types

642

643

```python { .api }

644

from typing import Union, Optional, Any, List, Callable

645

646

# Message data types

647

MessageData = Union[bytes, str, memoryview, Frame]

648

MultipartMessage = List[MessageData]

649

650

# Socket option value types

651

OptionValue = Union[int, bytes, str]

652

653

# Address types

654

Address = str

655

656

# Context manager type

657

SocketContext = Any # Context manager for bind/connect operations

658

659

# Serialization function types

660

Serializer = Callable[[Any], bytes]

661

Deserializer = Callable[[bytes], Any]

662

```