0
# Message Handling
1
2
Zero-copy message objects (Frame/Message) for efficient data transfer, with support for metadata, copying control, and memory-mapped operations.
3
4
## Capabilities
5
6
### Frame Objects
7
8
Frame objects provide zero-copy message handling with optional tracking and metadata support.
9
10
```python { .api }
11
class Frame:
12
def __init__(self, data: Union[bytes, str, int] = b'', track: bool = True, copy: bool = None) -> None:
13
"""
14
Create a new Frame.
15
16
Parameters:
17
- data: Initial data (bytes, string, or buffer size)
18
- track: Whether to track the Frame lifecycle
19
- copy: Whether to copy the data (None for auto-detect)
20
"""
21
22
@property
23
def bytes(self) -> bytes:
24
"""Get frame data as bytes."""
25
26
@property
27
def buffer(self) -> memoryview:
28
"""Get frame data as memoryview buffer."""
29
30
def copy(self) -> Frame:
31
"""
32
Create a copy of the frame.
33
34
Returns:
35
- Frame: New Frame with copied data
36
"""
37
38
def __len__(self) -> int:
39
"""Get frame size in bytes."""
40
41
def __bytes__(self) -> bytes:
42
"""Convert frame to bytes."""
43
44
def __str__(self) -> str:
45
"""Convert frame to string using utf-8."""
46
47
@property
48
def more(self) -> bool:
49
"""True if this frame is part of a multipart message with more parts."""
50
51
@property
52
def tracker(self) -> Optional[MessageTracker]:
53
"""MessageTracker for this frame, if tracking is enabled."""
54
```
55
56
### Message Objects
57
58
Message objects extend Frame with additional ZMQ message properties and metadata.
59
60
```python { .api }
61
class Message(Frame):
62
def __init__(self, data: Union[bytes, str, int] = b'', track: bool = True, copy: bool = None) -> None:
63
"""
64
Create a new Message.
65
66
Parameters:
67
- data: Initial data
68
- track: Whether to track message lifecycle
69
- copy: Whether to copy the data
70
"""
71
72
def get(self, property: int) -> int:
73
"""
74
Get a message property.
75
76
Parameters:
77
- property: Message property constant (MORE, SRCFD, SHARED, etc.)
78
79
Returns:
80
- int: Property value
81
"""
82
83
def set(self, property: int, value: int) -> None:
84
"""
85
Set a message property.
86
87
Parameters:
88
- property: Message property constant
89
- value: Property value
90
"""
91
92
def gets(self, property: str) -> Optional[str]:
93
"""
94
Get a message metadata property as string.
95
96
Parameters:
97
- property: Property name
98
99
Returns:
100
- str or None: Property value as string
101
"""
102
103
def routing_id(self) -> Optional[bytes]:
104
"""
105
Get the routing ID for this message.
106
107
Returns:
108
- bytes or None: Routing ID if available
109
"""
110
111
def group(self) -> Optional[str]:
112
"""
113
Get the group for this message.
114
115
Returns:
116
- str or None: Group name if set
117
"""
118
```
119
120
### Message Tracking
121
122
MessageTracker objects allow monitoring the lifecycle of sent messages.
123
124
```python { .api }
125
class MessageTracker:
126
@property
127
def done(self) -> bool:
128
"""True if all tracked messages have been sent/received."""
129
130
def wait(self, timeout: int = -1) -> bool:
131
"""
132
Wait for tracked messages to complete.
133
134
Parameters:
135
- timeout: Timeout in milliseconds (-1 for infinite)
136
137
Returns:
138
- bool: True if completed, False if timeout
139
"""
140
```
141
142
## Usage Examples
143
144
### Basic Frame Operations
145
146
```python
147
import zmq
148
149
# Create frames from different data types
150
frame1 = zmq.Frame(b"Hello World")
151
frame2 = zmq.Frame("Hello World") # Auto-encoded as UTF-8
152
frame3 = zmq.Frame(1024) # Create frame with 1024 bytes capacity
153
154
# Access frame data
155
data = frame1.bytes
156
buffer = frame1.buffer
157
size = len(frame1)
158
159
print(f"Frame data: {frame1}")
160
print(f"Frame size: {size} bytes")
161
162
# Copy frames
163
frame_copy = frame1.copy()
164
```
165
166
### Zero-Copy Message Sending
167
168
```python
169
import zmq
170
171
context = zmq.Context()
172
socket = context.socket(zmq.PUSH)
173
socket.bind("tcp://*:5555")
174
175
# Create large message
176
large_data = b"x" * 1000000 # 1MB of data
177
178
# Send with zero-copy (no data duplication)
179
frame = zmq.Frame(large_data, copy=False)
180
tracker = socket.send(frame, copy=False, track=True)
181
182
# Wait for message to be sent
183
if tracker.wait(timeout=5000):
184
print("Large message sent successfully")
185
else:
186
print("Send timeout")
187
188
socket.close()
189
context.term()
190
```
191
192
### Message Properties and Metadata
193
194
```python
195
import zmq
196
197
# Create message with tracking
198
msg = zmq.Message(b"Hello with metadata", track=True)
199
200
# Check message properties
201
has_more = msg.get(zmq.MORE)
202
is_shared = msg.get(zmq.SHARED)
203
204
print(f"Has more parts: {bool(has_more)}")
205
print(f"Is shared: {bool(is_shared)}")
206
207
# Access metadata (if available)
208
routing_id = msg.routing_id()
209
group = msg.group()
210
211
if routing_id:
212
print(f"Routing ID: {routing_id}")
213
if group:
214
print(f"Group: {group}")
215
```
216
217
### Multipart Message Construction
218
219
```python
220
import zmq
221
222
context = zmq.Context()
223
socket = context.socket(zmq.DEALER)
224
socket.connect("tcp://localhost:5555")
225
226
# Create multipart message with frames
227
header = zmq.Frame(b"HEADER")
228
body = zmq.Frame(b"Message body content")
229
footer = zmq.Frame(b"FOOTER")
230
231
# Send as multipart message
232
parts = [header, body, footer]
233
tracker = socket.send_multipart(parts, copy=False, track=True)
234
235
# Wait for completion
236
if tracker.wait():
237
print("Multipart message sent")
238
239
socket.close()
240
context.term()
241
```
242
243
### Receiving and Processing Frames
244
245
```python
246
import zmq
247
248
context = zmq.Context()
249
socket = context.socket(zmq.PULL)
250
socket.connect("tcp://localhost:5555")
251
252
while True:
253
# Receive frame (zero-copy)
254
frame = socket.recv(copy=False)
255
256
# Check if it's part of multipart message
257
if frame.more:
258
print("This frame has more parts")
259
260
# Process frame data
261
data = frame.bytes
262
print(f"Received {len(data)} bytes")
263
264
# Access as buffer for efficient processing
265
buffer = frame.buffer
266
# Process buffer without copying...
267
268
if not frame.more:
269
print("Complete message received")
270
break
271
272
socket.close()
273
context.term()
274
```
275
276
### Memory-Mapped Frame Creation
277
278
```python
279
import zmq
280
import mmap
281
import os
282
283
# Create memory-mapped file
284
filename = "large_data.bin"
285
with open(filename, "wb") as f:
286
f.write(b"x" * 1000000) # 1MB file
287
288
# Memory-map the file
289
with open(filename, "rb") as f:
290
with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mm:
291
# Create frame from memory-mapped data (zero-copy)
292
frame = zmq.Frame(mm, copy=False)
293
294
context = zmq.Context()
295
socket = context.socket(zmq.PUSH)
296
socket.bind("tcp://*:5555")
297
298
# Send memory-mapped data efficiently
299
tracker = socket.send(frame, copy=False, track=True)
300
301
if tracker.wait():
302
print("Memory-mapped data sent")
303
304
socket.close()
305
context.term()
306
307
# Clean up
308
os.unlink(filename)
309
```
310
311
### Custom Frame Subclassing
312
313
```python
314
import zmq
315
from typing import Any
316
317
class TimestampedFrame(zmq.Frame):
318
"""Frame with timestamp metadata"""
319
320
def __init__(self, data: bytes = b'', timestamp: float = None):
321
super().__init__(data)
322
self._timestamp = timestamp or time.time()
323
324
@property
325
def timestamp(self) -> float:
326
return self._timestamp
327
328
def age(self) -> float:
329
"""Get age of frame in seconds"""
330
return time.time() - self._timestamp
331
332
# Usage
333
import time
334
335
frame = TimestampedFrame(b"Hello World")
336
time.sleep(1)
337
print(f"Frame age: {frame.age():.2f} seconds")
338
```
339
340
### Frame Buffer Operations
341
342
```python
343
import zmq
344
import numpy as np
345
346
# Create frame from numpy array
347
array = np.arange(1000, dtype=np.float64)
348
frame = zmq.Frame(array.tobytes(), copy=False)
349
350
context = zmq.Context()
351
socket = context.socket(zmq.PUSH)
352
socket.bind("tcp://*:5555")
353
354
# Send numpy array efficiently
355
socket.send(frame, copy=False)
356
357
socket.close()
358
context.term()
359
360
# Receiving end
361
context = zmq.Context()
362
socket = context.socket(zmq.PULL)
363
socket.connect("tcp://localhost:5555")
364
365
# Receive and reconstruct numpy array
366
frame = socket.recv(copy=False)
367
received_array = np.frombuffer(frame.buffer, dtype=np.float64)
368
369
print(f"Received array shape: {received_array.shape}")
370
print(f"Array data: {received_array[:10]}...") # First 10 elements
371
372
socket.close()
373
context.term()
374
```
375
376
## Performance Considerations
377
378
### Zero-Copy Operations
379
380
```python
381
import zmq
382
383
# Efficient: Zero-copy sending
384
frame = zmq.Frame(large_data, copy=False)
385
socket.send(frame, copy=False)
386
387
# Less efficient: Data is copied twice
388
socket.send(large_data, copy=True) # Default behavior
389
390
# Efficient: Zero-copy receiving
391
frame = socket.recv(copy=False)
392
data = frame.buffer # Access as memoryview
393
394
# Less efficient: Data is copied
395
data = socket.recv(copy=True) # Returns bytes copy
396
```
397
398
### Message Tracking
399
400
```python
401
import zmq
402
403
# Track message lifecycle for reliability
404
tracker = socket.send(frame, track=True)
405
406
# Non-blocking check
407
if tracker.done:
408
print("Message sent")
409
410
# Blocking wait with timeout
411
if tracker.wait(timeout=1000):
412
print("Message confirmed sent")
413
else:
414
print("Send timeout - message may be lost")
415
```
416
417
## Types
418
419
```python { .api }
420
from typing import Union, Optional, Any
421
import memoryview
422
423
# Frame data types
424
FrameData = Union[bytes, str, memoryview, int]
425
BufferLike = Union[bytes, memoryview, bytearray]
426
427
# Message property types
428
MessageProperty = int
429
PropertyValue = Union[int, str, None]
430
431
# Tracking types
432
TrackerResult = bool
433
```