or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

actors.mdbrokers.mdcomposition.mdindex.mdmessages.mdmiddleware.mdrate-limiting.mdresults.mdworkers.md

results.mddocs/

0

# Results

1

2

Result storage in Dramatiq enables tasks to store and retrieve their return values, making it possible to build complex workflows where tasks depend on the results of previous tasks. The system supports multiple storage backends and provides both synchronous and asynchronous result retrieval.

3

4

## Capabilities

5

6

### Results Middleware

7

8

The core middleware component that handles result storage and retrieval.

9

10

```python { .api }

11

class Results(Middleware):

12

def __init__(self, *, backend: ResultBackend = None, store_results: bool = False):

13

"""

14

Initialize results middleware.

15

16

Parameters:

17

- backend: Result storage backend (uses StubBackend if None)

18

- store_results: Whether to store results by default for all actors

19

"""

20

21

@property

22

def actor_options(self) -> Set[str]:

23

return {"store_results"}

24

```

25

26

**Usage:**

27

28

```python

29

from dramatiq.middleware import Results

30

from dramatiq.results.backends import RedisBackend

31

32

# Set up result backend

33

result_backend = RedisBackend()

34

35

# Create results middleware

36

results_middleware = Results(

37

backend=result_backend,

38

store_results=False # Only store when explicitly requested

39

)

40

41

# Add to broker

42

broker.add_middleware(results_middleware)

43

44

# Actors with result storage

45

@dramatiq.actor(store_results=True)

46

def compute_task(x, y):

47

result = x * y + 42

48

return {"computation": result, "inputs": [x, y]}

49

50

# Send task and get result

51

message = compute_task.send(10, 20)

52

result = message.get_result(block=True, timeout=30000)

53

print(f"Task result: {result}")

54

```

55

56

### Result Backend Base Class

57

58

Abstract base class for result storage backends.

59

60

```python { .api }

61

class ResultBackend:

62

def __init__(self, *, namespace: str = "dramatiq-results", encoder: Encoder = None):

63

"""

64

Initialize result backend.

65

66

Parameters:

67

- namespace: Key namespace for storing results

68

- encoder: Encoder for serializing results (uses JSON if None)

69

"""

70

71

def get_result(

72

self,

73

message: Message,

74

*,

75

block: bool = False,

76

timeout: int = 10000

77

):

78

"""

79

Get result for a message.

80

81

Parameters:

82

- message: Message to get result for

83

- block: Whether to block waiting for result

84

- timeout: Timeout in milliseconds when blocking

85

86

Returns:

87

Task result or Missing sentinel

88

89

Raises:

90

ResultTimeout: If timeout exceeded while blocking

91

ResultMissing: If result not found (when not blocking)

92

ResultFailure: If task failed with exception

93

"""

94

95

def store_result(self, message: Message, result, ttl: int):

96

"""

97

Store result for a message.

98

99

Parameters:

100

- message: Message to store result for

101

- result: Result value to store

102

- ttl: Time-to-live in milliseconds

103

"""

104

105

def delete_result(self, message: Message):

106

"""

107

Delete stored result for a message.

108

109

Parameters:

110

- message: Message to delete result for

111

"""

112

```

113

114

### Result Backend Implementations

115

116

#### Redis Backend

117

118

Production backend using Redis for result storage.

119

120

```python { .api }

121

class RedisBackend(ResultBackend):

122

def __init__(

123

self,

124

client: redis.Redis,

125

*,

126

namespace: str = "dramatiq-results",

127

encoder: Encoder = None

128

):

129

"""

130

Create Redis result backend.

131

132

Parameters:

133

- client: Redis client instance

134

- namespace: Key namespace for results (default: "dramatiq-results")

135

- encoder: Result encoder (uses JSON if None)

136

"""

137

```

138

139

**Usage:**

140

141

```python

142

import redis

143

from dramatiq.results.backends import RedisBackend

144

145

# Create Redis client

146

redis_client = redis.Redis(host="localhost", port=6379, db=2, decode_responses=True)

147

148

# Create Redis backend

149

redis_backend = RedisBackend(

150

redis_client,

151

namespace="myapp-results"

152

)

153

154

# Use with Results middleware

155

results = Results(backend=redis_backend, store_results=True)

156

broker.add_middleware(results)

157

```

158

159

#### Memcached Backend

160

161

Memcached backend for result storage.

162

163

```python { .api }

164

class MemcachedBackend(ResultBackend):

165

def __init__(

166

self,

167

client,

168

*,

169

namespace: str = "dramatiq-results",

170

encoder: Encoder = None

171

):

172

"""

173

Create Memcached result backend.

174

175

Parameters:

176

- client: Memcached client instance

177

- namespace: Key namespace for results

178

- encoder: Result encoder (uses JSON if None)

179

"""

180

```

181

182

**Usage:**

183

184

```python

185

import pylibmc

186

from dramatiq.results.backends import MemcachedBackend

187

188

# Create Memcached client

189

mc_client = pylibmc.Client(["127.0.0.1:11211"])

190

191

# Create Memcached backend

192

mc_backend = MemcachedBackend(mc_client)

193

194

# Use with Results middleware

195

results = Results(backend=mc_backend)

196

broker.add_middleware(results)

197

```

198

199

#### Stub Backend

200

201

In-memory backend for testing and development.

202

203

```python { .api }

204

class StubBackend(ResultBackend):

205

def __init__(self, *, namespace: str = "dramatiq-results", encoder: Encoder = None):

206

"""

207

Create in-memory result backend for testing.

208

209

Parameters:

210

- namespace: Key namespace for results

211

- encoder: Result encoder (uses JSON if None)

212

"""

213

```

214

215

**Usage:**

216

217

```python

218

from dramatiq.results.backends import StubBackend

219

220

# Create stub backend for testing

221

stub_backend = StubBackend()

222

223

# Use in tests

224

results = Results(backend=stub_backend, store_results=True)

225

broker.add_middleware(results)

226

227

# Test result storage

228

@dramatiq.actor(store_results=True)

229

def test_task(value):

230

return value * 2

231

232

message = test_task.send(21)

233

result = message.get_result(block=True)

234

assert result == 42

235

```

236

237

### Message Result Interface

238

239

Messages provide direct access to result operations.

240

241

```python { .api }

242

class Message:

243

def get_result(

244

self,

245

*,

246

backend: ResultBackend = None,

247

block: bool = False,

248

timeout: int = None

249

):

250

"""

251

Get result for this message.

252

253

Parameters:

254

- backend: Result backend to use (uses broker's backend if None)

255

- block: Whether to block waiting for result

256

- timeout: Timeout in milliseconds when blocking

257

258

Returns:

259

Task result

260

261

Raises:

262

ResultMissing: If result not available (when not blocking)

263

ResultTimeout: If timeout exceeded while blocking

264

ResultFailure: If task failed with exception

265

"""

266

```

267

268

**Usage:**

269

270

```python

271

@dramatiq.actor(store_results=True)

272

def long_running_task(duration):

273

import time

274

time.sleep(duration)

275

return {"completed_after": duration, "timestamp": time.time()}

276

277

# Send task

278

message = long_running_task.send(5)

279

280

# Non-blocking result check

281

try:

282

result = message.get_result(block=False)

283

print(f"Task completed: {result}")

284

except dramatiq.ResultMissing:

285

print("Task still running...")

286

287

# Blocking result retrieval

288

result = message.get_result(block=True, timeout=10000)

289

print(f"Final result: {result}")

290

```

291

292

### Result Errors

293

294

Specialized exceptions for result operations.

295

296

```python { .api }

297

class ResultError(Exception):

298

"""Base exception for result operations."""

299

300

class ResultMissing(ResultError):

301

"""Raised when result is not available."""

302

303

class ResultTimeout(ResultError):

304

"""Raised when timeout exceeded while waiting for result."""

305

306

class ResultFailure(ResultError):

307

"""

308

Raised when task failed with an exception.

309

310

Contains the original exception information.

311

"""

312

def __init__(self, exception_type, exception_value, traceback):

313

self.exception_type = exception_type

314

self.exception_value = exception_value

315

self.traceback = traceback

316

317

# Missing sentinel value

318

class Missing:

319

"""Sentinel value indicating missing result."""

320

321

Missing = Missing()

322

```

323

324

**Usage:**

325

326

```python

327

@dramatiq.actor(store_results=True)

328

def failing_task(should_fail):

329

if should_fail:

330

raise ValueError("Task intentionally failed")

331

return "Success"

332

333

# Handle different result scenarios

334

message = failing_task.send(True) # Will fail

335

336

try:

337

result = message.get_result(block=True, timeout=5000)

338

print(f"Success: {result}")

339

except dramatiq.ResultTimeout:

340

print("Task timed out")

341

except dramatiq.ResultFailure as e:

342

print(f"Task failed: {e.exception_type.__name__}: {e.exception_value}")

343

except dramatiq.ResultMissing:

344

print("Result not found")

345

```

346

347

### Advanced Result Patterns

348

349

#### Pipeline Results

350

351

```python

352

@dramatiq.actor(store_results=True)

353

def step_one(data):

354

processed = data.upper()

355

return {"step": 1, "data": processed, "length": len(processed)}

356

357

@dramatiq.actor(store_results=True)

358

def step_two(step_one_result):

359

data = step_one_result["data"]

360

return {"step": 2, "data": data + "!!!", "prev_length": step_one_result["length"]}

361

362

@dramatiq.actor(store_results=True)

363

def step_three(step_two_result):

364

return {

365

"step": 3,

366

"final_data": step_two_result["data"],

367

"total_transformations": 3

368

}

369

370

# Create pipeline with result dependencies

371

msg1 = step_one.send("hello world")

372

result1 = msg1.get_result(block=True)

373

374

msg2 = step_two.send(result1)

375

result2 = msg2.get_result(block=True)

376

377

msg3 = step_three.send(result2)

378

final_result = msg3.get_result(block=True)

379

380

print(f"Pipeline result: {final_result}")

381

```

382

383

#### Batch Result Collection

384

385

```python

386

@dramatiq.actor(store_results=True)

387

def process_item(item_id, item_data):

388

# Simulate processing

389

import time, random

390

time.sleep(random.uniform(0.1, 0.5))

391

return {

392

"item_id": item_id,

393

"processed_data": item_data.upper(),

394

"processing_time": random.uniform(0.1, 0.5)

395

}

396

397

# Send batch of tasks

398

batch_items = [

399

{"id": i, "data": f"item_{i}"}

400

for i in range(10)

401

]

402

403

messages = []

404

for item in batch_items:

405

msg = process_item.send(item["id"], item["data"])

406

messages.append(msg)

407

408

# Collect all results

409

results = []

410

for msg in messages:

411

try:

412

result = msg.get_result(block=True, timeout=10000)

413

results.append(result)

414

except dramatiq.ResultTimeout:

415

print(f"Message {msg.message_id} timed out")

416

except dramatiq.ResultFailure as e:

417

print(f"Message {msg.message_id} failed: {e}")

418

419

print(f"Collected {len(results)} results from {len(messages)} tasks")

420

```

421

422

#### Result Caching and TTL Management

423

424

```python

425

@dramatiq.actor(store_results=True)

426

def expensive_computation(input_data):

427

"""Expensive computation with result caching"""

428

import time, hashlib

429

430

# Simulate expensive operation

431

time.sleep(2)

432

433

# Generate deterministic result

434

hash_input = str(input_data).encode()

435

result_hash = hashlib.md5(hash_input).hexdigest()

436

437

return {

438

"input": input_data,

439

"result": result_hash,

440

"computed_at": time.time()

441

}

442

443

# Custom result storage with longer TTL

444

def store_with_custom_ttl(message, result, ttl_hours=24):

445

"""Store result with custom TTL"""

446

backend = broker.get_results_backend()

447

ttl_ms = ttl_hours * 3600 * 1000 # Convert to milliseconds

448

backend.store_result(message, result, ttl_ms)

449

450

# Usage with custom TTL

451

message = expensive_computation.send({"complex": "data"})

452

result = message.get_result(block=True)

453

454

# Store with longer TTL for caching

455

store_with_custom_ttl(message, result, ttl_hours=48)

456

```

457

458

#### Result Aggregation

459

460

```python

461

@dramatiq.actor(store_results=True)

462

def partial_computation(chunk_id, data_chunk):

463

"""Process a chunk of data"""

464

return {

465

"chunk_id": chunk_id,

466

"sum": sum(data_chunk),

467

"count": len(data_chunk),

468

"min": min(data_chunk),

469

"max": max(data_chunk)

470

}

471

472

@dramatiq.actor(store_results=True)

473

def aggregate_results(message_ids):

474

"""Aggregate results from multiple partial computations"""

475

backend = broker.get_results_backend()

476

477

partial_results = []

478

for msg_id in message_ids:

479

# Create dummy message for result retrieval

480

msg = dramatiq.Message(

481

queue_name="default",

482

actor_name="partial_computation",

483

args=(), kwargs={},

484

options={},

485

message_id=msg_id,

486

message_timestamp=0

487

)

488

489

try:

490

result = backend.get_result(msg, block=False)

491

if result != dramatiq.results.Missing:

492

partial_results.append(result)

493

except dramatiq.ResultMissing:

494

continue

495

496

# Aggregate all partial results

497

total_sum = sum(r["sum"] for r in partial_results)

498

total_count = sum(r["count"] for r in partial_results)

499

overall_min = min(r["min"] for r in partial_results)

500

overall_max = max(r["max"] for r in partial_results)

501

502

return {

503

"total_sum": total_sum,

504

"total_count": total_count,

505

"average": total_sum / total_count if total_count > 0 else 0,

506

"min": overall_min,

507

"max": overall_max,

508

"chunks_processed": len(partial_results)

509

}

510

511

# Usage

512

large_dataset = list(range(1000))

513

chunk_size = 100

514

chunks = [large_dataset[i:i+chunk_size] for i in range(0, len(large_dataset), chunk_size)]

515

516

# Process chunks

517

message_ids = []

518

for i, chunk in enumerate(chunks):

519

msg = partial_computation.send(i, chunk)

520

message_ids.append(msg.message_id)

521

522

# Wait for partial results to complete

523

import time

524

time.sleep(5)

525

526

# Aggregate results

527

aggregation_msg = aggregate_results.send(message_ids)

528

final_result = aggregation_msg.get_result(block=True)

529

print(f"Aggregated result: {final_result}")

530

```

531

532

#### Result-Based Conditional Execution

533

534

```python

535

@dramatiq.actor(store_results=True)

536

def data_quality_check(data):

537

"""Check data quality and return score"""

538

import random

539

quality_score = random.uniform(0, 1)

540

541

return {

542

"data": data,

543

"quality_score": quality_score,

544

"passed": quality_score > 0.7

545

}

546

547

@dramatiq.actor(store_results=True)

548

def high_quality_processing(quality_result):

549

"""Process high-quality data"""

550

if not quality_result["passed"]:

551

return {"status": "skipped", "reason": "Low quality data"}

552

553

return {

554

"status": "processed",

555

"result": f"High quality processing of: {quality_result['data']}",

556

"quality_score": quality_result["quality_score"]

557

}

558

559

@dramatiq.actor(store_results=True)

560

def basic_processing(quality_result):

561

"""Basic processing for any data"""

562

return {

563

"status": "basic_processed",

564

"result": f"Basic processing of: {quality_result['data']}",

565

"quality_score": quality_result["quality_score"]

566

}

567

568

# Conditional processing based on results

569

def process_with_quality_check(data):

570

# Step 1: Quality check

571

quality_msg = data_quality_check.send(data)

572

quality_result = quality_msg.get_result(block=True)

573

574

# Step 2: Conditional processing based on quality

575

if quality_result["passed"]:

576

processing_msg = high_quality_processing.send(quality_result)

577

else:

578

processing_msg = basic_processing.send(quality_result)

579

580

final_result = processing_msg.get_result(block=True)

581

return final_result

582

583

# Usage

584

test_data = "sample data for processing"

585

result = process_with_quality_check(test_data)

586

print(f"Processing result: {result}")

587

```

588

589

### Result Monitoring and Debugging

590

591

```python

592

def monitor_task_results(messages, timeout=30000):

593

"""Monitor multiple task results with progress tracking"""

594

import time

595

596

start_time = time.time()

597

completed = {}

598

599

while len(completed) < len(messages):

600

for i, msg in enumerate(messages):

601

if i in completed:

602

continue

603

604

try:

605

result = msg.get_result(block=False)

606

completed[i] = result

607

print(f"Task {i} completed: {type(result)}")

608

except dramatiq.ResultMissing:

609

continue

610

except dramatiq.ResultFailure as e:

611

completed[i] = e

612

print(f"Task {i} failed: {e.exception_type.__name__}")

613

614

elapsed = (time.time() - start_time) * 1000

615

if elapsed > timeout:

616

print(f"Timeout: {len(completed)}/{len(messages)} completed")

617

break

618

619

if len(completed) < len(messages):

620

time.sleep(0.1)

621

622

return completed

623

624

# Usage

625

messages = [compute_task.send(i, i*2) for i in range(10)]

626

results = monitor_task_results(messages)

627

print(f"Final results: {len(results)} tasks completed")

628

```