or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

actors.mdbrokers.mdcomposition.mdindex.mdmessages.mdmiddleware.mdrate-limiting.mdresults.mdworkers.md

composition.mddocs/

0

# Composition

1

2

Task composition in Dramatiq enables complex workflows by chaining tasks sequentially with pipelines or executing multiple tasks in parallel with groups. This allows building sophisticated data processing workflows from simple actor building blocks.

3

4

## Capabilities

5

6

### Pipeline

7

8

Sequential task execution where each task's output becomes the next task's input, creating processing chains.

9

10

```python { .api }

11

class pipeline:

12

def __init__(self, children: Iterable[Message | pipeline], *, broker: Broker = None):

13

"""

14

Create a pipeline from message objects or other pipelines.

15

16

Parameters:

17

- children: Iterable of Message objects or pipeline objects

18

- broker: Broker instance (uses global broker if None)

19

"""

20

21

def run(self, *, delay: int = None) -> pipeline:

22

"""

23

Execute the pipeline by sending all messages to the broker.

24

25

Parameters:

26

- delay: Delay in milliseconds before starting execution

27

28

Returns:

29

Self (for method chaining)

30

"""

31

32

def get_result(self, *, block: bool = False, timeout: int = None):

33

"""

34

Get the result of the final task in the pipeline.

35

36

Parameters:

37

- block: Whether to block waiting for result

38

- timeout: Timeout in milliseconds when blocking

39

40

Returns:

41

Result of the last task in the pipeline

42

43

Raises:

44

ResultMissing: If result is not available

45

ResultTimeout: If timeout exceeded while blocking

46

"""

47

48

def get_results(self, *, block: bool = False, timeout: int = None) -> List:

49

"""

50

Get results from all tasks in the pipeline.

51

52

Parameters:

53

- block: Whether to block waiting for results

54

- timeout: Timeout in milliseconds when blocking

55

56

Returns:

57

List of results from all pipeline tasks

58

"""

59

60

def __or__(self, other) -> pipeline:

61

"""

62

Chain this pipeline with another message or pipeline using | operator.

63

64

Parameters:

65

- other: Message or pipeline to chain with

66

67

Returns:

68

New pipeline containing both sequences

69

"""

70

71

def __len__(self) -> int:

72

"""

73

Get the number of tasks in the pipeline.

74

75

Returns:

76

Number of messages in the pipeline

77

"""

78

79

# Properties

80

completed: bool # True if all tasks completed

81

completed_count: int # Number of completed tasks

82

messages: List[Message] # List of pipeline messages

83

```

84

85

**Usage:**

86

87

```python

88

@dramatiq.actor

89

def fetch_data(url):

90

"""Fetch data from URL"""

91

return {"data": f"content from {url}", "size": 1024}

92

93

@dramatiq.actor

94

def process_data(data_info):

95

"""Process the fetched data"""

96

processed = data_info["data"].upper()

97

return {"processed": processed, "original_size": data_info["size"]}

98

99

@dramatiq.actor

100

def save_data(processed_info):

101

"""Save processed data"""

102

print(f"Saving: {processed_info['processed']}")

103

return {"saved": True, "id": "12345"}

104

105

# Create pipeline using | operator

106

pipeline = (

107

fetch_data.message("https://api.example.com/data") |

108

process_data.message() | # Will receive output from previous task

109

save_data.message()

110

)

111

112

# Execute pipeline

113

pipeline.run()

114

115

# Get final result (blocking)

116

final_result = pipeline.get_result(block=True, timeout=30000)

117

print(f"Pipeline result: {final_result}")

118

119

# Get all results

120

all_results = pipeline.get_results(block=True)

121

print(f"All results: {all_results}")

122

```

123

124

### Group

125

126

Parallel task execution where multiple tasks run concurrently and can be synchronized.

127

128

```python { .api }

129

class group:

130

def __init__(self, children: Iterable[Message], *, broker: Broker = None):

131

"""

132

Create a group from message objects.

133

134

Parameters:

135

- children: Iterable of Message objects

136

- broker: Broker instance (uses global broker if None)

137

"""

138

139

def run(self, *, delay: int = None) -> group:

140

"""

141

Execute the group by sending all messages to the broker.

142

143

Parameters:

144

- delay: Delay in milliseconds before starting execution

145

146

Returns:

147

Self (for method chaining)

148

"""

149

150

def get_results(self, *, block: bool = False, timeout: int = None) -> List:

151

"""

152

Get results from all tasks in the group.

153

154

Parameters:

155

- block: Whether to block waiting for all results

156

- timeout: Timeout in milliseconds when blocking

157

158

Returns:

159

List of results from all group tasks

160

161

Raises:

162

ResultTimeout: If timeout exceeded while blocking

163

"""

164

165

def wait(self, *, timeout: int = None):

166

"""

167

Wait for all tasks in the group to complete.

168

169

Parameters:

170

- timeout: Timeout in milliseconds

171

172

Raises:

173

ResultTimeout: If timeout exceeded

174

"""

175

176

def add_completion_callback(self, message: Message):

177

"""

178

Add a callback to be executed when all group tasks complete.

179

180

Parameters:

181

- message: Message to execute as completion callback

182

"""

183

184

def __len__(self) -> int:

185

"""

186

Get the number of tasks in the group.

187

188

Returns:

189

Number of messages in the group

190

"""

191

192

# Properties

193

completed: bool # True if all tasks completed

194

completed_count: int # Number of completed tasks

195

```

196

197

**Usage:**

198

199

```python

200

@dramatiq.actor

201

def process_item(item_id, item_data):

202

"""Process individual item"""

203

print(f"Processing item {item_id}: {item_data}")

204

return {"id": item_id, "processed": True, "result": item_data * 2}

205

206

@dramatiq.actor

207

def group_completion_handler(group_id):

208

"""Handle group completion"""

209

print(f"Group {group_id} completed!")

210

211

# Create group of parallel tasks

212

items = [

213

{"id": 1, "data": 10},

214

{"id": 2, "data": 20},

215

{"id": 3, "data": 30},

216

{"id": 4, "data": 40}

217

]

218

219

task_group = group([

220

process_item.message(item["id"], item["data"])

221

for item in items

222

])

223

224

# Add completion callback

225

task_group.add_completion_callback(

226

group_completion_handler.message("batch_001")

227

)

228

229

# Execute group

230

task_group.run()

231

232

# Wait for completion

233

task_group.wait(timeout=60000) # 1 minute timeout

234

235

# Get all results

236

results = task_group.get_results(block=True)

237

print(f"Group results: {results}")

238

```

239

240

### Advanced Composition Patterns

241

242

#### Mixed Pipeline and Group Composition

243

244

```python

245

@dramatiq.actor

246

def fetch_urls(urls):

247

"""Fetch multiple URLs"""

248

return [{"url": url, "data": f"content from {url}"} for url in urls]

249

250

@dramatiq.actor

251

def process_single_url(url_data):

252

"""Process single URL data"""

253

return {

254

"url": url_data["url"],

255

"processed": url_data["data"].upper(),

256

"length": len(url_data["data"])

257

}

258

259

@dramatiq.actor

260

def aggregate_results(results):

261

"""Aggregate all processed results"""

262

total_length = sum(r["length"] for r in results)

263

return {"total_items": len(results), "total_length": total_length}

264

265

# Complex composition: pipeline -> group -> pipeline

266

urls = ["https://api1.com", "https://api2.com", "https://api3.com"]

267

268

# Step 1: Fetch all URLs (single task)

269

fetch_step = fetch_urls.message(urls)

270

271

# Step 2: Process each URL in parallel (group)

272

# Note: This would require custom logic to split fetch results into group

273

# For demonstration, we'll create the group directly

274

process_group = group([

275

process_single_url.message({"url": url, "data": f"content from {url}"})

276

for url in urls

277

])

278

279

# Step 3: Aggregate results (single task)

280

aggregate_step = aggregate_results.message()

281

282

# Execute steps sequentially

283

fetch_step.send()

284

# Wait for fetch to complete, then create group with actual data

285

process_group.run()

286

process_group.wait()

287

aggregate_step.send()

288

```

289

290

#### Pipeline with Error Handling

291

292

```python

293

@dramatiq.actor

294

def safe_fetch(url):

295

"""Fetch with error handling"""

296

try:

297

# Simulate fetch operation

298

if "error" in url:

299

raise ValueError("Simulated fetch error")

300

return {"url": url, "data": f"content from {url}", "success": True}

301

except Exception as e:

302

return {"url": url, "error": str(e), "success": False}

303

304

@dramatiq.actor

305

def process_or_skip(fetch_result):

306

"""Process successful fetches, skip errors"""

307

if fetch_result["success"]:

308

return {

309

"processed": fetch_result["data"].upper(),

310

"original_url": fetch_result["url"]

311

}

312

else:

313

print(f"Skipping failed fetch: {fetch_result['error']}")

314

return {"skipped": True, "error": fetch_result["error"]}

315

316

@dramatiq.actor

317

def finalize_result(process_result):

318

"""Finalize the processing result"""

319

if process_result.get("skipped"):

320

return {"status": "failed", "reason": process_result["error"]}

321

else:

322

return {

323

"status": "success",

324

"result": process_result["processed"],

325

"url": process_result["original_url"]

326

}

327

328

# Error-resilient pipeline

329

error_pipeline = (

330

safe_fetch.message("https://error.example.com/data") |

331

process_or_skip.message() |

332

finalize_result.message()

333

)

334

335

error_pipeline.run()

336

result = error_pipeline.get_result(block=True)

337

print(f"Pipeline handled error gracefully: {result}")

338

```

339

340

#### Dynamic Group Creation

341

342

```python

343

@dramatiq.actor

344

def create_tasks_for_batch(batch_data):

345

"""Dynamically create tasks based on batch data"""

346

tasks = []

347

for item in batch_data["items"]:

348

if item["type"] == "email":

349

tasks.append(send_email.message(item["to"], item["subject"], item["body"]))

350

elif item["type"] == "sms":

351

tasks.append(send_sms.message(item["to"], item["message"]))

352

elif item["type"] == "push":

353

tasks.append(send_push.message(item["device_id"], item["message"]))

354

355

# Create and run group

356

notification_group = group(tasks)

357

notification_group.run()

358

359

return {"batch_id": batch_data["batch_id"], "task_count": len(tasks)}

360

361

# Usage

362

batch_data = {

363

"batch_id": "batch_123",

364

"items": [

365

{"type": "email", "to": "user1@example.com", "subject": "Hello", "body": "Message"},

366

{"type": "sms", "to": "+1234567890", "message": "Hello via SMS"},

367

{"type": "push", "device_id": "device123", "message": "Hello via push"}

368

]

369

}

370

371

create_tasks_for_batch.send(batch_data)

372

```

373

374

#### Conditional Pipeline Execution

375

376

```python

377

@dramatiq.actor

378

def check_condition(data):

379

"""Check if pipeline should continue"""

380

return {"continue": data["value"] > 10, "data": data}

381

382

@dramatiq.actor

383

def conditional_processor(check_result):

384

"""Process only if condition was met"""

385

if check_result["continue"]:

386

return {"processed": check_result["data"]["value"] * 2}

387

else:

388

return {"skipped": True, "reason": "Condition not met"}

389

390

@dramatiq.actor

391

def final_handler(process_result):

392

"""Handle final result regardless of path taken"""

393

if process_result.get("skipped"):

394

return {"status": "skipped", "reason": process_result["reason"]}

395

else:

396

return {"status": "completed", "result": process_result["processed"]}

397

398

# Conditional pipeline

399

conditional_pipeline = (

400

check_condition.message({"value": 5}) | # Will not meet condition

401

conditional_processor.message() |

402

final_handler.message()

403

)

404

405

conditional_pipeline.run()

406

result = conditional_pipeline.get_result(block=True)

407

print(f"Conditional result: {result}")

408

```

409

410

### Composition with Results Storage

411

412

When using the Results middleware, composition objects can retrieve and work with stored results:

413

414

```python

415

# Enable results storage

416

from dramatiq.middleware import Results

417

from dramatiq.results.backends import RedisBackend

418

419

result_backend = RedisBackend()

420

results_middleware = Results(backend=result_backend, store_results=True)

421

broker.add_middleware(results_middleware)

422

423

@dramatiq.actor(store_results=True)

424

def data_processor(data):

425

return {"processed": data, "timestamp": time.time()}

426

427

@dramatiq.actor(store_results=True)

428

def data_validator(processed_data):

429

return {"valid": True, "data": processed_data}

430

431

# Pipeline with result storage

432

result_pipeline = (

433

data_processor.message({"input": "test_data"}) |

434

data_validator.message()

435

)

436

437

result_pipeline.run()

438

439

# Get individual step results

440

step_results = result_pipeline.get_results(block=True, timeout=30000)

441

print(f"Each step result: {step_results}")

442

443

# Get final result

444

final_result = result_pipeline.get_result(block=True)

445

print(f"Final result: {final_result}")

446

```

447

448

### Composition Monitoring

449

450

```python

451

import time

452

453

def monitor_composition(composition, name):

454

"""Monitor composition progress"""

455

print(f"Starting {name} with {len(composition)} tasks")

456

457

start_time = time.time()

458

while not composition.completed:

459

elapsed = time.time() - start_time

460

print(f"{name}: {composition.completed_count}/{len(composition)} completed ({elapsed:.1f}s)")

461

time.sleep(1.0)

462

463

total_time = time.time() - start_time

464

print(f"{name} completed in {total_time:.1f}s")

465

466

# Usage with monitoring

467

large_group = group([

468

process_item.message(i, f"data_{i}")

469

for i in range(100)

470

])

471

472

large_group.run()

473

monitor_composition(large_group, "Large Group Processing")

474

```