or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdcore-application.mdexceptions.mdindex.mdresults-state.mdscheduling-beat.mdsignals-events.mdworkflow-primitives.md

workflow-primitives.mddocs/

0

# Workflow Primitives (Canvas)

1

2

Canvas workflow primitives enable complex task composition and orchestration patterns. These building blocks allow creating sophisticated distributed workflows including sequential chains, parallel execution, callbacks, and functional programming patterns over tasks.

3

4

## Capabilities

5

6

### Signature

7

8

Task signature that wraps a task call with its arguments and execution options, forming the foundation for all Canvas workflow patterns.

9

10

```python { .api }

11

class Signature:

12

def __init__(self, task=None, args=None, kwargs=None, options=None, **ex):

13

"""

14

Create task signature.

15

16

Args:

17

task (str|Task): Task name or task instance

18

args (tuple): Positional arguments

19

kwargs (dict): Keyword arguments

20

options (dict): Execution options

21

"""

22

23

def apply_async(self, args=None, kwargs=None, **options):

24

"""

25

Execute signature asynchronously.

26

27

Args:

28

args (tuple): Additional positional arguments

29

kwargs (dict): Additional keyword arguments

30

31

Returns:

32

AsyncResult instance

33

"""

34

35

def apply(self, args=None, kwargs=None, **options):

36

"""

37

Execute signature synchronously.

38

39

Args:

40

args (tuple): Additional positional arguments

41

kwargs (dict): Additional keyword arguments

42

43

Returns:

44

Task result

45

"""

46

47

def clone(self, args=None, kwargs=None, **opts):

48

"""

49

Create copy with modified arguments or options.

50

51

Args:

52

args (tuple): New positional arguments

53

kwargs (dict): New keyword arguments

54

**opts: New execution options

55

56

Returns:

57

New Signature instance

58

"""

59

60

def freeze(self, id=None):

61

"""

62

Make signature immutable with optional custom ID.

63

64

Args:

65

id (str): Custom task ID

66

67

Returns:

68

Immutable signature

69

"""

70

71

def set(self, immutable=None, **options):

72

"""

73

Set signature options.

74

75

Args:

76

immutable (bool): Make signature immutable

77

**options: Execution options to set

78

79

Returns:

80

Self for chaining

81

"""

82

83

def link(self, callback):

84

"""

85

Add success callback.

86

87

Args:

88

callback (Signature): Callback signature

89

90

Returns:

91

Self for chaining

92

"""

93

94

def link_error(self, errback):

95

"""

96

Add error callback.

97

98

Args:

99

errback (Signature): Error callback signature

100

101

Returns:

102

Self for chaining

103

"""

104

105

def signature(task, args=None, kwargs=None, **options):

106

"""

107

Create task signature.

108

109

Args:

110

task (str|Task): Task name or instance

111

args (tuple): Positional arguments

112

kwargs (dict): Keyword arguments

113

**options: Execution options

114

115

Returns:

116

Signature instance

117

"""

118

```

119

120

### Chain

121

122

Execute tasks sequentially, passing the result of each task as the first argument to the next task in the chain.

123

124

```python { .api }

125

class chain:

126

def __init__(self, *tasks, **kwargs):

127

"""

128

Create task chain.

129

130

Args:

131

*tasks: Task signatures to chain

132

**kwargs: Chain options

133

"""

134

135

def apply_async(self, args=None, kwargs=None, **options):

136

"""

137

Execute chain asynchronously.

138

139

Args:

140

args (tuple): Arguments for first task

141

kwargs (dict): Keyword arguments for first task

142

143

Returns:

144

AsyncResult for final task

145

"""

146

147

def apply(self, args=None, kwargs=None, **options):

148

"""

149

Execute chain synchronously.

150

151

Args:

152

args (tuple): Arguments for first task

153

kwargs (dict): Keyword arguments for first task

154

155

Returns:

156

Final task result

157

"""

158

159

def chain(*tasks, **kwargs):

160

"""

161

Create sequential task chain.

162

163

Args:

164

*tasks: Task signatures to execute in order

165

**kwargs: Chain execution options

166

167

Returns:

168

chain instance

169

"""

170

```

171

172

### Group

173

174

Execute multiple tasks in parallel, collecting results when all tasks complete.

175

176

```python { .api }

177

class group:

178

def __init__(self, *tasks, **kwargs):

179

"""

180

Create task group.

181

182

Args:

183

*tasks: Task signatures to execute in parallel

184

**kwargs: Group options

185

"""

186

187

def apply_async(self, args=None, kwargs=None, **options):

188

"""

189

Execute group asynchronously.

190

191

Args:

192

args (tuple): Arguments to add to each task

193

kwargs (dict): Keyword arguments to add to each task

194

195

Returns:

196

GroupResult instance

197

"""

198

199

def apply(self, args=None, kwargs=None, **options):

200

"""

201

Execute group synchronously.

202

203

Args:

204

args (tuple): Arguments to add to each task

205

kwargs (dict): Keyword arguments to add to each task

206

207

Returns:

208

List of task results

209

"""

210

211

def group(*tasks, **kwargs):

212

"""

213

Create parallel task group.

214

215

Args:

216

*tasks: Task signatures to execute in parallel

217

**kwargs: Group execution options

218

219

Returns:

220

group instance

221

"""

222

```

223

224

### Chord

225

226

Execute a group of tasks in parallel, then execute a callback task with the results when all tasks in the group complete.

227

228

```python { .api }

229

class chord:

230

def __init__(self, header, body, **kwargs):

231

"""

232

Create task chord.

233

234

Args:

235

header: Group of tasks to execute in parallel

236

body (Signature): Callback task to execute with results

237

**kwargs: Chord options

238

"""

239

240

def apply_async(self, args=None, kwargs=None, **options):

241

"""

242

Execute chord asynchronously.

243

244

Args:

245

args (tuple): Arguments for header tasks

246

kwargs (dict): Keyword arguments for header tasks

247

248

Returns:

249

AsyncResult for callback task

250

"""

251

252

def apply(self, args=None, kwargs=None, **options):

253

"""

254

Execute chord synchronously.

255

256

Args:

257

args (tuple): Arguments for header tasks

258

kwargs (dict): Keyword arguments for header tasks

259

260

Returns:

261

Callback task result

262

"""

263

264

def chord(header, body, **kwargs):

265

"""

266

Create chord (group + callback).

267

268

Args:

269

header: Group or list of tasks for parallel execution

270

body (Signature): Callback task executed with group results

271

**kwargs: Chord execution options

272

273

Returns:

274

chord instance

275

"""

276

```

277

278

### Chunks

279

280

Split an iterable into chunks and create tasks to process each chunk in parallel.

281

282

```python { .api }

283

class chunks:

284

def __init__(self, it, n, task):

285

"""

286

Create chunked task processing.

287

288

Args:

289

it: Iterable to chunk

290

n (int): Chunk size

291

task (Signature): Task to process each chunk

292

"""

293

294

def apply_async(self, **options):

295

"""

296

Execute chunks asynchronously.

297

298

Returns:

299

GroupResult instance

300

"""

301

302

def chunks(it, n, task):

303

"""

304

Split iterable into chunks for parallel processing.

305

306

Args:

307

it: Iterable to split

308

n (int): Size of each chunk

309

task (Signature): Task to process each chunk

310

311

Returns:

312

chunks instance

313

"""

314

```

315

316

### Map Operations

317

318

Functional programming style operations for mapping tasks over iterables.

319

320

```python { .api }

321

def xmap(task, it):

322

"""

323

Map task over iterable arguments.

324

325

Args:

326

task (Signature): Task to map

327

it: Iterable of argument tuples

328

329

Returns:

330

group instance

331

"""

332

333

def xstarmap(task, it):

334

"""

335

Map task over iterable with argument unpacking.

336

337

Args:

338

task (Signature): Task to map

339

it: Iterable of argument tuples to unpack

340

341

Returns:

342

group instance

343

"""

344

```

345

346

### Utility Functions

347

348

Helper functions for working with signatures and Canvas primitives.

349

350

```python { .api }

351

def maybe_signature(d, app=None):

352

"""

353

Convert signature-like object to actual signature.

354

355

Args:

356

d: Object that might be signature (dict, signature, etc.)

357

app: Celery app instance

358

359

Returns:

360

Signature instance or original object

361

"""

362

```

363

364

## Usage Examples

365

366

### Basic Signature Usage

367

368

```python

369

from celery import signature, Celery

370

371

app = Celery('example')

372

373

@app.task

374

def add(x, y):

375

return x + y

376

377

@app.task

378

def mul(x, y):

379

return x * y

380

381

# Create and execute signature

382

sig = signature('add', args=(2, 3))

383

result = sig.apply_async()

384

print(result.get()) # 5

385

386

# Using task shortcut methods

387

sig = add.s(2, 3) # Equivalent to signature

388

result = sig()

389

print(result.get()) # 5

390

391

# Immutable signature

392

sig = add.si(2, 3) # Won't accept additional arguments

393

```

394

395

### Chain Workflows

396

397

```python

398

from celery import chain

399

400

# Sequential processing - result of each becomes first arg of next

401

workflow = chain(

402

add.s(2, 3), # 2 + 3 = 5

403

mul.s(4), # 5 * 4 = 20

404

add.s(10) # 20 + 10 = 30

405

)

406

result = workflow()

407

print(result.get()) # 30

408

409

# Partial chain application

410

partial_chain = chain(mul.s(2), add.s(10))

411

result = partial_chain.apply_async(args=(5,)) # (5 * 2) + 10 = 20

412

print(result.get()) # 20

413

```

414

415

### Parallel Groups

416

417

```python

418

from celery import group

419

420

# Execute tasks in parallel

421

job = group([

422

add.s(2, 2),

423

add.s(4, 4),

424

add.s(8, 8),

425

add.s(16, 16)

426

])

427

result = job.apply_async()

428

429

# Get all results

430

results = result.get()

431

print(results) # [4, 8, 16, 32]

432

433

# Check completion status

434

print(result.ready()) # True when all complete

435

print(result.successful()) # True when all successful

436

```

437

438

### Chord Patterns

439

440

```python

441

from celery import chord

442

443

@app.task

444

def sum_results(numbers):

445

return sum(numbers)

446

447

# Parallel execution with callback

448

callback_workflow = chord([

449

add.s(2, 2),

450

add.s(4, 4),

451

add.s(8, 8)

452

])(sum_results.s())

453

454

result = callback_workflow.apply_async()

455

print(result.get()) # 28 (4 + 8 + 16)

456

457

# Nested chord with error handling

458

try:

459

result = chord([

460

add.s(1, 1),

461

add.s(2, 2)

462

])(mul.s(5)).apply_async()

463

464

final_result = result.get() # (1+1 + 2+2) * 5 = 30

465

except Exception as exc:

466

print(f"Chord failed: {exc}")

467

```

468

469

### Chunked Processing

470

471

```python

472

from celery import chunks

473

474

@app.task

475

def process_batch(items):

476

return [item * 2 for item in items]

477

478

# Process large dataset in chunks

479

data = list(range(100))

480

job = chunks(data, 10, process_batch.s())

481

result = job.apply_async()

482

483

# Get all batch results

484

batch_results = result.get()

485

print(len(batch_results)) # 10 batches

486

```

487

488

### Functional Map Operations

489

490

```python

491

from celery import xmap, xstarmap

492

493

# Map task over arguments

494

arguments = [(1, 2), (3, 4), (5, 6)]

495

job = xmap(add.s(), arguments)

496

results = job.apply_async().get()

497

print(results) # [3, 7, 11]

498

499

# Map with argument unpacking

500

job = xstarmap(add.s(), arguments)

501

results = job.apply_async().get()

502

print(results) # [3, 7, 11] - same result

503

504

# More complex mapping

505

@app.task

506

def process_user(user_id, action, **options):

507

return f"User {user_id}: {action}"

508

509

user_actions = [

510

(1, 'login', {'timestamp': '2023-01-01'}),

511

(2, 'logout', {'timestamp': '2023-01-02'})

512

]

513

514

job = xstarmap(process_user.s(), user_actions)

515

results = job.apply_async().get()

516

```

517

518

### Complex Workflow Composition

519

520

```python

521

# Combine multiple patterns

522

from celery import chain, group, chord

523

524

@app.task

525

def fetch_data(source):

526

return f"data_from_{source}"

527

528

@app.task

529

def process_data(data):

530

return f"processed_{data}"

531

532

@app.task

533

def aggregate_results(results):

534

return f"aggregated_{len(results)}_items"

535

536

# Complex nested workflow

537

workflow = chain(

538

# Step 1: Fetch data from multiple sources in parallel

539

group([

540

fetch_data.s('db'),

541

fetch_data.s('api'),

542

fetch_data.s('cache')

543

]),

544

545

# Step 2: Process each result and aggregate

546

chord(

547

group([process_data.s() for _ in range(3)]),

548

aggregate_results.s()

549

)

550

)

551

552

result = workflow.apply_async()

553

print(result.get()) # Final aggregated result

554

```

555

556

### Error Handling and Callbacks

557

558

```python

559

from celery import signature

560

561

@app.task

562

def may_fail(x):

563

if x < 0:

564

raise ValueError("Negative numbers not allowed")

565

return x * 2

566

567

@app.task

568

def handle_success(result):

569

print(f"Success: {result}")

570

return result

571

572

@app.task

573

def handle_failure(task_id, error, traceback):

574

print(f"Task {task_id} failed: {error}")

575

576

# Add callbacks to signature

577

sig = may_fail.s(5)

578

sig.link(handle_success.s())

579

sig.link_error(handle_failure.s())

580

581

result = sig.apply_async()

582

```