CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-eclipse-zenoh

The python API for Eclipse zenoh - a high-performance networking library providing pub/sub, store/query and compute framework for zero-overhead communication

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

index.mddocs/

Eclipse Zenoh Python API

Eclipse Zenoh is a high-performance networking library that provides Python bindings for the Zenoh protocol, a cutting-edge pub/sub, store/query and compute framework designed for zero-overhead communication. It enables Python developers to build distributed applications with unified data-in-motion, data-at-rest, and computational capabilities that blend traditional publish-subscribe patterns with geo-distributed storage, queries, and computations while maintaining exceptional time and space efficiency.

Package Information

  • Package Name: eclipse-zenoh
  • Language: Python
  • Installation: pip install eclipse-zenoh
  • Version: 1.5.1
  • Platform Support: Linux, macOS, Windows
  • Python Version: 3.8+

Core Imports

import zenoh

For specific functionality:

from zenoh import Config, Session, Priority, CongestionControl

For extensions (serialization and advanced features):

import zenoh.ext
from zenoh.ext import z_serialize, z_deserialize

For handlers:

from zenoh.handlers import DefaultHandler, FifoChannel, RingChannel, Callback

Basic Usage

import zenoh

# Create and open a session
config = zenoh.Config()
session = zenoh.open(config)

# Publisher pattern
publisher = session.declare_publisher("demo/example")
publisher.put("Hello, Zenoh!")

# Subscriber pattern
def listener(sample):
    print(f"Received: {sample.payload.to_string()} on {sample.key_expr}")

subscriber = session.declare_subscriber("demo/example", listener)

# Query pattern
replies = session.get("demo/**")
for reply in replies:
    if reply.ok:
        print(f"Got: {reply.ok.payload.to_string()}")

# Cleanup
publisher.undeclare()
subscriber.undeclare()
session.close()

Architecture

Zenoh's architecture is built around several key concepts:

  • Session: Central communication hub that manages all operations
  • Key Expressions: Hierarchical addressing scheme for resources (e.g., "sensors/temperature/room1")
  • Data Flow Patterns: Publisher/Subscriber for streaming, Query/Queryable for request-response
  • Zero-copy Data: Efficient ZBytes container for high-performance data handling
  • Quality of Service: Fine-grained control over priority, congestion control, and reliability
  • Handlers: Flexible callback system supporting Rust channels and Python callbacks

The Python API provides bindings to a high-performance Rust implementation, ensuring excellent performance while maintaining Python's ease of use.

Capabilities

Session Management

Core session lifecycle including configuration, connection establishment, and discovery operations.

def open(config: Config) -> Session: ...
def scout(what: WhatAmIMatcher = None, **kwargs) -> Scout: ...

class Config:
    def __init__(self): ...
    @staticmethod
    def from_file(path: str) -> Config: ...
    @staticmethod
    def from_json5(config: str) -> Config: ...

class Session:
    def close(self) -> None: ...
    def declare_publisher(self, key_expr, **kwargs) -> Publisher: ...
    def declare_subscriber(self, key_expr, handler=None, **kwargs) -> Subscriber: ...
    def declare_queryable(self, key_expr, handler, **kwargs) -> Queryable: ...
    def declare_querier(self, key_expr, **kwargs) -> Querier: ...
    def get(self, selector, **kwargs): ...
    def put(self, key_expr, payload, **kwargs): ...
    def delete(self, key_expr, **kwargs): ...

Session Management

Publisher/Subscriber Pattern

Real-time data streaming with publish/subscribe messaging patterns.

class Publisher:
    key_expr: KeyExpr
    encoding: Encoding
    congestion_control: CongestionControl
    priority: Priority
    def put(self, payload, **kwargs) -> None: ...
    def delete(self, **kwargs) -> None: ...
    def undeclare(self) -> None: ...
    def declare_matching_listener(self, handler) -> MatchingListener: ...

class Subscriber:
    key_expr: KeyExpr
    handler: Handler
    def undeclare(self) -> None: ...
    def try_recv(self): ...
    def recv(self): ...
    def __iter__(self): ...

class Sample:
    key_expr: KeyExpr
    payload: ZBytes
    encoding: Encoding
    kind: SampleKind
    timestamp: Timestamp

Publisher/Subscriber

Query/Queryable Pattern

Request-response messaging for querying distributed data and services.

def get(self, selector: str, **kwargs): ...

class Query:
    selector: Selector
    key_expr: KeyExpr
    parameters: Parameters
    def reply(self, payload, **kwargs) -> None: ...
    def reply_err(self, payload, **kwargs) -> None: ...
    def reply_del(self, **kwargs) -> None: ...

class Reply:
    ok: Sample
    err: ReplyError

class Queryable:
    def undeclare(self) -> None: ...
    def try_recv(self): ...
    def recv(self): ...

class Querier:
    def get(self, **kwargs): ...
    def undeclare(self) -> None: ...

Query/Queryable

Data Types

Core data structures for efficient data handling and type safety.

class ZBytes:
    def __init__(self, data): ...
    def to_bytes(self) -> bytes: ...
    def to_string(self) -> str: ...

class Encoding:
    def __init__(self, encoding_type: str): ...
    def with_schema(self, schema: str) -> Encoding: ...

class KeyExpr:
    def __init__(self, key: str): ...
    def intersects(self, other: KeyExpr) -> bool: ...

class Selector:
    key_expr: KeyExpr
    parameters: Parameters

class Parameters:
    def get(self, key: str): ...
    def insert(self, key: str, value: str): ...

Data Types

Handler System

Flexible handler system for processing asynchronous data streams with channels and callbacks.

from zenoh.handlers import DefaultHandler, FifoChannel, RingChannel, Callback

class FifoChannel:
    def __init__(self, capacity: int): ...
    def try_recv(self): ...
    def recv(self): ...

class Callback:
    def __init__(self, callback: callable, drop: callable = None): ...

Handler System

Advanced Features

Liveliness detection, matching listeners, and advanced networking features.

class Liveliness:
    def declare_token(self, key_expr) -> LivelinessToken: ...
    def declare_subscriber(self, key_expr, handler) -> Subscriber: ...

class MatchingStatus:
    matching: bool

class MatchingListener:
    def undeclare(self) -> None: ...

Advanced Features

Extensions

Serialization utilities and advanced publisher/subscriber features with additional reliability and caching.

from zenoh.ext import z_serialize, z_deserialize
from zenoh.ext import AdvancedPublisher, AdvancedSubscriber

def z_serialize(obj) -> ZBytes: ...
def z_deserialize(target_type, data: ZBytes): ...

Extensions

Quality of Service

class Priority:
    REAL_TIME = ...
    INTERACTIVE_HIGH = ...
    INTERACTIVE_LOW = ...
    DATA_HIGH = ...
    DATA = ...
    DATA_LOW = ...
    BACKGROUND = ...
    DEFAULT = DATA

class CongestionControl:
    DROP = ...
    BLOCK = ...
    DEFAULT = DROP

class Reliability:  # Unstable
    BEST_EFFORT = ...
    RELIABLE = ...

Error Handling

class ZError(Exception):
    """Base exception for all Zenoh errors"""

class ReplyError:
    payload: ZBytes
    encoding: Encoding

All Zenoh operations may raise ZError exceptions. Handle them appropriately in production code:

try:
    session = zenoh.open(config)
    # ... operations
except zenoh.ZError as e:
    print(f"Zenoh error: {e}")

docs

advanced.md

data-types.md

extensions.md

handlers.md

index.md

pubsub.md

query.md

session-management.md

tile.json