The python API for Eclipse zenoh - a high-performance networking library providing pub/sub, store/query and compute framework for zero-overhead communication
—
The Query/Queryable pattern enables request-response messaging in Zenoh applications. Queries request data from distributed sources, while Queryables provide responses. This pattern supports both simple data retrieval and complex distributed computations with flexible targeting and consolidation options.
Send queries to retrieve data from distributed sources.
def get(
self,
selector,
handler = None,
target: QueryTarget = None,
consolidation: QueryConsolidation = None,
value = None,
encoding: Encoding = None,
attachment = None,
timeout: float = None
):
"""
Query data from the network.
Parameters:
- selector: Selector expression (key + optional parameters)
- handler: Handler for receiving replies
- target: Query targeting strategy
- consolidation: Reply consolidation mode
- value: Optional payload to send with query
- encoding: Encoding for the query payload
- attachment: Additional metadata
- timeout: Query timeout in seconds
Returns:
Iterator over Reply objects if no handler provided
"""Handle responses to queries with success and error cases.
class Reply:
"""Query reply"""
@property
def result(self):
"""Reply result (success or error)"""
@property
def ok(self) -> Sample:
"""Success reply data (None if error)"""
@property
def err(self) -> ReplyError:
"""Error reply data (None if success)"""
@property
def replier_id(self) -> ZenohId:
"""ID of the replier"""
class ReplyError:
"""Query reply error"""
@property
def payload(self) -> ZBytes:
"""Error payload"""
@property
def encoding(self) -> Encoding:
"""Error encoding"""Control which queryables should respond to queries.
class QueryTarget:
"""Query targeting modes"""
BEST_MATCHING = ... # Target best matching queryable
ALL = ... # Target all matching queryables
ALL_COMPLETE = ... # Target all, wait for complete responses
DEFAULT = ...
class QueryConsolidation:
"""Query consolidation configuration"""
@property
def mode(self) -> ConsolidationMode:
"""Consolidation mode"""
AUTO = ... # Automatic consolidation
DEFAULT = ... # Default consolidation
class ConsolidationMode:
"""Consolidation modes"""
AUTO = ... # Automatic consolidation
NONE = ... # No consolidation
MONOTONIC = ... # Monotonic consolidation
LATEST = ... # Latest value only
DEFAULT = ...Provide data or services in response to queries.
def declare_queryable(
self,
key_expr,
handler,
complete: bool = False
) -> Queryable:
"""
Declare a queryable for a key expression.
Parameters:
- key_expr: Key expression pattern to handle queries for
- handler: Handler for received queries
- complete: Whether this queryable provides complete answers
Returns:
Queryable object for handling queries
"""
class Queryable:
"""Queryable with generic handler"""
@property
def key_expr(self) -> KeyExpr:
"""Get the queryable's key expression"""
@property
def handler(self):
"""Get the queryable's handler"""
def undeclare(self) -> None:
"""Undeclare the queryable and release resources"""
def try_recv(self):
"""Try to receive a query without blocking"""
def recv(self):
"""Receive a query (blocking)"""
def __iter__(self):
"""Iterate over received queries"""Process incoming queries and send appropriate replies.
class Query:
"""Query received by queryable"""
@property
def selector(self) -> Selector:
"""Query selector (key expression + parameters)"""
@property
def key_expr(self) -> KeyExpr:
"""Query key expression"""
@property
def parameters(self) -> Parameters:
"""Query parameters"""
@property
def payload(self) -> ZBytes:
"""Query payload (optional)"""
@property
def encoding(self) -> Encoding:
"""Query payload encoding"""
@property
def attachment(self):
"""Query attachment metadata"""
def reply(
self,
payload,
encoding: Encoding = None,
timestamp: Timestamp = None,
attachment = None
) -> None:
"""
Send a successful reply to the query.
Parameters:
- payload: Reply data
- encoding: Data encoding
- timestamp: Reply timestamp
- attachment: Additional metadata
"""
def reply_err(
self,
payload,
encoding: Encoding = None
) -> None:
"""
Send an error reply to the query.
Parameters:
- payload: Error message or data
- encoding: Error encoding
"""
def reply_del(
self,
timestamp: Timestamp = None,
attachment = None
) -> None:
"""
Send a delete reply to the query.
Parameters:
- timestamp: Delete timestamp
- attachment: Additional metadata
"""
def drop(self) -> None:
"""Drop the query without replying"""Enhanced querying capabilities with additional features.
def declare_querier(
self,
key_expr,
target: QueryTarget = None,
consolidation: QueryConsolidation = None,
timeout: float = None
) -> Querier:
"""
Declare a querier for repeated queries.
Parameters:
- key_expr: Key expression for queries
- target: Default query targeting
- consolidation: Default consolidation mode
- timeout: Default query timeout
Returns:
Querier object for sending queries
"""
class Querier:
"""Querier for sending queries"""
@property
def key_expr(self) -> KeyExpr:
"""Get the querier's key expression"""
@property
def matching_status(self) -> MatchingStatus:
"""Get current matching status"""
def get(
self,
parameters = None,
handler = None,
target: QueryTarget = None,
consolidation: QueryConsolidation = None,
value = None,
encoding: Encoding = None,
attachment = None,
timeout: float = None
):
"""
Send a query using this querier.
Parameters:
- parameters: Query parameters to add to key expression
- handler: Handler for replies
- target: Override default targeting
- consolidation: Override default consolidation
- value: Query payload
- encoding: Payload encoding
- attachment: Additional metadata
- timeout: Override default timeout
Returns:
Iterator over replies if no handler provided
"""
def undeclare(self) -> None:
"""Undeclare the querier"""
def declare_matching_listener(self, handler) -> MatchingListener:
"""Declare a listener for matching status changes"""import zenoh
session = zenoh.open()
# Simple query for all data under a key
replies = session.get("sensors/**")
for reply in replies:
if reply.ok:
sample = reply.ok
print(f"Data from {sample.key_expr}: {sample.payload.to_string()}")
else:
print(f"Error: {reply.err.payload.to_string()}")
session.close()import zenoh
session = zenoh.open()
# Query with parameters
replies = session.get("sensors/temperature?region=north&limit=10")
for reply in replies:
if reply.ok:
sample = reply.ok
print(f"Temperature: {sample.payload.to_string()}")
session.close()import zenoh
session = zenoh.open()
# Send query with data
query_data = {"operation": "compute", "params": [1, 2, 3]}
replies = session.get(
"compute/service",
value=str(query_data),
encoding=zenoh.Encoding.APPLICATION_JSON
)
for reply in replies:
if reply.ok:
result = reply.ok.payload.to_string()
print(f"Computation result: {result}")
session.close()import zenoh
def query_handler(query):
print(f"Query on {query.key_expr}")
# Extract parameters
region = query.parameters.get("region")
limit = query.parameters.get("limit")
# Generate response based on query
if "temperature" in str(query.key_expr):
data = f"Temperature: 23.5°C (region: {region})"
query.reply(data)
else:
query.reply_err("Unknown sensor type")
session = zenoh.open()
# Declare queryable
queryable = session.declare_queryable("sensors/**", query_handler)
# Let it run
import time
time.sleep(30)
queryable.undeclare()
session.close()import zenoh
import json
class TemperatureService:
def __init__(self):
self.sensors = {
"sensors/temperature/room1": 23.5,
"sensors/temperature/room2": 24.1,
"sensors/temperature/outside": 18.3
}
def handle_query(self, query):
print(f"Query: {query.selector}")
# Check if query has parameters
region = query.parameters.get("region")
limit = query.parameters.get("limit")
# Filter sensors based on parameters
results = []
for key, temp in self.sensors.items():
if region and region not in key:
continue
results.append({"key": key, "temperature": temp})
if limit and len(results) >= int(limit):
break
if results:
# Send multiple replies for each result
for result in results:
query.reply(
json.dumps(result),
encoding=zenoh.Encoding.APPLICATION_JSON
)
else:
query.reply_err("No sensors found matching criteria")
service = TemperatureService()
session = zenoh.open()
queryable = session.declare_queryable(
"sensors/**",
service.handle_query,
complete=True # This queryable provides complete answers
)
print("Temperature service running...")
import time
time.sleep(60)
queryable.undeclare()
session.close()import zenoh
import time
session = zenoh.open()
# Declare querier for repeated use
querier = session.declare_querier(
"status/services",
target=zenoh.QueryTarget.ALL,
timeout=5.0
)
def check_services():
replies = querier.get()
services = []
for reply in replies:
if reply.ok:
services.append(reply.ok.payload.to_string())
return services
# Use querier multiple times
for i in range(5):
services = check_services()
print(f"Check {i+1}: Found {len(services)} services")
time.sleep(10)
querier.undeclare()
session.close()import zenoh
session = zenoh.open()
# Query with specific consolidation mode
replies = session.get(
"sensors/temperature/**",
target=zenoh.QueryTarget.ALL,
consolidation=zenoh.QueryConsolidation.DEFAULT
)
# Process consolidated replies
temperatures = []
for reply in replies:
if reply.ok:
temp = float(reply.ok.payload.to_string())
temperatures.append(temp)
if temperatures:
avg_temp = sum(temperatures) / len(temperatures)
print(f"Average temperature: {avg_temp:.1f}°C")
session.close()import zenoh
import threading
import time
import json
class DataStore:
def __init__(self):
self.data = {
"users/alice": {"name": "Alice", "age": 30},
"users/bob": {"name": "Bob", "age": 25},
"config/timeout": 30,
"config/retries": 3
}
def handle_query(self, query):
key = str(query.key_expr)
if key in self.data:
response = json.dumps(self.data[key])
query.reply(response, encoding=zenoh.Encoding.APPLICATION_JSON)
else:
# Pattern matching for wildcard queries
matches = [k for k in self.data.keys() if k.startswith(key.rstrip('*'))]
if matches:
for match in matches:
response = json.dumps({match: self.data[match]})
query.reply(response, encoding=zenoh.Encoding.APPLICATION_JSON)
else:
query.reply_err(f"No data found for {key}")
def queryable_thread():
store = DataStore()
session = zenoh.open()
queryable = session.declare_queryable("**", store.handle_query)
print("Data store queryable running...")
time.sleep(15)
queryable.undeclare()
session.close()
print("Queryable stopped")
def client_thread():
time.sleep(1) # Let queryable start first
session = zenoh.open()
# Query specific user
print("Querying specific user...")
replies = session.get("users/alice")
for reply in replies:
if reply.ok:
user_data = reply.ok.payload.to_string()
print(f"User data: {user_data}")
time.sleep(1)
# Query all users
print("Querying all users...")
replies = session.get("users/*", target=zenoh.QueryTarget.ALL)
for reply in replies:
if reply.ok:
data = reply.ok.payload.to_string()
print(f"Found: {data}")
time.sleep(1)
# Query all config
print("Querying config...")
replies = session.get("config/**")
for reply in replies:
if reply.ok:
config = reply.ok.payload.to_string()
print(f"Config: {config}")
session.close()
print("Client done")
# Run both threads
queryable_t = threading.Thread(target=queryable_thread)
client_t = threading.Thread(target=client_thread)
queryable_t.start()
client_t.start()
queryable_t.join()
client_t.join()Install with Tessl CLI
npx tessl i tessl/pypi-eclipse-zenoh