CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pyzmq

Python bindings for ZeroMQ (ØMQ), a lightweight and fast messaging library

Pending
Overview
Eval results
Files

async-support.mddocs/

Async Support

Native async/await support with AsyncIO-compatible Context, Socket, and Poller classes that integrate seamlessly with Python's event loop for non-blocking messaging operations.

Capabilities

AsyncIO Context

Async-compatible context manager that integrates with Python's asyncio event loop for non-blocking socket operations.

class Context:
    def __init__(self, io_threads: int | zmq.Context = 1, shadow: zmq.Context | int = 0) -> None:
        """
        Create a new async ZMQ context.
        
        Parameters:
        - io_threads: Number of I/O threads or existing Context to shadow (default: 1)
        - shadow: Context or address to shadow (default: 0)
        """

    def socket(self, socket_type: int, **kwargs) -> Socket:
        """
        Create an async socket of the specified type.
        
        Parameters:
        - socket_type: ZMQ socket type constant
        
        Returns:
        - Socket: New async socket instance
        """

    def term(self) -> None:
        """Terminate the context and close all sockets."""

    def destroy(self, linger: int = None) -> None:
        """
        Close all sockets and terminate context.
        
        Parameters:
        - linger: Linger period in milliseconds
        """

    def __enter__(self) -> Context:
        """Context manager entry."""
    
    def __exit__(self, exc_type, exc_value, traceback) -> None:
        """Context manager exit - destroys context."""
    
    @property
    def closed(self) -> bool:
        """True if the context has been terminated."""

AsyncIO Socket

Async socket class providing non-blocking send/receive operations that work with Python's async/await syntax. Inherits all methods from the base Socket class with async versions where applicable.

class Socket:
    def bind(self, address: str) -> None:
        """Bind socket to an address."""

    def connect(self, address: str) -> None:
        """Connect socket to an address."""

    async def send(self, data: bytes | Frame, flags: int = 0, copy: bool = True, track: bool = False) -> MessageTracker | None:
        """
        Send a message asynchronously.
        
        Parameters:
        - data: Message data
        - flags: Send flags (NOBLOCK, SNDMORE)
        - copy: Whether to copy data
        - track: Whether to return MessageTracker
        
        Returns:
        - MessageTracker: If track=True
        """

    async def recv(self, flags: int = 0, copy: bool = True, track: bool = False) -> bytes | Frame:
        """
        Receive a message asynchronously.
        
        Parameters:
        - flags: Receive flags (NOBLOCK)
        - copy: Whether to copy data
        - track: Whether to return Frame with tracking
        
        Returns:
        - bytes or Frame: Received message data
        """

    async def send_string(self, string: str, flags: int = 0, encoding: str = 'utf-8', copy: bool = True, track: bool = False) -> MessageTracker | None:
        """
        Send a string message asynchronously.
        
        Parameters:
        - string: String to send
        - flags: Send flags
        - encoding: String encoding
        """

    async def recv_string(self, flags: int = 0, encoding: str = 'utf-8', copy: bool = True) -> str:
        """
        Receive a string message asynchronously.
        
        Parameters:
        - flags: Receive flags
        - encoding: String encoding
        
        Returns:
        - str: Received string
        """

    async def send_json(self, obj: Any, flags: int = 0, **kwargs) -> None:
        """
        Send a JSON object asynchronously.
        
        Parameters:
        - obj: JSON-serializable object
        - flags: Send flags
        - kwargs: Additional json.dumps() arguments
        """

    async def recv_json(self, flags: int = 0, **kwargs) -> Any:
        """
        Receive a JSON object asynchronously.
        
        Parameters:
        - flags: Receive flags
        - kwargs: Additional json.loads() arguments
        
        Returns:
        - Any: Deserialized JSON object
        """

    async def send_pyobj(self, obj: Any, flags: int = 0, protocol: int = DEFAULT_PROTOCOL) -> None:
        """
        Send a Python object asynchronously using pickle.
        
        Parameters:
        - obj: Python object to send
        - flags: Send flags
        - protocol: Pickle protocol version
        """

    async def recv_pyobj(self, flags: int = 0) -> Any:
        """
        Receive a Python object asynchronously using pickle.
        
        Parameters:
        - flags: Receive flags
        
        Returns:
        - Any: Unpickled Python object
        """

    async def send_multipart(self, msg_parts: list, flags: int = 0, copy: bool = True, track: bool = False) -> MessageTracker | None:
        """
        Send a multipart message asynchronously.
        
        Parameters:
        - msg_parts: List of message parts
        - flags: Send flags
        - copy: Whether to copy data
        - track: Whether to return MessageTracker
        
        Returns:
        - MessageTracker: If track=True
        """

    async def recv_multipart(self, flags: int = 0, copy: bool = True, track: bool = False) -> list:
        """
        Receive a multipart message asynchronously.
        
        Parameters:
        - flags: Receive flags
        - copy: Whether to copy data
        - track: Whether to return Frames with tracking
        
        Returns:
        - list: List of message parts
        """

    def close(self, linger: int = None) -> None:
        """Close the socket."""

    @property
    def closed(self) -> bool:
        """True if socket is closed."""

AsyncIO Polling

Async poller for monitoring multiple sockets with non-blocking event detection.

class Poller:
    def register(self, socket: Socket | zmq.Socket, flags: int = POLLIN | POLLOUT) -> None:
        """
        Register a socket for polling.
        
        Parameters:
        - socket: Socket to monitor
        - flags: Event flags (POLLIN, POLLOUT, POLLERR)
        """

    def unregister(self, socket: Socket | zmq.Socket) -> None:
        """
        Unregister a socket from polling.
        
        Parameters:
        - socket: Socket to unregister
        """

    async def poll(self, timeout: int = -1) -> list[tuple[Socket, int]]:
        """
        Poll for events asynchronously.
        
        Parameters:
        - timeout: Timeout in milliseconds (-1 for infinite)
        
        Returns:
        - list: List of (socket, events) tuples
        """

Usage Examples

Async Request-Reply Server

import asyncio
import zmq.asyncio

async def server():
    context = zmq.asyncio.Context()
    socket = context.socket(zmq.REP)
    socket.bind("tcp://*:5555")
    
    try:
        while True:
            # Non-blocking receive
            message = await socket.recv_string()
            print(f"Received: {message}")
            
            # Simulate async processing
            await asyncio.sleep(0.1)
            
            # Non-blocking send
            await socket.send_string(f"Echo: {message}")
    finally:
        socket.close()
        context.term()

# Run the server
asyncio.run(server())

Async Request-Reply Client

import asyncio
import zmq.asyncio

async def client():
    context = zmq.asyncio.Context()
    socket = context.socket(zmq.REQ)
    socket.connect("tcp://localhost:5555")
    
    try:
        # Send multiple requests concurrently
        tasks = []
        for i in range(10):
            task = send_request(socket, f"Request {i}")
            tasks.append(task)
        
        responses = await asyncio.gather(*tasks)
        for response in responses:
            print(f"Response: {response}")
    finally:
        socket.close()
        context.term()

async def send_request(socket, message):
    await socket.send_string(message)
    return await socket.recv_string()

asyncio.run(client())

Async Publisher

import asyncio
import zmq.asyncio

async def publisher():
    context = zmq.asyncio.Context()
    socket = context.socket(zmq.PUB)
    socket.bind("tcp://*:5556")
    
    try:
        i = 0
        while True:
            topic = "weather" if i % 2 else "news"
            message = f"Update {i}"
            
            # Non-blocking send
            await socket.send_string(f"{topic} {message}")
            print(f"Published: {topic} {message}")
            
            # Async sleep allows other coroutines to run
            await asyncio.sleep(1)
            i += 1
    finally:
        socket.close()
        context.term()

asyncio.run(publisher())

Async Subscriber

import asyncio
import zmq.asyncio

async def subscriber():
    context = zmq.asyncio.Context()
    socket = context.socket(zmq.SUB)
    socket.connect("tcp://localhost:5556")
    socket.setsockopt_string(zmq.SUBSCRIBE, "weather")
    
    try:
        while True:
            # Non-blocking receive
            message = await socket.recv_string()
            print(f"Received: {message}")
            
            # Process message asynchronously
            await process_weather_data(message)
    finally:
        socket.close()
        context.term()

async def process_weather_data(message):
    # Simulate async processing
    await asyncio.sleep(0.1)
    print(f"Processed: {message}")

asyncio.run(subscriber())

Async Polling Multiple Sockets

import asyncio
import zmq.asyncio

async def multi_socket_handler():
    context = zmq.asyncio.Context()
    
    # Create multiple sockets
    frontend = context.socket(zmq.ROUTER)
    frontend.bind("tcp://*:5555")
    
    backend = context.socket(zmq.DEALER)
    backend.bind("tcp://*:5556")
    
    # Create async poller
    poller = zmq.asyncio.Poller()
    poller.register(frontend, zmq.POLLIN)
    poller.register(backend, zmq.POLLIN)
    
    try:
        while True:
            # Poll for events asynchronously
            events = await poller.poll()
            
            for socket, event in events:
                if socket is frontend and event & zmq.POLLIN:
                    # Handle frontend message
                    message = await frontend.recv_multipart()
                    print(f"Frontend: {message}")
                    await backend.send_multipart(message)
                
                elif socket is backend and event & zmq.POLLIN:
                    # Handle backend message
                    message = await backend.recv_multipart()
                    print(f"Backend: {message}")
                    await frontend.send_multipart(message)
    finally:
        frontend.close()
        backend.close()
        context.term()

asyncio.run(multi_socket_handler())

Integration with Other Async Libraries

import asyncio
import aiohttp
import zmq.asyncio

async def web_service_integration():
    """Example integrating ZMQ with aiohttp web service"""
    context = zmq.asyncio.Context()
    socket = context.socket(zmq.REQ)
    socket.connect("tcp://localhost:5555")
    
    async with aiohttp.ClientSession() as session:
        try:
            # Send ZMQ message
            await socket.send_json({"action": "get_data", "id": 123})
            response = await socket.recv_json()
            
            # Use response in HTTP request
            async with session.get(f"https://api.example.com/data/{response['id']}") as resp:
                web_data = await resp.json()
                
            # Send web data back via ZMQ
            await socket.send_json({"web_data": web_data})
            result = await socket.recv_json()
            
            return result
        finally:
            socket.close()
            context.term()

# Run with asyncio
result = asyncio.run(web_service_integration())

Event Loop Integration

PyZMQ's async support automatically integrates with the current asyncio event loop:

import asyncio
import zmq.asyncio

async def main():
    # Context automatically uses current event loop
    context = zmq.asyncio.Context()
    socket = context.socket(zmq.REQ)
    
    # All operations are non-blocking and event-loop aware
    await socket.send_string("Hello")
    response = await socket.recv_string()
    
    socket.close()
    context.term()

# Works with any asyncio event loop
if __name__ == "__main__":
    asyncio.run(main())

Imports

import zmq
import zmq.asyncio
from zmq import Frame, MessageTracker, POLLIN, POLLOUT, DEFAULT_PROTOCOL

Types

from typing import Union, Optional, Any, List, Tuple, Awaitable

# Async message data types
AsyncMessageData = Union[bytes, str, memoryview, Frame]
AsyncMultipartMessage = List[AsyncMessageData]

# Async polling result
PollResult = List[Tuple[Socket, int]]

# Coroutine types
SendCoroutine = Awaitable[Optional[MessageTracker]]
RecvCoroutine = Awaitable[Union[bytes, Frame]]
StringCoroutine = Awaitable[str]
JsonCoroutine = Awaitable[Any]
MultipartCoroutine = Awaitable[List[AsyncMessageData]]
PollCoroutine = Awaitable[PollResult]

Install with Tessl CLI

npx tessl i tessl/pypi-pyzmq

docs

async-support.md

authentication.md

constants.md

core-messaging.md

devices.md

error-handling.md

index.md

message-handling.md

polling.md

tile.json