or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdcore-application.mdexceptions.mdindex.mdresults-state.mdscheduling-beat.mdsignals-events.mdworkflow-primitives.md

results-state.mddocs/

0

# Results and State Management

1

2

Task result handling and state monitoring capabilities for tracking task execution, retrieving results, and managing task lifecycle. These components provide comprehensive result access, state inspection, and task control functionality.

3

4

## Capabilities

5

6

### AsyncResult

7

8

Represents the result of an asynchronously executed task, providing methods to check status, retrieve results, and control task execution.

9

10

```python { .api }

11

class AsyncResult:

12

def __init__(self, id, backend=None, task_name=None, app=None, parent=None):

13

"""

14

Create AsyncResult instance.

15

16

Args:

17

id (str): Task ID

18

backend: Result backend instance

19

task_name (str): Task name

20

app: Celery app instance

21

parent: Parent result

22

"""

23

24

def get(

25

self,

26

timeout=None,

27

propagate=True,

28

interval=0.5,

29

no_ack=True,

30

follow_parents=True,

31

callback=None,

32

on_message=None,

33

on_interval=None,

34

disable_sync_subtasks=True,

35

EXCEPTION_STATES=None,

36

PROPAGATE_STATES=None

37

):

38

"""

39

Get task result, waiting if necessary.

40

41

Args:

42

timeout (float): Maximum time to wait in seconds

43

propagate (bool): Re-raise task exceptions

44

interval (float): Polling interval in seconds

45

no_ack (bool): Don't acknowledge result

46

follow_parents (bool): Follow parent results

47

callback (callable): Called with result when ready

48

on_message (callable): Called for each message received

49

on_interval (callable): Called on each polling interval

50

disable_sync_subtasks (bool): Disable synchronous subtasks

51

52

Returns:

53

Task result value

54

55

Raises:

56

TimeoutError: If timeout exceeded

57

Exception: Task exception if propagate=True

58

"""

59

60

def ready(self):

61

"""

62

Check if task has finished executing.

63

64

Returns:

65

bool: True if task complete (success or failure)

66

"""

67

68

def successful(self):

69

"""

70

Check if task completed successfully.

71

72

Returns:

73

bool: True if task succeeded, False otherwise

74

75

Raises:

76

ValueError: If task not ready yet

77

"""

78

79

def failed(self):

80

"""

81

Check if task execution failed.

82

83

Returns:

84

bool: True if task failed, False otherwise

85

86

Raises:

87

ValueError: If task not ready yet

88

"""

89

90

def retry(self):

91

"""

92

Check if task is waiting for retry.

93

94

Returns:

95

bool: True if task will be retried

96

"""

97

98

def revoke(self, connection=None, terminate=False, signal='SIGTERM', wait=False, timeout=None):

99

"""

100

Revoke/cancel task execution.

101

102

Args:

103

connection: Broker connection

104

terminate (bool): Terminate worker process

105

signal (str): Signal to send if terminating

106

wait (bool): Wait for termination confirmation

107

timeout (float): Termination timeout

108

"""

109

110

def forget(self):

111

"""

112

Remove result from backend storage.

113

"""

114

115

def build_graph(self, intermediate=False, formatter=None):

116

"""

117

Build task dependency graph.

118

119

Args:

120

intermediate (bool): Include intermediate results

121

formatter (callable): Result formatter function

122

123

Returns:

124

dict: Dependency graph structure

125

"""

126

127

@property

128

def result(self):

129

"""Task result value or exception."""

130

131

@property

132

def return_value(self):

133

"""Alias for result property."""

134

135

@property

136

def state(self):

137

"""Current task state."""

138

139

@property

140

def status(self):

141

"""Alias for state property."""

142

143

@property

144

def info(self):

145

"""Additional state information."""

146

147

@property

148

def traceback(self):

149

"""Exception traceback if task failed."""

150

151

@property

152

def id(self):

153

"""Task ID."""

154

155

@property

156

def task_id(self):

157

"""Alias for id property."""

158

159

@property

160

def name(self):

161

"""Task name."""

162

163

@property

164

def args(self):

165

"""Task positional arguments."""

166

167

@property

168

def kwargs(self):

169

"""Task keyword arguments."""

170

171

@property

172

def backend(self):

173

"""Result backend instance."""

174

175

@property

176

def children(self):

177

"""Child task results."""

178

```

179

180

### GroupResult

181

182

Result collection for group task execution, providing methods to check group completion status and retrieve all results.

183

184

```python { .api }

185

class GroupResult:

186

def __init__(self, id=None, results=None, **kwargs):

187

"""

188

Create GroupResult instance.

189

190

Args:

191

id (str): Group ID

192

results (list): List of AsyncResult instances

193

"""

194

195

def get(self, timeout=None, propagate=True, interval=0.5, callback=None, no_ack=True):

196

"""

197

Get results from all tasks in group.

198

199

Args:

200

timeout (float): Maximum time to wait

201

propagate (bool): Re-raise task exceptions

202

interval (float): Polling interval

203

callback (callable): Result callback

204

no_ack (bool): Don't acknowledge results

205

206

Returns:

207

list: Results from all group tasks

208

"""

209

210

def ready(self):

211

"""

212

Check if all group tasks are complete.

213

214

Returns:

215

bool: True if all tasks finished

216

"""

217

218

def successful(self):

219

"""

220

Check if all group tasks succeeded.

221

222

Returns:

223

bool: True if all tasks successful

224

"""

225

226

def failed(self):

227

"""

228

Check if any group tasks failed.

229

230

Returns:

231

bool: True if any task failed

232

"""

233

234

def waiting(self):

235

"""

236

Check if any group tasks are still waiting.

237

238

Returns:

239

bool: True if any tasks not ready

240

"""

241

242

def revoke(self, connection=None, terminate=False, signal='SIGTERM'):

243

"""

244

Revoke all tasks in group.

245

246

Args:

247

connection: Broker connection

248

terminate (bool): Terminate worker processes

249

signal (str): Termination signal

250

"""

251

252

def forget(self):

253

"""Remove all results from backend."""

254

255

def iterate(self, timeout=None, propagate=True, interval=0.5):

256

"""

257

Iterate over results as they become ready.

258

259

Args:

260

timeout (float): Overall timeout

261

propagate (bool): Re-raise exceptions

262

interval (float): Polling interval

263

264

Yields:

265

Task results as they complete

266

"""

267

268

@property

269

def results(self):

270

"""List of AsyncResult instances."""

271

272

@property

273

def children(self):

274

"""Alias for results property."""

275

```

276

277

### ResultSet

278

279

Collection of results that can be managed as a single unit, providing batch operations over multiple AsyncResult instances.

280

281

```python { .api }

282

class ResultSet:

283

def __init__(self, results=None, **kwargs):

284

"""

285

Create ResultSet instance.

286

287

Args:

288

results (list): AsyncResult instances

289

"""

290

291

def get(self, timeout=None, propagate=True, interval=0.5, callback=None, no_ack=True):

292

"""

293

Get all results in set.

294

295

Args:

296

timeout (float): Maximum wait time

297

propagate (bool): Re-raise exceptions

298

interval (float): Polling interval

299

callback (callable): Result callback

300

no_ack (bool): Don't acknowledge results

301

302

Returns:

303

list: All results

304

"""

305

306

def ready(self):

307

"""

308

Check if all results are ready.

309

310

Returns:

311

bool: True if all complete

312

"""

313

314

def successful(self):

315

"""

316

Check if all results are successful.

317

318

Returns:

319

bool: True if all succeeded

320

"""

321

322

def failed(self):

323

"""

324

Check if any results failed.

325

326

Returns:

327

bool: True if any failed

328

"""

329

330

def revoke(self, connection=None, terminate=False, signal='SIGTERM'):

331

"""

332

Revoke all results.

333

334

Args:

335

connection: Broker connection

336

terminate (bool): Terminate processes

337

signal (str): Termination signal

338

"""

339

340

def iterate(self, timeout=None, propagate=True, interval=0.5):

341

"""

342

Iterate over results as ready.

343

344

Args:

345

timeout (float): Overall timeout

346

propagate (bool): Re-raise exceptions

347

interval (float): Check interval

348

349

Yields:

350

Results as they complete

351

"""

352

353

def add(self, result):

354

"""

355

Add result to set.

356

357

Args:

358

result (AsyncResult): Result to add

359

"""

360

361

def remove(self, result):

362

"""

363

Remove result from set.

364

365

Args:

366

result (AsyncResult): Result to remove

367

"""

368

369

@property

370

def results(self):

371

"""List of AsyncResult instances in set."""

372

```

373

374

### Task States

375

376

State constants and utilities for tracking task execution status and lifecycle.

377

378

```python { .api }

379

# State constants

380

PENDING = 'PENDING' # Task waiting for execution or unknown

381

RECEIVED = 'RECEIVED' # Task received by worker

382

STARTED = 'STARTED' # Task started by worker

383

SUCCESS = 'SUCCESS' # Task executed successfully

384

FAILURE = 'FAILURE' # Task execution failed

385

REVOKED = 'REVOKED' # Task revoked/cancelled

386

RETRY = 'RETRY' # Task will be retried

387

IGNORED = 'IGNORED' # Task result ignored

388

389

# State collections

390

READY_STATES = frozenset([SUCCESS, FAILURE, REVOKED])

391

UNREADY_STATES = frozenset([PENDING, RECEIVED, STARTED, RETRY])

392

EXCEPTION_STATES = frozenset([RETRY, FAILURE, REVOKED])

393

PROPAGATE_STATES = frozenset([FAILURE, REVOKED])

394

395

class state(str):

396

"""

397

String subclass with state comparison methods.

398

"""

399

400

def __lt__(self, other):

401

"""Compare state precedence."""

402

403

def __gt__(self, other):

404

"""Compare state precedence."""

405

```

406

407

### State Utilities

408

409

Helper functions for working with task states and result objects.

410

411

```python { .api }

412

def result_from_tuple(r, app=None):

413

"""

414

Create result object from tuple representation.

415

416

Args:

417

r (tuple): Result tuple (task_id, status, result, traceback, children)

418

app: Celery app instance

419

420

Returns:

421

AsyncResult instance

422

"""

423

```

424

425

## Usage Examples

426

427

### Basic Result Handling

428

429

```python

430

from celery import Celery

431

432

app = Celery('example', broker='redis://localhost:6379')

433

434

@app.task

435

def add(x, y):

436

import time

437

time.sleep(2) # Simulate work

438

return x + y

439

440

@app.task

441

def divide(x, y):

442

if y == 0:

443

raise ValueError("Cannot divide by zero")

444

return x / y

445

446

# Execute task and get result

447

result = add.delay(4, 4)

448

449

# Check status without blocking

450

print(f"Task ID: {result.id}")

451

print(f"Ready: {result.ready()}")

452

print(f"State: {result.state}")

453

454

# Get result with timeout

455

try:

456

value = result.get(timeout=10)

457

print(f"Result: {value}")

458

except TimeoutError:

459

print("Task took too long")

460

461

# Check if successful

462

if result.successful():

463

print("Task completed successfully")

464

else:

465

print("Task failed or not ready")

466

```

467

468

### Error Handling and Propagation

469

470

```python

471

# Task that may fail

472

result = divide.delay(10, 0)

473

474

# Get result with error handling

475

try:

476

value = result.get(propagate=True)

477

print(f"Result: {value}")

478

except ValueError as exc:

479

print(f"Task failed: {exc}")

480

print(f"Traceback: {result.traceback}")

481

482

# Get result without propagating errors

483

value = result.get(propagate=False)

484

if result.failed():

485

print(f"Task failed with: {result.result}")

486

print(f"Info: {result.info}")

487

```

488

489

### Group Results

490

491

```python

492

from celery import group

493

494

# Create and execute group

495

job = group([

496

add.s(2, 2),

497

add.s(4, 4),

498

add.s(8, 8)

499

])

500

result = job.apply_async()

501

502

# Check group status

503

print(f"Group ready: {result.ready()}")

504

print(f"Group successful: {result.successful() if result.ready() else 'Not ready'}")

505

506

# Get all results

507

try:

508

results = result.get(timeout=30)

509

print(f"All results: {results}") # [4, 8, 16]

510

except Exception as exc:

511

print(f"Group failed: {exc}")

512

513

# Iterate over results as they complete

514

print("Results as they complete:")

515

for task_result in result.iterate(timeout=30):

516

print(f"Got result: {task_result}")

517

```

518

519

### ResultSet Operations

520

521

```python

522

from celery.result import ResultSet

523

524

# Create multiple tasks

525

tasks = [add.delay(i, i) for i in range(5)]

526

527

# Create result set

528

result_set = ResultSet(tasks)

529

530

# Batch operations

531

print(f"All ready: {result_set.ready()}")

532

533

# Get results with error handling

534

results = []

535

for result in result_set.iterate(timeout=60):

536

try:

537

value = result

538

results.append(value)

539

print(f"Task completed with result: {value}")

540

except Exception as exc:

541

print(f"Task failed: {exc}")

542

543

print(f"Final results: {results}")

544

```

545

546

### Task Revocation and Control

547

548

```python

549

import time

550

551

# Start long-running task

552

result = add.delay(1000000, 2000000)

553

554

# Check if we can revoke it

555

if not result.ready():

556

print("Revoking task...")

557

result.revoke(terminate=True)

558

559

# Wait a moment and check

560

time.sleep(1)

561

if result.state == 'REVOKED':

562

print("Task was revoked")

563

564

# Forget about result

565

result.forget()

566

```

567

568

### Advanced Result Monitoring

569

570

```python

571

import time

572

573

def monitor_task(result, name="Task"):

574

"""Monitor task execution with detailed status."""

575

576

print(f"Monitoring {name} (ID: {result.id})")

577

578

# Polling loop

579

while not result.ready():

580

print(f" Status: {result.state}")

581

if hasattr(result, 'info') and result.info:

582

print(f" Info: {result.info}")

583

time.sleep(1)

584

585

# Final status

586

if result.successful():

587

print(f" ✓ Completed: {result.result}")

588

else:

589

print(f" ✗ Failed: {result.result}")

590

if result.traceback:

591

print(f" Traceback: {result.traceback}")

592

593

# Monitor task execution

594

result = add.delay(10, 20)

595

monitor_task(result, "Addition")

596

```

597

598

### Working with Task Metadata

599

600

```python

601

@app.task(bind=True)

602

def task_with_progress(self, total_items):

603

"""Task that reports progress."""

604

605

for i in range(total_items):

606

# Update task state with progress info

607

self.update_state(

608

state='PROGRESS',

609

meta={'current': i + 1, 'total': total_items}

610

)

611

time.sleep(0.1)

612

613

return {'status': 'Complete', 'processed': total_items}

614

615

# Execute and monitor progress

616

result = task_with_progress.delay(10)

617

618

while not result.ready():

619

if result.state == 'PROGRESS':

620

meta = result.info

621

progress = (meta['current'] / meta['total']) * 100

622

print(f"Progress: {progress:.1f}% ({meta['current']}/{meta['total']})")

623

time.sleep(0.5)

624

625

print(f"Final result: {result.get()}")

626

```

627

628

### Handling Task Dependencies

629

630

```python

631

from celery import chain

632

633

# Create dependent tasks

634

workflow = chain(

635

add.s(2, 3), # First task: 2 + 3 = 5

636

add.s(10), # Second task: 5 + 10 = 15

637

add.s(5) # Third task: 15 + 5 = 20

638

)

639

640

result = workflow.apply_async()

641

642

# The result represents the final task in the chain

643

print(f"Final result: {result.get()}") # 20

644

645

# Access parent results if needed

646

if hasattr(result, 'parent') and result.parent:

647

print(f"Parent result: {result.parent.get()}") # 15

648

if hasattr(result.parent, 'parent') and result.parent.parent:

649

print(f"Grandparent result: {result.parent.parent.get()}") # 5

650

```