or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

actors.mdbrokers.mdcomposition.mdindex.mdmessages.mdmiddleware.mdrate-limiting.mdresults.mdworkers.md

messages.mddocs/

0

# Messages

1

2

Messages in Dramatiq represent tasks to be executed by workers. They contain all the information needed to invoke an actor, including arguments, metadata, and processing options. The message system also handles serialization, routing, and integration with composition features.

3

4

## Capabilities

5

6

### Message Class

7

8

The core message data structure that represents a task.

9

10

```python { .api }

11

class Message:

12

def __init__(

13

self,

14

queue_name: str,

15

actor_name: str,

16

args: tuple,

17

kwargs: Dict[str, Any],

18

options: Dict[str, Any],

19

message_id: str = None,

20

message_timestamp: int = None

21

):

22

"""

23

Create a message instance.

24

25

Parameters:

26

- queue_name: Name of the queue for this message

27

- actor_name: Name of the actor to execute

28

- args: Positional arguments for the actor

29

- kwargs: Keyword arguments for the actor

30

- options: Message options and metadata

31

- message_id: Unique message identifier (auto-generated if None)

32

- message_timestamp: Unix timestamp in milliseconds (auto-generated if None)

33

"""

34

35

def encode(self) -> bytes:

36

"""

37

Encode message to bytes for transmission.

38

39

Returns:

40

Serialized message data

41

"""

42

43

@classmethod

44

def decode(cls, data: bytes) -> 'Message':

45

"""

46

Decode message from bytes.

47

48

Parameters:

49

- data: Serialized message data

50

51

Returns:

52

Message instance

53

54

Raises:

55

DecodeError: If data cannot be decoded

56

"""

57

58

def copy(self, **attributes) -> 'Message':

59

"""

60

Create a copy of the message with modified attributes.

61

62

Parameters:

63

- **attributes: Attributes to modify in the copy

64

65

Returns:

66

New message instance with modifications

67

"""

68

69

def get_result(

70

self,

71

*,

72

backend: ResultBackend = None,

73

block: bool = False,

74

timeout: int = None

75

):

76

"""

77

Get result for this message (requires Results middleware).

78

79

Parameters:

80

- backend: Result backend to use (uses broker's backend if None)

81

- block: Whether to block waiting for result

82

- timeout: Timeout in milliseconds when blocking

83

84

Returns:

85

Task result

86

87

Raises:

88

ResultMissing: If result not available

89

ResultTimeout: If timeout exceeded while blocking

90

ResultFailure: If task failed with exception

91

"""

92

93

def asdict(self) -> Dict[str, Any]:

94

"""

95

Convert message to dictionary representation.

96

97

Returns:

98

Dictionary containing message data

99

"""

100

101

def __or__(self, other) -> 'pipeline':

102

"""

103

Create pipeline with this message using | operator.

104

105

Parameters:

106

- other: Message or pipeline to chain with

107

108

Returns:

109

Pipeline containing both messages

110

"""

111

112

# Properties

113

queue_name: str # Queue name

114

actor_name: str # Actor name

115

args: tuple # Positional arguments

116

kwargs: Dict[str, Any] # Keyword arguments

117

options: Dict[str, Any] # Message options

118

message_id: str # Unique identifier

119

message_timestamp: int # Creation timestamp

120

```

121

122

**Usage:**

123

124

```python

125

import dramatiq

126

127

@dramatiq.actor

128

def example_task(x, y, multiplier=1):

129

return (x + y) * multiplier

130

131

# Create message manually

132

message = dramatiq.Message(

133

queue_name="default",

134

actor_name="example_task",

135

args=(10, 20),

136

kwargs={"multiplier": 2},

137

options={"max_retries": 5}

138

)

139

140

# Send message

141

broker = dramatiq.get_broker()

142

broker.enqueue(message)

143

144

# Message created by actor

145

auto_message = example_task.message(10, 20, multiplier=2)

146

print(f"Message ID: {auto_message.message_id}")

147

print(f"Message data: {auto_message.asdict()}")

148

149

# Send the message

150

auto_message = example_task.send(10, 20, multiplier=2)

151

```

152

153

### Message Encoding

154

155

The encoding system handles serialization and deserialization of messages for transmission between brokers and workers.

156

157

#### Encoder Base Class

158

159

```python { .api }

160

class Encoder:

161

"""

162

Abstract base class for message encoders.

163

164

Encoders handle serialization of message data for storage

165

and transmission through brokers.

166

"""

167

168

def encode(self, data: MessageData) -> bytes:

169

"""

170

Encode message data to bytes.

171

172

Parameters:

173

- data: Message data to encode

174

175

Returns:

176

Encoded message as bytes

177

"""

178

raise NotImplementedError

179

180

def decode(self, data: bytes) -> MessageData:

181

"""

182

Decode bytes to message data.

183

184

Parameters:

185

- data: Encoded message bytes

186

187

Returns:

188

Decoded message data

189

190

Raises:

191

DecodeError: If data cannot be decoded

192

"""

193

raise NotImplementedError

194

```

195

196

#### JSON Encoder

197

198

Default encoder using JSON serialization.

199

200

```python { .api }

201

class JSONEncoder(Encoder):

202

def __init__(self):

203

"""

204

Create JSON encoder for message serialization.

205

206

Uses JSON for cross-language compatibility and human readability.

207

Handles standard Python types: str, int, float, bool, list, dict, None.

208

"""

209

210

def encode(self, data: MessageData) -> bytes:

211

"""

212

Encode message data to JSON bytes.

213

214

Returns:

215

UTF-8 encoded JSON data

216

"""

217

218

def decode(self, data: bytes) -> MessageData:

219

"""

220

Decode JSON bytes to message data.

221

222

Returns:

223

Decoded Python objects

224

"""

225

```

226

227

**Usage:**

228

229

```python

230

from dramatiq import JSONEncoder

231

232

# JSON encoder (default)

233

json_encoder = JSONEncoder()

234

235

@dramatiq.actor

236

def json_compatible_task(data):

237

# Data must be JSON-serializable

238

return {

239

"processed": data["input"],

240

"timestamp": time.time(),

241

"items": [1, 2, 3, 4, 5]

242

}

243

244

# Send JSON-compatible data

245

json_compatible_task.send({

246

"input": "test data",

247

"config": {"option1": True, "option2": "value"}

248

})

249

```

250

251

#### Pickle Encoder

252

253

Encoder using Python's pickle for complex object serialization.

254

255

```python { .api }

256

class PickleEncoder(Encoder):

257

def __init__(self):

258

"""

259

Create Pickle encoder for Python object serialization.

260

261

WARNING: Pickle can execute arbitrary code during deserialization.

262

Only use with trusted data sources.

263

"""

264

265

def encode(self, data: MessageData) -> bytes:

266

"""

267

Encode message data using pickle.

268

269

Returns:

270

Pickled data bytes

271

"""

272

273

def decode(self, data: bytes) -> MessageData:

274

"""

275

Decode pickled bytes to message data.

276

277

Returns:

278

Unpickled Python objects

279

"""

280

```

281

282

**Usage:**

283

284

```python

285

from dramatiq import PickleEncoder, set_encoder

286

import datetime

287

288

# Set pickle encoder globally (use with caution)

289

pickle_encoder = PickleEncoder()

290

dramatiq.set_encoder(pickle_encoder)

291

292

class CustomObject:

293

def __init__(self, value):

294

self.value = value

295

self.created = datetime.datetime.now()

296

297

@dramatiq.actor

298

def pickle_compatible_task(obj):

299

# Can handle complex Python objects

300

return {

301

"object_value": obj.value,

302

"created": obj.created,

303

"processed": datetime.datetime.now()

304

}

305

306

# Send complex objects

307

custom_obj = CustomObject("test value")

308

pickle_compatible_task.send(custom_obj)

309

```

310

311

### Global Encoder Management

312

313

Functions for managing the global message encoder.

314

315

```python { .api }

316

def get_encoder() -> Encoder:

317

"""

318

Get the current global message encoder.

319

320

Returns:

321

Current encoder instance (defaults to JSONEncoder)

322

"""

323

324

def set_encoder(encoder: Encoder):

325

"""

326

Set the global message encoder.

327

328

Parameters:

329

- encoder: Encoder instance to use globally

330

"""

331

```

332

333

**Usage:**

334

335

```python

336

# Check current encoder

337

current_encoder = dramatiq.get_encoder()

338

print(f"Current encoder: {type(current_encoder).__name__}")

339

340

# Switch to custom encoder

341

custom_encoder = MyCustomEncoder()

342

dramatiq.set_encoder(custom_encoder)

343

344

# All messages will now use the custom encoder

345

@dramatiq.actor

346

def task_with_custom_encoding(data):

347

return process_with_custom_format(data)

348

```

349

350

### Advanced Message Patterns

351

352

#### Message Metadata and Options

353

354

```python

355

@dramatiq.actor

356

def metadata_aware_task(data):

357

# Access current message in middleware

358

from dramatiq.middleware import get_current_message

359

360

try:

361

current_msg = get_current_message()

362

363

return {

364

"data": data,

365

"message_id": current_msg.message_id,

366

"retry_count": current_msg.options.get("retries", 0),

367

"queue": current_msg.queue_name,

368

"created_at": current_msg.message_timestamp

369

}

370

except:

371

# Fallback when no current message available

372

return {"data": data, "no_metadata": True}

373

374

# Send with custom options

375

message = metadata_aware_task.message_with_options(

376

args=("test_data",),

377

delay=5000, # 5 second delay

378

max_retries=3,

379

custom_option="custom_value"

380

)

381

message_id = broker.enqueue(message).message_id

382

```

383

384

#### Message Copying and Modification

385

386

```python

387

@dramatiq.actor

388

def original_task(data, multiplier=1):

389

return data * multiplier

390

391

# Create base message

392

base_message = original_task.message("hello", multiplier=2)

393

394

# Create variations

395

urgent_message = base_message.copy(

396

queue_name="urgent",

397

options={**base_message.options, "priority": 0}

398

)

399

400

delayed_message = base_message.copy(

401

args=("goodbye",),

402

options={**base_message.options, "delay": 30000}

403

)

404

405

# Send variations

406

broker.enqueue(urgent_message)

407

broker.enqueue(delayed_message)

408

```

409

410

#### Conditional Message Creation

411

412

```python

413

def create_conditional_message(data, priority="normal"):

414

"""Create message based on priority level"""

415

416

if priority == "urgent":

417

return urgent_task.message_with_options(

418

args=(data,),

419

queue_name="urgent",

420

priority=0,

421

max_retries=1

422

)

423

elif priority == "high":

424

return high_priority_task.message_with_options(

425

args=(data,),

426

queue_name="high_priority",

427

priority=1,

428

max_retries=3

429

)

430

else:

431

return normal_task.message_with_options(

432

args=(data,),

433

queue_name="normal",

434

priority=5,

435

max_retries=5

436

)

437

438

# Usage

439

urgent_msg = create_conditional_message("critical_data", "urgent")

440

normal_msg = create_conditional_message("regular_data", "normal")

441

442

broker.enqueue(urgent_msg)

443

broker.enqueue(normal_msg)

444

```

445

446

### Message Lifecycle and Tracking

447

448

#### Message State Tracking

449

450

```python

451

import time

452

from enum import Enum

453

454

class MessageState(Enum):

455

CREATED = "created"

456

ENQUEUED = "enqueued"

457

PROCESSING = "processing"

458

COMPLETED = "completed"

459

FAILED = "failed"

460

RETRYING = "retrying"

461

462

class TrackedMessage:

463

def __init__(self, message):

464

self.message = message

465

self.state = MessageState.CREATED

466

self.state_history = [(MessageState.CREATED, time.time())]

467

self.result = None

468

self.error = None

469

470

def update_state(self, new_state, error=None):

471

self.state = new_state

472

self.state_history.append((new_state, time.time()))

473

if error:

474

self.error = error

475

476

def get_duration(self):

477

if len(self.state_history) < 2:

478

return 0

479

start_time = self.state_history[0][1]

480

end_time = self.state_history[-1][1]

481

return end_time - start_time

482

483

# Middleware for message tracking

484

class MessageTrackingMiddleware(dramatiq.Middleware):

485

def __init__(self):

486

self.tracked_messages = {}

487

488

def before_enqueue(self, broker, message, delay):

489

tracked = TrackedMessage(message)

490

tracked.update_state(MessageState.ENQUEUED)

491

self.tracked_messages[message.message_id] = tracked

492

493

def before_process_message(self, broker, message):

494

if message.message_id in self.tracked_messages:

495

tracked = self.tracked_messages[message.message_id]

496

tracked.update_state(MessageState.PROCESSING)

497

498

def after_process_message(self, broker, message, *, result=None, exception=None):

499

if message.message_id in self.tracked_messages:

500

tracked = self.tracked_messages[message.message_id]

501

if exception:

502

tracked.update_state(MessageState.FAILED, error=str(exception))

503

else:

504

tracked.update_state(MessageState.COMPLETED)

505

tracked.result = result

506

507

# Usage

508

tracking_middleware = MessageTrackingMiddleware()

509

broker.add_middleware(tracking_middleware)

510

511

@dramatiq.actor

512

def tracked_task(data):

513

time.sleep(1) # Simulate work

514

return f"Processed: {data}"

515

516

# Send tracked message

517

message = tracked_task.send("test_data")

518

519

# Check tracking later

520

time.sleep(2)

521

tracked = tracking_middleware.tracked_messages.get(message.message_id)

522

if tracked:

523

print(f"Message state: {tracked.state}")

524

print(f"Duration: {tracked.get_duration():.2f}s")

525

```

526

527

#### Message Batching

528

529

```python

530

class MessageBatch:

531

def __init__(self, broker):

532

self.broker = broker

533

self.messages = []

534

535

def add(self, message):

536

"""Add message to batch"""

537

self.messages.append(message)

538

539

def send_all(self, delay=None):

540

"""Send all messages in the batch"""

541

sent_messages = []

542

543

for message in self.messages:

544

if delay:

545

message = message.copy(options={**message.options, "delay": delay})

546

547

sent_message = self.broker.enqueue(message)

548

sent_messages.append(sent_message)

549

550

self.messages.clear()

551

return sent_messages

552

553

def __len__(self):

554

return len(self.messages)

555

556

# Usage

557

batch = MessageBatch(broker)

558

559

# Add messages to batch

560

for i in range(10):

561

msg = process_item.message(f"item_{i}", {"config": "batch_mode"})

562

batch.add(msg)

563

564

# Send entire batch with delay

565

sent_messages = batch.send_all(delay=1000) # 1 second delay

566

print(f"Sent {len(sent_messages)} messages")

567

```

568

569

### Custom Message Serialization

570

571

#### Custom Encoder Implementation

572

573

```python

574

import msgpack

575

from dramatiq import Encoder, DecodeError

576

577

class MessagePackEncoder(Encoder):

578

"""Custom encoder using MessagePack for efficient serialization"""

579

580

def encode(self, data):

581

try:

582

return msgpack.packb(data, use_bin_type=True)

583

except Exception as e:

584

raise DecodeError(f"Failed to encode with MessagePack: {e}")

585

586

def decode(self, data):

587

try:

588

return msgpack.unpackb(data, raw=False, strict_map_key=False)

589

except Exception as e:

590

raise DecodeError(f"Failed to decode with MessagePack: {e}")

591

592

# Use custom encoder

593

msgpack_encoder = MessagePackEncoder()

594

dramatiq.set_encoder(msgpack_encoder)

595

596

@dramatiq.actor

597

def msgpack_task(data):

598

"""Task using MessagePack encoding"""

599

return {

600

"input_size": len(str(data)),

601

"processed": True,

602

"binary_data": b"binary content"

603

}

604

605

# Send binary-friendly data

606

msgpack_task.send({

607

"text": "Hello World",

608

"binary": b"\\x00\\x01\\x02\\x03",

609

"numbers": [1, 2, 3, 4, 5]

610

})

611

```

612

613

#### Compression Support

614

615

```python

616

import gzip

617

import json

618

619

class CompressedJSONEncoder(dramatiq.Encoder):

620

"""JSON encoder with gzip compression"""

621

622

def __init__(self, compression_level=6):

623

self.compression_level = compression_level

624

625

def encode(self, data):

626

try:

627

json_data = json.dumps(data).encode('utf-8')

628

return gzip.compress(json_data, compresslevel=self.compression_level)

629

except Exception as e:

630

raise dramatiq.DecodeError(f"Failed to encode/compress: {e}")

631

632

def decode(self, data):

633

try:

634

decompressed = gzip.decompress(data)

635

return json.loads(decompressed.decode('utf-8'))

636

except Exception as e:

637

raise dramatiq.DecodeError(f"Failed to decompress/decode: {e}")

638

639

# Use compressed encoder for large messages

640

compressed_encoder = CompressedJSONEncoder(compression_level=9)

641

dramatiq.set_encoder(compressed_encoder)

642

643

@dramatiq.actor

644

def large_data_task(large_data):

645

"""Task handling large data with compression"""

646

return {

647

"processed_items": len(large_data),

648

"sample": large_data[:10] if large_data else [],

649

"compression_effective": True

650

}

651

652

# Send large dataset

653

large_dataset = list(range(10000))

654

large_data_task.send(large_dataset)

655

```

656

657

### Message Debugging and Inspection

658

659

#### Message Inspector

660

661

```python

662

import pprint

663

664

class MessageInspector:

665

def __init__(self, broker):

666

self.broker = broker

667

668

def inspect_message(self, message):

669

"""Detailed message inspection"""

670

print(f"=== Message Inspection ===")

671

print(f"ID: {message.message_id}")

672

print(f"Actor: {message.actor_name}")

673

print(f"Queue: {message.queue_name}")

674

print(f"Timestamp: {message.message_timestamp}")

675

print(f"Args: {message.args}")

676

print(f"Kwargs: {message.kwargs}")

677

print(f"Options:")

678

pprint.pprint(message.options, indent=2)

679

680

# Size information

681

encoded = message.encode()

682

print(f"Encoded size: {len(encoded)} bytes")

683

684

# Validate encoding/decoding

685

try:

686

decoded = dramatiq.Message.decode(encoded)

687

print("✓ Encoding/decoding successful")

688

except Exception as e:

689

print(f"✗ Encoding/decoding failed: {e}")

690

691

def compare_messages(self, msg1, msg2):

692

"""Compare two messages"""

693

print(f"=== Message Comparison ===")

694

695

fields = ['message_id', 'actor_name', 'queue_name', 'args', 'kwargs', 'options']

696

for field in fields:

697

val1 = getattr(msg1, field)

698

val2 = getattr(msg2, field)

699

700

if val1 == val2:

701

print(f"✓ {field}: MATCH")

702

else:

703

print(f"✗ {field}: DIFFER")

704

print(f" Message 1: {val1}")

705

print(f" Message 2: {val2}")

706

707

# Usage

708

inspector = MessageInspector(broker)

709

710

@dramatiq.actor

711

def debug_task(data, option=None):

712

return f"Debug: {data} with {option}"

713

714

# Create and inspect message

715

message = debug_task.message("test_data", option="debug_mode")

716

inspector.inspect_message(message)

717

718

# Create modified copy and compare

719

modified = message.copy(queue_name="debug", options={"debug": True})

720

inspector.compare_messages(message, modified)

721

```

722

723

#### Message Queue Monitoring

724

725

```python

726

def monitor_message_queues(broker, interval=5):

727

"""Monitor message queues for debugging"""

728

729

def queue_stats():

730

while True:

731

try:

732

# Get queue information (broker-specific)

733

if hasattr(broker, 'client') and hasattr(broker.client, 'info'):

734

# Redis broker

735

for queue_name in broker.queues:

736

key = f"{broker.namespace}:{queue_name}.msgs"

737

length = broker.client.llen(key)

738

print(f"Queue '{queue_name}': {length} messages")

739

740

print("---")

741

time.sleep(interval)

742

743

except Exception as e:

744

print(f"Error monitoring queues: {e}")

745

time.sleep(interval)

746

747

monitor_thread = threading.Thread(target=queue_stats, daemon=True)

748

monitor_thread.start()

749

return monitor_thread

750

751

# Start queue monitoring

752

monitor_thread = monitor_message_queues(broker)

753

754

# Send some messages to observe

755

for i in range(20):

756

debug_task.send(f"message_{i}")

757

```

758

759

This comprehensive message documentation covers all aspects of working with messages in Dramatiq, from basic usage to advanced patterns for tracking, batching, custom serialization, and debugging.