The python API for Eclipse zenoh - a high-performance networking library providing pub/sub, store/query and compute framework for zero-overhead communication
npx @tessl/cli install tessl/pypi-eclipse-zenoh@1.5.0Eclipse Zenoh is a high-performance networking library that provides Python bindings for the Zenoh protocol, a cutting-edge pub/sub, store/query and compute framework designed for zero-overhead communication. It enables Python developers to build distributed applications with unified data-in-motion, data-at-rest, and computational capabilities that blend traditional publish-subscribe patterns with geo-distributed storage, queries, and computations while maintaining exceptional time and space efficiency.
pip install eclipse-zenohimport zenohFor specific functionality:
from zenoh import Config, Session, Priority, CongestionControlFor extensions (serialization and advanced features):
import zenoh.ext
from zenoh.ext import z_serialize, z_deserializeFor handlers:
from zenoh.handlers import DefaultHandler, FifoChannel, RingChannel, Callbackimport zenoh
# Create and open a session
config = zenoh.Config()
session = zenoh.open(config)
# Publisher pattern
publisher = session.declare_publisher("demo/example")
publisher.put("Hello, Zenoh!")
# Subscriber pattern
def listener(sample):
print(f"Received: {sample.payload.to_string()} on {sample.key_expr}")
subscriber = session.declare_subscriber("demo/example", listener)
# Query pattern
replies = session.get("demo/**")
for reply in replies:
if reply.ok:
print(f"Got: {reply.ok.payload.to_string()}")
# Cleanup
publisher.undeclare()
subscriber.undeclare()
session.close()Zenoh's architecture is built around several key concepts:
ZBytes container for high-performance data handlingThe Python API provides bindings to a high-performance Rust implementation, ensuring excellent performance while maintaining Python's ease of use.
Core session lifecycle including configuration, connection establishment, and discovery operations.
def open(config: Config) -> Session: ...
def scout(what: WhatAmIMatcher = None, **kwargs) -> Scout: ...
class Config:
def __init__(self): ...
@staticmethod
def from_file(path: str) -> Config: ...
@staticmethod
def from_json5(config: str) -> Config: ...
class Session:
def close(self) -> None: ...
def declare_publisher(self, key_expr, **kwargs) -> Publisher: ...
def declare_subscriber(self, key_expr, handler=None, **kwargs) -> Subscriber: ...
def declare_queryable(self, key_expr, handler, **kwargs) -> Queryable: ...
def declare_querier(self, key_expr, **kwargs) -> Querier: ...
def get(self, selector, **kwargs): ...
def put(self, key_expr, payload, **kwargs): ...
def delete(self, key_expr, **kwargs): ...Real-time data streaming with publish/subscribe messaging patterns.
class Publisher:
key_expr: KeyExpr
encoding: Encoding
congestion_control: CongestionControl
priority: Priority
def put(self, payload, **kwargs) -> None: ...
def delete(self, **kwargs) -> None: ...
def undeclare(self) -> None: ...
def declare_matching_listener(self, handler) -> MatchingListener: ...
class Subscriber:
key_expr: KeyExpr
handler: Handler
def undeclare(self) -> None: ...
def try_recv(self): ...
def recv(self): ...
def __iter__(self): ...
class Sample:
key_expr: KeyExpr
payload: ZBytes
encoding: Encoding
kind: SampleKind
timestamp: TimestampRequest-response messaging for querying distributed data and services.
def get(self, selector: str, **kwargs): ...
class Query:
selector: Selector
key_expr: KeyExpr
parameters: Parameters
def reply(self, payload, **kwargs) -> None: ...
def reply_err(self, payload, **kwargs) -> None: ...
def reply_del(self, **kwargs) -> None: ...
class Reply:
ok: Sample
err: ReplyError
class Queryable:
def undeclare(self) -> None: ...
def try_recv(self): ...
def recv(self): ...
class Querier:
def get(self, **kwargs): ...
def undeclare(self) -> None: ...Core data structures for efficient data handling and type safety.
class ZBytes:
def __init__(self, data): ...
def to_bytes(self) -> bytes: ...
def to_string(self) -> str: ...
class Encoding:
def __init__(self, encoding_type: str): ...
def with_schema(self, schema: str) -> Encoding: ...
class KeyExpr:
def __init__(self, key: str): ...
def intersects(self, other: KeyExpr) -> bool: ...
class Selector:
key_expr: KeyExpr
parameters: Parameters
class Parameters:
def get(self, key: str): ...
def insert(self, key: str, value: str): ...Flexible handler system for processing asynchronous data streams with channels and callbacks.
from zenoh.handlers import DefaultHandler, FifoChannel, RingChannel, Callback
class FifoChannel:
def __init__(self, capacity: int): ...
def try_recv(self): ...
def recv(self): ...
class Callback:
def __init__(self, callback: callable, drop: callable = None): ...Liveliness detection, matching listeners, and advanced networking features.
class Liveliness:
def declare_token(self, key_expr) -> LivelinessToken: ...
def declare_subscriber(self, key_expr, handler) -> Subscriber: ...
class MatchingStatus:
matching: bool
class MatchingListener:
def undeclare(self) -> None: ...Serialization utilities and advanced publisher/subscriber features with additional reliability and caching.
from zenoh.ext import z_serialize, z_deserialize
from zenoh.ext import AdvancedPublisher, AdvancedSubscriber
def z_serialize(obj) -> ZBytes: ...
def z_deserialize(target_type, data: ZBytes): ...class Priority:
REAL_TIME = ...
INTERACTIVE_HIGH = ...
INTERACTIVE_LOW = ...
DATA_HIGH = ...
DATA = ...
DATA_LOW = ...
BACKGROUND = ...
DEFAULT = DATA
class CongestionControl:
DROP = ...
BLOCK = ...
DEFAULT = DROP
class Reliability: # Unstable
BEST_EFFORT = ...
RELIABLE = ...class ZError(Exception):
"""Base exception for all Zenoh errors"""
class ReplyError:
payload: ZBytes
encoding: EncodingAll Zenoh operations may raise ZError exceptions. Handle them appropriately in production code:
try:
session = zenoh.open(config)
# ... operations
except zenoh.ZError as e:
print(f"Zenoh error: {e}")