or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

advanced.mddata-types.mdextensions.mdhandlers.mdindex.mdpubsub.mdquery.mdsession-management.md

advanced.mddocs/

0

# Advanced Features

1

2

Advanced features in Zenoh provide sophisticated networking capabilities including liveliness detection, matching status monitoring, custom handler systems, and network administration tools. These features enable building robust, self-monitoring distributed systems with fine-grained control over communication patterns.

3

4

## Capabilities

5

6

### Liveliness Management

7

8

Monitor and advertise the availability of distributed system components.

9

10

```python { .api }

11

class Liveliness:

12

"""Liveliness management for distributed system components"""

13

14

def declare_token(self, key_expr) -> LivelinessToken:

15

"""

16

Declare a liveliness token for a key expression.

17

18

Parameters:

19

- key_expr: Key expression to declare liveliness for

20

21

Returns:

22

LivelinessToken that maintains liveliness while active

23

"""

24

25

def get(

26

self,

27

key_expr,

28

handler = None,

29

timeout: float = None

30

):

31

"""

32

Query current liveliness status.

33

34

Parameters:

35

- key_expr: Key expression pattern to query

36

- handler: Handler for liveliness replies

37

- timeout: Query timeout in seconds

38

39

Returns:

40

Iterator over liveliness replies if no handler provided

41

"""

42

43

def declare_subscriber(

44

self,

45

key_expr,

46

handler,

47

history: bool = False

48

) -> Subscriber:

49

"""

50

Subscribe to liveliness changes.

51

52

Parameters:

53

- key_expr: Key expression pattern to monitor

54

- handler: Handler for liveliness change notifications

55

- history: Whether to receive historical liveliness data

56

57

Returns:

58

Subscriber for liveliness changes

59

"""

60

61

class LivelinessToken:

62

"""Liveliness token that maintains component availability"""

63

64

def __enter__(self):

65

"""Context manager entry"""

66

return self

67

68

def __exit__(self, exc_type, exc_val, exc_tb):

69

"""Context manager exit - automatically undeclares token"""

70

71

def undeclare(self) -> None:

72

"""Undeclare the liveliness token"""

73

```

74

75

### Matching Status Monitoring

76

77

Monitor whether publishers/subscribers and queriers/queryables have matching peers.

78

79

```python { .api }

80

class MatchingStatus:

81

"""Entity matching status"""

82

83

@property

84

def matching(self) -> bool:

85

"""Whether there are matching entities on the network"""

86

87

class MatchingListener:

88

"""Matching status listener for monitoring connection changes"""

89

90

@property

91

def handler(self):

92

"""Get the listener's handler"""

93

94

def undeclare(self) -> None:

95

"""Undeclare the matching listener"""

96

97

def try_recv(self):

98

"""Try to receive matching status update without blocking"""

99

100

def recv(self):

101

"""Receive matching status update (blocking)"""

102

103

def __iter__(self):

104

"""Iterate over matching status updates"""

105

```

106

107

### Handler System

108

109

Flexible handler system supporting various callback patterns and channel types.

110

111

```python { .api }

112

from zenoh.handlers import Handler, DefaultHandler, FifoChannel, RingChannel, Callback

113

114

class Handler:

115

"""Generic handler interface for receiving data"""

116

117

def try_recv(self):

118

"""Try to receive data without blocking"""

119

120

def recv(self):

121

"""Receive data (blocking)"""

122

123

def __iter__(self):

124

"""Iterate over received data"""

125

126

def __next__(self):

127

"""Get next item in iteration"""

128

129

class DefaultHandler:

130

"""Default FIFO handler with unlimited capacity"""

131

pass

132

133

class FifoChannel:

134

"""FIFO channel with configurable capacity"""

135

136

def __init__(self, capacity: int):

137

"""

138

Create FIFO channel with specified capacity.

139

140

Parameters:

141

- capacity: Maximum number of items to buffer

142

"""

143

144

class RingChannel:

145

"""Ring channel that overwrites oldest data when full"""

146

147

def __init__(self, capacity: int):

148

"""

149

Create ring channel with fixed capacity.

150

151

Parameters:

152

- capacity: Fixed size of the ring buffer

153

"""

154

155

class Callback:

156

"""Callback handler wrapper for Python functions"""

157

158

@property

159

def callback(self):

160

"""Get the callback function"""

161

162

@property

163

def drop(self):

164

"""Get the drop callback (called when handler is dropped)"""

165

166

@property

167

def indirect(self) -> bool:

168

"""Whether callback is called indirectly"""

169

170

def __init__(

171

self,

172

callback,

173

drop = None,

174

indirect: bool = False

175

):

176

"""

177

Create callback handler.

178

179

Parameters:

180

- callback: Function to call with received data

181

- drop: Optional cleanup function

182

- indirect: Whether to use indirect calling

183

"""

184

185

def __call__(self, *args, **kwargs):

186

"""Call the wrapped callback function"""

187

```

188

189

### Network Administration

190

191

Tools for network configuration and monitoring.

192

193

```python { .api }

194

# Access session's liveliness interface

195

@property

196

def liveliness(self) -> Liveliness:

197

"""Get session's liveliness interface"""

198

199

# Session operations for entity management

200

def undeclare(self, entity) -> None:

201

"""

202

Undeclare any Zenoh entity (publisher, subscriber, etc.)

203

204

Parameters:

205

- entity: The entity to undeclare

206

"""

207

208

def declare_keyexpr(self, key_expr: str) -> KeyExpr:

209

"""

210

Declare a key expression for optimized repeated usage.

211

212

Parameters:

213

- key_expr: Key expression string to optimize

214

215

Returns:

216

Optimized KeyExpr object

217

"""

218

```

219

220

### Error Handling

221

222

Comprehensive error handling for robust distributed applications.

223

224

```python { .api }

225

class ZError(Exception):

226

"""Base exception for all Zenoh errors"""

227

pass

228

```

229

230

## Usage Examples

231

232

### Basic Liveliness

233

234

```python

235

import zenoh

236

import time

237

238

session = zenoh.open()

239

240

# Declare liveliness token

241

token = session.liveliness.declare_token("services/temperature_monitor")

242

243

print("Service is alive...")

244

245

# Service runs for some time

246

time.sleep(10)

247

248

# Cleanup - this signals that service is no longer alive

249

token.undeclare()

250

251

session.close()

252

```

253

254

### Liveliness with Context Manager

255

256

```python

257

import zenoh

258

import time

259

260

session = zenoh.open()

261

262

# Use context manager for automatic cleanup

263

with session.liveliness.declare_token("services/data_processor") as token:

264

print("Data processor service is running...")

265

266

# Simulate service work

267

for i in range(5):

268

print(f"Processing batch {i+1}...")

269

time.sleep(2)

270

271

print("Service work complete")

272

# Token is automatically undeclared when exiting the context

273

274

session.close()

275

```

276

277

### Monitoring Liveliness

278

279

```python

280

import zenoh

281

import time

282

283

def liveliness_handler(sample):

284

if sample.kind == zenoh.SampleKind.PUT:

285

print(f"Service ONLINE: {sample.key_expr}")

286

elif sample.kind == zenoh.SampleKind.DELETE:

287

print(f"Service OFFLINE: {sample.key_expr}")

288

289

session = zenoh.open()

290

291

# Subscribe to liveliness changes for all services

292

subscriber = session.liveliness.declare_subscriber(

293

"services/**",

294

liveliness_handler,

295

history=True # Get current liveliness state

296

)

297

298

print("Monitoring service liveliness...")

299

300

# Let it monitor for a while

301

time.sleep(30)

302

303

subscriber.undeclare()

304

session.close()

305

```

306

307

### Querying Liveliness Status

308

309

```python

310

import zenoh

311

312

session = zenoh.open()

313

314

# Query current liveliness status

315

print("Querying current service status...")

316

replies = session.liveliness.get("services/**", timeout=5.0)

317

318

active_services = []

319

for reply in replies:

320

if reply.ok:

321

service_key = str(reply.ok.key_expr)

322

active_services.append(service_key)

323

print(f"ACTIVE: {service_key}")

324

325

print(f"\nTotal active services: {len(active_services)}")

326

327

session.close()

328

```

329

330

### Matching Status Monitoring

331

332

```python

333

import zenoh

334

import time

335

336

def matching_handler(status):

337

if status.matching:

338

print("Publisher has subscribers!")

339

else:

340

print("No subscribers found")

341

342

session = zenoh.open()

343

344

# Create publisher with matching monitoring

345

publisher = session.declare_publisher("sensor/data")

346

347

# Monitor matching status

348

listener = publisher.declare_matching_listener(matching_handler)

349

350

# Check initial status

351

print(f"Initial matching status: {publisher.matching_status.matching}")

352

353

# Publish some data

354

for i in range(5):

355

publisher.put(f"Data point {i}")

356

time.sleep(1)

357

358

# Cleanup

359

listener.undeclare()

360

publisher.undeclare()

361

session.close()

362

```

363

364

### Custom Handler Examples

365

366

```python

367

import zenoh

368

from zenoh.handlers import FifoChannel, RingChannel, Callback

369

import threading

370

import time

371

372

session = zenoh.open()

373

374

# Example 1: FIFO Channel Handler

375

print("Example 1: FIFO Channel")

376

fifo_handler = FifoChannel(capacity=10)

377

subscriber1 = session.declare_subscriber("data/fifo", fifo_handler)

378

379

# Simulate some data

380

publisher = session.declare_publisher("data/fifo")

381

for i in range(5):

382

publisher.put(f"FIFO message {i}")

383

384

# Receive from FIFO

385

try:

386

sample = subscriber1.recv() # Blocks until data available

387

print(f"FIFO received: {sample.payload.to_string()}")

388

except:

389

pass

390

391

subscriber1.undeclare()

392

393

# Example 2: Ring Channel Handler

394

print("\nExample 2: Ring Channel")

395

ring_handler = RingChannel(capacity=3)

396

subscriber2 = session.declare_subscriber("data/ring", ring_handler)

397

398

# Send more data than ring capacity

399

for i in range(5):

400

publisher.put(f"Ring message {i}")

401

402

# Ring channel only keeps latest 3 messages

403

for sample in subscriber2:

404

print(f"Ring received: {sample.payload.to_string()}")

405

break # Just get first one for demo

406

407

subscriber2.undeclare()

408

409

# Example 3: Callback Handler

410

print("\nExample 3: Callback Handler")

411

def my_callback(sample):

412

print(f"Callback received: {sample.payload.to_string()}")

413

414

def cleanup_callback():

415

print("Callback handler cleanup")

416

417

callback_handler = Callback(

418

callback=my_callback,

419

drop=cleanup_callback

420

)

421

422

subscriber3 = session.declare_subscriber("data/callback", callback_handler)

423

424

# Send test data

425

publisher.put("Callback test message")

426

time.sleep(0.1) # Let callback process

427

428

subscriber3.undeclare() # This will trigger cleanup_callback

429

430

publisher.undeclare()

431

session.close()

432

```

433

434

### Advanced Error Handling

435

436

```python

437

import zenoh

438

import logging

439

440

logging.basicConfig(level=logging.INFO)

441

logger = logging.getLogger(__name__)

442

443

def robust_session_example():

444

session = None

445

publisher = None

446

447

try:

448

# Open session with error handling

449

config = zenoh.Config()

450

session = zenoh.open(config)

451

logger.info("Session opened successfully")

452

453

# Declare publisher

454

publisher = session.declare_publisher("robust/example")

455

logger.info("Publisher declared")

456

457

# Publish with error handling

458

for i in range(10):

459

try:

460

publisher.put(f"Message {i}")

461

logger.debug(f"Published message {i}")

462

except zenoh.ZError as e:

463

logger.error(f"Failed to publish message {i}: {e}")

464

continue

465

466

except zenoh.ZError as e:

467

logger.error(f"Zenoh error: {e}")

468

return False

469

470

except Exception as e:

471

logger.error(f"Unexpected error: {e}")

472

return False

473

474

finally:

475

# Cleanup with error handling

476

if publisher:

477

try:

478

publisher.undeclare()

479

logger.info("Publisher undeclared")

480

except zenoh.ZError as e:

481

logger.error(f"Error undeclaring publisher: {e}")

482

483

if session:

484

try:

485

session.close()

486

logger.info("Session closed")

487

except zenoh.ZError as e:

488

logger.error(f"Error closing session: {e}")

489

490

return True

491

492

# Run the robust example

493

success = robust_session_example()

494

print(f"Example completed successfully: {success}")

495

```

496

497

### Complete Advanced Features Example

498

499

```python

500

import zenoh

501

from zenoh.handlers import FifoChannel, Callback

502

import threading

503

import time

504

import logging

505

506

logging.basicConfig(level=logging.INFO)

507

logger = logging.getLogger(__name__)

508

509

class DistributedService:

510

def __init__(self, service_name: str):

511

self.service_name = service_name

512

self.session = None

513

self.liveliness_token = None

514

self.publisher = None

515

self.subscriber = None

516

self.matching_listener = None

517

self.running = False

518

519

def start(self):

520

"""Start the distributed service with full monitoring"""

521

try:

522

# Open session

523

self.session = zenoh.open()

524

logger.info(f"Service {self.service_name} session started")

525

526

# Declare liveliness

527

self.liveliness_token = self.session.liveliness.declare_token(

528

f"services/{self.service_name}"

529

)

530

logger.info(f"Liveliness declared for {self.service_name}")

531

532

# Setup publisher with matching monitoring

533

self.publisher = self.session.declare_publisher(

534

f"output/{self.service_name}"

535

)

536

537

def matching_handler(status):

538

if status.matching:

539

logger.info(f"{self.service_name}: Consumers connected")

540

else:

541

logger.warning(f"{self.service_name}: No consumers")

542

543

self.matching_listener = self.publisher.declare_matching_listener(

544

matching_handler

545

)

546

547

# Setup subscriber with custom handler

548

handler = FifoChannel(capacity=100)

549

self.subscriber = self.session.declare_subscriber(

550

f"input/{self.service_name}",

551

handler

552

)

553

554

self.running = True

555

logger.info(f"Service {self.service_name} fully started")

556

557

except zenoh.ZError as e:

558

logger.error(f"Failed to start service {self.service_name}: {e}")

559

self.stop()

560

raise

561

562

def process_data(self):

563

"""Main processing loop"""

564

while self.running:

565

try:

566

# Try to receive input data

567

sample = self.subscriber.try_recv()

568

if sample:

569

input_data = sample.payload.to_string()

570

logger.debug(f"{self.service_name} processing: {input_data}")

571

572

# Process the data (simulate some work)

573

processed = f"PROCESSED[{self.service_name}]: {input_data.upper()}"

574

575

# Publish result

576

self.publisher.put(processed)

577

logger.debug(f"{self.service_name} output: {processed}")

578

579

else:

580

# No data available, short sleep

581

time.sleep(0.1)

582

583

except zenoh.ZError as e:

584

logger.error(f"Processing error in {self.service_name}: {e}")

585

time.sleep(1) # Brief pause before retry

586

587

except KeyboardInterrupt:

588

logger.info(f"Shutdown requested for {self.service_name}")

589

break

590

591

def stop(self):

592

"""Stop the service and cleanup resources"""

593

self.running = False

594

595

# Cleanup in reverse order

596

if self.matching_listener:

597

try:

598

self.matching_listener.undeclare()

599

except zenoh.ZError as e:

600

logger.error(f"Error undeclaring matching listener: {e}")

601

602

if self.subscriber:

603

try:

604

self.subscriber.undeclare()

605

except zenoh.ZError as e:

606

logger.error(f"Error undeclaring subscriber: {e}")

607

608

if self.publisher:

609

try:

610

self.publisher.undeclare()

611

except zenoh.ZError as e:

612

logger.error(f"Error undeclaring publisher: {e}")

613

614

if self.liveliness_token:

615

try:

616

self.liveliness_token.undeclare()

617

except zenoh.ZError as e:

618

logger.error(f"Error undeclaring liveliness: {e}")

619

620

if self.session:

621

try:

622

self.session.close()

623

except zenoh.ZError as e:

624

logger.error(f"Error closing session: {e}")

625

626

logger.info(f"Service {self.service_name} stopped")

627

628

def monitor_services():

629

"""Monitor all service liveliness"""

630

session = zenoh.open()

631

632

def liveliness_handler(sample):

633

service_name = str(sample.key_expr).split('/')[-1]

634

if sample.kind == zenoh.SampleKind.PUT:

635

logger.info(f"MONITOR: Service {service_name} is ONLINE")

636

else:

637

logger.warning(f"MONITOR: Service {service_name} went OFFLINE")

638

639

# Subscribe to all service liveliness with history

640

monitor = session.liveliness.declare_subscriber(

641

"services/**",

642

liveliness_handler,

643

history=True

644

)

645

646

# Let it monitor

647

time.sleep(20)

648

649

monitor.undeclare()

650

session.close()

651

logger.info("Service monitor stopped")

652

653

def main():

654

# Start service monitor in background

655

monitor_thread = threading.Thread(target=monitor_services)

656

monitor_thread.start()

657

658

# Create and start services

659

service1 = DistributedService("processor_a")

660

service2 = DistributedService("processor_b")

661

662

try:

663

service1.start()

664

service2.start()

665

666

# Start processing in separate threads

667

thread1 = threading.Thread(target=service1.process_data)

668

thread2 = threading.Thread(target=service2.process_data)

669

670

thread1.start()

671

thread2.start()

672

673

# Send some test data

674

test_session = zenoh.open()

675

test_pub = test_session.declare_publisher("input/processor_a")

676

677

for i in range(5):

678

test_pub.put(f"test_data_{i}")

679

time.sleep(2)

680

681

test_pub.undeclare()

682

test_session.close()

683

684

# Let services run for a bit

685

time.sleep(5)

686

687

except KeyboardInterrupt:

688

logger.info("Shutdown requested")

689

690

finally:

691

# Stop services

692

service1.stop()

693

service2.stop()

694

695

# Wait for threads to finish

696

monitor_thread.join(timeout=5)

697

698

logger.info("All services stopped")

699

700

if __name__ == "__main__":

701

main()

702

```