Python bindings for ZeroMQ (ØMQ), a lightweight and fast messaging library
—
Native async/await support with AsyncIO-compatible Context, Socket, and Poller classes that integrate seamlessly with Python's event loop for non-blocking messaging operations.
Async-compatible context manager that integrates with Python's asyncio event loop for non-blocking socket operations.
class Context:
def __init__(self, io_threads: int | zmq.Context = 1, shadow: zmq.Context | int = 0) -> None:
"""
Create a new async ZMQ context.
Parameters:
- io_threads: Number of I/O threads or existing Context to shadow (default: 1)
- shadow: Context or address to shadow (default: 0)
"""
def socket(self, socket_type: int, **kwargs) -> Socket:
"""
Create an async socket of the specified type.
Parameters:
- socket_type: ZMQ socket type constant
Returns:
- Socket: New async socket instance
"""
def term(self) -> None:
"""Terminate the context and close all sockets."""
def destroy(self, linger: int = None) -> None:
"""
Close all sockets and terminate context.
Parameters:
- linger: Linger period in milliseconds
"""
def __enter__(self) -> Context:
"""Context manager entry."""
def __exit__(self, exc_type, exc_value, traceback) -> None:
"""Context manager exit - destroys context."""
@property
def closed(self) -> bool:
"""True if the context has been terminated."""Async socket class providing non-blocking send/receive operations that work with Python's async/await syntax. Inherits all methods from the base Socket class with async versions where applicable.
class Socket:
def bind(self, address: str) -> None:
"""Bind socket to an address."""
def connect(self, address: str) -> None:
"""Connect socket to an address."""
async def send(self, data: bytes | Frame, flags: int = 0, copy: bool = True, track: bool = False) -> MessageTracker | None:
"""
Send a message asynchronously.
Parameters:
- data: Message data
- flags: Send flags (NOBLOCK, SNDMORE)
- copy: Whether to copy data
- track: Whether to return MessageTracker
Returns:
- MessageTracker: If track=True
"""
async def recv(self, flags: int = 0, copy: bool = True, track: bool = False) -> bytes | Frame:
"""
Receive a message asynchronously.
Parameters:
- flags: Receive flags (NOBLOCK)
- copy: Whether to copy data
- track: Whether to return Frame with tracking
Returns:
- bytes or Frame: Received message data
"""
async def send_string(self, string: str, flags: int = 0, encoding: str = 'utf-8', copy: bool = True, track: bool = False) -> MessageTracker | None:
"""
Send a string message asynchronously.
Parameters:
- string: String to send
- flags: Send flags
- encoding: String encoding
"""
async def recv_string(self, flags: int = 0, encoding: str = 'utf-8', copy: bool = True) -> str:
"""
Receive a string message asynchronously.
Parameters:
- flags: Receive flags
- encoding: String encoding
Returns:
- str: Received string
"""
async def send_json(self, obj: Any, flags: int = 0, **kwargs) -> None:
"""
Send a JSON object asynchronously.
Parameters:
- obj: JSON-serializable object
- flags: Send flags
- kwargs: Additional json.dumps() arguments
"""
async def recv_json(self, flags: int = 0, **kwargs) -> Any:
"""
Receive a JSON object asynchronously.
Parameters:
- flags: Receive flags
- kwargs: Additional json.loads() arguments
Returns:
- Any: Deserialized JSON object
"""
async def send_pyobj(self, obj: Any, flags: int = 0, protocol: int = DEFAULT_PROTOCOL) -> None:
"""
Send a Python object asynchronously using pickle.
Parameters:
- obj: Python object to send
- flags: Send flags
- protocol: Pickle protocol version
"""
async def recv_pyobj(self, flags: int = 0) -> Any:
"""
Receive a Python object asynchronously using pickle.
Parameters:
- flags: Receive flags
Returns:
- Any: Unpickled Python object
"""
async def send_multipart(self, msg_parts: list, flags: int = 0, copy: bool = True, track: bool = False) -> MessageTracker | None:
"""
Send a multipart message asynchronously.
Parameters:
- msg_parts: List of message parts
- flags: Send flags
- copy: Whether to copy data
- track: Whether to return MessageTracker
Returns:
- MessageTracker: If track=True
"""
async def recv_multipart(self, flags: int = 0, copy: bool = True, track: bool = False) -> list:
"""
Receive a multipart message asynchronously.
Parameters:
- flags: Receive flags
- copy: Whether to copy data
- track: Whether to return Frames with tracking
Returns:
- list: List of message parts
"""
def close(self, linger: int = None) -> None:
"""Close the socket."""
@property
def closed(self) -> bool:
"""True if socket is closed."""Async poller for monitoring multiple sockets with non-blocking event detection.
class Poller:
def register(self, socket: Socket | zmq.Socket, flags: int = POLLIN | POLLOUT) -> None:
"""
Register a socket for polling.
Parameters:
- socket: Socket to monitor
- flags: Event flags (POLLIN, POLLOUT, POLLERR)
"""
def unregister(self, socket: Socket | zmq.Socket) -> None:
"""
Unregister a socket from polling.
Parameters:
- socket: Socket to unregister
"""
async def poll(self, timeout: int = -1) -> list[tuple[Socket, int]]:
"""
Poll for events asynchronously.
Parameters:
- timeout: Timeout in milliseconds (-1 for infinite)
Returns:
- list: List of (socket, events) tuples
"""import asyncio
import zmq.asyncio
async def server():
context = zmq.asyncio.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:5555")
try:
while True:
# Non-blocking receive
message = await socket.recv_string()
print(f"Received: {message}")
# Simulate async processing
await asyncio.sleep(0.1)
# Non-blocking send
await socket.send_string(f"Echo: {message}")
finally:
socket.close()
context.term()
# Run the server
asyncio.run(server())import asyncio
import zmq.asyncio
async def client():
context = zmq.asyncio.Context()
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5555")
try:
# Send multiple requests concurrently
tasks = []
for i in range(10):
task = send_request(socket, f"Request {i}")
tasks.append(task)
responses = await asyncio.gather(*tasks)
for response in responses:
print(f"Response: {response}")
finally:
socket.close()
context.term()
async def send_request(socket, message):
await socket.send_string(message)
return await socket.recv_string()
asyncio.run(client())import asyncio
import zmq.asyncio
async def publisher():
context = zmq.asyncio.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5556")
try:
i = 0
while True:
topic = "weather" if i % 2 else "news"
message = f"Update {i}"
# Non-blocking send
await socket.send_string(f"{topic} {message}")
print(f"Published: {topic} {message}")
# Async sleep allows other coroutines to run
await asyncio.sleep(1)
i += 1
finally:
socket.close()
context.term()
asyncio.run(publisher())import asyncio
import zmq.asyncio
async def subscriber():
context = zmq.asyncio.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:5556")
socket.setsockopt_string(zmq.SUBSCRIBE, "weather")
try:
while True:
# Non-blocking receive
message = await socket.recv_string()
print(f"Received: {message}")
# Process message asynchronously
await process_weather_data(message)
finally:
socket.close()
context.term()
async def process_weather_data(message):
# Simulate async processing
await asyncio.sleep(0.1)
print(f"Processed: {message}")
asyncio.run(subscriber())import asyncio
import zmq.asyncio
async def multi_socket_handler():
context = zmq.asyncio.Context()
# Create multiple sockets
frontend = context.socket(zmq.ROUTER)
frontend.bind("tcp://*:5555")
backend = context.socket(zmq.DEALER)
backend.bind("tcp://*:5556")
# Create async poller
poller = zmq.asyncio.Poller()
poller.register(frontend, zmq.POLLIN)
poller.register(backend, zmq.POLLIN)
try:
while True:
# Poll for events asynchronously
events = await poller.poll()
for socket, event in events:
if socket is frontend and event & zmq.POLLIN:
# Handle frontend message
message = await frontend.recv_multipart()
print(f"Frontend: {message}")
await backend.send_multipart(message)
elif socket is backend and event & zmq.POLLIN:
# Handle backend message
message = await backend.recv_multipart()
print(f"Backend: {message}")
await frontend.send_multipart(message)
finally:
frontend.close()
backend.close()
context.term()
asyncio.run(multi_socket_handler())import asyncio
import aiohttp
import zmq.asyncio
async def web_service_integration():
"""Example integrating ZMQ with aiohttp web service"""
context = zmq.asyncio.Context()
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5555")
async with aiohttp.ClientSession() as session:
try:
# Send ZMQ message
await socket.send_json({"action": "get_data", "id": 123})
response = await socket.recv_json()
# Use response in HTTP request
async with session.get(f"https://api.example.com/data/{response['id']}") as resp:
web_data = await resp.json()
# Send web data back via ZMQ
await socket.send_json({"web_data": web_data})
result = await socket.recv_json()
return result
finally:
socket.close()
context.term()
# Run with asyncio
result = asyncio.run(web_service_integration())PyZMQ's async support automatically integrates with the current asyncio event loop:
import asyncio
import zmq.asyncio
async def main():
# Context automatically uses current event loop
context = zmq.asyncio.Context()
socket = context.socket(zmq.REQ)
# All operations are non-blocking and event-loop aware
await socket.send_string("Hello")
response = await socket.recv_string()
socket.close()
context.term()
# Works with any asyncio event loop
if __name__ == "__main__":
asyncio.run(main())import zmq
import zmq.asyncio
from zmq import Frame, MessageTracker, POLLIN, POLLOUT, DEFAULT_PROTOCOLfrom typing import Union, Optional, Any, List, Tuple, Awaitable
# Async message data types
AsyncMessageData = Union[bytes, str, memoryview, Frame]
AsyncMultipartMessage = List[AsyncMessageData]
# Async polling result
PollResult = List[Tuple[Socket, int]]
# Coroutine types
SendCoroutine = Awaitable[Optional[MessageTracker]]
RecvCoroutine = Awaitable[Union[bytes, Frame]]
StringCoroutine = Awaitable[str]
JsonCoroutine = Awaitable[Any]
MultipartCoroutine = Awaitable[List[AsyncMessageData]]
PollCoroutine = Awaitable[PollResult]Install with Tessl CLI
npx tessl i tessl/pypi-pyzmq