or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

application.mdauthentication.mdbroker.mdcommand-line.mdevents.mdindex.mdrest-api.mdtasks.mdutilities.mdweb-interface.mdworkers.md

tasks.mddocs/

0

# Task Management

1

2

Complete task lifecycle management including execution, monitoring, filtering, search, and control operations for Celery distributed tasks.

3

4

## Capabilities

5

6

### Task Filtering and Iteration

7

8

Advanced task filtering and iteration capabilities for processing large numbers of tasks with various criteria.

9

10

```python { .api }

11

def iter_tasks(events, limit=None, offset=0, type=None, worker=None,

12

state=None, sort_by=None, received_start=None, received_end=None,

13

started_start=None, started_end=None, search=None):

14

"""

15

Iterator for filtered tasks with pagination and sorting.

16

17

Args:

18

events: Events instance containing task data

19

limit (int, optional): Maximum number of tasks to return

20

offset (int, optional): Number of tasks to skip (pagination)

21

type (str, optional): Filter by task name/type

22

worker (str, optional): Filter by worker hostname

23

state (str, optional): Filter by task state (PENDING, SUCCESS, FAILURE, etc.)

24

sort_by (str, optional): Sort field ('name', 'state', 'received', 'started', 'runtime')

25

received_start (datetime, optional): Filter tasks received after this time

26

received_end (datetime, optional): Filter tasks received before this time

27

started_start (datetime, optional): Filter tasks started after this time

28

started_end (datetime, optional): Filter tasks started before this time

29

search (str, optional): Search term for task content

30

31

Yields:

32

dict: Filtered task objects matching the specified criteria

33

34

Supports complex filtering combinations and efficient pagination

35

for large task datasets.

36

"""

37

38

def sort_tasks(tasks, sort_by):

39

"""

40

Sort tasks by specified field.

41

42

Args:

43

tasks (list): List of task objects to sort

44

sort_by (str): Sort field ('name', 'state', 'received', 'started', 'runtime')

45

46

Returns:

47

list: Sorted list of tasks

48

49

Supports sorting by multiple fields with proper handling of None values

50

and different data types.

51

"""

52

53

def get_task_by_id(events, task_id):

54

"""

55

Retrieve specific task by UUID.

56

57

Args:

58

events: Events instance containing task data

59

task_id (str): Task UUID to retrieve

60

61

Returns:

62

dict or None: Task object if found, None otherwise

63

"""

64

65

def as_dict(task):

66

"""

67

Convert task object to dictionary representation.

68

69

Args:

70

task: Task object from Celery state

71

72

Returns:

73

dict: Task data as dictionary with all relevant fields

74

"""

75

```

76

77

### Task Search and Filtering

78

79

Advanced search capabilities for finding tasks based on various criteria.

80

81

```python { .api }

82

def parse_search_terms(raw_search_value):

83

"""

84

Parse search query string into structured search terms.

85

86

Args:

87

raw_search_value (str): Raw search query string

88

89

Returns:

90

list: Parsed search terms with field specifications

91

92

Supports field-specific searches like 'name:my_task' or 'state:FAILURE'

93

and general text searches across task content.

94

"""

95

96

def satisfies_search_terms(task, search_terms):

97

"""

98

Check if task matches search criteria.

99

100

Args:

101

task (dict): Task object to check

102

search_terms (list): Parsed search terms from parse_search_terms

103

104

Returns:

105

bool: True if task matches all search terms

106

107

Performs comprehensive search across task name, arguments, result,

108

traceback, and other task metadata.

109

"""

110

111

def task_args_contains_search_args(task_args, search_args):

112

"""

113

Check if task arguments contain search terms.

114

115

Args:

116

task_args (list): Task arguments

117

search_args (list): Search terms to find

118

119

Returns:

120

bool: True if arguments contain search terms

121

122

Searches within task arguments and keyword arguments for specified terms.

123

"""

124

```

125

126

### Task State Constants

127

128

Standard Celery task states used throughout the system.

129

130

```python { .api }

131

# Task states from Celery

132

TASK_STATES = [

133

'PENDING', # Task is waiting for execution

134

'STARTED', # Task has been started

135

'SUCCESS', # Task completed successfully

136

'FAILURE', # Task failed with an exception

137

'RETRY', # Task is being retried

138

'REVOKED', # Task has been revoked/cancelled

139

]

140

141

# Sort key mappings for task sorting

142

sort_keys = {

143

'name': lambda task: task.name or '',

144

'state': lambda task: task.state or '',

145

'received': lambda task: task.received or 0,

146

'started': lambda task: task.started or 0,

147

'runtime': lambda task: task.runtime or 0,

148

'worker': lambda task: task.worker.hostname if task.worker else '',

149

}

150

```

151

152

## Task Control Operations

153

154

### Task Execution

155

156

Execute tasks remotely with various execution modes and options.

157

158

```python { .api }

159

def task_apply(task_name, args=None, kwargs=None, **options):

160

"""

161

Execute task synchronously and wait for result.

162

163

Args:

164

task_name (str): Name of task to execute

165

args (list, optional): Task arguments

166

kwargs (dict, optional): Task keyword arguments

167

**options: Additional task options (queue, countdown, eta, etc.)

168

169

Returns:

170

dict: Task result and metadata

171

172

Executes the task and waits for completion, returning the result

173

or raising an exception if the task fails.

174

"""

175

176

def task_async_apply(task_name, args=None, kwargs=None, **options):

177

"""

178

Execute task asynchronously without waiting for result.

179

180

Args:

181

task_name (str): Name of task to execute

182

args (list, optional): Task arguments

183

kwargs (dict, optional): Task keyword arguments

184

**options: Additional task options

185

186

Returns:

187

dict: Task ID and submission metadata

188

189

Submits the task for execution and returns immediately with task ID.

190

"""

191

192

def task_send_task(task_name, args=None, kwargs=None, **options):

193

"""

194

Send task without requiring task definition on sender.

195

196

Args:

197

task_name (str): Name of task to send

198

args (list, optional): Task arguments

199

kwargs (dict, optional): Task keyword arguments

200

**options: Additional task options

201

202

Returns:

203

dict: Task ID and submission metadata

204

205

Sends task using Celery's send_task, which doesn't require the

206

task to be registered locally.

207

"""

208

```

209

210

### Task Result Management

211

212

Retrieve and manage task results and execution status.

213

214

```python { .api }

215

def get_task_result(task_id, timeout=None):

216

"""

217

Get task result by ID.

218

219

Args:

220

task_id (str): Task UUID

221

timeout (float, optional): Maximum time to wait for result

222

223

Returns:

224

dict: Task result data including:

225

- result: Task return value

226

- state: Current task state

227

- traceback: Error traceback if failed

228

- success: Boolean success status

229

230

Retrieves result from the configured result backend.

231

"""

232

233

def task_abort(task_id):

234

"""

235

Abort running task.

236

237

Args:

238

task_id (str): Task UUID to abort

239

240

Returns:

241

dict: Abort operation status

242

243

Attempts to abort a running task if it supports abortion.

244

Only works with AbortableTask instances.

245

"""

246

```

247

248

### Task Control Commands

249

250

Remote control operations for managing task execution and behavior.

251

252

```python { .api }

253

def task_revoke(task_id, terminate=False, signal='SIGTERM'):

254

"""

255

Revoke/cancel a task.

256

257

Args:

258

task_id (str): Task UUID to revoke

259

terminate (bool): Whether to terminate if already running

260

signal (str): Signal to send if terminating ('SIGTERM', 'SIGKILL')

261

262

Returns:

263

dict: Revocation status

264

265

Revokes a task, optionally terminating it if already executing.

266

"""

267

268

def task_rate_limit(task_name, rate_limit, workername=None):

269

"""

270

Set rate limit for task type.

271

272

Args:

273

task_name (str): Name of task to limit

274

rate_limit (str): Rate limit specification (e.g., '10/m', '1/s')

275

workername (str, optional): Specific worker to apply limit

276

277

Returns:

278

dict: Rate limit operation status

279

280

Sets execution rate limit for the specified task type.

281

"""

282

283

def task_timeout(task_name, soft=None, hard=None, workername=None):

284

"""

285

Set timeout limits for task type.

286

287

Args:

288

task_name (str): Name of task to configure

289

soft (float, optional): Soft timeout in seconds

290

hard (float, optional): Hard timeout in seconds

291

workername (str, optional): Specific worker to apply timeouts

292

293

Returns:

294

dict: Timeout configuration status

295

296

Configures soft and hard timeout limits for task execution.

297

"""

298

```

299

300

## Task Data Structure

301

302

### Core Task Information

303

304

Comprehensive task data structure containing all relevant execution and metadata information.

305

306

```python { .api }

307

TaskInfo = {

308

# Basic identification

309

'uuid': str, # Unique task identifier

310

'name': str, # Task name/type

311

'state': str, # Current task state

312

'hostname': str, # Worker hostname executing task

313

314

# Timing information

315

'timestamp': float, # Task event timestamp

316

'received': float, # Time task was received by worker

317

'started': float, # Time task execution started

318

'succeeded': float, # Time task completed successfully

319

'failed': float, # Time task failed

320

'retried': float, # Time task was retried

321

'revoked': float, # Time task was revoked

322

'runtime': float, # Total execution time in seconds

323

324

# Task parameters

325

'args': list, # Task positional arguments

326

'kwargs': dict, # Task keyword arguments

327

'retries': int, # Number of retry attempts

328

'eta': str, # Estimated time of arrival

329

'expires': str, # Task expiration time

330

331

# Execution results

332

'result': Any, # Task return value (if successful)

333

'traceback': str, # Exception traceback (if failed)

334

'exception': str, # Exception message (if failed)

335

336

# Routing information

337

'queue': str, # Queue name task was sent to

338

'exchange': str, # Exchange name

339

'routing_key': str, # Routing key used

340

'priority': int, # Message priority

341

342

# Worker information

343

'worker': {

344

'hostname': str, # Worker hostname

345

'pid': int, # Worker process ID

346

'sw_ident': str, # Software identifier

347

'sw_ver': str, # Software version

348

'sw_sys': str, # System information

349

},

350

351

# Additional metadata

352

'clock': int, # Logical clock value

353

'client': str, # Client that sent the task

354

'root_id': str, # Root task ID (for task chains)

355

'parent_id': str, # Parent task ID (for task groups)

356

'children': [str], # Child task IDs

357

}

358

```

359

360

### Task Event Types

361

362

Different event types that can occur during task execution lifecycle.

363

364

```python { .api }

365

TaskEventTypes = {

366

'task-sent': {

367

'description': 'Task was sent to broker',

368

'fields': ['uuid', 'name', 'args', 'kwargs', 'eta', 'expires']

369

},

370

'task-received': {

371

'description': 'Worker received task from broker',

372

'fields': ['uuid', 'name', 'hostname', 'timestamp']

373

},

374

'task-started': {

375

'description': 'Worker started executing task',

376

'fields': ['uuid', 'hostname', 'timestamp', 'pid']

377

},

378

'task-succeeded': {

379

'description': 'Task completed successfully',

380

'fields': ['uuid', 'result', 'runtime', 'hostname', 'timestamp']

381

},

382

'task-failed': {

383

'description': 'Task execution failed',

384

'fields': ['uuid', 'exception', 'traceback', 'hostname', 'timestamp']

385

},

386

'task-retried': {

387

'description': 'Task is being retried',

388

'fields': ['uuid', 'reason', 'traceback', 'hostname', 'timestamp']

389

},

390

'task-revoked': {

391

'description': 'Task was revoked/cancelled',

392

'fields': ['uuid', 'hostname', 'timestamp']

393

}

394

}

395

```

396

397

## Usage Examples

398

399

### Basic Task Filtering

400

401

```python

402

from flower.utils.tasks import iter_tasks, get_task_by_id

403

from flower.events import Events

404

405

# Assume we have an events instance with task data

406

events = Events(celery_app, io_loop)

407

408

# Get all failed tasks

409

failed_tasks = list(iter_tasks(

410

events,

411

state='FAILURE',

412

limit=100

413

))

414

415

print(f"Found {len(failed_tasks)} failed tasks")

416

417

# Get tasks from specific worker

418

worker_tasks = list(iter_tasks(

419

events,

420

worker='celery@worker1',

421

limit=50

422

))

423

424

# Get recent tasks

425

from datetime import datetime, timedelta

426

recent_tasks = list(iter_tasks(

427

events,

428

received_start=datetime.now() - timedelta(hours=1),

429

sort_by='received'

430

))

431

```

432

433

### Advanced Task Search

434

435

```python

436

from flower.utils.search import parse_search_terms, satisfies_search_terms

437

438

# Parse search query

439

search_terms = parse_search_terms('name:my_task state:FAILURE')

440

441

# Find matching tasks

442

matching_tasks = []

443

for task in iter_tasks(events):

444

if satisfies_search_terms(task, search_terms):

445

matching_tasks.append(task)

446

447

# Search with text query

448

text_search = parse_search_terms('error database connection')

449

error_tasks = [

450

task for task in iter_tasks(events, state='FAILURE')

451

if satisfies_search_terms(task, text_search)

452

]

453

```

454

455

### Task Execution and Control

456

457

```python

458

from flower.api.tasks import TaskApply, TaskAsyncApply, TaskRevoke

459

460

# Execute task synchronously

461

async def execute_task():

462

handler = TaskApply()

463

result = await handler.post(

464

'my_task',

465

args=[1, 2, 3],

466

kwargs={'timeout': 30}

467

)

468

print(f"Task result: {result}")

469

470

# Execute task asynchronously

471

async def async_execute():

472

handler = TaskAsyncApply()

473

response = await handler.post(

474

'long_running_task',

475

args=['data'],

476

kwargs={'priority': 5}

477

)

478

task_id = response['task-id']

479

print(f"Task submitted: {task_id}")

480

481

# Revoke task

482

async def revoke_task():

483

handler = TaskRevoke()

484

await handler.post('task-uuid-here', terminate=True)

485

```

486

487

### Task Result Monitoring

488

489

```python

490

from flower.api.tasks import TaskResult, TaskInfo

491

492

# Get task result

493

async def get_result():

494

handler = TaskResult()

495

result = await handler.get('task-uuid-here', timeout=10)

496

497

if result['state'] == 'SUCCESS':

498

print(f"Task completed: {result['result']}")

499

elif result['state'] == 'FAILURE':

500

print(f"Task failed: {result['traceback']}")

501

502

# Get detailed task information

503

async def get_task_info():

504

handler = TaskInfo()

505

info = await handler.get('task-uuid-here')

506

507

print(f"Task: {info['name']}")

508

print(f"State: {info['state']}")

509

print(f"Worker: {info['hostname']}")

510

print(f"Runtime: {info.get('runtime', 'N/A')} seconds")

511

```

512

513

### Bulk Task Operations

514

515

```python

516

# Process tasks in batches

517

def process_tasks_batch(events, batch_size=1000):

518

offset = 0

519

520

while True:

521

batch = list(iter_tasks(

522

events,

523

limit=batch_size,

524

offset=offset,

525

sort_by='received'

526

))

527

528

if not batch:

529

break

530

531

# Process batch

532

for task in batch:

533

process_single_task(task)

534

535

offset += batch_size

536

537

# Find and revoke failed tasks

538

async def cleanup_failed_tasks():

539

failed_tasks = iter_tasks(events, state='FAILURE')

540

541

for task in failed_tasks:

542

if should_revoke_task(task):

543

await task_revoke(task['uuid'])

544

```

545

546

### Task Analytics

547

548

```python

549

from collections import Counter

550

from datetime import datetime, timedelta

551

552

def analyze_task_performance(events):

553

"""Generate task performance analytics."""

554

555

# Get tasks from last 24 hours

556

yesterday = datetime.now() - timedelta(days=1)

557

recent_tasks = list(iter_tasks(

558

events,

559

received_start=yesterday,

560

sort_by='received'

561

))

562

563

# Task counts by state

564

state_counts = Counter(task['state'] for task in recent_tasks)

565

566

# Task counts by name

567

name_counts = Counter(task['name'] for task in recent_tasks)

568

569

# Average runtime by task name

570

runtime_by_name = {}

571

for task_name in name_counts:

572

runtimes = [

573

task['runtime'] for task in recent_tasks

574

if task['name'] == task_name and task.get('runtime')

575

]

576

if runtimes:

577

runtime_by_name[task_name] = sum(runtimes) / len(runtimes)

578

579

return {

580

'total_tasks': len(recent_tasks),

581

'state_distribution': dict(state_counts),

582

'task_distribution': dict(name_counts),

583

'average_runtimes': runtime_by_name,

584

'success_rate': state_counts['SUCCESS'] / len(recent_tasks) if recent_tasks else 0

585

}

586

```

587

588

## Error Handling and Edge Cases

589

590

Task management includes comprehensive error handling for various scenarios:

591

592

```python

593

# Handle task execution errors

594

try:

595

result = await task_apply('my_task', args=[1, 2, 3])

596

except Exception as e:

597

if 'timeout' in str(e).lower():

598

print("Task execution timed out")

599

elif 'not registered' in str(e).lower():

600

print("Task not found on workers")

601

else:

602

print(f"Task execution failed: {e}")

603

604

# Handle missing task results

605

try:

606

result = await get_task_result('task-id')

607

except Exception as e:

608

if 'no such task' in str(e).lower():

609

print("Task not found in result backend")

610

else:

611

print(f"Could not retrieve result: {e}")

612

613

# Handle search and filtering edge cases

614

def safe_task_search(events, **filters):

615

try:

616

return list(iter_tasks(events, **filters))

617

except Exception as e:

618

print(f"Task search failed: {e}")

619

return []

620

```

621

622

## Performance Considerations

623

624

- Use pagination (`limit` and `offset`) for large task datasets

625

- Consider memory usage when processing many tasks

626

- Use specific filters to reduce dataset size before processing

627

- Cache frequent queries when possible

628

- Monitor search performance with complex queries

629

- Use appropriate sorting fields based on query patterns