CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-eclipse-zenoh

The python API for Eclipse zenoh - a high-performance networking library providing pub/sub, store/query and compute framework for zero-overhead communication

Pending
Overview
Eval results
Files

query.mddocs/

Query/Queryable Pattern

The Query/Queryable pattern enables request-response messaging in Zenoh applications. Queries request data from distributed sources, while Queryables provide responses. This pattern supports both simple data retrieval and complex distributed computations with flexible targeting and consolidation options.

Capabilities

Querying Data

Send queries to retrieve data from distributed sources.

def get(
    self,
    selector,
    handler = None,
    target: QueryTarget = None,
    consolidation: QueryConsolidation = None,
    value = None,
    encoding: Encoding = None,
    attachment = None,
    timeout: float = None
):
    """
    Query data from the network.
    
    Parameters:
    - selector: Selector expression (key + optional parameters)
    - handler: Handler for receiving replies
    - target: Query targeting strategy
    - consolidation: Reply consolidation mode
    - value: Optional payload to send with query
    - encoding: Encoding for the query payload
    - attachment: Additional metadata
    - timeout: Query timeout in seconds
    
    Returns:
    Iterator over Reply objects if no handler provided
    """

Query Replies

Handle responses to queries with success and error cases.

class Reply:
    """Query reply"""
    
    @property
    def result(self):
        """Reply result (success or error)"""
    
    @property
    def ok(self) -> Sample:
        """Success reply data (None if error)"""
    
    @property
    def err(self) -> ReplyError:
        """Error reply data (None if success)"""
    
    @property
    def replier_id(self) -> ZenohId:
        """ID of the replier"""

class ReplyError:
    """Query reply error"""
    
    @property
    def payload(self) -> ZBytes:
        """Error payload"""
    
    @property
    def encoding(self) -> Encoding:
        """Error encoding"""

Query Targeting

Control which queryables should respond to queries.

class QueryTarget:
    """Query targeting modes"""
    BEST_MATCHING = ...     # Target best matching queryable
    ALL = ...               # Target all matching queryables
    ALL_COMPLETE = ...      # Target all, wait for complete responses
    
    DEFAULT = ...

class QueryConsolidation:
    """Query consolidation configuration"""
    
    @property
    def mode(self) -> ConsolidationMode:
        """Consolidation mode"""
    
    AUTO = ...      # Automatic consolidation
    DEFAULT = ...   # Default consolidation

class ConsolidationMode:
    """Consolidation modes"""
    AUTO = ...       # Automatic consolidation
    NONE = ...       # No consolidation
    MONOTONIC = ...  # Monotonic consolidation
    LATEST = ...     # Latest value only
    
    DEFAULT = ...

Queryable Services

Provide data or services in response to queries.

def declare_queryable(
    self,
    key_expr,
    handler,
    complete: bool = False
) -> Queryable:
    """
    Declare a queryable for a key expression.
    
    Parameters:
    - key_expr: Key expression pattern to handle queries for
    - handler: Handler for received queries
    - complete: Whether this queryable provides complete answers
    
    Returns:
    Queryable object for handling queries
    """

class Queryable:
    """Queryable with generic handler"""
    
    @property
    def key_expr(self) -> KeyExpr:
        """Get the queryable's key expression"""
    
    @property
    def handler(self):
        """Get the queryable's handler"""
    
    def undeclare(self) -> None:
        """Undeclare the queryable and release resources"""
    
    def try_recv(self):
        """Try to receive a query without blocking"""
    
    def recv(self):
        """Receive a query (blocking)"""
    
    def __iter__(self):
        """Iterate over received queries"""

Query Handling

Process incoming queries and send appropriate replies.

class Query:
    """Query received by queryable"""
    
    @property
    def selector(self) -> Selector:
        """Query selector (key expression + parameters)"""
    
    @property
    def key_expr(self) -> KeyExpr:
        """Query key expression"""
    
    @property
    def parameters(self) -> Parameters:
        """Query parameters"""
    
    @property
    def payload(self) -> ZBytes:
        """Query payload (optional)"""
    
    @property
    def encoding(self) -> Encoding:
        """Query payload encoding"""
    
    @property
    def attachment(self):
        """Query attachment metadata"""
    
    def reply(
        self,
        payload,
        encoding: Encoding = None,
        timestamp: Timestamp = None,
        attachment = None
    ) -> None:
        """
        Send a successful reply to the query.
        
        Parameters:
        - payload: Reply data
        - encoding: Data encoding
        - timestamp: Reply timestamp
        - attachment: Additional metadata
        """
    
    def reply_err(
        self,
        payload,
        encoding: Encoding = None
    ) -> None:
        """
        Send an error reply to the query.
        
        Parameters:
        - payload: Error message or data
        - encoding: Error encoding
        """
    
    def reply_del(
        self,
        timestamp: Timestamp = None,
        attachment = None
    ) -> None:
        """
        Send a delete reply to the query.
        
        Parameters:
        - timestamp: Delete timestamp
        - attachment: Additional metadata
        """
    
    def drop(self) -> None:
        """Drop the query without replying"""

Advanced Querying

Enhanced querying capabilities with additional features.

def declare_querier(
    self,
    key_expr,
    target: QueryTarget = None,
    consolidation: QueryConsolidation = None,
    timeout: float = None
) -> Querier:
    """
    Declare a querier for repeated queries.
    
    Parameters:
    - key_expr: Key expression for queries
    - target: Default query targeting
    - consolidation: Default consolidation mode
    - timeout: Default query timeout
    
    Returns:
    Querier object for sending queries
    """

class Querier:
    """Querier for sending queries"""
    
    @property
    def key_expr(self) -> KeyExpr:
        """Get the querier's key expression"""
    
    @property
    def matching_status(self) -> MatchingStatus:
        """Get current matching status"""
    
    def get(
        self,
        parameters = None,
        handler = None,
        target: QueryTarget = None,
        consolidation: QueryConsolidation = None,
        value = None,
        encoding: Encoding = None,
        attachment = None,
        timeout: float = None
    ):
        """
        Send a query using this querier.
        
        Parameters:
        - parameters: Query parameters to add to key expression
        - handler: Handler for replies
        - target: Override default targeting
        - consolidation: Override default consolidation  
        - value: Query payload
        - encoding: Payload encoding
        - attachment: Additional metadata
        - timeout: Override default timeout
        
        Returns:
        Iterator over replies if no handler provided
        """
    
    def undeclare(self) -> None:
        """Undeclare the querier"""
    
    def declare_matching_listener(self, handler) -> MatchingListener:
        """Declare a listener for matching status changes"""

Usage Examples

Simple Query

import zenoh

session = zenoh.open()

# Simple query for all data under a key
replies = session.get("sensors/**")

for reply in replies:
    if reply.ok:
        sample = reply.ok
        print(f"Data from {sample.key_expr}: {sample.payload.to_string()}")
    else:
        print(f"Error: {reply.err.payload.to_string()}")

session.close()

Query with Parameters

import zenoh

session = zenoh.open()

# Query with parameters
replies = session.get("sensors/temperature?region=north&limit=10")

for reply in replies:
    if reply.ok:
        sample = reply.ok
        print(f"Temperature: {sample.payload.to_string()}")

session.close()

Query with Payload

import zenoh

session = zenoh.open()

# Send query with data
query_data = {"operation": "compute", "params": [1, 2, 3]}
replies = session.get(
    "compute/service",
    value=str(query_data),
    encoding=zenoh.Encoding.APPLICATION_JSON
)

for reply in replies:
    if reply.ok:
        result = reply.ok.payload.to_string()
        print(f"Computation result: {result}")

session.close()

Simple Queryable

import zenoh

def query_handler(query):
    print(f"Query on {query.key_expr}")
    
    # Extract parameters
    region = query.parameters.get("region")
    limit = query.parameters.get("limit")
    
    # Generate response based on query
    if "temperature" in str(query.key_expr):
        data = f"Temperature: 23.5°C (region: {region})"
        query.reply(data)
    else:
        query.reply_err("Unknown sensor type")

session = zenoh.open()

# Declare queryable
queryable = session.declare_queryable("sensors/**", query_handler)

# Let it run
import time
time.sleep(30)

queryable.undeclare()
session.close()

Queryable with Complex Logic

import zenoh
import json

class TemperatureService:
    def __init__(self):
        self.sensors = {
            "sensors/temperature/room1": 23.5,
            "sensors/temperature/room2": 24.1,
            "sensors/temperature/outside": 18.3
        }
    
    def handle_query(self, query):
        print(f"Query: {query.selector}")
        
        # Check if query has parameters
        region = query.parameters.get("region")
        limit = query.parameters.get("limit")
        
        # Filter sensors based on parameters
        results = []
        for key, temp in self.sensors.items():
            if region and region not in key:
                continue
            results.append({"key": key, "temperature": temp})
            
            if limit and len(results) >= int(limit):
                break
        
        if results:
            # Send multiple replies for each result
            for result in results:
                query.reply(
                    json.dumps(result),
                    encoding=zenoh.Encoding.APPLICATION_JSON
                )
        else:
            query.reply_err("No sensors found matching criteria")

service = TemperatureService()
session = zenoh.open()

queryable = session.declare_queryable(
    "sensors/**", 
    service.handle_query,
    complete=True  # This queryable provides complete answers
)

print("Temperature service running...")
import time
time.sleep(60)

queryable.undeclare()
session.close()

Querier for Repeated Queries

import zenoh
import time

session = zenoh.open()

# Declare querier for repeated use
querier = session.declare_querier(
    "status/services",
    target=zenoh.QueryTarget.ALL,
    timeout=5.0
)

def check_services():
    replies = querier.get()
    services = []
    
    for reply in replies:
        if reply.ok:
            services.append(reply.ok.payload.to_string())
    
    return services

# Use querier multiple times
for i in range(5):
    services = check_services()
    print(f"Check {i+1}: Found {len(services)} services")
    time.sleep(10)

querier.undeclare()
session.close()

Query with Consolidation

import zenoh

session = zenoh.open()

# Query with specific consolidation mode
replies = session.get(
    "sensors/temperature/**",
    target=zenoh.QueryTarget.ALL,
    consolidation=zenoh.QueryConsolidation.DEFAULT
)

# Process consolidated replies
temperatures = []
for reply in replies:
    if reply.ok:
        temp = float(reply.ok.payload.to_string())
        temperatures.append(temp)

if temperatures:
    avg_temp = sum(temperatures) / len(temperatures)
    print(f"Average temperature: {avg_temp:.1f}°C")

session.close()

Complete Query/Queryable Example

import zenoh
import threading
import time
import json

class DataStore:
    def __init__(self):
        self.data = {
            "users/alice": {"name": "Alice", "age": 30},
            "users/bob": {"name": "Bob", "age": 25},
            "config/timeout": 30,
            "config/retries": 3
        }
    
    def handle_query(self, query):
        key = str(query.key_expr)
        
        if key in self.data:
            response = json.dumps(self.data[key])
            query.reply(response, encoding=zenoh.Encoding.APPLICATION_JSON)
        else:
            # Pattern matching for wildcard queries
            matches = [k for k in self.data.keys() if k.startswith(key.rstrip('*'))]
            if matches:
                for match in matches:
                    response = json.dumps({match: self.data[match]})
                    query.reply(response, encoding=zenoh.Encoding.APPLICATION_JSON)
            else:
                query.reply_err(f"No data found for {key}")

def queryable_thread():
    store = DataStore()
    session = zenoh.open()
    
    queryable = session.declare_queryable("**", store.handle_query)
    print("Data store queryable running...")
    
    time.sleep(15)
    
    queryable.undeclare()
    session.close()
    print("Queryable stopped")

def client_thread():
    time.sleep(1)  # Let queryable start first
    
    session = zenoh.open()
    
    # Query specific user
    print("Querying specific user...")
    replies = session.get("users/alice")
    for reply in replies:
        if reply.ok:
            user_data = reply.ok.payload.to_string()
            print(f"User data: {user_data}")
    
    time.sleep(1)
    
    # Query all users
    print("Querying all users...")
    replies = session.get("users/*", target=zenoh.QueryTarget.ALL)
    for reply in replies:
        if reply.ok:
            data = reply.ok.payload.to_string()
            print(f"Found: {data}")
    
    time.sleep(1)
    
    # Query all config
    print("Querying config...")
    replies = session.get("config/**")
    for reply in replies:
        if reply.ok:
            config = reply.ok.payload.to_string()
            print(f"Config: {config}")
    
    session.close()
    print("Client done")

# Run both threads
queryable_t = threading.Thread(target=queryable_thread)
client_t = threading.Thread(target=client_thread)

queryable_t.start()
client_t.start()

queryable_t.join()
client_t.join()

Install with Tessl CLI

npx tessl i tessl/pypi-eclipse-zenoh

docs

advanced.md

data-types.md

extensions.md

handlers.md

index.md

pubsub.md

query.md

session-management.md

tile.json