or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

advanced.mddata-types.mdextensions.mdhandlers.mdindex.mdpubsub.mdquery.mdsession-management.md

query.mddocs/

0

# Query/Queryable Pattern

1

2

The Query/Queryable pattern enables request-response messaging in Zenoh applications. Queries request data from distributed sources, while Queryables provide responses. This pattern supports both simple data retrieval and complex distributed computations with flexible targeting and consolidation options.

3

4

## Capabilities

5

6

### Querying Data

7

8

Send queries to retrieve data from distributed sources.

9

10

```python { .api }

11

def get(

12

self,

13

selector,

14

handler = None,

15

target: QueryTarget = None,

16

consolidation: QueryConsolidation = None,

17

value = None,

18

encoding: Encoding = None,

19

attachment = None,

20

timeout: float = None

21

):

22

"""

23

Query data from the network.

24

25

Parameters:

26

- selector: Selector expression (key + optional parameters)

27

- handler: Handler for receiving replies

28

- target: Query targeting strategy

29

- consolidation: Reply consolidation mode

30

- value: Optional payload to send with query

31

- encoding: Encoding for the query payload

32

- attachment: Additional metadata

33

- timeout: Query timeout in seconds

34

35

Returns:

36

Iterator over Reply objects if no handler provided

37

"""

38

```

39

40

### Query Replies

41

42

Handle responses to queries with success and error cases.

43

44

```python { .api }

45

class Reply:

46

"""Query reply"""

47

48

@property

49

def result(self):

50

"""Reply result (success or error)"""

51

52

@property

53

def ok(self) -> Sample:

54

"""Success reply data (None if error)"""

55

56

@property

57

def err(self) -> ReplyError:

58

"""Error reply data (None if success)"""

59

60

@property

61

def replier_id(self) -> ZenohId:

62

"""ID of the replier"""

63

64

class ReplyError:

65

"""Query reply error"""

66

67

@property

68

def payload(self) -> ZBytes:

69

"""Error payload"""

70

71

@property

72

def encoding(self) -> Encoding:

73

"""Error encoding"""

74

```

75

76

### Query Targeting

77

78

Control which queryables should respond to queries.

79

80

```python { .api }

81

class QueryTarget:

82

"""Query targeting modes"""

83

BEST_MATCHING = ... # Target best matching queryable

84

ALL = ... # Target all matching queryables

85

ALL_COMPLETE = ... # Target all, wait for complete responses

86

87

DEFAULT = ...

88

89

class QueryConsolidation:

90

"""Query consolidation configuration"""

91

92

@property

93

def mode(self) -> ConsolidationMode:

94

"""Consolidation mode"""

95

96

AUTO = ... # Automatic consolidation

97

DEFAULT = ... # Default consolidation

98

99

class ConsolidationMode:

100

"""Consolidation modes"""

101

AUTO = ... # Automatic consolidation

102

NONE = ... # No consolidation

103

MONOTONIC = ... # Monotonic consolidation

104

LATEST = ... # Latest value only

105

106

DEFAULT = ...

107

```

108

109

### Queryable Services

110

111

Provide data or services in response to queries.

112

113

```python { .api }

114

def declare_queryable(

115

self,

116

key_expr,

117

handler,

118

complete: bool = False

119

) -> Queryable:

120

"""

121

Declare a queryable for a key expression.

122

123

Parameters:

124

- key_expr: Key expression pattern to handle queries for

125

- handler: Handler for received queries

126

- complete: Whether this queryable provides complete answers

127

128

Returns:

129

Queryable object for handling queries

130

"""

131

132

class Queryable:

133

"""Queryable with generic handler"""

134

135

@property

136

def key_expr(self) -> KeyExpr:

137

"""Get the queryable's key expression"""

138

139

@property

140

def handler(self):

141

"""Get the queryable's handler"""

142

143

def undeclare(self) -> None:

144

"""Undeclare the queryable and release resources"""

145

146

def try_recv(self):

147

"""Try to receive a query without blocking"""

148

149

def recv(self):

150

"""Receive a query (blocking)"""

151

152

def __iter__(self):

153

"""Iterate over received queries"""

154

```

155

156

### Query Handling

157

158

Process incoming queries and send appropriate replies.

159

160

```python { .api }

161

class Query:

162

"""Query received by queryable"""

163

164

@property

165

def selector(self) -> Selector:

166

"""Query selector (key expression + parameters)"""

167

168

@property

169

def key_expr(self) -> KeyExpr:

170

"""Query key expression"""

171

172

@property

173

def parameters(self) -> Parameters:

174

"""Query parameters"""

175

176

@property

177

def payload(self) -> ZBytes:

178

"""Query payload (optional)"""

179

180

@property

181

def encoding(self) -> Encoding:

182

"""Query payload encoding"""

183

184

@property

185

def attachment(self):

186

"""Query attachment metadata"""

187

188

def reply(

189

self,

190

payload,

191

encoding: Encoding = None,

192

timestamp: Timestamp = None,

193

attachment = None

194

) -> None:

195

"""

196

Send a successful reply to the query.

197

198

Parameters:

199

- payload: Reply data

200

- encoding: Data encoding

201

- timestamp: Reply timestamp

202

- attachment: Additional metadata

203

"""

204

205

def reply_err(

206

self,

207

payload,

208

encoding: Encoding = None

209

) -> None:

210

"""

211

Send an error reply to the query.

212

213

Parameters:

214

- payload: Error message or data

215

- encoding: Error encoding

216

"""

217

218

def reply_del(

219

self,

220

timestamp: Timestamp = None,

221

attachment = None

222

) -> None:

223

"""

224

Send a delete reply to the query.

225

226

Parameters:

227

- timestamp: Delete timestamp

228

- attachment: Additional metadata

229

"""

230

231

def drop(self) -> None:

232

"""Drop the query without replying"""

233

```

234

235

### Advanced Querying

236

237

Enhanced querying capabilities with additional features.

238

239

```python { .api }

240

def declare_querier(

241

self,

242

key_expr,

243

target: QueryTarget = None,

244

consolidation: QueryConsolidation = None,

245

timeout: float = None

246

) -> Querier:

247

"""

248

Declare a querier for repeated queries.

249

250

Parameters:

251

- key_expr: Key expression for queries

252

- target: Default query targeting

253

- consolidation: Default consolidation mode

254

- timeout: Default query timeout

255

256

Returns:

257

Querier object for sending queries

258

"""

259

260

class Querier:

261

"""Querier for sending queries"""

262

263

@property

264

def key_expr(self) -> KeyExpr:

265

"""Get the querier's key expression"""

266

267

@property

268

def matching_status(self) -> MatchingStatus:

269

"""Get current matching status"""

270

271

def get(

272

self,

273

parameters = None,

274

handler = None,

275

target: QueryTarget = None,

276

consolidation: QueryConsolidation = None,

277

value = None,

278

encoding: Encoding = None,

279

attachment = None,

280

timeout: float = None

281

):

282

"""

283

Send a query using this querier.

284

285

Parameters:

286

- parameters: Query parameters to add to key expression

287

- handler: Handler for replies

288

- target: Override default targeting

289

- consolidation: Override default consolidation

290

- value: Query payload

291

- encoding: Payload encoding

292

- attachment: Additional metadata

293

- timeout: Override default timeout

294

295

Returns:

296

Iterator over replies if no handler provided

297

"""

298

299

def undeclare(self) -> None:

300

"""Undeclare the querier"""

301

302

def declare_matching_listener(self, handler) -> MatchingListener:

303

"""Declare a listener for matching status changes"""

304

```

305

306

## Usage Examples

307

308

### Simple Query

309

310

```python

311

import zenoh

312

313

session = zenoh.open()

314

315

# Simple query for all data under a key

316

replies = session.get("sensors/**")

317

318

for reply in replies:

319

if reply.ok:

320

sample = reply.ok

321

print(f"Data from {sample.key_expr}: {sample.payload.to_string()}")

322

else:

323

print(f"Error: {reply.err.payload.to_string()}")

324

325

session.close()

326

```

327

328

### Query with Parameters

329

330

```python

331

import zenoh

332

333

session = zenoh.open()

334

335

# Query with parameters

336

replies = session.get("sensors/temperature?region=north&limit=10")

337

338

for reply in replies:

339

if reply.ok:

340

sample = reply.ok

341

print(f"Temperature: {sample.payload.to_string()}")

342

343

session.close()

344

```

345

346

### Query with Payload

347

348

```python

349

import zenoh

350

351

session = zenoh.open()

352

353

# Send query with data

354

query_data = {"operation": "compute", "params": [1, 2, 3]}

355

replies = session.get(

356

"compute/service",

357

value=str(query_data),

358

encoding=zenoh.Encoding.APPLICATION_JSON

359

)

360

361

for reply in replies:

362

if reply.ok:

363

result = reply.ok.payload.to_string()

364

print(f"Computation result: {result}")

365

366

session.close()

367

```

368

369

### Simple Queryable

370

371

```python

372

import zenoh

373

374

def query_handler(query):

375

print(f"Query on {query.key_expr}")

376

377

# Extract parameters

378

region = query.parameters.get("region")

379

limit = query.parameters.get("limit")

380

381

# Generate response based on query

382

if "temperature" in str(query.key_expr):

383

data = f"Temperature: 23.5°C (region: {region})"

384

query.reply(data)

385

else:

386

query.reply_err("Unknown sensor type")

387

388

session = zenoh.open()

389

390

# Declare queryable

391

queryable = session.declare_queryable("sensors/**", query_handler)

392

393

# Let it run

394

import time

395

time.sleep(30)

396

397

queryable.undeclare()

398

session.close()

399

```

400

401

### Queryable with Complex Logic

402

403

```python

404

import zenoh

405

import json

406

407

class TemperatureService:

408

def __init__(self):

409

self.sensors = {

410

"sensors/temperature/room1": 23.5,

411

"sensors/temperature/room2": 24.1,

412

"sensors/temperature/outside": 18.3

413

}

414

415

def handle_query(self, query):

416

print(f"Query: {query.selector}")

417

418

# Check if query has parameters

419

region = query.parameters.get("region")

420

limit = query.parameters.get("limit")

421

422

# Filter sensors based on parameters

423

results = []

424

for key, temp in self.sensors.items():

425

if region and region not in key:

426

continue

427

results.append({"key": key, "temperature": temp})

428

429

if limit and len(results) >= int(limit):

430

break

431

432

if results:

433

# Send multiple replies for each result

434

for result in results:

435

query.reply(

436

json.dumps(result),

437

encoding=zenoh.Encoding.APPLICATION_JSON

438

)

439

else:

440

query.reply_err("No sensors found matching criteria")

441

442

service = TemperatureService()

443

session = zenoh.open()

444

445

queryable = session.declare_queryable(

446

"sensors/**",

447

service.handle_query,

448

complete=True # This queryable provides complete answers

449

)

450

451

print("Temperature service running...")

452

import time

453

time.sleep(60)

454

455

queryable.undeclare()

456

session.close()

457

```

458

459

### Querier for Repeated Queries

460

461

```python

462

import zenoh

463

import time

464

465

session = zenoh.open()

466

467

# Declare querier for repeated use

468

querier = session.declare_querier(

469

"status/services",

470

target=zenoh.QueryTarget.ALL,

471

timeout=5.0

472

)

473

474

def check_services():

475

replies = querier.get()

476

services = []

477

478

for reply in replies:

479

if reply.ok:

480

services.append(reply.ok.payload.to_string())

481

482

return services

483

484

# Use querier multiple times

485

for i in range(5):

486

services = check_services()

487

print(f"Check {i+1}: Found {len(services)} services")

488

time.sleep(10)

489

490

querier.undeclare()

491

session.close()

492

```

493

494

### Query with Consolidation

495

496

```python

497

import zenoh

498

499

session = zenoh.open()

500

501

# Query with specific consolidation mode

502

replies = session.get(

503

"sensors/temperature/**",

504

target=zenoh.QueryTarget.ALL,

505

consolidation=zenoh.QueryConsolidation.DEFAULT

506

)

507

508

# Process consolidated replies

509

temperatures = []

510

for reply in replies:

511

if reply.ok:

512

temp = float(reply.ok.payload.to_string())

513

temperatures.append(temp)

514

515

if temperatures:

516

avg_temp = sum(temperatures) / len(temperatures)

517

print(f"Average temperature: {avg_temp:.1f}°C")

518

519

session.close()

520

```

521

522

### Complete Query/Queryable Example

523

524

```python

525

import zenoh

526

import threading

527

import time

528

import json

529

530

class DataStore:

531

def __init__(self):

532

self.data = {

533

"users/alice": {"name": "Alice", "age": 30},

534

"users/bob": {"name": "Bob", "age": 25},

535

"config/timeout": 30,

536

"config/retries": 3

537

}

538

539

def handle_query(self, query):

540

key = str(query.key_expr)

541

542

if key in self.data:

543

response = json.dumps(self.data[key])

544

query.reply(response, encoding=zenoh.Encoding.APPLICATION_JSON)

545

else:

546

# Pattern matching for wildcard queries

547

matches = [k for k in self.data.keys() if k.startswith(key.rstrip('*'))]

548

if matches:

549

for match in matches:

550

response = json.dumps({match: self.data[match]})

551

query.reply(response, encoding=zenoh.Encoding.APPLICATION_JSON)

552

else:

553

query.reply_err(f"No data found for {key}")

554

555

def queryable_thread():

556

store = DataStore()

557

session = zenoh.open()

558

559

queryable = session.declare_queryable("**", store.handle_query)

560

print("Data store queryable running...")

561

562

time.sleep(15)

563

564

queryable.undeclare()

565

session.close()

566

print("Queryable stopped")

567

568

def client_thread():

569

time.sleep(1) # Let queryable start first

570

571

session = zenoh.open()

572

573

# Query specific user

574

print("Querying specific user...")

575

replies = session.get("users/alice")

576

for reply in replies:

577

if reply.ok:

578

user_data = reply.ok.payload.to_string()

579

print(f"User data: {user_data}")

580

581

time.sleep(1)

582

583

# Query all users

584

print("Querying all users...")

585

replies = session.get("users/*", target=zenoh.QueryTarget.ALL)

586

for reply in replies:

587

if reply.ok:

588

data = reply.ok.payload.to_string()

589

print(f"Found: {data}")

590

591

time.sleep(1)

592

593

# Query all config

594

print("Querying config...")

595

replies = session.get("config/**")

596

for reply in replies:

597

if reply.ok:

598

config = reply.ok.payload.to_string()

599

print(f"Config: {config}")

600

601

session.close()

602

print("Client done")

603

604

# Run both threads

605

queryable_t = threading.Thread(target=queryable_thread)

606

client_t = threading.Thread(target=client_thread)

607

608

queryable_t.start()

609

client_t.start()

610

611

queryable_t.join()

612

client_t.join()

613

```