or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdcore-application.mdexceptions.mdindex.mdresults-state.mdscheduling-beat.mdsignals-events.mdworkflow-primitives.md

core-application.mddocs/

0

# Core Application

1

2

Core Celery application classes and task creation mechanisms that form the foundation of distributed task processing. These components provide the primary interface for creating, configuring, and managing Celery applications and tasks.

3

4

## Capabilities

5

6

### Celery Application Class

7

8

Main application class that serves as the central coordination point for task management, configuration, and worker communication.

9

10

```python { .api }

11

class Celery:

12

def __init__(

13

self,

14

main=None,

15

loader=None,

16

backend=None,

17

amqp=None,

18

events=None,

19

log=None,

20

control=None,

21

set_as_current=True,

22

tasks=None,

23

broker=None,

24

include=None,

25

changes=None,

26

config_source=None,

27

fixups=None,

28

task_cls=None,

29

autofinalize=True,

30

namespace=None,

31

strict_typing=True,

32

**kwargs

33

):

34

"""

35

Create a Celery application instance.

36

37

Args:

38

main (str): Name of main module if running as __main__

39

loader: Custom loader class for configuration

40

backend (str): Result backend URL or class

41

broker (str): Message broker URL

42

include (list): Modules to import when worker starts

43

set_as_current (bool): Make this the current app

44

autofinalize (bool): Auto-finalize app on first use

45

namespace (str): Configuration key namespace

46

"""

47

48

def task(self, *args, **opts):

49

"""

50

Decorator to create task from any callable.

51

52

Args:

53

bind (bool): Create bound task with self parameter

54

name (str): Custom task name

55

base (class): Custom task base class

56

serializer (str): Task argument serializer

57

max_retries (int): Maximum retry attempts

58

default_retry_delay (int): Default retry delay in seconds

59

rate_limit (str): Rate limit (e.g., '100/m' for 100/minute)

60

time_limit (int): Hard time limit in seconds

61

soft_time_limit (int): Soft time limit in seconds

62

ignore_result (bool): Don't store task results

63

store_errors_even_if_ignored (bool): Store errors even when ignoring results

64

65

Returns:

66

Task class instance

67

"""

68

69

def send_task(

70

self,

71

name,

72

args=None,

73

kwargs=None,

74

countdown=None,

75

eta=None,

76

task_id=None,

77

producer=None,

78

connection=None,

79

router=None,

80

result_cls=None,

81

expires=None,

82

publisher=None,

83

link=None,

84

link_error=None,

85

add_to_parent=True,

86

group_id=None,

87

group_index=None,

88

retries=0,

89

chord=None,

90

reply_to=None,

91

time_limit=None,

92

soft_time_limit=None,

93

root_id=None,

94

parent_id=None,

95

route_name=None,

96

shadow=None,

97

chain=None,

98

task_type=None,

99

**options

100

):

101

"""

102

Send task by name without having the task function imported.

103

104

Args:

105

name (str): Task name to execute

106

args (tuple): Positional arguments for task

107

kwargs (dict): Keyword arguments for task

108

countdown (int): Delay execution for N seconds

109

eta (datetime): Specific execution time

110

task_id (str): Custom task ID

111

expires (datetime|int): Task expiration time

112

link (Signature): Success callback

113

link_error (Signature): Failure callback

114

115

Returns:

116

AsyncResult instance

117

"""

118

119

def signature(self, *args, **kwargs):

120

"""

121

Create signature bound to this app.

122

123

Returns:

124

Signature instance

125

"""

126

127

def start(self, argv=None):

128

"""

129

Run celery using command line arguments.

130

131

Args:

132

argv (list): Command line arguments

133

"""

134

135

def worker_main(self, argv=None):

136

"""

137

Run celery worker using command line arguments.

138

139

Args:

140

argv (list): Command line arguments

141

"""

142

143

def config_from_object(self, obj, silent=False, force=False, namespace=None):

144

"""

145

Load configuration from object.

146

147

Args:

148

obj: Configuration object, module, or string

149

silent (bool): Don't raise on import errors

150

force (bool): Force update even if finalized

151

namespace (str): Only load keys with this prefix

152

"""

153

154

def config_from_envvar(self, variable_name, silent=False, force=False):

155

"""

156

Load configuration from environment variable.

157

158

Args:

159

variable_name (str): Environment variable name

160

silent (bool): Don't raise if variable not found

161

force (bool): Force update even if finalized

162

"""

163

164

def autodiscover_tasks(self, packages=None, related_name='tasks', force=False):

165

"""

166

Automatically discover tasks from packages.

167

168

Args:

169

packages (list): Packages to search (defaults to INSTALLED_APPS)

170

related_name (str): Module name to search for tasks

171

force (bool): Force discovery even if already done

172

"""

173

174

def finalize(self, auto=False):

175

"""

176

Finalize the app configuration.

177

178

Args:

179

auto (bool): Called automatically during first use

180

"""

181

182

def close(self):

183

"""Clean up after the application."""

184

185

def connection_for_read(self, url=None, **kwargs):

186

"""

187

Get connection for consuming messages.

188

189

Args:

190

url (str): Broker URL override

191

192

Returns:

193

Connection instance

194

"""

195

196

def connection_for_write(self, url=None, **kwargs):

197

"""

198

Get connection for producing messages.

199

200

Args:

201

url (str): Broker URL override

202

203

Returns:

204

Connection instance

205

"""

206

207

def add_periodic_task(

208

self,

209

schedule,

210

sig,

211

args=(),

212

kwargs=(),

213

name=None,

214

**opts

215

):

216

"""

217

Add periodic task to beat schedule.

218

219

Args:

220

schedule: Schedule instance (crontab, schedule)

221

sig (Signature): Task to execute

222

args (tuple): Arguments for task

223

kwargs (dict): Keyword arguments for task

224

name (str): Schedule entry name

225

"""

226

227

@property

228

def conf(self):

229

"""Current configuration namespace."""

230

231

@property

232

def tasks(self):

233

"""Task registry containing all registered tasks."""

234

235

@property

236

def backend(self):

237

"""Current result backend instance."""

238

239

@property

240

def control(self):

241

"""Remote control interface for workers."""

242

243

@property

244

def events(self):

245

"""Events interface for monitoring."""

246

247

@property

248

def current_task(self):

249

"""Currently executing task."""

250

```

251

252

### Task Base Class

253

254

Base class for all Celery tasks, providing execution methods and task context access.

255

256

```python { .api }

257

class Task:

258

def __init__(self):

259

"""Initialize task instance."""

260

261

def delay(self, *args, **kwargs):

262

"""

263

Shortcut to apply_async with only positional arguments.

264

265

Args:

266

*args: Positional arguments for task

267

**kwargs: Keyword arguments for task

268

269

Returns:

270

AsyncResult instance

271

"""

272

273

def apply_async(

274

self,

275

args=None,

276

kwargs=None,

277

task_id=None,

278

producer=None,

279

link=None,

280

link_error=None,

281

shadow=None,

282

**options

283

):

284

"""

285

Apply task asynchronously.

286

287

Args:

288

args (tuple): Positional arguments

289

kwargs (dict): Keyword arguments

290

task_id (str): Custom task ID

291

producer: Message producer

292

link (Signature): Success callback

293

link_error (Signature): Error callback

294

shadow (str): Override task name in logs

295

countdown (int): Delay execution N seconds

296

eta (datetime): Execute at specific time

297

expires (datetime|int): Task expiration

298

retry (bool): Enable retries

299

retry_policy (dict): Retry configuration

300

301

Returns:

302

AsyncResult instance

303

"""

304

305

def apply(self, args=None, kwargs=None, **options):

306

"""

307

Execute task synchronously in current process.

308

309

Args:

310

args (tuple): Positional arguments

311

kwargs (dict): Keyword arguments

312

313

Returns:

314

Task result directly

315

"""

316

317

def retry(

318

self,

319

args=None,

320

kwargs=None,

321

exc=None,

322

throw=True,

323

eta=None,

324

countdown=None,

325

max_retries=None,

326

**options

327

):

328

"""

329

Retry current task.

330

331

Args:

332

args (tuple): New positional arguments

333

kwargs (dict): New keyword arguments

334

exc (Exception): Exception that caused retry

335

throw (bool): Re-raise Retry exception

336

eta (datetime): Retry at specific time

337

countdown (int): Retry after N seconds

338

max_retries (int): Override max retries

339

340

Raises:

341

Retry: To trigger task retry

342

"""

343

344

def signature(self, args=None, kwargs=None, **options):

345

"""

346

Create signature for this task.

347

348

Args:

349

args (tuple): Positional arguments

350

kwargs (dict): Keyword arguments

351

352

Returns:

353

Signature instance

354

"""

355

356

def s(self, *args, **kwargs):

357

"""

358

Shortcut for signature creation.

359

360

Args:

361

*args: Positional arguments

362

**kwargs: Keyword arguments

363

364

Returns:

365

Signature instance

366

"""

367

368

def si(self, *args, **kwargs):

369

"""

370

Create immutable signature.

371

372

Args:

373

*args: Positional arguments

374

**kwargs: Keyword arguments

375

376

Returns:

377

Immutable signature instance

378

"""

379

380

def chunks(self, it, n):

381

"""

382

Split iterator into chunks for parallel processing.

383

384

Args:

385

it: Iterator to chunk

386

n (int): Chunk size

387

388

Returns:

389

Chunks instance

390

"""

391

392

@property

393

def name(self):

394

"""Task name."""

395

396

@property

397

def app(self):

398

"""Celery app instance this task is bound to."""

399

400

@property

401

def request(self):

402

"""Current task request context."""

403

```

404

405

### Shared Task Decorator

406

407

Decorator for creating tasks that work with any Celery app, useful for reusable libraries and Django integration.

408

409

```python { .api }

410

def shared_task(*args, **kwargs):

411

"""

412

Create task that works with any Celery app instance.

413

414

Args:

415

bind (bool): Create bound task with self parameter

416

name (str): Custom task name

417

base (class): Custom task base class

418

serializer (str): Argument serializer

419

max_retries (int): Maximum retry attempts

420

default_retry_delay (int): Default retry delay

421

rate_limit (str): Task rate limit

422

ignore_result (bool): Don't store results

423

424

Returns:

425

Task decorator function

426

"""

427

428

def current_app():

429

"""

430

Get the current Celery application instance.

431

432

Returns:

433

Celery: Current application instance

434

435

Raises:

436

RuntimeError: If no current app is set

437

"""

438

439

def current_task():

440

"""

441

Get the currently executing task.

442

443

Returns:

444

Task: Current task instance or None if not in task context

445

"""

446

```

447

448

### Task Request Context

449

450

Context object providing access to current task metadata and execution information.

451

452

```python { .api }

453

class Context:

454

"""

455

Task execution context available via Task.request.

456

457

Attributes:

458

id (str): Unique task ID

459

args (tuple): Task positional arguments

460

kwargs (dict): Task keyword arguments

461

retries (int): Number of retries attempted

462

is_eager (bool): True if executed synchronously

463

eta (datetime): Scheduled execution time

464

expires (datetime): Task expiration time

465

headers (dict): Message headers

466

delivery_info (dict): Message delivery information

467

reply_to (str): Reply queue name

468

correlation_id (str): Message correlation ID

469

root_id (str): Root task ID in chain

470

parent_id (str): Parent task ID

471

group (str): Group ID if part of group

472

group_index (int): Position in group

473

chord (str): Chord ID if part of chord

474

chain (list): Chain information

475

hostname (str): Worker hostname

476

logfile (str): Worker log file

477

loglevel (int): Worker log level

478

utc (bool): Use UTC times

479

called_directly (bool): Called via apply()

480

callbacks (list): Success callbacks

481

errbacks (list): Error callbacks

482

timelimit (tuple): Time limits (soft, hard)

483

origin (str): Message origin

484

"""

485

```

486

487

## Usage Examples

488

489

### Basic Application Setup

490

491

```python

492

from celery import Celery

493

494

# Create application with Redis broker and backend

495

app = Celery(

496

'myapp',

497

broker='redis://localhost:6379/0',

498

backend='redis://localhost:6379/1'

499

)

500

501

# Configure from object

502

app.config_from_object({

503

'task_serializer': 'json',

504

'accept_content': ['json'],

505

'result_serializer': 'json',

506

'timezone': 'UTC',

507

'enable_utc': True,

508

})

509

510

# Auto-discover tasks

511

app.autodiscover_tasks(['myapp.tasks', 'myapp.utils'])

512

```

513

514

### Task Creation Patterns

515

516

```python

517

# Basic task

518

@app.task

519

def add(x, y):

520

return x + y

521

522

# Bound task with retry logic

523

@app.task(bind=True, max_retries=3)

524

def process_data(self, data_id):

525

try:

526

# Process data

527

return process(data_id)

528

except Exception as exc:

529

# Retry with exponential backoff

530

self.retry(countdown=2 ** self.request.retries, exc=exc)

531

532

# Shared task for libraries

533

@shared_task

534

def send_email(recipient, subject, body):

535

# Email sending logic

536

pass

537

538

# Custom task class

539

class DatabaseTask(app.Task):

540

def on_failure(self, exc, task_id, args, kwargs, einfo):

541

# Custom failure handling

542

logger.error(f"Task {task_id} failed: {exc}")

543

544

@app.task(base=DatabaseTask)

545

def update_user(user_id, data):

546

# Database operation

547

pass

548

```

549

550

### Task Execution

551

552

```python

553

# Synchronous execution

554

result = add.apply(args=(4, 4))

555

print(result) # 8

556

557

# Asynchronous execution

558

result = add.delay(4, 4)

559

print(result.get()) # Wait for result: 8

560

561

# Advanced async execution

562

result = add.apply_async(

563

args=(4, 4),

564

countdown=10, # Execute in 10 seconds

565

expires=60, # Expire after 60 seconds

566

retry=True,

567

retry_policy={

568

'max_retries': 3,

569

'interval_start': 0,

570

'interval_step': 0.2,

571

'interval_max': 0.2,

572

}

573

)

574

575

# Send task by name

576

result = app.send_task('myapp.tasks.add', args=(4, 4))

577

```