CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-eclipse-zenoh

The python API for Eclipse zenoh - a high-performance networking library providing pub/sub, store/query and compute framework for zero-overhead communication

Pending
Overview
Eval results
Files

data-types.mddocs/

Data Types

Zenoh provides a rich set of data types for efficient data handling, addressing, and metadata management. These types enable zero-copy operations, flexible encoding specifications, and precise temporal ordering while maintaining type safety across the distributed system.

Capabilities

Data Containers

Core data structures for efficient payload handling and zero-copy operations.

class ZBytes:
    """Serialized bytes container for zero-copy data handling"""
    
    def __init__(self, data):
        """
        Create ZBytes from various data types.
        
        Parameters:
        - data: str, bytes, bytearray, or other serializable data
        """
    
    def to_bytes(self) -> bytes:
        """Convert to Python bytes"""
    
    def to_string(self) -> str:
        """Convert to Python string (UTF-8 decoded)"""
    
    def __bool__(self) -> bool:
        """Check if ZBytes is non-empty"""
    
    def __len__(self) -> int:
        """Get length in bytes"""
    
    def __bytes__(self) -> bytes:
        """Convert to bytes via bytes() function"""
    
    def __str__(self) -> str:
        """String representation"""
    
    def __eq__(self, other) -> bool:
        """Equality comparison"""
    
    def __hash__(self) -> int:
        """Hash for use in sets and dicts"""

Encoding Specification

Data encoding and schema information for type-safe communication.

class Encoding:
    """Data encoding specification"""
    
    def __init__(self, encoding_type: str):
        """Create encoding from string specification"""
    
    def with_schema(self, schema: str) -> Encoding:
        """Add schema information to encoding"""
    
    def __eq__(self, other) -> bool:
        """Equality comparison"""
    
    def __hash__(self) -> int:
        """Hash for use in sets and dicts"""
    
    def __str__(self) -> str:
        """String representation"""
    
    # Standard encoding constants
    ZENOH_BYTES = ...               # Raw bytes
    ZENOH_STRING = ...              # UTF-8 string
    ZENOH_SERIALIZED = ...          # Zenoh serialized data
    APPLICATION_JSON = ...          # JSON format
    APPLICATION_CBOR = ...          # CBOR format
    APPLICATION_YAML = ...          # YAML format
    APPLICATION_PYTHON_SERIALIZED_OBJECT = ...  # Python pickle
    APPLICATION_PROTOBUF = ...      # Protocol Buffers
    APPLICATION_JAVA_SERIALIZED_OBJECT = ...    # Java serialization
    APPLICATION_OPENMETRICS_TEXT = ... # OpenMetrics text format
    IMAGE_PNG = ...                 # PNG image
    IMAGE_JPEG = ...                # JPEG image
    TEXT_PLAIN = ...                # Plain text
    TEXT_JSON = ...                 # JSON as text
    TEXT_HTML = ...                 # HTML
    TEXT_XML = ...                  # XML
    TEXT_CSS = ...                  # CSS
    TEXT_CSV = ...                  # CSV
    TEXT_JAVASCRIPT = ...           # JavaScript

Key Expressions

Hierarchical addressing scheme for resource identification and pattern matching.

class KeyExpr:
    """Key expression for addressing resources"""
    
    def __init__(self, key: str):
        """Create key expression from string"""
    
    @staticmethod
    def autocanonize(key: str) -> KeyExpr:
        """Create and canonicalize key expression"""
    
    def intersects(self, other: KeyExpr) -> bool:
        """Check if this key expression intersects with another"""
    
    def includes(self, other: KeyExpr) -> bool:
        """Check if this key expression includes another"""
    
    def relation_to(self, other: KeyExpr) -> SetIntersectionLevel:
        """Get relationship to another key expression"""
    
    def join(self, other: KeyExpr) -> KeyExpr:
        """Join two key expressions"""
    
    def concat(self, suffix: str) -> KeyExpr:
        """Concatenate string suffix to key expression"""
    
    def __str__(self) -> str:
        """String representation"""

class SetIntersectionLevel:
    """Key expression set relations (unstable)"""
    DISJOINT = ...      # No intersection
    INTERSECTS = ...    # Some intersection
    INCLUDES = ...      # This includes other
    EQUALS = ...        # Exactly equal

Parameters and Selectors

Key-value parameters and selector combinations for queries and configuration.

class Parameters:
    """Key-value parameters"""
    
    def __init__(self, params: str = None):
        """Create parameters from string (key1=value1;key2=value2)"""
    
    def is_empty(self) -> bool:
        """Check if parameters are empty"""
    
    def get(self, key: str):
        """Get parameter value by key"""
    
    def values(self, key: str) -> list:
        """Get all values for a key (for multi-value parameters)"""
    
    def insert(self, key: str, value: str) -> Parameters:
        """Insert key-value pair"""
    
    def remove(self, key: str) -> Parameters:
        """Remove parameter by key"""
    
    def extend(self, other: Parameters) -> Parameters:
        """Extend with another Parameters object"""
    
    def is_ordered(self) -> bool:
        """Check if parameters maintain insertion order"""
    
    # Operators for parameter manipulation
    def __eq__(self, other) -> bool:
        """Equality comparison"""
    
    def __str__(self) -> str:
        """String representation"""

class Selector:
    """Key expression + parameters combination"""
    
    def __init__(self, selector: str):
        """Create selector from string (keyexpr?param1=val1)"""
    
    @property
    def key_expr(self) -> KeyExpr:
        """Get key expression part"""
    
    @key_expr.setter
    def key_expr(self, value: KeyExpr):
        """Set key expression part"""
    
    @property
    def parameters(self) -> Parameters:
        """Get parameters part"""
    
    @parameters.setter
    def parameters(self, value: Parameters):
        """Set parameters part"""
    
    def __str__(self) -> str:
        """String representation"""

Time and Identity

Temporal ordering and unique identification across the distributed system.

class Timestamp:
    """Timestamping for samples and operations"""
    
    def __init__(self, time: int, id: TimestampId):
        """Create timestamp with time value and ID"""
    
    def get_time(self) -> int:
        """Get time component as NTP64 timestamp"""
    
    def get_id(self) -> TimestampId:
        """Get timestamp ID component"""
    
    def get_diff_duration(self, other: Timestamp) -> float:
        """Get duration difference in seconds"""
    
    def to_string_rfc3339_lossy(self) -> str:
        """Convert to RFC3339 string (may lose precision)"""
    
    @staticmethod
    def parse_rfc3339(timestamp: str) -> Timestamp:
        """Parse RFC3339 timestamp string"""
    
    # Comparison operators
    def __eq__(self, other) -> bool:
        """Equality comparison"""
    
    def __lt__(self, other) -> bool:
        """Less than comparison"""
    
    def __le__(self, other) -> bool:
        """Less than or equal comparison"""
    
    def __gt__(self, other) -> bool:
        """Greater than comparison"""
    
    def __ge__(self, other) -> bool:
        """Greater than or equal comparison"""

class TimestampId:
    """Timestamp identifier for uniqueness"""
    
    def __init__(self, id_bytes: bytes):
        """Create timestamp ID from bytes"""
    
    def __bytes__(self) -> bytes:
        """Convert to bytes"""
    
    # Comparison operators
    def __eq__(self, other) -> bool:
        """Equality comparison"""
    
    def __lt__(self, other) -> bool:
        """Less than comparison"""

class ZenohId:
    """Global unique peer identifier"""
    
    def __str__(self) -> str:
        """String representation of ZenohId"""

Entity Identification

Unique identifiers for entities within sessions and across the network.

class EntityGlobalId:
    """Entity global identifier (unstable)"""
    
    @property
    def zid(self) -> ZenohId:
        """Get ZenohId component"""
    
    @property
    def eid(self) -> int:
        """Get entity ID component"""

# Type aliases for input flexibility
EntityId = int  # Integer entity identifier

Input Type Aliases

Type aliases that define acceptable input types for various Zenoh operations.

# Input type aliases for API flexibility
_IntoEncoding = str | Encoding        # Accept string or Encoding
_IntoKeyExpr = str | KeyExpr         # Accept string or KeyExpr  
_IntoParameters = str | Parameters    # Accept string or Parameters
_IntoSelector = str | Selector       # Accept string or Selector
_IntoTimestampId = bytes | TimestampId  # Accept bytes or TimestampId
_IntoWhatAmIMatcher = WhatAmI | WhatAmIMatcher  # Accept node type or matcher
_IntoZBytes = str | bytes | bytearray | ZBytes  # Accept various data types
_IntoQueryConsolidation = ConsolidationMode | QueryConsolidation  # Accept mode or config

Usage Examples

Working with ZBytes

import zenoh

# Create ZBytes from different data types
text_data = zenoh.ZBytes("Hello, Zenoh!")
byte_data = zenoh.ZBytes(b"Binary data")
number_data = zenoh.ZBytes(str(42))

# Convert back to Python types
print(text_data.to_string())     # "Hello, Zenoh!"
print(byte_data.to_bytes())      # b"Binary data"
print(len(number_data))          # Length in bytes

# Use in boolean context
if text_data:
    print("Data is not empty")

# Use as dictionary key (hashable)
data_cache = {
    text_data: "cached_result_1",
    byte_data: "cached_result_2"
}

Encoding Examples

import zenoh

# Create encodings
json_encoding = zenoh.Encoding(zenoh.Encoding.APPLICATION_JSON)
text_encoding = zenoh.Encoding(zenoh.Encoding.TEXT_PLAIN)

# Encoding with schema
protobuf_encoding = zenoh.Encoding(zenoh.Encoding.APPLICATION_PROTOBUF).with_schema("user.proto")

# Use in publisher
session = zenoh.open()
publisher = session.declare_publisher("data/json", encoding=json_encoding)

import json
data = {"temperature": 23.5, "humidity": 65}
publisher.put(json.dumps(data))

publisher.undeclare()
session.close()

Key Expression Operations

import zenoh

# Create key expressions
sensor_key = zenoh.KeyExpr("sensors/temperature/room1")
pattern_key = zenoh.KeyExpr("sensors/**")
config_key = zenoh.KeyExpr("config/network")

# Test relationships
print(pattern_key.includes(sensor_key))    # True - pattern includes specific key
print(sensor_key.intersects(config_key))   # False - different hierarchies

# Join key expressions
base_key = zenoh.KeyExpr("sensors")
specific_key = base_key.concat("/temperature/room2")
print(specific_key)  # "sensors/temperature/room2"

# Combine keys
room_key = zenoh.KeyExpr("room1")
full_key = sensor_key.join(room_key)

Parameters and Selectors

import zenoh

# Create parameters
params = zenoh.Parameters("region=north;limit=10;format=json")

print(params.get("region"))  # "north"
print(params.get("limit"))   # "10"

# Add parameters
new_params = params.insert("timeout", "30")
extended_params = params.extend(zenoh.Parameters("debug=true"))

# Create selector
selector = zenoh.Selector("sensors/temperature?region=north&limit=5")
print(selector.key_expr)     # KeyExpr("sensors/temperature")
print(selector.parameters.get("region"))  # "north"

# Use in queries
session = zenoh.open()
replies = session.get(selector)
for reply in replies:
    if reply.ok:
        print(reply.ok.payload.to_string())
session.close()

Timestamps

import zenoh

session = zenoh.open()

# Create timestamp
timestamp = session.new_timestamp()
print(f"Time: {timestamp.get_time()}")
print(f"ID: {timestamp.get_id()}")

# RFC3339 format
rfc3339_str = timestamp.to_string_rfc3339_lossy()
print(f"RFC3339: {rfc3339_str}")

# Parse timestamp
parsed = zenoh.Timestamp.parse_rfc3339("2023-01-01T12:00:00Z")

# Compare timestamps
if timestamp > parsed:
    print("Current timestamp is later")

# Duration between timestamps
duration = timestamp.get_diff_duration(parsed)
print(f"Difference: {duration} seconds")

session.close()

Complete Data Types Example

import zenoh
import json
import time

class SensorData:
    def __init__(self, sensor_id: str, value: float, unit: str):
        self.sensor_id = sensor_id
        self.value = value
        self.unit = unit
        self.timestamp = time.time()
    
    def to_json(self) -> str:
        return json.dumps({
            "sensor_id": self.sensor_id,
            "value": self.value,
            "unit": self.unit,
            "timestamp": self.timestamp
        })
    
    @classmethod
    def from_json(cls, json_str: str):
        data = json.loads(json_str)
        return cls(data["sensor_id"], data["value"], data["unit"])

def main():
    session = zenoh.open()
    
    # Create sensor data
    sensor = SensorData("temp_01", 23.5, "°C")
    
    # Create key expression for this sensor
    key_expr = zenoh.KeyExpr(f"sensors/temperature/{sensor.sensor_id}")
    
    # Create encoding for JSON data
    encoding = zenoh.Encoding(zenoh.Encoding.APPLICATION_JSON)
    
    # Declare publisher with encoding
    publisher = session.declare_publisher(key_expr, encoding=encoding)
    
    # Create ZBytes payload
    payload = zenoh.ZBytes(sensor.to_json())
    
    # Create timestamp
    timestamp = session.new_timestamp()
    
    # Publish with metadata
    publisher.put(
        payload,
        timestamp=timestamp,
        attachment={"source": "building_a", "floor": "2"}
    )
    
    print(f"Published sensor data:")
    print(f"  Key: {key_expr}")
    print(f"  Payload: {payload.to_string()}")
    print(f"  Encoding: {encoding}")
    print(f"  Timestamp: {timestamp.to_string_rfc3339_lossy()}")
    
    # Query with parameters
    query_params = zenoh.Parameters("unit=°C;recent=true")
    selector = zenoh.Selector("sensors/temperature/**")
    selector.parameters = query_params
    
    print(f"\nQuerying with selector: {selector}")
    
    # Simulate some delay
    time.sleep(0.1)
    
    # Query the data back
    replies = session.get(selector)
    for reply in replies:
        if reply.ok:
            sample = reply.ok
            received_data = SensorData.from_json(sample.payload.to_string())
            print(f"Received: {received_data.sensor_id} = {received_data.value} {received_data.unit}")
    
    # Cleanup
    publisher.undeclare()
    session.close()

if __name__ == "__main__":
    main()

Install with Tessl CLI

npx tessl i tessl/pypi-eclipse-zenoh

docs

advanced.md

data-types.md

extensions.md

handlers.md

index.md

pubsub.md

query.md

session-management.md

tile.json