0
# Core Messaging
1
2
The fundamental Context and Socket classes that form the foundation of all ZMQ communication. These classes provide synchronous messaging operations and support all ZMQ socket types and messaging patterns.
3
4
## Capabilities
5
6
### Context Management
7
8
The Context class manages ZMQ contexts, which are containers for all sockets in a single process. Contexts handle I/O threads, socket limits, and global settings.
9
10
```python { .api }
11
class Context:
12
def __init__(self, io_threads: int | Context = 1, shadow: Context | int = 0) -> None:
13
"""
14
Create a new ZMQ context.
15
16
Parameters:
17
- io_threads: Number of I/O threads or existing Context to shadow (default: 1)
18
- shadow: Context or address to shadow (default: 0)
19
"""
20
21
def socket(self, socket_type: int) -> Socket:
22
"""
23
Create a socket of the specified type.
24
25
Parameters:
26
- socket_type: ZMQ socket type constant (REQ, REP, PUB, SUB, etc.)
27
28
Returns:
29
- Socket: New socket instance
30
"""
31
32
def term(self) -> None:
33
"""Terminate the context and close all associated sockets."""
34
35
def destroy(self, linger: int = None) -> None:
36
"""
37
Close all sockets and terminate context with optional linger period.
38
39
Parameters:
40
- linger: Time in milliseconds to wait for messages to be sent
41
"""
42
43
def set(self, option: int, value: int) -> None:
44
"""
45
Set a context option.
46
47
Parameters:
48
- option: Context option constant (IO_THREADS, MAX_SOCKETS, etc.)
49
- value: Option value
50
"""
51
52
def get(self, option: int) -> int:
53
"""
54
Get a context option value.
55
56
Parameters:
57
- option: Context option constant
58
59
Returns:
60
- int: Current option value
61
"""
62
63
def __enter__(self) -> Context:
64
"""Context manager entry."""
65
66
def __exit__(self, exc_type, exc_value, traceback) -> None:
67
"""Context manager exit - destroys context."""
68
69
@classmethod
70
def instance(cls, io_threads: int = 1) -> Context:
71
"""
72
Return a global Context instance.
73
74
Parameters:
75
- io_threads: Number of I/O threads for new instance
76
77
Returns:
78
- Context: Global singleton context instance
79
"""
80
81
@classmethod
82
def shadow(cls, address: int | Context) -> Context:
83
"""
84
Shadow an existing libzmq context.
85
86
Parameters:
87
- address: Context or integer address to shadow
88
89
Returns:
90
- Context: New context shadowing the existing one
91
"""
92
93
@property
94
def underlying(self) -> int:
95
"""Integer address of the underlying libzmq context."""
96
97
@property
98
def closed(self) -> bool:
99
"""True if the context has been terminated."""
100
```
101
102
Usage example:
103
104
```python
105
import zmq
106
107
# Create context with 2 I/O threads
108
context = zmq.Context(io_threads=2)
109
110
# Set context options
111
context.set(zmq.MAX_SOCKETS, 1024)
112
113
# Use as context manager for automatic cleanup
114
with zmq.Context() as ctx:
115
socket = ctx.socket(zmq.REQ)
116
# Socket operations...
117
# Context automatically terminated when leaving with block
118
```
119
120
### Socket Operations
121
122
The Socket class provides methods for connecting, binding, sending, and receiving messages across all ZMQ socket types.
123
124
```python { .api }
125
class Socket:
126
def bind(self, address: str) -> SocketContext:
127
"""
128
Bind socket to an address. Returns context manager for automatic unbind.
129
130
Parameters:
131
- address: Address string (tcp://*:5555, ipc:///tmp/socket, inproc://workers)
132
133
Returns:
134
- SocketContext: Context manager for automatic unbind on exit
135
"""
136
137
def bind_to_random_port(self, address: str, min_port: int = 49152, max_port: int = 65536, max_tries: int = 100) -> int:
138
"""
139
Bind socket to a random port in the specified range.
140
141
Parameters:
142
- address: Address template (tcp://*:%s)
143
- min_port: Minimum port number
144
- max_port: Maximum port number
145
- max_tries: Maximum binding attempts
146
147
Returns:
148
- int: The port number that was bound
149
"""
150
151
def connect(self, address: str) -> SocketContext:
152
"""
153
Connect socket to an address. Returns context manager for automatic disconnect.
154
155
Parameters:
156
- address: Address string (tcp://localhost:5555, ipc:///tmp/socket)
157
158
Returns:
159
- SocketContext: Context manager for automatic disconnect on exit
160
"""
161
162
def disconnect(self, address: str) -> None:
163
"""
164
Disconnect socket from an address.
165
166
Parameters:
167
- address: Address string to disconnect from
168
"""
169
170
def unbind(self, address: str) -> None:
171
"""
172
Unbind socket from an address.
173
174
Parameters:
175
- address: Address string to unbind from
176
"""
177
178
def close(self, linger: int = None) -> None:
179
"""
180
Close the socket.
181
182
Parameters:
183
- linger: Linger period in milliseconds (None for default)
184
"""
185
186
def __enter__(self) -> Socket:
187
"""Context manager entry."""
188
189
def __exit__(self, exc_type, exc_value, traceback) -> None:
190
"""Context manager exit - closes socket."""
191
192
def poll(self, timeout: int = -1, flags: int = POLLIN) -> int:
193
"""
194
Poll socket for events.
195
196
Parameters:
197
- timeout: Timeout in milliseconds (-1 for infinite)
198
- flags: Poll flags (POLLIN, POLLOUT, POLLERR)
199
200
Returns:
201
- int: Events that occurred (bitmask)
202
"""
203
204
def fileno(self) -> int:
205
"""
206
Get file descriptor for socket integration with select/poll.
207
208
Returns:
209
- int: File descriptor
210
"""
211
212
def subscribe(self, topic: str | bytes) -> None:
213
"""
214
Subscribe to a topic (SUB sockets only).
215
216
Parameters:
217
- topic: Topic to subscribe to
218
"""
219
220
def unsubscribe(self, topic: str | bytes) -> None:
221
"""
222
Unsubscribe from a topic (SUB sockets only).
223
224
Parameters:
225
- topic: Topic to unsubscribe from
226
"""
227
228
@classmethod
229
def shadow(cls, address: int | Socket) -> Socket:
230
"""
231
Shadow an existing libzmq socket.
232
233
Parameters:
234
- address: Socket or integer address to shadow
235
236
Returns:
237
- Socket: New socket shadowing the existing one
238
"""
239
240
@property
241
def underlying(self) -> int:
242
"""Integer address of the underlying libzmq socket."""
243
244
@property
245
def type(self) -> int:
246
"""Socket type (REQ, REP, PUB, SUB, etc.)."""
247
248
@property
249
def last_endpoint(self) -> str:
250
"""Last bound or connected endpoint address."""
251
252
@property
253
def copy_threshold(self) -> int:
254
"""Threshold for copying vs zero-copy operations."""
255
256
@copy_threshold.setter
257
def copy_threshold(self, value: int) -> None:
258
"""Set copy threshold."""
259
260
@property
261
def closed(self) -> bool:
262
"""True if the socket has been closed."""
263
```
264
265
### Message Sending
266
267
Methods for sending various data types with optional flags and routing information.
268
269
```python { .api }
270
def send(self, data: Union[bytes, Frame], flags: int = 0, copy: bool = True, track: bool = False) -> Optional[MessageTracker]:
271
"""
272
Send a message.
273
274
Parameters:
275
- data: Message data as bytes or Frame
276
- flags: Send flags (NOBLOCK, SNDMORE)
277
- copy: Whether to copy the message data
278
- track: Whether to return a MessageTracker
279
280
Returns:
281
- MessageTracker: If track=True, tracker for send completion
282
"""
283
284
def send_string(self, string: str, flags: int = 0, encoding: str = 'utf-8', copy: bool = True, track: bool = False) -> MessageTracker | None:
285
"""
286
Send a string message.
287
288
Parameters:
289
- string: String to send
290
- flags: Send flags (NOBLOCK, SNDMORE)
291
- encoding: String encoding (default: utf-8)
292
"""
293
294
def send_pyobj(self, obj: Any, flags: int = 0, protocol: int = DEFAULT_PROTOCOL, copy: bool = True, track: bool = False) -> MessageTracker | None:
295
"""
296
Send a Python object using pickle.
297
298
Parameters:
299
- obj: Python object to send
300
- flags: Send flags
301
- protocol: Pickle protocol version
302
"""
303
304
def send_json(self, obj: Any, flags: int = 0, copy: bool = True, track: bool = False, **kwargs) -> MessageTracker | None:
305
"""
306
Send a JSON-serializable object.
307
308
Parameters:
309
- obj: JSON-serializable object
310
- flags: Send flags
311
- kwargs: Additional arguments for json.dumps()
312
"""
313
314
def send_multipart(self, msg_parts: list, flags: int = 0, copy: bool = True, track: bool = False) -> MessageTracker | None:
315
"""
316
Send a multipart message.
317
318
Parameters:
319
- msg_parts: List of message parts (bytes, strings, or Frames)
320
- flags: Send flags
321
- copy: Whether to copy message data
322
- track: Whether to return MessageTracker
323
324
Returns:
325
- MessageTracker: If track=True, tracker for send completion
326
"""
327
328
def send_serialized(self, msg: Any, serialize: Callable, flags: int = 0, copy: bool = True, track: bool = False) -> MessageTracker | None:
329
"""
330
Send a message with custom serialization.
331
332
Parameters:
333
- msg: Message to serialize and send
334
- serialize: Serialization function
335
- flags: Send flags
336
- copy: Whether to copy message data
337
- track: Whether to return MessageTracker
338
339
Returns:
340
- MessageTracker: If track=True, tracker for send completion
341
"""
342
"""
343
Send a multipart message.
344
345
Parameters:
346
- msg_parts: List of message parts (bytes, strings, or Frames)
347
- flags: Send flags
348
- copy: Whether to copy message data
349
- track: Whether to return MessageTracker
350
351
Returns:
352
- MessageTracker: If track=True, tracker for send completion
353
"""
354
```
355
356
### Message Receiving
357
358
Methods for receiving various data types with optional flags and timeout handling.
359
360
```python { .api }
361
def recv(self, flags: int = 0, copy: bool = True, track: bool = False) -> Union[bytes, Frame]:
362
"""
363
Receive a message.
364
365
Parameters:
366
- flags: Receive flags (NOBLOCK)
367
- copy: Whether to copy the message data
368
- track: Whether to return a Frame with tracking
369
370
Returns:
371
- bytes or Frame: Received message data
372
"""
373
374
def recv_string(self, flags: int = 0, encoding: str = 'utf-8') -> str:
375
"""
376
Receive a string message.
377
378
Parameters:
379
- flags: Receive flags (NOBLOCK)
380
- encoding: String encoding (default: utf-8)
381
382
Returns:
383
- str: Received string
384
"""
385
386
def recv_pyobj(self, flags: int = 0) -> Any:
387
"""
388
Receive a Python object using pickle.
389
390
Parameters:
391
- flags: Receive flags
392
393
Returns:
394
- Any: Unpickled Python object
395
"""
396
397
def recv_json(self, flags: int = 0, **kwargs) -> Any:
398
"""
399
Receive a JSON object.
400
401
Parameters:
402
- flags: Receive flags
403
- kwargs: Additional arguments for json.loads()
404
405
Returns:
406
- Any: Deserialized JSON object
407
"""
408
409
def recv_multipart(self, flags: int = 0, copy: bool = True, track: bool = False) -> list:
410
"""
411
Receive a multipart message.
412
413
Parameters:
414
- flags: Receive flags
415
- copy: Whether to copy message data
416
- track: Whether to return Frames with tracking
417
418
Returns:
419
- list: List of message parts
420
"""
421
422
def recv_serialized(self, deserialize: Callable, flags: int = 0, copy: bool = True) -> Any:
423
"""
424
Receive a message with custom deserialization.
425
426
Parameters:
427
- deserialize: Deserialization function
428
- flags: Receive flags
429
- copy: Whether to copy message data
430
431
Returns:
432
- Any: Deserialized message
433
"""
434
435
def recv_into(self, buf: Any, flags: int = 0, copy: bool = True, track: bool = False) -> int:
436
"""
437
Receive a message into an existing buffer.
438
439
Parameters:
440
- buf: Buffer to receive into
441
- flags: Receive flags
442
- copy: Whether to copy message data
443
- track: Whether to return Frame with tracking
444
445
Returns:
446
- int: Number of bytes received
447
"""
448
```
449
450
### Socket Configuration
451
452
Methods for getting and setting socket options that control behavior, performance, and protocol settings.
453
454
```python { .api }
455
def set(self, option: int, value: int | bytes | str) -> None:
456
"""
457
Set a socket option (preferred method name).
458
459
Parameters:
460
- option: Socket option constant (LINGER, RCVHWM, SNDHWM, etc.)
461
- value: Option value (type depends on option)
462
"""
463
464
def setsockopt(self, option: int, value: int | bytes | str) -> None:
465
"""
466
Set a socket option.
467
468
Parameters:
469
- option: Socket option constant (LINGER, RCVHWM, SNDHWM, etc.)
470
- value: Option value (type depends on option)
471
"""
472
473
def get(self, option: int) -> int | bytes:
474
"""
475
Get a socket option value (preferred method name).
476
477
Parameters:
478
- option: Socket option constant
479
480
Returns:
481
- int or bytes: Current option value
482
"""
483
484
def getsockopt(self, option: int) -> int | bytes:
485
"""
486
Get a socket option value.
487
488
Parameters:
489
- option: Socket option constant
490
491
Returns:
492
- int or bytes: Current option value
493
"""
494
495
def set_string(self, option: int, value: str, encoding: str = 'utf-8') -> None:
496
"""
497
Set a socket option with string value (preferred method name).
498
499
Parameters:
500
- option: Socket option constant
501
- value: String value
502
- encoding: String encoding
503
"""
504
505
def setsockopt_string(self, option: int, value: str, encoding: str = 'utf-8') -> None:
506
"""
507
Set a socket option with string value.
508
509
Parameters:
510
- option: Socket option constant
511
- value: String value
512
- encoding: String encoding
513
"""
514
515
def get_string(self, option: int, encoding: str = 'utf-8') -> str:
516
"""
517
Get a socket option as string (preferred method name).
518
519
Parameters:
520
- option: Socket option constant
521
- encoding: String encoding
522
523
Returns:
524
- str: Option value as string
525
"""
526
527
def getsockopt_string(self, option: int, encoding: str = 'utf-8') -> str:
528
"""
529
Get a socket option as string.
530
531
Parameters:
532
- option: Socket option constant
533
- encoding: String encoding
534
535
Returns:
536
- str: Option value as string
537
"""
538
539
@property
540
def hwm(self) -> int:
541
"""High water mark for both send and receive."""
542
543
@hwm.setter
544
def hwm(self, value: int) -> None:
545
"""Set high water mark for both send and receive."""
546
547
@property
548
def linger(self) -> int:
549
"""Linger period for socket closure."""
550
551
@linger.setter
552
def linger(self, value: int) -> None:
553
"""Set linger period for socket closure."""
554
```
555
556
### Socket Monitoring
557
558
Methods for monitoring socket events and state changes.
559
560
```python { .api }
561
def monitor(self, address: str, events: int = EVENT_ALL) -> None:
562
"""
563
Start monitoring socket events.
564
565
Parameters:
566
- address: Address for monitor socket (inproc://monitor.socket)
567
- events: Bitmask of events to monitor
568
"""
569
570
def get_monitor_socket(self, events: int = EVENT_ALL, addr: str = None) -> Socket:
571
"""
572
Get a PAIR socket for receiving monitor events.
573
574
Parameters:
575
- events: Bitmask of events to monitor
576
- addr: Optional address for monitor socket
577
578
Returns:
579
- Socket: PAIR socket for receiving events
580
"""
581
582
def disable_monitor(self) -> None:
583
"""Stop monitoring socket events."""
584
```
585
586
## Usage Examples
587
588
### Request-Reply Pattern
589
590
```python
591
import zmq
592
593
# Server
594
with zmq.Context() as context:
595
socket = context.socket(zmq.REP)
596
socket.bind("tcp://*:5555")
597
598
while True:
599
message = socket.recv_string()
600
print(f"Received: {message}")
601
socket.send_string(f"Echo: {message}")
602
603
# Client
604
with zmq.Context() as context:
605
socket = context.socket(zmq.REQ)
606
socket.connect("tcp://localhost:5555")
607
608
socket.send_string("Hello World")
609
reply = socket.recv_string()
610
print(f"Reply: {reply}")
611
```
612
613
### Publisher-Subscriber Pattern
614
615
```python
616
import time
617
import zmq
618
619
# Publisher
620
with zmq.Context() as context:
621
socket = context.socket(zmq.PUB)
622
socket.bind("tcp://*:5556")
623
624
for i in range(100):
625
topic = "weather" if i % 2 else "news"
626
message = f"{topic} Update {i}"
627
socket.send_string(f"{topic} {message}")
628
time.sleep(0.1)
629
630
# Subscriber
631
with zmq.Context() as context:
632
socket = context.socket(zmq.SUB)
633
socket.connect("tcp://localhost:5556")
634
socket.setsockopt_string(zmq.SUBSCRIBE, "weather")
635
636
while True:
637
message = socket.recv_string()
638
print(f"Received: {message}")
639
```
640
641
## Types
642
643
```python { .api }
644
from typing import Union, Optional, Any, List, Callable
645
646
# Message data types
647
MessageData = Union[bytes, str, memoryview, Frame]
648
MultipartMessage = List[MessageData]
649
650
# Socket option value types
651
OptionValue = Union[int, bytes, str]
652
653
# Address types
654
Address = str
655
656
# Context manager type
657
SocketContext = Any # Context manager for bind/connect operations
658
659
# Serialization function types
660
Serializer = Callable[[Any], bytes]
661
Deserializer = Callable[[bytes], Any]
662
```