0
# Query/Queryable Pattern
1
2
The Query/Queryable pattern enables request-response messaging in Zenoh applications. Queries request data from distributed sources, while Queryables provide responses. This pattern supports both simple data retrieval and complex distributed computations with flexible targeting and consolidation options.
3
4
## Capabilities
5
6
### Querying Data
7
8
Send queries to retrieve data from distributed sources.
9
10
```python { .api }
11
def get(
12
self,
13
selector,
14
handler = None,
15
target: QueryTarget = None,
16
consolidation: QueryConsolidation = None,
17
value = None,
18
encoding: Encoding = None,
19
attachment = None,
20
timeout: float = None
21
):
22
"""
23
Query data from the network.
24
25
Parameters:
26
- selector: Selector expression (key + optional parameters)
27
- handler: Handler for receiving replies
28
- target: Query targeting strategy
29
- consolidation: Reply consolidation mode
30
- value: Optional payload to send with query
31
- encoding: Encoding for the query payload
32
- attachment: Additional metadata
33
- timeout: Query timeout in seconds
34
35
Returns:
36
Iterator over Reply objects if no handler provided
37
"""
38
```
39
40
### Query Replies
41
42
Handle responses to queries with success and error cases.
43
44
```python { .api }
45
class Reply:
46
"""Query reply"""
47
48
@property
49
def result(self):
50
"""Reply result (success or error)"""
51
52
@property
53
def ok(self) -> Sample:
54
"""Success reply data (None if error)"""
55
56
@property
57
def err(self) -> ReplyError:
58
"""Error reply data (None if success)"""
59
60
@property
61
def replier_id(self) -> ZenohId:
62
"""ID of the replier"""
63
64
class ReplyError:
65
"""Query reply error"""
66
67
@property
68
def payload(self) -> ZBytes:
69
"""Error payload"""
70
71
@property
72
def encoding(self) -> Encoding:
73
"""Error encoding"""
74
```
75
76
### Query Targeting
77
78
Control which queryables should respond to queries.
79
80
```python { .api }
81
class QueryTarget:
82
"""Query targeting modes"""
83
BEST_MATCHING = ... # Target best matching queryable
84
ALL = ... # Target all matching queryables
85
ALL_COMPLETE = ... # Target all, wait for complete responses
86
87
DEFAULT = ...
88
89
class QueryConsolidation:
90
"""Query consolidation configuration"""
91
92
@property
93
def mode(self) -> ConsolidationMode:
94
"""Consolidation mode"""
95
96
AUTO = ... # Automatic consolidation
97
DEFAULT = ... # Default consolidation
98
99
class ConsolidationMode:
100
"""Consolidation modes"""
101
AUTO = ... # Automatic consolidation
102
NONE = ... # No consolidation
103
MONOTONIC = ... # Monotonic consolidation
104
LATEST = ... # Latest value only
105
106
DEFAULT = ...
107
```
108
109
### Queryable Services
110
111
Provide data or services in response to queries.
112
113
```python { .api }
114
def declare_queryable(
115
self,
116
key_expr,
117
handler,
118
complete: bool = False
119
) -> Queryable:
120
"""
121
Declare a queryable for a key expression.
122
123
Parameters:
124
- key_expr: Key expression pattern to handle queries for
125
- handler: Handler for received queries
126
- complete: Whether this queryable provides complete answers
127
128
Returns:
129
Queryable object for handling queries
130
"""
131
132
class Queryable:
133
"""Queryable with generic handler"""
134
135
@property
136
def key_expr(self) -> KeyExpr:
137
"""Get the queryable's key expression"""
138
139
@property
140
def handler(self):
141
"""Get the queryable's handler"""
142
143
def undeclare(self) -> None:
144
"""Undeclare the queryable and release resources"""
145
146
def try_recv(self):
147
"""Try to receive a query without blocking"""
148
149
def recv(self):
150
"""Receive a query (blocking)"""
151
152
def __iter__(self):
153
"""Iterate over received queries"""
154
```
155
156
### Query Handling
157
158
Process incoming queries and send appropriate replies.
159
160
```python { .api }
161
class Query:
162
"""Query received by queryable"""
163
164
@property
165
def selector(self) -> Selector:
166
"""Query selector (key expression + parameters)"""
167
168
@property
169
def key_expr(self) -> KeyExpr:
170
"""Query key expression"""
171
172
@property
173
def parameters(self) -> Parameters:
174
"""Query parameters"""
175
176
@property
177
def payload(self) -> ZBytes:
178
"""Query payload (optional)"""
179
180
@property
181
def encoding(self) -> Encoding:
182
"""Query payload encoding"""
183
184
@property
185
def attachment(self):
186
"""Query attachment metadata"""
187
188
def reply(
189
self,
190
payload,
191
encoding: Encoding = None,
192
timestamp: Timestamp = None,
193
attachment = None
194
) -> None:
195
"""
196
Send a successful reply to the query.
197
198
Parameters:
199
- payload: Reply data
200
- encoding: Data encoding
201
- timestamp: Reply timestamp
202
- attachment: Additional metadata
203
"""
204
205
def reply_err(
206
self,
207
payload,
208
encoding: Encoding = None
209
) -> None:
210
"""
211
Send an error reply to the query.
212
213
Parameters:
214
- payload: Error message or data
215
- encoding: Error encoding
216
"""
217
218
def reply_del(
219
self,
220
timestamp: Timestamp = None,
221
attachment = None
222
) -> None:
223
"""
224
Send a delete reply to the query.
225
226
Parameters:
227
- timestamp: Delete timestamp
228
- attachment: Additional metadata
229
"""
230
231
def drop(self) -> None:
232
"""Drop the query without replying"""
233
```
234
235
### Advanced Querying
236
237
Enhanced querying capabilities with additional features.
238
239
```python { .api }
240
def declare_querier(
241
self,
242
key_expr,
243
target: QueryTarget = None,
244
consolidation: QueryConsolidation = None,
245
timeout: float = None
246
) -> Querier:
247
"""
248
Declare a querier for repeated queries.
249
250
Parameters:
251
- key_expr: Key expression for queries
252
- target: Default query targeting
253
- consolidation: Default consolidation mode
254
- timeout: Default query timeout
255
256
Returns:
257
Querier object for sending queries
258
"""
259
260
class Querier:
261
"""Querier for sending queries"""
262
263
@property
264
def key_expr(self) -> KeyExpr:
265
"""Get the querier's key expression"""
266
267
@property
268
def matching_status(self) -> MatchingStatus:
269
"""Get current matching status"""
270
271
def get(
272
self,
273
parameters = None,
274
handler = None,
275
target: QueryTarget = None,
276
consolidation: QueryConsolidation = None,
277
value = None,
278
encoding: Encoding = None,
279
attachment = None,
280
timeout: float = None
281
):
282
"""
283
Send a query using this querier.
284
285
Parameters:
286
- parameters: Query parameters to add to key expression
287
- handler: Handler for replies
288
- target: Override default targeting
289
- consolidation: Override default consolidation
290
- value: Query payload
291
- encoding: Payload encoding
292
- attachment: Additional metadata
293
- timeout: Override default timeout
294
295
Returns:
296
Iterator over replies if no handler provided
297
"""
298
299
def undeclare(self) -> None:
300
"""Undeclare the querier"""
301
302
def declare_matching_listener(self, handler) -> MatchingListener:
303
"""Declare a listener for matching status changes"""
304
```
305
306
## Usage Examples
307
308
### Simple Query
309
310
```python
311
import zenoh
312
313
session = zenoh.open()
314
315
# Simple query for all data under a key
316
replies = session.get("sensors/**")
317
318
for reply in replies:
319
if reply.ok:
320
sample = reply.ok
321
print(f"Data from {sample.key_expr}: {sample.payload.to_string()}")
322
else:
323
print(f"Error: {reply.err.payload.to_string()}")
324
325
session.close()
326
```
327
328
### Query with Parameters
329
330
```python
331
import zenoh
332
333
session = zenoh.open()
334
335
# Query with parameters
336
replies = session.get("sensors/temperature?region=north&limit=10")
337
338
for reply in replies:
339
if reply.ok:
340
sample = reply.ok
341
print(f"Temperature: {sample.payload.to_string()}")
342
343
session.close()
344
```
345
346
### Query with Payload
347
348
```python
349
import zenoh
350
351
session = zenoh.open()
352
353
# Send query with data
354
query_data = {"operation": "compute", "params": [1, 2, 3]}
355
replies = session.get(
356
"compute/service",
357
value=str(query_data),
358
encoding=zenoh.Encoding.APPLICATION_JSON
359
)
360
361
for reply in replies:
362
if reply.ok:
363
result = reply.ok.payload.to_string()
364
print(f"Computation result: {result}")
365
366
session.close()
367
```
368
369
### Simple Queryable
370
371
```python
372
import zenoh
373
374
def query_handler(query):
375
print(f"Query on {query.key_expr}")
376
377
# Extract parameters
378
region = query.parameters.get("region")
379
limit = query.parameters.get("limit")
380
381
# Generate response based on query
382
if "temperature" in str(query.key_expr):
383
data = f"Temperature: 23.5°C (region: {region})"
384
query.reply(data)
385
else:
386
query.reply_err("Unknown sensor type")
387
388
session = zenoh.open()
389
390
# Declare queryable
391
queryable = session.declare_queryable("sensors/**", query_handler)
392
393
# Let it run
394
import time
395
time.sleep(30)
396
397
queryable.undeclare()
398
session.close()
399
```
400
401
### Queryable with Complex Logic
402
403
```python
404
import zenoh
405
import json
406
407
class TemperatureService:
408
def __init__(self):
409
self.sensors = {
410
"sensors/temperature/room1": 23.5,
411
"sensors/temperature/room2": 24.1,
412
"sensors/temperature/outside": 18.3
413
}
414
415
def handle_query(self, query):
416
print(f"Query: {query.selector}")
417
418
# Check if query has parameters
419
region = query.parameters.get("region")
420
limit = query.parameters.get("limit")
421
422
# Filter sensors based on parameters
423
results = []
424
for key, temp in self.sensors.items():
425
if region and region not in key:
426
continue
427
results.append({"key": key, "temperature": temp})
428
429
if limit and len(results) >= int(limit):
430
break
431
432
if results:
433
# Send multiple replies for each result
434
for result in results:
435
query.reply(
436
json.dumps(result),
437
encoding=zenoh.Encoding.APPLICATION_JSON
438
)
439
else:
440
query.reply_err("No sensors found matching criteria")
441
442
service = TemperatureService()
443
session = zenoh.open()
444
445
queryable = session.declare_queryable(
446
"sensors/**",
447
service.handle_query,
448
complete=True # This queryable provides complete answers
449
)
450
451
print("Temperature service running...")
452
import time
453
time.sleep(60)
454
455
queryable.undeclare()
456
session.close()
457
```
458
459
### Querier for Repeated Queries
460
461
```python
462
import zenoh
463
import time
464
465
session = zenoh.open()
466
467
# Declare querier for repeated use
468
querier = session.declare_querier(
469
"status/services",
470
target=zenoh.QueryTarget.ALL,
471
timeout=5.0
472
)
473
474
def check_services():
475
replies = querier.get()
476
services = []
477
478
for reply in replies:
479
if reply.ok:
480
services.append(reply.ok.payload.to_string())
481
482
return services
483
484
# Use querier multiple times
485
for i in range(5):
486
services = check_services()
487
print(f"Check {i+1}: Found {len(services)} services")
488
time.sleep(10)
489
490
querier.undeclare()
491
session.close()
492
```
493
494
### Query with Consolidation
495
496
```python
497
import zenoh
498
499
session = zenoh.open()
500
501
# Query with specific consolidation mode
502
replies = session.get(
503
"sensors/temperature/**",
504
target=zenoh.QueryTarget.ALL,
505
consolidation=zenoh.QueryConsolidation.DEFAULT
506
)
507
508
# Process consolidated replies
509
temperatures = []
510
for reply in replies:
511
if reply.ok:
512
temp = float(reply.ok.payload.to_string())
513
temperatures.append(temp)
514
515
if temperatures:
516
avg_temp = sum(temperatures) / len(temperatures)
517
print(f"Average temperature: {avg_temp:.1f}°C")
518
519
session.close()
520
```
521
522
### Complete Query/Queryable Example
523
524
```python
525
import zenoh
526
import threading
527
import time
528
import json
529
530
class DataStore:
531
def __init__(self):
532
self.data = {
533
"users/alice": {"name": "Alice", "age": 30},
534
"users/bob": {"name": "Bob", "age": 25},
535
"config/timeout": 30,
536
"config/retries": 3
537
}
538
539
def handle_query(self, query):
540
key = str(query.key_expr)
541
542
if key in self.data:
543
response = json.dumps(self.data[key])
544
query.reply(response, encoding=zenoh.Encoding.APPLICATION_JSON)
545
else:
546
# Pattern matching for wildcard queries
547
matches = [k for k in self.data.keys() if k.startswith(key.rstrip('*'))]
548
if matches:
549
for match in matches:
550
response = json.dumps({match: self.data[match]})
551
query.reply(response, encoding=zenoh.Encoding.APPLICATION_JSON)
552
else:
553
query.reply_err(f"No data found for {key}")
554
555
def queryable_thread():
556
store = DataStore()
557
session = zenoh.open()
558
559
queryable = session.declare_queryable("**", store.handle_query)
560
print("Data store queryable running...")
561
562
time.sleep(15)
563
564
queryable.undeclare()
565
session.close()
566
print("Queryable stopped")
567
568
def client_thread():
569
time.sleep(1) # Let queryable start first
570
571
session = zenoh.open()
572
573
# Query specific user
574
print("Querying specific user...")
575
replies = session.get("users/alice")
576
for reply in replies:
577
if reply.ok:
578
user_data = reply.ok.payload.to_string()
579
print(f"User data: {user_data}")
580
581
time.sleep(1)
582
583
# Query all users
584
print("Querying all users...")
585
replies = session.get("users/*", target=zenoh.QueryTarget.ALL)
586
for reply in replies:
587
if reply.ok:
588
data = reply.ok.payload.to_string()
589
print(f"Found: {data}")
590
591
time.sleep(1)
592
593
# Query all config
594
print("Querying config...")
595
replies = session.get("config/**")
596
for reply in replies:
597
if reply.ok:
598
config = reply.ok.payload.to_string()
599
print(f"Config: {config}")
600
601
session.close()
602
print("Client done")
603
604
# Run both threads
605
queryable_t = threading.Thread(target=queryable_thread)
606
client_t = threading.Thread(target=client_thread)
607
608
queryable_t.start()
609
client_t.start()
610
611
queryable_t.join()
612
client_t.join()
613
```