or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

admin-interface.mddatabase-scheduler.mdindex.mdschedule-models.mdtask-management.mdvalidation-utilities.md

task-management.mddocs/

0

# Task Management

1

2

Core models and utilities for defining, configuring, and tracking periodic tasks including task arguments, routing, scheduling, and execution management.

3

4

## Capabilities

5

6

### Periodic Task Model

7

8

Central model for defining periodic tasks with full configuration options.

9

10

```python { .api }

11

class PeriodicTask(models.Model):

12

"""

13

Model representing a periodic task to be executed by Celery.

14

15

Must be associated with exactly one schedule type (interval, crontab, solar, or clocked).

16

"""

17

# Task identification

18

name: str # Unique task name, max 200 chars

19

task: str # Celery task name to execute, max 200 chars

20

21

# Schedule associations (exactly one must be set)

22

interval: Optional[ForeignKey[IntervalSchedule]]

23

crontab: Optional[ForeignKey[CrontabSchedule]]

24

solar: Optional[ForeignKey[SolarSchedule]]

25

clocked: Optional[ForeignKey[ClockedSchedule]]

26

27

# Task arguments

28

args: str # JSON encoded positional arguments, default '[]'

29

kwargs: str # JSON encoded keyword arguments, default '{}'

30

31

# Routing and delivery

32

queue: Optional[str] # Queue override, max 200 chars

33

exchange: Optional[str] # AMQP exchange override, max 200 chars

34

routing_key: Optional[str] # AMQP routing key override, max 200 chars

35

headers: str # JSON encoded AMQP headers, default '{}'

36

priority: Optional[int] # Task priority 0-255

37

38

# Execution control

39

expires: Optional[datetime.datetime] # Absolute expiry time

40

expire_seconds: Optional[int] # Expiry in seconds from now

41

one_off: bool # Execute only once, default False

42

start_time: Optional[datetime.datetime] # When to start executing

43

enabled: bool # Whether task is enabled, default True

44

45

# Execution tracking (auto-managed)

46

last_run_at: Optional[datetime.datetime] # Last execution time

47

total_run_count: int # Number of times executed, default 0

48

date_changed: datetime.datetime # Last modified timestamp

49

50

# Documentation

51

description: str # Task description

52

53

# Manager and internal attributes

54

objects: PeriodicTaskQuerySet # Custom QuerySet manager

55

no_changes: bool # Internal flag for change tracking

56

57

@property

58

def scheduler(self) -> Union[IntervalSchedule, CrontabSchedule, SolarSchedule, ClockedSchedule]: ...

59

60

@property

61

def schedule(self) -> schedules.BaseSchedule: ...

62

63

@property

64

def expires_(self) -> Optional[datetime.datetime]: ...

65

66

def due_start_time(self, tz: Optional[tzinfo.tzinfo] = None) -> datetime.datetime: ...

67

68

def validate_unique(self): ...

69

```

70

71

**Usage Examples:**

72

73

```python

74

from django_celery_beat.models import PeriodicTask, IntervalSchedule, CrontabSchedule

75

import json

76

from datetime import datetime, timedelta

77

78

# Create task with interval schedule

79

schedule = IntervalSchedule.objects.create(every=30, period=IntervalSchedule.MINUTES)

80

task = PeriodicTask.objects.create(

81

name='Process queue every 30 minutes',

82

task='myapp.tasks.process_queue',

83

interval=schedule,

84

args=json.dumps([]),

85

kwargs=json.dumps({'priority': 'high'}),

86

enabled=True

87

)

88

89

# Create task with cron schedule and expiry

90

cron_schedule = CrontabSchedule.objects.create(

91

minute='0',

92

hour='2',

93

day_of_week='*',

94

day_of_month='*',

95

month_of_year='*'

96

)

97

task = PeriodicTask.objects.create(

98

name='Daily backup at 2 AM',

99

task='myapp.tasks.backup_database',

100

crontab=cron_schedule,

101

expires=datetime.utcnow() + timedelta(days=30),

102

queue='backup_queue',

103

description='Automated daily database backup'

104

)

105

106

# Create one-off task

107

task = PeriodicTask.objects.create(

108

name='One-time data migration',

109

task='myapp.tasks.migrate_data',

110

interval=IntervalSchedule.objects.create(every=1, period=IntervalSchedule.SECONDS),

111

one_off=True,

112

args=json.dumps(['migration_v2'])

113

)

114

```

115

116

### Task Change Tracking

117

118

Model for tracking when periodic tasks are modified to trigger scheduler updates.

119

120

```python { .api }

121

class PeriodicTasks(models.Model):

122

"""

123

Helper model to track when periodic task schedules change.

124

125

Contains a single row with ID=1 that gets updated whenever tasks are modified.

126

"""

127

ident: int # Primary key, always 1

128

last_update: datetime.datetime # Timestamp of last change

129

130

@classmethod

131

def changed(cls, instance: PeriodicTask, **kwargs): ...

132

133

@classmethod

134

def update_changed(cls, **kwargs): ...

135

136

@classmethod

137

def last_change(cls) -> Optional[datetime.datetime]: ...

138

```

139

140

**Usage Examples:**

141

142

```python

143

from django_celery_beat.models import PeriodicTasks, PeriodicTask

144

145

# Check when tasks were last changed

146

last_change = PeriodicTasks.last_change()

147

print(f"Tasks last modified: {last_change}")

148

149

# Manually update change timestamp (for bulk operations)

150

# Note: This is automatically called when tasks are saved/deleted

151

PeriodicTask.objects.filter(enabled=False).update(enabled=True)

152

PeriodicTasks.update_changed() # Notify scheduler of bulk changes

153

154

# The changed() method is automatically called by Django signals

155

# when individual tasks are saved or deleted

156

```

157

158

### Custom QuerySet

159

160

Optimized queryset for periodic task queries with prefetch relationships.

161

162

```python { .api }

163

class PeriodicTaskQuerySet(QuerySet):

164

"""

165

Custom queryset for PeriodicTask with optimizations.

166

"""

167

def enabled(self) -> 'PeriodicTaskQuerySet': ...

168

```

169

170

**Usage Examples:**

171

172

```python

173

from django_celery_beat.models import PeriodicTask

174

175

# Get enabled tasks with optimized queries

176

enabled_tasks = PeriodicTask.objects.enabled()

177

178

# The enabled() method automatically includes:

179

# - prefetch_related('interval', 'crontab', 'solar', 'clocked')

180

# - filter(enabled=True)

181

182

for task in enabled_tasks:

183

print(f"Task: {task.name}, Schedule: {task.scheduler}")

184

# No additional queries needed due to prefetch_related

185

```

186

187

## Task Configuration Patterns

188

189

### JSON Argument Handling

190

191

Tasks use JSON encoding for arguments to ensure database compatibility.

192

193

```python

194

import json

195

196

# Positional arguments

197

args = json.dumps(['arg1', 'arg2', 123])

198

199

# Keyword arguments

200

kwargs = json.dumps({

201

'param1': 'value1',

202

'param2': True,

203

'param3': {'nested': 'dict'}

204

})

205

206

# AMQP headers

207

headers = json.dumps({

208

'priority': 9,

209

'retry_policy': {'max_retries': 3}

210

})

211

212

task = PeriodicTask.objects.create(

213

name='Complex task',

214

task='myapp.tasks.complex_task',

215

interval=schedule,

216

args=args,

217

kwargs=kwargs,

218

headers=headers

219

)

220

```

221

222

### Routing Configuration

223

224

Tasks can be routed to specific queues, exchanges, and routing keys.

225

226

```python

227

# Route to specific queue

228

task = PeriodicTask.objects.create(

229

name='High priority task',

230

task='myapp.tasks.urgent_task',

231

interval=schedule,

232

queue='high_priority',

233

priority=9

234

)

235

236

# Use custom exchange and routing key

237

task = PeriodicTask.objects.create(

238

name='Custom routing task',

239

task='myapp.tasks.custom_task',

240

interval=schedule,

241

exchange='custom_exchange',

242

routing_key='custom.routing.key'

243

)

244

```

245

246

### Task Lifecycle Management

247

248

```python

249

from datetime import datetime, timedelta

250

251

# Temporarily disable task

252

task = PeriodicTask.objects.get(name='my_task')

253

task.enabled = False

254

task.save()

255

256

# Set task expiry

257

task.expires = datetime.utcnow() + timedelta(hours=24)

258

task.save()

259

260

# Create task that starts in the future

261

task = PeriodicTask.objects.create(

262

name='Future task',

263

task='myapp.tasks.future_task',

264

interval=schedule,

265

start_time=datetime.utcnow() + timedelta(hours=2)

266

)

267

268

# Reset task execution history

269

task.last_run_at = None

270

task.total_run_count = 0

271

task.save()

272

```

273

274

## Error Handling

275

276

### Validation Errors

277

278

The PeriodicTask model enforces several validation rules:

279

280

- Exactly one schedule type must be specified

281

- Task name must be unique

282

- JSON fields must contain valid JSON

283

- Priority must be between 0-255 if specified

284

285

```python

286

from django.core.exceptions import ValidationError

287

288

try:

289

# This will raise ValidationError - no schedule specified

290

task = PeriodicTask(

291

name='Invalid task',

292

task='myapp.tasks.test'

293

)

294

task.full_clean() # Triggers validation

295

except ValidationError as e:

296

print(f"Validation error: {e}")

297

298

try:

299

# This will raise ValidationError - multiple schedules

300

task = PeriodicTask(

301

name='Invalid task',

302

task='myapp.tasks.test',

303

interval=interval_schedule,

304

crontab=crontab_schedule # Can't have both

305

)

306

task.full_clean()

307

except ValidationError as e:

308

print(f"Validation error: {e}")

309

```

310

311

### Common Patterns

312

313

```python

314

# Safe task creation with error handling

315

def create_periodic_task(name, task_name, schedule, **kwargs):

316

try:

317

task = PeriodicTask.objects.create(

318

name=name,

319

task=task_name,

320

**{schedule_type: schedule for schedule_type in ['interval', 'crontab', 'solar', 'clocked']

321

if schedule_type in kwargs},

322

**{k: v for k, v in kwargs.items()

323

if k not in ['interval', 'crontab', 'solar', 'clocked']}

324

)

325

return task

326

except ValidationError as e:

327

print(f"Failed to create task {name}: {e}")

328

return None

329

except Exception as e:

330

print(f"Unexpected error creating task {name}: {e}")

331

return None

332

333

# Get or create pattern with proper error handling

334

def get_or_create_task(name, task_name, schedule_kwargs, task_kwargs=None):

335

try:

336

task, created = PeriodicTask.objects.get_or_create(

337

name=name,

338

defaults={

339

'task': task_name,

340

**schedule_kwargs,

341

**(task_kwargs or {})

342

}

343

)

344

return task, created

345

except Exception as e:

346

print(f"Error getting/creating task {name}: {e}")

347

return None, False

348

```