0
# Async Support
1
2
Native async/await support with AsyncIO-compatible Context, Socket, and Poller classes that integrate seamlessly with Python's event loop for non-blocking messaging operations.
3
4
## Capabilities
5
6
### AsyncIO Context
7
8
Async-compatible context manager that integrates with Python's asyncio event loop for non-blocking socket operations.
9
10
```python { .api }
11
class Context:
12
def __init__(self, io_threads: int | zmq.Context = 1, shadow: zmq.Context | int = 0) -> None:
13
"""
14
Create a new async ZMQ context.
15
16
Parameters:
17
- io_threads: Number of I/O threads or existing Context to shadow (default: 1)
18
- shadow: Context or address to shadow (default: 0)
19
"""
20
21
def socket(self, socket_type: int, **kwargs) -> Socket:
22
"""
23
Create an async socket of the specified type.
24
25
Parameters:
26
- socket_type: ZMQ socket type constant
27
28
Returns:
29
- Socket: New async socket instance
30
"""
31
32
def term(self) -> None:
33
"""Terminate the context and close all sockets."""
34
35
def destroy(self, linger: int = None) -> None:
36
"""
37
Close all sockets and terminate context.
38
39
Parameters:
40
- linger: Linger period in milliseconds
41
"""
42
43
def __enter__(self) -> Context:
44
"""Context manager entry."""
45
46
def __exit__(self, exc_type, exc_value, traceback) -> None:
47
"""Context manager exit - destroys context."""
48
49
@property
50
def closed(self) -> bool:
51
"""True if the context has been terminated."""
52
```
53
54
### AsyncIO Socket
55
56
Async socket class providing non-blocking send/receive operations that work with Python's async/await syntax. Inherits all methods from the base Socket class with async versions where applicable.
57
58
```python { .api }
59
class Socket:
60
def bind(self, address: str) -> None:
61
"""Bind socket to an address."""
62
63
def connect(self, address: str) -> None:
64
"""Connect socket to an address."""
65
66
async def send(self, data: bytes | Frame, flags: int = 0, copy: bool = True, track: bool = False) -> MessageTracker | None:
67
"""
68
Send a message asynchronously.
69
70
Parameters:
71
- data: Message data
72
- flags: Send flags (NOBLOCK, SNDMORE)
73
- copy: Whether to copy data
74
- track: Whether to return MessageTracker
75
76
Returns:
77
- MessageTracker: If track=True
78
"""
79
80
async def recv(self, flags: int = 0, copy: bool = True, track: bool = False) -> bytes | Frame:
81
"""
82
Receive a message asynchronously.
83
84
Parameters:
85
- flags: Receive flags (NOBLOCK)
86
- copy: Whether to copy data
87
- track: Whether to return Frame with tracking
88
89
Returns:
90
- bytes or Frame: Received message data
91
"""
92
93
async def send_string(self, string: str, flags: int = 0, encoding: str = 'utf-8', copy: bool = True, track: bool = False) -> MessageTracker | None:
94
"""
95
Send a string message asynchronously.
96
97
Parameters:
98
- string: String to send
99
- flags: Send flags
100
- encoding: String encoding
101
"""
102
103
async def recv_string(self, flags: int = 0, encoding: str = 'utf-8', copy: bool = True) -> str:
104
"""
105
Receive a string message asynchronously.
106
107
Parameters:
108
- flags: Receive flags
109
- encoding: String encoding
110
111
Returns:
112
- str: Received string
113
"""
114
115
async def send_json(self, obj: Any, flags: int = 0, **kwargs) -> None:
116
"""
117
Send a JSON object asynchronously.
118
119
Parameters:
120
- obj: JSON-serializable object
121
- flags: Send flags
122
- kwargs: Additional json.dumps() arguments
123
"""
124
125
async def recv_json(self, flags: int = 0, **kwargs) -> Any:
126
"""
127
Receive a JSON object asynchronously.
128
129
Parameters:
130
- flags: Receive flags
131
- kwargs: Additional json.loads() arguments
132
133
Returns:
134
- Any: Deserialized JSON object
135
"""
136
137
async def send_pyobj(self, obj: Any, flags: int = 0, protocol: int = DEFAULT_PROTOCOL) -> None:
138
"""
139
Send a Python object asynchronously using pickle.
140
141
Parameters:
142
- obj: Python object to send
143
- flags: Send flags
144
- protocol: Pickle protocol version
145
"""
146
147
async def recv_pyobj(self, flags: int = 0) -> Any:
148
"""
149
Receive a Python object asynchronously using pickle.
150
151
Parameters:
152
- flags: Receive flags
153
154
Returns:
155
- Any: Unpickled Python object
156
"""
157
158
async def send_multipart(self, msg_parts: list, flags: int = 0, copy: bool = True, track: bool = False) -> MessageTracker | None:
159
"""
160
Send a multipart message asynchronously.
161
162
Parameters:
163
- msg_parts: List of message parts
164
- flags: Send flags
165
- copy: Whether to copy data
166
- track: Whether to return MessageTracker
167
168
Returns:
169
- MessageTracker: If track=True
170
"""
171
172
async def recv_multipart(self, flags: int = 0, copy: bool = True, track: bool = False) -> list:
173
"""
174
Receive a multipart message asynchronously.
175
176
Parameters:
177
- flags: Receive flags
178
- copy: Whether to copy data
179
- track: Whether to return Frames with tracking
180
181
Returns:
182
- list: List of message parts
183
"""
184
185
def close(self, linger: int = None) -> None:
186
"""Close the socket."""
187
188
@property
189
def closed(self) -> bool:
190
"""True if socket is closed."""
191
```
192
193
### AsyncIO Polling
194
195
Async poller for monitoring multiple sockets with non-blocking event detection.
196
197
```python { .api }
198
class Poller:
199
def register(self, socket: Socket | zmq.Socket, flags: int = POLLIN | POLLOUT) -> None:
200
"""
201
Register a socket for polling.
202
203
Parameters:
204
- socket: Socket to monitor
205
- flags: Event flags (POLLIN, POLLOUT, POLLERR)
206
"""
207
208
def unregister(self, socket: Socket | zmq.Socket) -> None:
209
"""
210
Unregister a socket from polling.
211
212
Parameters:
213
- socket: Socket to unregister
214
"""
215
216
async def poll(self, timeout: int = -1) -> list[tuple[Socket, int]]:
217
"""
218
Poll for events asynchronously.
219
220
Parameters:
221
- timeout: Timeout in milliseconds (-1 for infinite)
222
223
Returns:
224
- list: List of (socket, events) tuples
225
"""
226
```
227
228
## Usage Examples
229
230
### Async Request-Reply Server
231
232
```python
233
import asyncio
234
import zmq.asyncio
235
236
async def server():
237
context = zmq.asyncio.Context()
238
socket = context.socket(zmq.REP)
239
socket.bind("tcp://*:5555")
240
241
try:
242
while True:
243
# Non-blocking receive
244
message = await socket.recv_string()
245
print(f"Received: {message}")
246
247
# Simulate async processing
248
await asyncio.sleep(0.1)
249
250
# Non-blocking send
251
await socket.send_string(f"Echo: {message}")
252
finally:
253
socket.close()
254
context.term()
255
256
# Run the server
257
asyncio.run(server())
258
```
259
260
### Async Request-Reply Client
261
262
```python
263
import asyncio
264
import zmq.asyncio
265
266
async def client():
267
context = zmq.asyncio.Context()
268
socket = context.socket(zmq.REQ)
269
socket.connect("tcp://localhost:5555")
270
271
try:
272
# Send multiple requests concurrently
273
tasks = []
274
for i in range(10):
275
task = send_request(socket, f"Request {i}")
276
tasks.append(task)
277
278
responses = await asyncio.gather(*tasks)
279
for response in responses:
280
print(f"Response: {response}")
281
finally:
282
socket.close()
283
context.term()
284
285
async def send_request(socket, message):
286
await socket.send_string(message)
287
return await socket.recv_string()
288
289
asyncio.run(client())
290
```
291
292
### Async Publisher
293
294
```python
295
import asyncio
296
import zmq.asyncio
297
298
async def publisher():
299
context = zmq.asyncio.Context()
300
socket = context.socket(zmq.PUB)
301
socket.bind("tcp://*:5556")
302
303
try:
304
i = 0
305
while True:
306
topic = "weather" if i % 2 else "news"
307
message = f"Update {i}"
308
309
# Non-blocking send
310
await socket.send_string(f"{topic} {message}")
311
print(f"Published: {topic} {message}")
312
313
# Async sleep allows other coroutines to run
314
await asyncio.sleep(1)
315
i += 1
316
finally:
317
socket.close()
318
context.term()
319
320
asyncio.run(publisher())
321
```
322
323
### Async Subscriber
324
325
```python
326
import asyncio
327
import zmq.asyncio
328
329
async def subscriber():
330
context = zmq.asyncio.Context()
331
socket = context.socket(zmq.SUB)
332
socket.connect("tcp://localhost:5556")
333
socket.setsockopt_string(zmq.SUBSCRIBE, "weather")
334
335
try:
336
while True:
337
# Non-blocking receive
338
message = await socket.recv_string()
339
print(f"Received: {message}")
340
341
# Process message asynchronously
342
await process_weather_data(message)
343
finally:
344
socket.close()
345
context.term()
346
347
async def process_weather_data(message):
348
# Simulate async processing
349
await asyncio.sleep(0.1)
350
print(f"Processed: {message}")
351
352
asyncio.run(subscriber())
353
```
354
355
### Async Polling Multiple Sockets
356
357
```python
358
import asyncio
359
import zmq.asyncio
360
361
async def multi_socket_handler():
362
context = zmq.asyncio.Context()
363
364
# Create multiple sockets
365
frontend = context.socket(zmq.ROUTER)
366
frontend.bind("tcp://*:5555")
367
368
backend = context.socket(zmq.DEALER)
369
backend.bind("tcp://*:5556")
370
371
# Create async poller
372
poller = zmq.asyncio.Poller()
373
poller.register(frontend, zmq.POLLIN)
374
poller.register(backend, zmq.POLLIN)
375
376
try:
377
while True:
378
# Poll for events asynchronously
379
events = await poller.poll()
380
381
for socket, event in events:
382
if socket is frontend and event & zmq.POLLIN:
383
# Handle frontend message
384
message = await frontend.recv_multipart()
385
print(f"Frontend: {message}")
386
await backend.send_multipart(message)
387
388
elif socket is backend and event & zmq.POLLIN:
389
# Handle backend message
390
message = await backend.recv_multipart()
391
print(f"Backend: {message}")
392
await frontend.send_multipart(message)
393
finally:
394
frontend.close()
395
backend.close()
396
context.term()
397
398
asyncio.run(multi_socket_handler())
399
```
400
401
### Integration with Other Async Libraries
402
403
```python
404
import asyncio
405
import aiohttp
406
import zmq.asyncio
407
408
async def web_service_integration():
409
"""Example integrating ZMQ with aiohttp web service"""
410
context = zmq.asyncio.Context()
411
socket = context.socket(zmq.REQ)
412
socket.connect("tcp://localhost:5555")
413
414
async with aiohttp.ClientSession() as session:
415
try:
416
# Send ZMQ message
417
await socket.send_json({"action": "get_data", "id": 123})
418
response = await socket.recv_json()
419
420
# Use response in HTTP request
421
async with session.get(f"https://api.example.com/data/{response['id']}") as resp:
422
web_data = await resp.json()
423
424
# Send web data back via ZMQ
425
await socket.send_json({"web_data": web_data})
426
result = await socket.recv_json()
427
428
return result
429
finally:
430
socket.close()
431
context.term()
432
433
# Run with asyncio
434
result = asyncio.run(web_service_integration())
435
```
436
437
## Event Loop Integration
438
439
PyZMQ's async support automatically integrates with the current asyncio event loop:
440
441
```python
442
import asyncio
443
import zmq.asyncio
444
445
async def main():
446
# Context automatically uses current event loop
447
context = zmq.asyncio.Context()
448
socket = context.socket(zmq.REQ)
449
450
# All operations are non-blocking and event-loop aware
451
await socket.send_string("Hello")
452
response = await socket.recv_string()
453
454
socket.close()
455
context.term()
456
457
# Works with any asyncio event loop
458
if __name__ == "__main__":
459
asyncio.run(main())
460
```
461
462
## Imports
463
464
```python
465
import zmq
466
import zmq.asyncio
467
from zmq import Frame, MessageTracker, POLLIN, POLLOUT, DEFAULT_PROTOCOL
468
```
469
470
## Types
471
472
```python { .api }
473
from typing import Union, Optional, Any, List, Tuple, Awaitable
474
475
# Async message data types
476
AsyncMessageData = Union[bytes, str, memoryview, Frame]
477
AsyncMultipartMessage = List[AsyncMessageData]
478
479
# Async polling result
480
PollResult = List[Tuple[Socket, int]]
481
482
# Coroutine types
483
SendCoroutine = Awaitable[Optional[MessageTracker]]
484
RecvCoroutine = Awaitable[Union[bytes, Frame]]
485
StringCoroutine = Awaitable[str]
486
JsonCoroutine = Awaitable[Any]
487
MultipartCoroutine = Awaitable[List[AsyncMessageData]]
488
PollCoroutine = Awaitable[PollResult]
489
```