or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

advanced.mddata-types.mdextensions.mdhandlers.mdindex.mdpubsub.mdquery.mdsession-management.md

extensions.mddocs/

0

# Extensions

1

2

The Zenoh extensions module (`zenoh.ext`) provides additional functionality including type-safe serialization utilities, custom numeric types, and advanced publisher/subscriber features with enhanced reliability and caching capabilities. These extensions enable more sophisticated data handling and communication patterns while maintaining compatibility with the core Zenoh API.

3

4

## Capabilities

5

6

### Serialization System

7

8

Type-safe serialization and deserialization of Python objects to/from ZBytes.

9

10

```python { .api }

11

from zenoh.ext import z_serialize, z_deserialize, ZDeserializeError

12

13

def z_serialize(obj) -> ZBytes:

14

"""

15

Serialize Python objects to ZBytes with automatic type detection.

16

17

Parameters:

18

- obj: Python object to serialize (supports basic types, collections, custom objects)

19

20

Returns:

21

ZBytes containing serialized data

22

23

Raises:

24

ZError: If serialization fails

25

"""

26

27

def z_deserialize(target_type, data: ZBytes):

28

"""

29

Deserialize ZBytes to Python objects with type validation.

30

31

Parameters:

32

- target_type: Expected type for deserialization validation

33

- data: ZBytes containing serialized data

34

35

Returns:

36

Deserialized Python object of specified type

37

38

Raises:

39

ZDeserializeError: If deserialization fails or type doesn't match

40

"""

41

42

class ZDeserializeError(Exception):

43

"""Exception raised when deserialization fails"""

44

pass

45

```

46

47

### Custom Numeric Types

48

49

Precise numeric types for cross-platform data exchange and scientific computing.

50

51

```python { .api }

52

# Signed Integer Types

53

class Int8:

54

"""8-bit signed integer (-128 to 127)"""

55

def __init__(self, value: int): ...

56

57

class Int16:

58

"""16-bit signed integer (-32,768 to 32,767)"""

59

def __init__(self, value: int): ...

60

61

class Int32:

62

"""32-bit signed integer (-2,147,483,648 to 2,147,483,647)"""

63

def __init__(self, value: int): ...

64

65

class Int64:

66

"""64-bit signed integer"""

67

def __init__(self, value: int): ...

68

69

class Int128:

70

"""128-bit signed integer for very large numbers"""

71

def __init__(self, value: int): ...

72

73

# Unsigned Integer Types

74

class UInt8:

75

"""8-bit unsigned integer (0 to 255)"""

76

def __init__(self, value: int): ...

77

78

class UInt16:

79

"""16-bit unsigned integer (0 to 65,535)"""

80

def __init__(self, value: int): ...

81

82

class UInt32:

83

"""32-bit unsigned integer (0 to 4,294,967,295)"""

84

def __init__(self, value: int): ...

85

86

class UInt64:

87

"""64-bit unsigned integer"""

88

def __init__(self, value: int): ...

89

90

class UInt128:

91

"""128-bit unsigned integer for very large numbers"""

92

def __init__(self, value: int): ...

93

94

# Floating Point Types

95

class Float32:

96

"""32-bit IEEE 754 floating point number"""

97

def __init__(self, value: float): ...

98

99

class Float64:

100

"""64-bit IEEE 754 floating point number"""

101

def __init__(self, value: float): ...

102

```

103

104

### Advanced Publisher (Unstable)

105

106

Enhanced publisher with additional features for reliable communication.

107

108

```python { .api }

109

def declare_advanced_publisher(

110

session,

111

key_expr,

112

encoding: Encoding = None,

113

congestion_control: CongestionControl = None,

114

priority: Priority = None,

115

cache: CacheConfig = None,

116

subscriber_detection: bool = None

117

) -> AdvancedPublisher:

118

"""

119

Declare an advanced publisher with enhanced features (unstable).

120

121

Parameters:

122

- session: Zenoh session

123

- key_expr: Key expression to publish on

124

- encoding: Data encoding specification

125

- congestion_control: Congestion control mode

126

- priority: Message priority

127

- cache: Cache configuration for late-joining subscribers

128

- subscriber_detection: Enable subscriber detection

129

130

Returns:

131

AdvancedPublisher with enhanced capabilities

132

"""

133

134

class AdvancedPublisher:

135

"""Advanced publisher with additional features (unstable)"""

136

137

@property

138

def key_expr(self) -> KeyExpr:

139

"""Get the publisher's key expression"""

140

141

@property

142

def encoding(self) -> Encoding:

143

"""Get the publisher's encoding"""

144

145

@property

146

def congestion_control(self) -> CongestionControl:

147

"""Get congestion control setting"""

148

149

@property

150

def priority(self) -> Priority:

151

"""Get priority setting"""

152

153

def put(

154

self,

155

payload,

156

encoding: Encoding = None,

157

timestamp: Timestamp = None,

158

attachment = None

159

) -> None:

160

"""

161

Send data through the advanced publisher.

162

163

Parameters:

164

- payload: Data to send

165

- encoding: Override default encoding

166

- timestamp: Custom timestamp

167

- attachment: Additional metadata

168

"""

169

170

def delete(

171

self,

172

timestamp: Timestamp = None,

173

attachment = None

174

) -> None:

175

"""Send a delete operation"""

176

177

def undeclare(self) -> None:

178

"""Undeclare the advanced publisher"""

179

```

180

181

### Advanced Subscriber (Unstable)

182

183

Enhanced subscriber with miss detection, recovery, and publisher detection.

184

185

```python { .api }

186

def declare_advanced_subscriber(

187

session,

188

key_expr,

189

handler = None,

190

reliability: Reliability = None,

191

recovery: RecoveryConfig = None,

192

history: HistoryConfig = None,

193

miss_detection: MissDetectionConfig = None

194

) -> AdvancedSubscriber:

195

"""

196

Declare an advanced subscriber with enhanced features (unstable).

197

198

Parameters:

199

- session: Zenoh session

200

- key_expr: Key expression pattern to subscribe to

201

- handler: Handler for received samples

202

- reliability: Reliability mode

203

- recovery: Recovery configuration for missed samples

204

- history: History configuration for late-joining

205

- miss_detection: Configuration for detecting missed samples

206

207

Returns:

208

AdvancedSubscriber with enhanced capabilities

209

"""

210

211

class AdvancedSubscriber:

212

"""Advanced subscriber with additional features (unstable)"""

213

214

@property

215

def key_expr(self) -> KeyExpr:

216

"""Get the subscriber's key expression"""

217

218

@property

219

def handler(self):

220

"""Get the subscriber's handler"""

221

222

def sample_miss_listener(self, handler) -> SampleMissListener:

223

"""

224

Declare a listener for sample miss detection.

225

226

Parameters:

227

- handler: Handler for miss notifications

228

229

Returns:

230

SampleMissListener for monitoring missed samples

231

"""

232

233

def detect_publishers(self, handler):

234

"""

235

Enable publisher detection with callback.

236

237

Parameters:

238

- handler: Handler for publisher detection events

239

"""

240

241

def undeclare(self) -> None:

242

"""Undeclare the advanced subscriber"""

243

244

def try_recv(self):

245

"""Try to receive a sample without blocking"""

246

247

def recv(self):

248

"""Receive a sample (blocking)"""

249

250

def __iter__(self):

251

"""Iterate over received samples"""

252

```

253

254

### Advanced Configuration

255

256

Configuration objects for advanced publisher/subscriber features.

257

258

```python { .api }

259

class CacheConfig:

260

"""Cache configuration for late-joining subscribers (unstable)"""

261

# Configuration details depend on implementation

262

263

class HistoryConfig:

264

"""History configuration for subscriber catch-up (unstable)"""

265

# Configuration details depend on implementation

266

267

class MissDetectionConfig:

268

"""Configuration for detecting missed samples (unstable)"""

269

# Configuration details depend on implementation

270

271

class RecoveryConfig:

272

"""Recovery configuration for missed sample recovery (unstable)"""

273

# Configuration details depend on implementation

274

275

class RepliesConfig:

276

"""Configuration for query replies handling (unstable)"""

277

# Configuration details depend on implementation

278

```

279

280

### Miss Detection

281

282

Monitor and handle missed samples in data streams.

283

284

```python { .api }

285

class Miss:

286

"""Information about missed samples (unstable)"""

287

288

@property

289

def source(self) -> ZenohId:

290

"""Source that missed samples are from"""

291

292

@property

293

def nb(self) -> int:

294

"""Number of missed samples"""

295

296

class SampleMissListener:

297

"""Listener for sample miss detection (unstable)"""

298

299

def undeclare(self) -> None:

300

"""Undeclare the miss listener"""

301

302

def try_recv(self):

303

"""Try to receive a miss notification without blocking"""

304

305

def recv(self):

306

"""Receive a miss notification (blocking)"""

307

308

def __iter__(self):

309

"""Iterate over miss notifications"""

310

```

311

312

## Usage Examples

313

314

### Basic Serialization

315

316

```python

317

import zenoh.ext as zext

318

import zenoh

319

320

# Serialize various data types

321

data_list = [1, 2, 3, "hello", {"key": "value"}]

322

serialized = zext.z_serialize(data_list)

323

324

# Deserialize with type checking

325

try:

326

deserialized = zext.z_deserialize(list, serialized)

327

print(f"Deserialized: {deserialized}")

328

except zext.ZDeserializeError as e:

329

print(f"Deserialization failed: {e}")

330

331

# Use in Zenoh communication

332

session = zenoh.open()

333

publisher = session.declare_publisher("data/serialized")

334

335

# Publish serialized data

336

publisher.put(serialized)

337

338

def handler(sample):

339

try:

340

data = zext.z_deserialize(list, sample.payload)

341

print(f"Received: {data}")

342

except zext.ZDeserializeError as e:

343

print(f"Failed to deserialize: {e}")

344

345

subscriber = session.declare_subscriber("data/serialized", handler)

346

347

# Cleanup

348

import time

349

time.sleep(1)

350

subscriber.undeclare()

351

publisher.undeclare()

352

session.close()

353

```

354

355

### Custom Numeric Types

356

357

```python

358

import zenoh.ext as zext

359

import zenoh

360

361

session = zenoh.open()

362

363

# Create precise numeric values

364

temperature = zext.Float32(23.567) # 32-bit precision

365

sensor_id = zext.UInt16(1024) # 16-bit unsigned

366

timestamp = zext.Int64(1640995200) # 64-bit signed

367

368

# Serialize numeric data

369

sensor_data = {

370

"temperature": temperature,

371

"sensor_id": sensor_id,

372

"timestamp": timestamp

373

}

374

375

serialized_data = zext.z_serialize(sensor_data)

376

377

# Publish precise sensor data

378

publisher = session.declare_publisher("sensors/precise",

379

encoding=zenoh.Encoding.ZENOH_SERIALIZED)

380

publisher.put(serialized_data)

381

382

def precise_handler(sample):

383

try:

384

data = zext.z_deserialize(dict, sample.payload)

385

print(f"Precise sensor reading:")

386

print(f" Temperature: {data['temperature']} (32-bit float)")

387

print(f" Sensor ID: {data['sensor_id']} (16-bit uint)")

388

print(f" Timestamp: {data['timestamp']} (64-bit int)")

389

except zext.ZDeserializeError as e:

390

print(f"Failed to deserialize sensor data: {e}")

391

392

subscriber = session.declare_subscriber("sensors/precise", precise_handler)

393

394

time.sleep(1)

395

subscriber.undeclare()

396

publisher.undeclare()

397

session.close()

398

```

399

400

### Advanced Publisher with Cache

401

402

```python

403

import zenoh

404

import zenoh.ext as zext

405

import time

406

407

session = zenoh.open()

408

409

# Note: This is unstable API - actual configuration may differ

410

try:

411

# Declare advanced publisher with caching for late joiners

412

advanced_pub = zext.declare_advanced_publisher(

413

session,

414

"data/cached",

415

cache=zext.CacheConfig(), # Enable caching

416

subscriber_detection=True # Detect subscribers

417

)

418

419

print("Advanced publisher declared with caching")

420

421

# Publish data that will be cached

422

for i in range(5):

423

data = {"message": f"Cached message {i}", "timestamp": time.time()}

424

serialized = zext.z_serialize(data)

425

advanced_pub.put(serialized)

426

print(f"Published cached message {i}")

427

time.sleep(1)

428

429

# Late-joining subscriber should receive cached data

430

def late_subscriber_handler(sample):

431

try:

432

data = zext.z_deserialize(dict, sample.payload)

433

print(f"Late subscriber received: {data['message']}")

434

except zext.ZDeserializeError:

435

print("Failed to deserialize cached data")

436

437

print("Creating late-joining subscriber...")

438

late_subscriber = session.declare_subscriber("data/cached", late_subscriber_handler)

439

440

time.sleep(2)

441

442

# Cleanup

443

late_subscriber.undeclare()

444

advanced_pub.undeclare()

445

446

except Exception as e:

447

print(f"Advanced publisher features may not be available: {e}")

448

449

session.close()

450

```

451

452

### Advanced Subscriber with Miss Detection

453

454

```python

455

import zenoh

456

import zenoh.ext as zext

457

import time

458

import threading

459

460

session = zenoh.open()

461

462

# Note: This is unstable API - actual configuration may differ

463

try:

464

def sample_handler(sample):

465

try:

466

data = zext.z_deserialize(dict, sample.payload)

467

print(f"Received sample: {data['sequence']}")

468

except zext.ZDeserializeError:

469

print("Failed to deserialize sample")

470

471

def miss_handler(miss):

472

print(f"MISSED {miss.nb} samples from source {miss.source}")

473

474

# Declare advanced subscriber with miss detection

475

advanced_sub = zext.declare_advanced_subscriber(

476

session,

477

"data/reliable",

478

handler=sample_handler,

479

miss_detection=zext.MissDetectionConfig()

480

)

481

482

# Setup miss detection listener

483

miss_listener = advanced_sub.sample_miss_listener(miss_handler)

484

485

print("Advanced subscriber with miss detection ready")

486

487

# Simulate publisher sending data with intentional gaps

488

def publisher_thread():

489

pub = session.declare_publisher("data/reliable")

490

491

for i in range(10):

492

if i == 5: # Skip message 5 to simulate loss

493

continue

494

495

data = {"sequence": i, "payload": f"data_{i}"}

496

serialized = zext.z_serialize(data)

497

pub.put(serialized)

498

time.sleep(0.5)

499

500

pub.undeclare()

501

502

# Start publisher in separate thread

503

pub_thread = threading.Thread(target=publisher_thread)

504

pub_thread.start()

505

506

# Let it run

507

time.sleep(6)

508

509

# Cleanup

510

pub_thread.join()

511

miss_listener.undeclare()

512

advanced_sub.undeclare()

513

514

except Exception as e:

515

print(f"Advanced subscriber features may not be available: {e}")

516

517

session.close()

518

```

519

520

### Complete Extension Example

521

522

```python

523

import zenoh

524

import zenoh.ext as zext

525

import threading

526

import time

527

import random

528

529

class AdvancedDataProcessor:

530

"""Example using multiple extension features"""

531

532

def __init__(self, processor_id: int):

533

self.processor_id = processor_id

534

self.session = zenoh.open()

535

self.running = False

536

537

# Use precise numeric types

538

self.id = zext.UInt16(processor_id)

539

self.processed_count = zext.UInt64(0)

540

541

def start_processing(self):

542

"""Start the data processing service"""

543

self.running = True

544

545

try:

546

# Setup advanced publisher

547

self.publisher = zext.declare_advanced_publisher(

548

self.session,

549

f"results/processor_{self.processor_id}",

550

cache=zext.CacheConfig() # Cache results for late subscribers

551

)

552

553

# Setup advanced subscriber with miss detection

554

def input_handler(sample):

555

try:

556

data = zext.z_deserialize(dict, sample.payload)

557

self.process_data(data)

558

except zext.ZDeserializeError as e:

559

print(f"Processor {self.processor_id}: Deserialization error: {e}")

560

561

def miss_handler(miss):

562

print(f"Processor {self.processor_id}: MISSED {miss.nb} inputs!")

563

564

self.subscriber = zext.declare_advanced_subscriber(

565

self.session,

566

"input/data",

567

handler=input_handler,

568

miss_detection=zext.MissDetectionConfig()

569

)

570

571

self.miss_listener = self.subscriber.sample_miss_listener(miss_handler)

572

573

print(f"Advanced processor {self.processor_id} started")

574

575

except Exception as e:

576

print(f"Failed to start advanced features: {e}")

577

# Fallback to basic functionality

578

self.publisher = self.session.declare_publisher(f"results/processor_{self.processor_id}")

579

self.subscriber = self.session.declare_subscriber("input/data", self.basic_handler)

580

581

def basic_handler(self, sample):

582

"""Fallback handler for basic functionality"""

583

try:

584

data = zext.z_deserialize(dict, sample.payload)

585

self.process_data(data)

586

except zext.ZDeserializeError as e:

587

print(f"Processor {self.processor_id}: Basic deserialization error: {e}")

588

589

def process_data(self, input_data):

590

"""Process input data and publish results"""

591

# Simulate processing time

592

time.sleep(random.uniform(0.1, 0.3))

593

594

# Increment counter with precise type

595

self.processed_count = zext.UInt64(int(self.processed_count) + 1)

596

597

# Create result with precise numeric types

598

result = {

599

"processor_id": self.id,

600

"input_sequence": input_data.get("sequence", 0),

601

"processed_count": self.processed_count,

602

"processing_time": zext.Float32(random.uniform(0.1, 0.3)),

603

"result": f"PROCESSED_{input_data.get('value', 'unknown')}"

604

}

605

606

# Serialize and publish result

607

serialized_result = zext.z_serialize(result)

608

self.publisher.put(serialized_result)

609

610

print(f"Processor {self.processor_id}: Processed #{int(self.processed_count)}")

611

612

def stop(self):

613

"""Stop the processor and cleanup"""

614

self.running = False

615

616

if hasattr(self, 'miss_listener'):

617

self.miss_listener.undeclare()

618

if hasattr(self, 'subscriber'):

619

self.subscriber.undeclare()

620

if hasattr(self, 'publisher'):

621

self.publisher.undeclare()

622

623

self.session.close()

624

print(f"Processor {self.processor_id} stopped")

625

626

def data_generator():

627

"""Generate test data"""

628

session = zenoh.open()

629

publisher = session.declare_publisher("input/data")

630

631

for i in range(20):

632

data = {

633

"sequence": zext.UInt32(i),

634

"value": f"data_item_{i}",

635

"timestamp": zext.Float64(time.time())

636

}

637

638

serialized = zext.z_serialize(data)

639

publisher.put(serialized)

640

641

print(f"Generated data item {i}")

642

time.sleep(0.5)

643

644

publisher.undeclare()

645

session.close()

646

print("Data generator finished")

647

648

def result_monitor():

649

"""Monitor processing results"""

650

session = zenoh.open()

651

652

def result_handler(sample):

653

try:

654

result = zext.z_deserialize(dict, sample.payload)

655

print(f"MONITOR: Processor {int(result['processor_id'])} "

656

f"completed #{int(result['processed_count'])}: {result['result']}")

657

except zext.ZDeserializeError as e:

658

print(f"Monitor deserialization error: {e}")

659

660

subscriber = session.declare_subscriber("results/**", result_handler)

661

662

time.sleep(15) # Monitor for 15 seconds

663

664

subscriber.undeclare()

665

session.close()

666

print("Result monitor stopped")

667

668

def main():

669

"""Main example execution"""

670

print("Starting advanced Zenoh extension example...")

671

672

# Create processors

673

processor1 = AdvancedDataProcessor(1)

674

processor2 = AdvancedDataProcessor(2)

675

676

try:

677

# Start processors

678

processor1.start_processing()

679

processor2.start_processing()

680

681

# Start monitoring and data generation in separate threads

682

monitor_thread = threading.Thread(target=result_monitor)

683

generator_thread = threading.Thread(target=data_generator)

684

685

monitor_thread.start()

686

time.sleep(1) # Let monitor start first

687

generator_thread.start()

688

689

# Wait for completion

690

generator_thread.join()

691

monitor_thread.join(timeout=5)

692

693

except KeyboardInterrupt:

694

print("Shutting down...")

695

696

finally:

697

processor1.stop()

698

processor2.stop()

699

700

print("Advanced extension example completed")

701

702

if __name__ == "__main__":

703

main()

704

```