or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-support.mdauthentication.mdconstants.mdcore-messaging.mddevices.mderror-handling.mdindex.mdmessage-handling.mdpolling.md

message-handling.mddocs/

0

# Message Handling

1

2

Zero-copy message objects (Frame/Message) for efficient data transfer, with support for metadata, copying control, and memory-mapped operations.

3

4

## Capabilities

5

6

### Frame Objects

7

8

Frame objects provide zero-copy message handling with optional tracking and metadata support.

9

10

```python { .api }

11

class Frame:

12

def __init__(self, data: Union[bytes, str, int] = b'', track: bool = True, copy: bool = None) -> None:

13

"""

14

Create a new Frame.

15

16

Parameters:

17

- data: Initial data (bytes, string, or buffer size)

18

- track: Whether to track the Frame lifecycle

19

- copy: Whether to copy the data (None for auto-detect)

20

"""

21

22

@property

23

def bytes(self) -> bytes:

24

"""Get frame data as bytes."""

25

26

@property

27

def buffer(self) -> memoryview:

28

"""Get frame data as memoryview buffer."""

29

30

def copy(self) -> Frame:

31

"""

32

Create a copy of the frame.

33

34

Returns:

35

- Frame: New Frame with copied data

36

"""

37

38

def __len__(self) -> int:

39

"""Get frame size in bytes."""

40

41

def __bytes__(self) -> bytes:

42

"""Convert frame to bytes."""

43

44

def __str__(self) -> str:

45

"""Convert frame to string using utf-8."""

46

47

@property

48

def more(self) -> bool:

49

"""True if this frame is part of a multipart message with more parts."""

50

51

@property

52

def tracker(self) -> Optional[MessageTracker]:

53

"""MessageTracker for this frame, if tracking is enabled."""

54

```

55

56

### Message Objects

57

58

Message objects extend Frame with additional ZMQ message properties and metadata.

59

60

```python { .api }

61

class Message(Frame):

62

def __init__(self, data: Union[bytes, str, int] = b'', track: bool = True, copy: bool = None) -> None:

63

"""

64

Create a new Message.

65

66

Parameters:

67

- data: Initial data

68

- track: Whether to track message lifecycle

69

- copy: Whether to copy the data

70

"""

71

72

def get(self, property: int) -> int:

73

"""

74

Get a message property.

75

76

Parameters:

77

- property: Message property constant (MORE, SRCFD, SHARED, etc.)

78

79

Returns:

80

- int: Property value

81

"""

82

83

def set(self, property: int, value: int) -> None:

84

"""

85

Set a message property.

86

87

Parameters:

88

- property: Message property constant

89

- value: Property value

90

"""

91

92

def gets(self, property: str) -> Optional[str]:

93

"""

94

Get a message metadata property as string.

95

96

Parameters:

97

- property: Property name

98

99

Returns:

100

- str or None: Property value as string

101

"""

102

103

def routing_id(self) -> Optional[bytes]:

104

"""

105

Get the routing ID for this message.

106

107

Returns:

108

- bytes or None: Routing ID if available

109

"""

110

111

def group(self) -> Optional[str]:

112

"""

113

Get the group for this message.

114

115

Returns:

116

- str or None: Group name if set

117

"""

118

```

119

120

### Message Tracking

121

122

MessageTracker objects allow monitoring the lifecycle of sent messages.

123

124

```python { .api }

125

class MessageTracker:

126

@property

127

def done(self) -> bool:

128

"""True if all tracked messages have been sent/received."""

129

130

def wait(self, timeout: int = -1) -> bool:

131

"""

132

Wait for tracked messages to complete.

133

134

Parameters:

135

- timeout: Timeout in milliseconds (-1 for infinite)

136

137

Returns:

138

- bool: True if completed, False if timeout

139

"""

140

```

141

142

## Usage Examples

143

144

### Basic Frame Operations

145

146

```python

147

import zmq

148

149

# Create frames from different data types

150

frame1 = zmq.Frame(b"Hello World")

151

frame2 = zmq.Frame("Hello World") # Auto-encoded as UTF-8

152

frame3 = zmq.Frame(1024) # Create frame with 1024 bytes capacity

153

154

# Access frame data

155

data = frame1.bytes

156

buffer = frame1.buffer

157

size = len(frame1)

158

159

print(f"Frame data: {frame1}")

160

print(f"Frame size: {size} bytes")

161

162

# Copy frames

163

frame_copy = frame1.copy()

164

```

165

166

### Zero-Copy Message Sending

167

168

```python

169

import zmq

170

171

context = zmq.Context()

172

socket = context.socket(zmq.PUSH)

173

socket.bind("tcp://*:5555")

174

175

# Create large message

176

large_data = b"x" * 1000000 # 1MB of data

177

178

# Send with zero-copy (no data duplication)

179

frame = zmq.Frame(large_data, copy=False)

180

tracker = socket.send(frame, copy=False, track=True)

181

182

# Wait for message to be sent

183

if tracker.wait(timeout=5000):

184

print("Large message sent successfully")

185

else:

186

print("Send timeout")

187

188

socket.close()

189

context.term()

190

```

191

192

### Message Properties and Metadata

193

194

```python

195

import zmq

196

197

# Create message with tracking

198

msg = zmq.Message(b"Hello with metadata", track=True)

199

200

# Check message properties

201

has_more = msg.get(zmq.MORE)

202

is_shared = msg.get(zmq.SHARED)

203

204

print(f"Has more parts: {bool(has_more)}")

205

print(f"Is shared: {bool(is_shared)}")

206

207

# Access metadata (if available)

208

routing_id = msg.routing_id()

209

group = msg.group()

210

211

if routing_id:

212

print(f"Routing ID: {routing_id}")

213

if group:

214

print(f"Group: {group}")

215

```

216

217

### Multipart Message Construction

218

219

```python

220

import zmq

221

222

context = zmq.Context()

223

socket = context.socket(zmq.DEALER)

224

socket.connect("tcp://localhost:5555")

225

226

# Create multipart message with frames

227

header = zmq.Frame(b"HEADER")

228

body = zmq.Frame(b"Message body content")

229

footer = zmq.Frame(b"FOOTER")

230

231

# Send as multipart message

232

parts = [header, body, footer]

233

tracker = socket.send_multipart(parts, copy=False, track=True)

234

235

# Wait for completion

236

if tracker.wait():

237

print("Multipart message sent")

238

239

socket.close()

240

context.term()

241

```

242

243

### Receiving and Processing Frames

244

245

```python

246

import zmq

247

248

context = zmq.Context()

249

socket = context.socket(zmq.PULL)

250

socket.connect("tcp://localhost:5555")

251

252

while True:

253

# Receive frame (zero-copy)

254

frame = socket.recv(copy=False)

255

256

# Check if it's part of multipart message

257

if frame.more:

258

print("This frame has more parts")

259

260

# Process frame data

261

data = frame.bytes

262

print(f"Received {len(data)} bytes")

263

264

# Access as buffer for efficient processing

265

buffer = frame.buffer

266

# Process buffer without copying...

267

268

if not frame.more:

269

print("Complete message received")

270

break

271

272

socket.close()

273

context.term()

274

```

275

276

### Memory-Mapped Frame Creation

277

278

```python

279

import zmq

280

import mmap

281

import os

282

283

# Create memory-mapped file

284

filename = "large_data.bin"

285

with open(filename, "wb") as f:

286

f.write(b"x" * 1000000) # 1MB file

287

288

# Memory-map the file

289

with open(filename, "rb") as f:

290

with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mm:

291

# Create frame from memory-mapped data (zero-copy)

292

frame = zmq.Frame(mm, copy=False)

293

294

context = zmq.Context()

295

socket = context.socket(zmq.PUSH)

296

socket.bind("tcp://*:5555")

297

298

# Send memory-mapped data efficiently

299

tracker = socket.send(frame, copy=False, track=True)

300

301

if tracker.wait():

302

print("Memory-mapped data sent")

303

304

socket.close()

305

context.term()

306

307

# Clean up

308

os.unlink(filename)

309

```

310

311

### Custom Frame Subclassing

312

313

```python

314

import zmq

315

from typing import Any

316

317

class TimestampedFrame(zmq.Frame):

318

"""Frame with timestamp metadata"""

319

320

def __init__(self, data: bytes = b'', timestamp: float = None):

321

super().__init__(data)

322

self._timestamp = timestamp or time.time()

323

324

@property

325

def timestamp(self) -> float:

326

return self._timestamp

327

328

def age(self) -> float:

329

"""Get age of frame in seconds"""

330

return time.time() - self._timestamp

331

332

# Usage

333

import time

334

335

frame = TimestampedFrame(b"Hello World")

336

time.sleep(1)

337

print(f"Frame age: {frame.age():.2f} seconds")

338

```

339

340

### Frame Buffer Operations

341

342

```python

343

import zmq

344

import numpy as np

345

346

# Create frame from numpy array

347

array = np.arange(1000, dtype=np.float64)

348

frame = zmq.Frame(array.tobytes(), copy=False)

349

350

context = zmq.Context()

351

socket = context.socket(zmq.PUSH)

352

socket.bind("tcp://*:5555")

353

354

# Send numpy array efficiently

355

socket.send(frame, copy=False)

356

357

socket.close()

358

context.term()

359

360

# Receiving end

361

context = zmq.Context()

362

socket = context.socket(zmq.PULL)

363

socket.connect("tcp://localhost:5555")

364

365

# Receive and reconstruct numpy array

366

frame = socket.recv(copy=False)

367

received_array = np.frombuffer(frame.buffer, dtype=np.float64)

368

369

print(f"Received array shape: {received_array.shape}")

370

print(f"Array data: {received_array[:10]}...") # First 10 elements

371

372

socket.close()

373

context.term()

374

```

375

376

## Performance Considerations

377

378

### Zero-Copy Operations

379

380

```python

381

import zmq

382

383

# Efficient: Zero-copy sending

384

frame = zmq.Frame(large_data, copy=False)

385

socket.send(frame, copy=False)

386

387

# Less efficient: Data is copied twice

388

socket.send(large_data, copy=True) # Default behavior

389

390

# Efficient: Zero-copy receiving

391

frame = socket.recv(copy=False)

392

data = frame.buffer # Access as memoryview

393

394

# Less efficient: Data is copied

395

data = socket.recv(copy=True) # Returns bytes copy

396

```

397

398

### Message Tracking

399

400

```python

401

import zmq

402

403

# Track message lifecycle for reliability

404

tracker = socket.send(frame, track=True)

405

406

# Non-blocking check

407

if tracker.done:

408

print("Message sent")

409

410

# Blocking wait with timeout

411

if tracker.wait(timeout=1000):

412

print("Message confirmed sent")

413

else:

414

print("Send timeout - message may be lost")

415

```

416

417

## Types

418

419

```python { .api }

420

from typing import Union, Optional, Any

421

import memoryview

422

423

# Frame data types

424

FrameData = Union[bytes, str, memoryview, int]

425

BufferLike = Union[bytes, memoryview, bytearray]

426

427

# Message property types

428

MessageProperty = int

429

PropertyValue = Union[int, str, None]

430

431

# Tracking types

432

TrackerResult = bool

433

```