or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

advanced.mddata-types.mdextensions.mdhandlers.mdindex.mdpubsub.mdquery.mdsession-management.md
tile.json

tessl/pypi-eclipse-zenoh

The python API for Eclipse zenoh - a high-performance networking library providing pub/sub, store/query and compute framework for zero-overhead communication

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/eclipse-zenoh@1.5.x

To install, run

npx @tessl/cli install tessl/pypi-eclipse-zenoh@1.5.0

index.mddocs/

Eclipse Zenoh Python API

Eclipse Zenoh is a high-performance networking library that provides Python bindings for the Zenoh protocol, a cutting-edge pub/sub, store/query and compute framework designed for zero-overhead communication. It enables Python developers to build distributed applications with unified data-in-motion, data-at-rest, and computational capabilities that blend traditional publish-subscribe patterns with geo-distributed storage, queries, and computations while maintaining exceptional time and space efficiency.

Package Information

  • Package Name: eclipse-zenoh
  • Language: Python
  • Installation: pip install eclipse-zenoh
  • Version: 1.5.1
  • Platform Support: Linux, macOS, Windows
  • Python Version: 3.8+

Core Imports

import zenoh

For specific functionality:

from zenoh import Config, Session, Priority, CongestionControl

For extensions (serialization and advanced features):

import zenoh.ext
from zenoh.ext import z_serialize, z_deserialize

For handlers:

from zenoh.handlers import DefaultHandler, FifoChannel, RingChannel, Callback

Basic Usage

import zenoh

# Create and open a session
config = zenoh.Config()
session = zenoh.open(config)

# Publisher pattern
publisher = session.declare_publisher("demo/example")
publisher.put("Hello, Zenoh!")

# Subscriber pattern
def listener(sample):
    print(f"Received: {sample.payload.to_string()} on {sample.key_expr}")

subscriber = session.declare_subscriber("demo/example", listener)

# Query pattern
replies = session.get("demo/**")
for reply in replies:
    if reply.ok:
        print(f"Got: {reply.ok.payload.to_string()}")

# Cleanup
publisher.undeclare()
subscriber.undeclare()
session.close()

Architecture

Zenoh's architecture is built around several key concepts:

  • Session: Central communication hub that manages all operations
  • Key Expressions: Hierarchical addressing scheme for resources (e.g., "sensors/temperature/room1")
  • Data Flow Patterns: Publisher/Subscriber for streaming, Query/Queryable for request-response
  • Zero-copy Data: Efficient ZBytes container for high-performance data handling
  • Quality of Service: Fine-grained control over priority, congestion control, and reliability
  • Handlers: Flexible callback system supporting Rust channels and Python callbacks

The Python API provides bindings to a high-performance Rust implementation, ensuring excellent performance while maintaining Python's ease of use.

Capabilities

Session Management

Core session lifecycle including configuration, connection establishment, and discovery operations.

def open(config: Config) -> Session: ...
def scout(what: WhatAmIMatcher = None, **kwargs) -> Scout: ...

class Config:
    def __init__(self): ...
    @staticmethod
    def from_file(path: str) -> Config: ...
    @staticmethod
    def from_json5(config: str) -> Config: ...

class Session:
    def close(self) -> None: ...
    def declare_publisher(self, key_expr, **kwargs) -> Publisher: ...
    def declare_subscriber(self, key_expr, handler=None, **kwargs) -> Subscriber: ...
    def declare_queryable(self, key_expr, handler, **kwargs) -> Queryable: ...
    def declare_querier(self, key_expr, **kwargs) -> Querier: ...
    def get(self, selector, **kwargs): ...
    def put(self, key_expr, payload, **kwargs): ...
    def delete(self, key_expr, **kwargs): ...

Session Management

Publisher/Subscriber Pattern

Real-time data streaming with publish/subscribe messaging patterns.

class Publisher:
    key_expr: KeyExpr
    encoding: Encoding
    congestion_control: CongestionControl
    priority: Priority
    def put(self, payload, **kwargs) -> None: ...
    def delete(self, **kwargs) -> None: ...
    def undeclare(self) -> None: ...
    def declare_matching_listener(self, handler) -> MatchingListener: ...

class Subscriber:
    key_expr: KeyExpr
    handler: Handler
    def undeclare(self) -> None: ...
    def try_recv(self): ...
    def recv(self): ...
    def __iter__(self): ...

class Sample:
    key_expr: KeyExpr
    payload: ZBytes
    encoding: Encoding
    kind: SampleKind
    timestamp: Timestamp

Publisher/Subscriber

Query/Queryable Pattern

Request-response messaging for querying distributed data and services.

def get(self, selector: str, **kwargs): ...

class Query:
    selector: Selector
    key_expr: KeyExpr
    parameters: Parameters
    def reply(self, payload, **kwargs) -> None: ...
    def reply_err(self, payload, **kwargs) -> None: ...
    def reply_del(self, **kwargs) -> None: ...

class Reply:
    ok: Sample
    err: ReplyError

class Queryable:
    def undeclare(self) -> None: ...
    def try_recv(self): ...
    def recv(self): ...

class Querier:
    def get(self, **kwargs): ...
    def undeclare(self) -> None: ...

Query/Queryable

Data Types

Core data structures for efficient data handling and type safety.

class ZBytes:
    def __init__(self, data): ...
    def to_bytes(self) -> bytes: ...
    def to_string(self) -> str: ...

class Encoding:
    def __init__(self, encoding_type: str): ...
    def with_schema(self, schema: str) -> Encoding: ...

class KeyExpr:
    def __init__(self, key: str): ...
    def intersects(self, other: KeyExpr) -> bool: ...

class Selector:
    key_expr: KeyExpr
    parameters: Parameters

class Parameters:
    def get(self, key: str): ...
    def insert(self, key: str, value: str): ...

Data Types

Handler System

Flexible handler system for processing asynchronous data streams with channels and callbacks.

from zenoh.handlers import DefaultHandler, FifoChannel, RingChannel, Callback

class FifoChannel:
    def __init__(self, capacity: int): ...
    def try_recv(self): ...
    def recv(self): ...

class Callback:
    def __init__(self, callback: callable, drop: callable = None): ...

Handler System

Advanced Features

Liveliness detection, matching listeners, and advanced networking features.

class Liveliness:
    def declare_token(self, key_expr) -> LivelinessToken: ...
    def declare_subscriber(self, key_expr, handler) -> Subscriber: ...

class MatchingStatus:
    matching: bool

class MatchingListener:
    def undeclare(self) -> None: ...

Advanced Features

Extensions

Serialization utilities and advanced publisher/subscriber features with additional reliability and caching.

from zenoh.ext import z_serialize, z_deserialize
from zenoh.ext import AdvancedPublisher, AdvancedSubscriber

def z_serialize(obj) -> ZBytes: ...
def z_deserialize(target_type, data: ZBytes): ...

Extensions

Quality of Service

class Priority:
    REAL_TIME = ...
    INTERACTIVE_HIGH = ...
    INTERACTIVE_LOW = ...
    DATA_HIGH = ...
    DATA = ...
    DATA_LOW = ...
    BACKGROUND = ...
    DEFAULT = DATA

class CongestionControl:
    DROP = ...
    BLOCK = ...
    DEFAULT = DROP

class Reliability:  # Unstable
    BEST_EFFORT = ...
    RELIABLE = ...

Error Handling

class ZError(Exception):
    """Base exception for all Zenoh errors"""

class ReplyError:
    payload: ZBytes
    encoding: Encoding

All Zenoh operations may raise ZError exceptions. Handle them appropriately in production code:

try:
    session = zenoh.open(config)
    # ... operations
except zenoh.ZError as e:
    print(f"Zenoh error: {e}")