or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdcore-application.mdexceptions.mdindex.mdresults-state.mdscheduling-beat.mdsignals-events.mdworkflow-primitives.md

scheduling-beat.mddocs/

0

# Scheduling and Beat

1

2

Periodic task scheduling capabilities including cron-like scheduling, interval-based schedules, solar event timing, and beat scheduler management for running recurring tasks in distributed environments.

3

4

## Capabilities

5

6

### Crontab Scheduling

7

8

Unix cron-style scheduling with minute, hour, day, and month specifications for precise timing control.

9

10

```python { .api }

11

class crontab:

12

def __init__(

13

self,

14

minute='*',

15

hour='*',

16

day_of_week='*',

17

day_of_month='*',

18

month_of_year='*',

19

nowfun=None,

20

app=None

21

):

22

"""

23

Create crontab schedule.

24

25

Args:

26

minute (str): Minute field (0-59, '*', '*/5', '10,20,30')

27

hour (str): Hour field (0-23, '*', '*/2', '9-17')

28

day_of_week (str): Day of week (0-6, mon-sun, '*')

29

day_of_month (str): Day of month (1-31, '*', '1,15')

30

month_of_year (str): Month (1-12, jan-dec, '*')

31

nowfun (callable): Function returning current time

32

app: Celery app instance

33

"""

34

35

def is_due(self, last_run_at):

36

"""

37

Check if schedule is due for execution.

38

39

Args:

40

last_run_at (datetime): Last execution time

41

42

Returns:

43

tuple: (is_due, next_time_to_run)

44

"""

45

46

def remaining_estimate(self, last_run_at):

47

"""

48

Estimate time until next execution.

49

50

Args:

51

last_run_at (datetime): Last execution time

52

53

Returns:

54

timedelta: Time remaining until next run

55

"""

56

57

def crontab(minute='*', hour='*', day_of_week='*', day_of_month='*', month_of_year='*'):

58

"""

59

Create cron-style schedule.

60

61

Args:

62

minute (str): Minutes (0-59, *, */N, ranges, lists)

63

hour (str): Hours (0-23, *, */N, ranges, lists)

64

day_of_week (str): Days of week (0-6 or names, *, */N, ranges, lists)

65

day_of_month (str): Days of month (1-31, *, */N, ranges, lists)

66

month_of_year (str): Months (1-12 or names, *, */N, ranges, lists)

67

68

Returns:

69

crontab instance

70

"""

71

```

72

73

### Interval Scheduling

74

75

Simple interval-based scheduling using timedelta objects for regular recurring execution.

76

77

```python { .api }

78

class schedule:

79

def __init__(self, run_every, relative=False, nowfun=None, app=None):

80

"""

81

Create interval schedule.

82

83

Args:

84

run_every (timedelta): Interval between executions

85

relative (bool): If True, schedule relative to last run

86

nowfun (callable): Function returning current time

87

app: Celery app instance

88

"""

89

90

def is_due(self, last_run_at):

91

"""

92

Check if schedule is due.

93

94

Args:

95

last_run_at (datetime): Last execution time

96

97

Returns:

98

tuple: (is_due, next_time_to_run)

99

"""

100

101

def remaining_estimate(self, last_run_at):

102

"""

103

Estimate time until next execution.

104

105

Args:

106

last_run_at (datetime): Last execution time

107

108

Returns:

109

timedelta: Time remaining

110

"""

111

112

def schedule(run_every, relative=False):

113

"""

114

Create interval-based schedule.

115

116

Args:

117

run_every (timedelta): Time between executions

118

relative (bool): Schedule relative to last run vs absolute intervals

119

120

Returns:

121

schedule instance

122

"""

123

```

124

125

### Solar Event Scheduling

126

127

Astronomical event-based scheduling for sunrise, sunset, and other solar events at specific geographic locations.

128

129

```python { .api }

130

class solar:

131

def __init__(self, event, lat, lon, nowfun=None, app=None):

132

"""

133

Create solar event schedule.

134

135

Args:

136

event (str): Solar event ('sunrise', 'sunset', 'dawn_civil', 'dusk_civil')

137

lat (float): Latitude coordinate

138

lon (float): Longitude coordinate

139

nowfun (callable): Function returning current time

140

app: Celery app instance

141

"""

142

143

def is_due(self, last_run_at):

144

"""

145

Check if solar event schedule is due.

146

147

Args:

148

last_run_at (datetime): Last execution time

149

150

Returns:

151

tuple: (is_due, next_time_to_run)

152

"""

153

154

def remaining_estimate(self, last_run_at):

155

"""

156

Estimate time until next solar event.

157

158

Args:

159

last_run_at (datetime): Last execution time

160

161

Returns:

162

timedelta: Time remaining

163

"""

164

165

def solar(event, lat, lon):

166

"""

167

Create solar event schedule.

168

169

Args:

170

event (str): Solar event type

171

lat (float): Latitude (-90 to 90)

172

lon (float): Longitude (-180 to 180)

173

174

Returns:

175

solar instance

176

"""

177

```

178

179

### Beat Scheduler Classes

180

181

Core beat scheduler components for managing and executing periodic tasks.

182

183

```python { .api }

184

class Scheduler:

185

def __init__(self, app, max_interval=None, Producer=None, lazy=False, **kwargs):

186

"""

187

Base scheduler class.

188

189

Args:

190

app: Celery application instance

191

max_interval (int): Maximum sleep interval between checks

192

Producer: Message producer class

193

lazy (bool): Don't start immediately

194

"""

195

196

def setup_schedule(self):

197

"""Initialize schedule from configuration."""

198

199

def sync(self):

200

"""Sync schedule with storage backend."""

201

202

def close(self):

203

"""Close scheduler and cleanup resources."""

204

205

def add(self, **kwargs):

206

"""

207

Add schedule entry.

208

209

Args:

210

**kwargs: Schedule entry parameters

211

"""

212

213

def tick(self):

214

"""

215

Check schedule and run due tasks.

216

217

Returns:

218

float: Seconds until next check needed

219

"""

220

221

@property

222

def schedule(self):

223

"""Current schedule dictionary."""

224

225

class PersistentScheduler(Scheduler):

226

def __init__(self, *args, **kwargs):

227

"""Scheduler that persists schedule to disk."""

228

229

def setup_schedule(self):

230

"""Load schedule from persistent storage."""

231

232

def sync(self):

233

"""Save schedule to persistent storage."""

234

235

class ScheduleEntry:

236

def __init__(

237

self,

238

task=None,

239

schedule=None,

240

args=None,

241

kwargs=None,

242

options=None,

243

name=None,

244

**kw

245

):

246

"""

247

Individual schedule entry.

248

249

Args:

250

task (str): Task name to execute

251

schedule: Schedule object (crontab, schedule, solar)

252

args (tuple): Task arguments

253

kwargs (dict): Task keyword arguments

254

options (dict): Task execution options

255

name (str): Entry name/identifier

256

"""

257

258

def is_due(self):

259

"""

260

Check if entry is due for execution.

261

262

Returns:

263

tuple: (is_due, next_time_to_run)

264

"""

265

266

def __call__(self, sender=None):

267

"""Execute the scheduled task."""

268

269

def update(self, other):

270

"""

271

Update entry with values from another entry.

272

273

Args:

274

other (ScheduleEntry): Entry to copy from

275

"""

276

277

class Service:

278

def __init__(self, app, max_interval=None, schedule_filename=None, scheduler_cls=None):

279

"""

280

Beat service for running periodic tasks.

281

282

Args:

283

app: Celery application instance

284

max_interval (int): Maximum sleep between schedule checks

285

schedule_filename (str): Persistent schedule file path

286

scheduler_cls: Custom scheduler class

287

"""

288

289

def start(self, embedded_process=False):

290

"""

291

Start beat service.

292

293

Args:

294

embedded_process (bool): Run as embedded process

295

"""

296

297

def sync(self):

298

"""Sync scheduler state."""

299

300

def stop(self, graceful=True):

301

"""

302

Stop beat service.

303

304

Args:

305

graceful (bool): Allow graceful shutdown

306

"""

307

```

308

309

### Beat Configuration

310

311

Configuration options and utilities for periodic task management.

312

313

```python { .api }

314

# Configuration via app.conf.beat_schedule

315

BEAT_SCHEDULE = {

316

'task-name': {

317

'task': 'myapp.tasks.my_task',

318

'schedule': crontab(minute=0, hour=4), # Execute daily at 4:00 AM

319

'args': (16, 16),

320

'kwargs': {'verbose': True},

321

'options': {'expires': 15.0}

322

}

323

}

324

325

# Add periodic task programmatically

326

app.add_periodic_task(

327

crontab(hour=7, minute=30, day_of_week=1),

328

send_weekly_report.s('Weekly Report'),

329

name='weekly report'

330

)

331

```

332

333

## Usage Examples

334

335

### Crontab Scheduling

336

337

```python

338

from celery import Celery

339

from celery.schedules import crontab

340

341

app = Celery('scheduler_example')

342

343

@app.task

344

def cleanup_temp_files():

345

# Cleanup logic here

346

return "Cleanup completed"

347

348

@app.task

349

def generate_report():

350

return "Report generated"

351

352

@app.task

353

def backup_database():

354

return "Database backed up"

355

356

# Configure periodic tasks

357

app.conf.beat_schedule = {

358

# Run every day at 2:30 AM

359

'daily-cleanup': {

360

'task': 'cleanup_temp_files',

361

'schedule': crontab(hour=2, minute=30),

362

},

363

364

# Run every Monday at 7:30 AM

365

'weekly-report': {

366

'task': 'generate_report',

367

'schedule': crontab(hour=7, minute=30, day_of_week=1),

368

},

369

370

# Run every 15 minutes during business hours

371

'frequent-check': {

372

'task': 'cleanup_temp_files',

373

'schedule': crontab(minute='*/15', hour='9-17'),

374

},

375

376

# Run on specific days of month

377

'monthly-backup': {

378

'task': 'backup_database',

379

'schedule': crontab(hour=1, minute=0, day_of_month='1,15'),

380

},

381

382

# Run multiple times per day

383

'multiple-daily': {

384

'task': 'cleanup_temp_files',

385

'schedule': crontab(hour='6,12,18', minute=0),

386

}

387

}

388

389

app.conf.timezone = 'UTC'

390

```

391

392

### Interval Scheduling

393

394

```python

395

from datetime import timedelta

396

from celery.schedules import schedule

397

398

@app.task

399

def periodic_health_check():

400

return "Health check completed"

401

402

@app.task

403

def process_queue():

404

return "Queue processed"

405

406

# Interval-based scheduling

407

app.conf.beat_schedule = {

408

# Run every 30 seconds

409

'health-check': {

410

'task': 'periodic_health_check',

411

'schedule': schedule(run_every=timedelta(seconds=30)),

412

},

413

414

# Run every 5 minutes

415

'process-queue': {

416

'task': 'process_queue',

417

'schedule': schedule(run_every=timedelta(minutes=5)),

418

'args': ('high_priority',),

419

'options': {'expires': 60} # Task expires after 60 seconds

420

},

421

422

# Run every hour with relative timing

423

'hourly-task': {

424

'task': 'cleanup_temp_files',

425

'schedule': schedule(run_every=timedelta(hours=1), relative=True),

426

}

427

}

428

```

429

430

### Solar Event Scheduling

431

432

```python

433

from celery.schedules import solar

434

435

@app.task

436

def morning_activation():

437

return "Morning systems activated"

438

439

@app.task

440

def evening_shutdown():

441

return "Evening systems shutdown"

442

443

# Solar event scheduling (San Francisco coordinates)

444

app.conf.beat_schedule = {

445

# Run at sunrise

446

'morning-startup': {

447

'task': 'morning_activation',

448

'schedule': solar('sunrise', 37.7749, -122.4194),

449

},

450

451

# Run at sunset

452

'evening-shutdown': {

453

'task': 'evening_shutdown',

454

'schedule': solar('sunset', 37.7749, -122.4194),

455

}

456

}

457

```

458

459

### Programmatic Schedule Management

460

461

```python

462

from celery.schedules import crontab

463

from datetime import timedelta

464

465

@app.task

466

def dynamic_task(message):

467

return f"Executed: {message}"

468

469

# Add periodic tasks programmatically

470

def setup_dynamic_schedule():

471

# Add daily task

472

app.add_periodic_task(

473

crontab(hour=8, minute=0),

474

dynamic_task.s('Daily morning task'),

475

name='morning task'

476

)

477

478

# Add interval task

479

app.add_periodic_task(

480

timedelta(minutes=10),

481

dynamic_task.s('Every 10 minutes'),

482

name='frequent task'

483

)

484

485

# Add conditional task

486

import datetime

487

if datetime.datetime.now().weekday() == 0: # Monday

488

app.add_periodic_task(

489

crontab(hour=9, minute=0, day_of_week=1),

490

dynamic_task.s('Monday special task'),

491

name='monday task'

492

)

493

494

# Call during app initialization

495

setup_dynamic_schedule()

496

```

497

498

### Custom Scheduler

499

500

```python

501

from celery.beat import Scheduler, ScheduleEntry

502

import json

503

import os

504

505

class DatabaseScheduler(Scheduler):

506

"""Custom scheduler that stores schedule in database."""

507

508

def __init__(self, *args, **kwargs):

509

self.database_url = kwargs.pop('database_url', 'sqlite:///schedule.db')

510

super().__init__(*args, **kwargs)

511

512

def setup_schedule(self):

513

"""Load schedule from database."""

514

# Implementation would load from database

515

self.schedule = self.load_schedule_from_db()

516

517

def sync(self):

518

"""Save schedule to database."""

519

# Implementation would save to database

520

self.save_schedule_to_db()

521

522

def load_schedule_from_db(self):

523

"""Load schedule entries from database."""

524

# Database loading logic

525

return {}

526

527

def save_schedule_to_db(self):

528

"""Save schedule entries to database."""

529

# Database saving logic

530

pass

531

532

# Use custom scheduler

533

app.conf.beat_scheduler = 'myapp.schedulers:DatabaseScheduler'

534

```

535

536

### Beat Service Management

537

538

```python

539

from celery.beat import Service

540

541

def run_beat_service():

542

"""Run beat service programmatically."""

543

544

# Create beat service

545

beat_service = Service(

546

app=app,

547

max_interval=60, # Check schedule every 60 seconds max

548

schedule_filename='celerybeat-schedule'

549

)

550

551

try:

552

# Start beat service (blocking)

553

beat_service.start()

554

except KeyboardInterrupt:

555

print("Beat service interrupted")

556

finally:

557

# Cleanup

558

beat_service.stop()

559

560

# Run beat as embedded process

561

def run_embedded_beat():

562

"""Run beat embedded in worker process."""

563

564

beat_service = Service(app=app)

565

beat_service.start(embedded_process=True)

566

567

# Continue with other work...

568

569

# Stop when needed

570

beat_service.stop()

571

```

572

573

### Advanced Scheduling Patterns

574

575

```python

576

from celery.schedules import crontab, schedule

577

from datetime import datetime, timedelta

578

579

@app.task

580

def conditional_task():

581

# Task that only runs under certain conditions

582

current_hour = datetime.now().hour

583

if 9 <= current_hour <= 17: # Business hours

584

return "Task executed during business hours"

585

else:

586

return "Task skipped outside business hours"

587

588

@app.task

589

def escalating_task(attempt=1):

590

# Task with escalating retry intervals

591

try:

592

# Some operation that might fail

593

if attempt < 3:

594

raise Exception(f"Simulated failure, attempt {attempt}")

595

return f"Success on attempt {attempt}"

596

except Exception:

597

# Retry with increasing delay

598

escalating_task.apply_async(

599

args=(attempt + 1,),

600

countdown=attempt * 60 # 1min, 2min, 3min delays

601

)

602

603

# Complex scheduling configuration

604

app.conf.beat_schedule = {

605

# Task with complex cron expression

606

'business-hours-only': {

607

'task': 'conditional_task',

608

'schedule': crontab(minute='*/30', hour='9-17', day_of_week='1-5'),

609

'options': {

610

'expires': 15 * 60, # Expire after 15 minutes

611

'retry': True,

612

'retry_policy': {

613

'max_retries': 3,

614

'interval_start': 0,

615

'interval_step': 0.2,

616

'interval_max': 0.2,

617

}

618

}

619

},

620

621

# Weekend-only task

622

'weekend-maintenance': {

623

'task': 'cleanup_temp_files',

624

'schedule': crontab(hour=3, minute=0, day_of_week='6,0'), # Saturday and Sunday

625

'kwargs': {'deep_clean': True}

626

},

627

628

# End of month reporting

629

'month-end-report': {

630

'task': 'generate_report',

631

'schedule': crontab(hour=23, minute=59, day_of_month='28-31'), # Last days of month

632

'kwargs': {'report_type': 'monthly'}

633

}

634

}

635

```

636

637

### Monitoring and Debugging

638

639

```python

640

@app.task(bind=True)

641

def monitored_periodic_task(self):

642

"""Periodic task with built-in monitoring."""

643

644

try:

645

# Task logic here

646

result = "Task completed successfully"

647

648

# Update task state

649

self.update_state(

650

state='SUCCESS',

651

meta={'result': result, 'timestamp': datetime.now().isoformat()}

652

)

653

654

return result

655

656

except Exception as exc:

657

# Handle errors

658

self.update_state(

659

state='FAILURE',

660

meta={'error': str(exc), 'timestamp': datetime.now().isoformat()}

661

)

662

raise exc

663

664

# Configure with detailed options

665

app.conf.beat_schedule = {

666

'monitored-task': {

667

'task': 'monitored_periodic_task',

668

'schedule': crontab(minute='*/5'),

669

'options': {

670

'task_track_started': True,

671

'task_serializer': 'json',

672

'task_routes': {'monitored_periodic_task': {'queue': 'periodic'}}

673

}

674

}

675

}

676

```