0
# Data Types
1
2
Zenoh provides a rich set of data types for efficient data handling, addressing, and metadata management. These types enable zero-copy operations, flexible encoding specifications, and precise temporal ordering while maintaining type safety across the distributed system.
3
4
## Capabilities
5
6
### Data Containers
7
8
Core data structures for efficient payload handling and zero-copy operations.
9
10
```python { .api }
11
class ZBytes:
12
"""Serialized bytes container for zero-copy data handling"""
13
14
def __init__(self, data):
15
"""
16
Create ZBytes from various data types.
17
18
Parameters:
19
- data: str, bytes, bytearray, or other serializable data
20
"""
21
22
def to_bytes(self) -> bytes:
23
"""Convert to Python bytes"""
24
25
def to_string(self) -> str:
26
"""Convert to Python string (UTF-8 decoded)"""
27
28
def __bool__(self) -> bool:
29
"""Check if ZBytes is non-empty"""
30
31
def __len__(self) -> int:
32
"""Get length in bytes"""
33
34
def __bytes__(self) -> bytes:
35
"""Convert to bytes via bytes() function"""
36
37
def __str__(self) -> str:
38
"""String representation"""
39
40
def __eq__(self, other) -> bool:
41
"""Equality comparison"""
42
43
def __hash__(self) -> int:
44
"""Hash for use in sets and dicts"""
45
```
46
47
### Encoding Specification
48
49
Data encoding and schema information for type-safe communication.
50
51
```python { .api }
52
class Encoding:
53
"""Data encoding specification"""
54
55
def __init__(self, encoding_type: str):
56
"""Create encoding from string specification"""
57
58
def with_schema(self, schema: str) -> Encoding:
59
"""Add schema information to encoding"""
60
61
def __eq__(self, other) -> bool:
62
"""Equality comparison"""
63
64
def __hash__(self) -> int:
65
"""Hash for use in sets and dicts"""
66
67
def __str__(self) -> str:
68
"""String representation"""
69
70
# Standard encoding constants
71
ZENOH_BYTES = ... # Raw bytes
72
ZENOH_STRING = ... # UTF-8 string
73
ZENOH_SERIALIZED = ... # Zenoh serialized data
74
APPLICATION_JSON = ... # JSON format
75
APPLICATION_CBOR = ... # CBOR format
76
APPLICATION_YAML = ... # YAML format
77
APPLICATION_PYTHON_SERIALIZED_OBJECT = ... # Python pickle
78
APPLICATION_PROTOBUF = ... # Protocol Buffers
79
APPLICATION_JAVA_SERIALIZED_OBJECT = ... # Java serialization
80
APPLICATION_OPENMETRICS_TEXT = ... # OpenMetrics text format
81
IMAGE_PNG = ... # PNG image
82
IMAGE_JPEG = ... # JPEG image
83
TEXT_PLAIN = ... # Plain text
84
TEXT_JSON = ... # JSON as text
85
TEXT_HTML = ... # HTML
86
TEXT_XML = ... # XML
87
TEXT_CSS = ... # CSS
88
TEXT_CSV = ... # CSV
89
TEXT_JAVASCRIPT = ... # JavaScript
90
```
91
92
### Key Expressions
93
94
Hierarchical addressing scheme for resource identification and pattern matching.
95
96
```python { .api }
97
class KeyExpr:
98
"""Key expression for addressing resources"""
99
100
def __init__(self, key: str):
101
"""Create key expression from string"""
102
103
@staticmethod
104
def autocanonize(key: str) -> KeyExpr:
105
"""Create and canonicalize key expression"""
106
107
def intersects(self, other: KeyExpr) -> bool:
108
"""Check if this key expression intersects with another"""
109
110
def includes(self, other: KeyExpr) -> bool:
111
"""Check if this key expression includes another"""
112
113
def relation_to(self, other: KeyExpr) -> SetIntersectionLevel:
114
"""Get relationship to another key expression"""
115
116
def join(self, other: KeyExpr) -> KeyExpr:
117
"""Join two key expressions"""
118
119
def concat(self, suffix: str) -> KeyExpr:
120
"""Concatenate string suffix to key expression"""
121
122
def __str__(self) -> str:
123
"""String representation"""
124
125
class SetIntersectionLevel:
126
"""Key expression set relations (unstable)"""
127
DISJOINT = ... # No intersection
128
INTERSECTS = ... # Some intersection
129
INCLUDES = ... # This includes other
130
EQUALS = ... # Exactly equal
131
```
132
133
### Parameters and Selectors
134
135
Key-value parameters and selector combinations for queries and configuration.
136
137
```python { .api }
138
class Parameters:
139
"""Key-value parameters"""
140
141
def __init__(self, params: str = None):
142
"""Create parameters from string (key1=value1;key2=value2)"""
143
144
def is_empty(self) -> bool:
145
"""Check if parameters are empty"""
146
147
def get(self, key: str):
148
"""Get parameter value by key"""
149
150
def values(self, key: str) -> list:
151
"""Get all values for a key (for multi-value parameters)"""
152
153
def insert(self, key: str, value: str) -> Parameters:
154
"""Insert key-value pair"""
155
156
def remove(self, key: str) -> Parameters:
157
"""Remove parameter by key"""
158
159
def extend(self, other: Parameters) -> Parameters:
160
"""Extend with another Parameters object"""
161
162
def is_ordered(self) -> bool:
163
"""Check if parameters maintain insertion order"""
164
165
# Operators for parameter manipulation
166
def __eq__(self, other) -> bool:
167
"""Equality comparison"""
168
169
def __str__(self) -> str:
170
"""String representation"""
171
172
class Selector:
173
"""Key expression + parameters combination"""
174
175
def __init__(self, selector: str):
176
"""Create selector from string (keyexpr?param1=val1)"""
177
178
@property
179
def key_expr(self) -> KeyExpr:
180
"""Get key expression part"""
181
182
@key_expr.setter
183
def key_expr(self, value: KeyExpr):
184
"""Set key expression part"""
185
186
@property
187
def parameters(self) -> Parameters:
188
"""Get parameters part"""
189
190
@parameters.setter
191
def parameters(self, value: Parameters):
192
"""Set parameters part"""
193
194
def __str__(self) -> str:
195
"""String representation"""
196
```
197
198
### Time and Identity
199
200
Temporal ordering and unique identification across the distributed system.
201
202
```python { .api }
203
class Timestamp:
204
"""Timestamping for samples and operations"""
205
206
def __init__(self, time: int, id: TimestampId):
207
"""Create timestamp with time value and ID"""
208
209
def get_time(self) -> int:
210
"""Get time component as NTP64 timestamp"""
211
212
def get_id(self) -> TimestampId:
213
"""Get timestamp ID component"""
214
215
def get_diff_duration(self, other: Timestamp) -> float:
216
"""Get duration difference in seconds"""
217
218
def to_string_rfc3339_lossy(self) -> str:
219
"""Convert to RFC3339 string (may lose precision)"""
220
221
@staticmethod
222
def parse_rfc3339(timestamp: str) -> Timestamp:
223
"""Parse RFC3339 timestamp string"""
224
225
# Comparison operators
226
def __eq__(self, other) -> bool:
227
"""Equality comparison"""
228
229
def __lt__(self, other) -> bool:
230
"""Less than comparison"""
231
232
def __le__(self, other) -> bool:
233
"""Less than or equal comparison"""
234
235
def __gt__(self, other) -> bool:
236
"""Greater than comparison"""
237
238
def __ge__(self, other) -> bool:
239
"""Greater than or equal comparison"""
240
241
class TimestampId:
242
"""Timestamp identifier for uniqueness"""
243
244
def __init__(self, id_bytes: bytes):
245
"""Create timestamp ID from bytes"""
246
247
def __bytes__(self) -> bytes:
248
"""Convert to bytes"""
249
250
# Comparison operators
251
def __eq__(self, other) -> bool:
252
"""Equality comparison"""
253
254
def __lt__(self, other) -> bool:
255
"""Less than comparison"""
256
257
class ZenohId:
258
"""Global unique peer identifier"""
259
260
def __str__(self) -> str:
261
"""String representation of ZenohId"""
262
```
263
264
### Entity Identification
265
266
Unique identifiers for entities within sessions and across the network.
267
268
```python { .api }
269
class EntityGlobalId:
270
"""Entity global identifier (unstable)"""
271
272
@property
273
def zid(self) -> ZenohId:
274
"""Get ZenohId component"""
275
276
@property
277
def eid(self) -> int:
278
"""Get entity ID component"""
279
280
# Type aliases for input flexibility
281
EntityId = int # Integer entity identifier
282
```
283
284
### Input Type Aliases
285
286
Type aliases that define acceptable input types for various Zenoh operations.
287
288
```python { .api }
289
# Input type aliases for API flexibility
290
_IntoEncoding = str | Encoding # Accept string or Encoding
291
_IntoKeyExpr = str | KeyExpr # Accept string or KeyExpr
292
_IntoParameters = str | Parameters # Accept string or Parameters
293
_IntoSelector = str | Selector # Accept string or Selector
294
_IntoTimestampId = bytes | TimestampId # Accept bytes or TimestampId
295
_IntoWhatAmIMatcher = WhatAmI | WhatAmIMatcher # Accept node type or matcher
296
_IntoZBytes = str | bytes | bytearray | ZBytes # Accept various data types
297
_IntoQueryConsolidation = ConsolidationMode | QueryConsolidation # Accept mode or config
298
```
299
300
## Usage Examples
301
302
### Working with ZBytes
303
304
```python
305
import zenoh
306
307
# Create ZBytes from different data types
308
text_data = zenoh.ZBytes("Hello, Zenoh!")
309
byte_data = zenoh.ZBytes(b"Binary data")
310
number_data = zenoh.ZBytes(str(42))
311
312
# Convert back to Python types
313
print(text_data.to_string()) # "Hello, Zenoh!"
314
print(byte_data.to_bytes()) # b"Binary data"
315
print(len(number_data)) # Length in bytes
316
317
# Use in boolean context
318
if text_data:
319
print("Data is not empty")
320
321
# Use as dictionary key (hashable)
322
data_cache = {
323
text_data: "cached_result_1",
324
byte_data: "cached_result_2"
325
}
326
```
327
328
### Encoding Examples
329
330
```python
331
import zenoh
332
333
# Create encodings
334
json_encoding = zenoh.Encoding(zenoh.Encoding.APPLICATION_JSON)
335
text_encoding = zenoh.Encoding(zenoh.Encoding.TEXT_PLAIN)
336
337
# Encoding with schema
338
protobuf_encoding = zenoh.Encoding(zenoh.Encoding.APPLICATION_PROTOBUF).with_schema("user.proto")
339
340
# Use in publisher
341
session = zenoh.open()
342
publisher = session.declare_publisher("data/json", encoding=json_encoding)
343
344
import json
345
data = {"temperature": 23.5, "humidity": 65}
346
publisher.put(json.dumps(data))
347
348
publisher.undeclare()
349
session.close()
350
```
351
352
### Key Expression Operations
353
354
```python
355
import zenoh
356
357
# Create key expressions
358
sensor_key = zenoh.KeyExpr("sensors/temperature/room1")
359
pattern_key = zenoh.KeyExpr("sensors/**")
360
config_key = zenoh.KeyExpr("config/network")
361
362
# Test relationships
363
print(pattern_key.includes(sensor_key)) # True - pattern includes specific key
364
print(sensor_key.intersects(config_key)) # False - different hierarchies
365
366
# Join key expressions
367
base_key = zenoh.KeyExpr("sensors")
368
specific_key = base_key.concat("/temperature/room2")
369
print(specific_key) # "sensors/temperature/room2"
370
371
# Combine keys
372
room_key = zenoh.KeyExpr("room1")
373
full_key = sensor_key.join(room_key)
374
```
375
376
### Parameters and Selectors
377
378
```python
379
import zenoh
380
381
# Create parameters
382
params = zenoh.Parameters("region=north;limit=10;format=json")
383
384
print(params.get("region")) # "north"
385
print(params.get("limit")) # "10"
386
387
# Add parameters
388
new_params = params.insert("timeout", "30")
389
extended_params = params.extend(zenoh.Parameters("debug=true"))
390
391
# Create selector
392
selector = zenoh.Selector("sensors/temperature?region=north&limit=5")
393
print(selector.key_expr) # KeyExpr("sensors/temperature")
394
print(selector.parameters.get("region")) # "north"
395
396
# Use in queries
397
session = zenoh.open()
398
replies = session.get(selector)
399
for reply in replies:
400
if reply.ok:
401
print(reply.ok.payload.to_string())
402
session.close()
403
```
404
405
### Timestamps
406
407
```python
408
import zenoh
409
410
session = zenoh.open()
411
412
# Create timestamp
413
timestamp = session.new_timestamp()
414
print(f"Time: {timestamp.get_time()}")
415
print(f"ID: {timestamp.get_id()}")
416
417
# RFC3339 format
418
rfc3339_str = timestamp.to_string_rfc3339_lossy()
419
print(f"RFC3339: {rfc3339_str}")
420
421
# Parse timestamp
422
parsed = zenoh.Timestamp.parse_rfc3339("2023-01-01T12:00:00Z")
423
424
# Compare timestamps
425
if timestamp > parsed:
426
print("Current timestamp is later")
427
428
# Duration between timestamps
429
duration = timestamp.get_diff_duration(parsed)
430
print(f"Difference: {duration} seconds")
431
432
session.close()
433
```
434
435
### Complete Data Types Example
436
437
```python
438
import zenoh
439
import json
440
import time
441
442
class SensorData:
443
def __init__(self, sensor_id: str, value: float, unit: str):
444
self.sensor_id = sensor_id
445
self.value = value
446
self.unit = unit
447
self.timestamp = time.time()
448
449
def to_json(self) -> str:
450
return json.dumps({
451
"sensor_id": self.sensor_id,
452
"value": self.value,
453
"unit": self.unit,
454
"timestamp": self.timestamp
455
})
456
457
@classmethod
458
def from_json(cls, json_str: str):
459
data = json.loads(json_str)
460
return cls(data["sensor_id"], data["value"], data["unit"])
461
462
def main():
463
session = zenoh.open()
464
465
# Create sensor data
466
sensor = SensorData("temp_01", 23.5, "°C")
467
468
# Create key expression for this sensor
469
key_expr = zenoh.KeyExpr(f"sensors/temperature/{sensor.sensor_id}")
470
471
# Create encoding for JSON data
472
encoding = zenoh.Encoding(zenoh.Encoding.APPLICATION_JSON)
473
474
# Declare publisher with encoding
475
publisher = session.declare_publisher(key_expr, encoding=encoding)
476
477
# Create ZBytes payload
478
payload = zenoh.ZBytes(sensor.to_json())
479
480
# Create timestamp
481
timestamp = session.new_timestamp()
482
483
# Publish with metadata
484
publisher.put(
485
payload,
486
timestamp=timestamp,
487
attachment={"source": "building_a", "floor": "2"}
488
)
489
490
print(f"Published sensor data:")
491
print(f" Key: {key_expr}")
492
print(f" Payload: {payload.to_string()}")
493
print(f" Encoding: {encoding}")
494
print(f" Timestamp: {timestamp.to_string_rfc3339_lossy()}")
495
496
# Query with parameters
497
query_params = zenoh.Parameters("unit=°C;recent=true")
498
selector = zenoh.Selector("sensors/temperature/**")
499
selector.parameters = query_params
500
501
print(f"\nQuerying with selector: {selector}")
502
503
# Simulate some delay
504
time.sleep(0.1)
505
506
# Query the data back
507
replies = session.get(selector)
508
for reply in replies:
509
if reply.ok:
510
sample = reply.ok
511
received_data = SensorData.from_json(sample.payload.to_string())
512
print(f"Received: {received_data.sensor_id} = {received_data.value} {received_data.unit}")
513
514
# Cleanup
515
publisher.undeclare()
516
session.close()
517
518
if __name__ == "__main__":
519
main()
520
```