or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

admin.mdbackends.mdindex.mdmodels.mdviews.md

models.mddocs/

0

# Task Result Models

1

2

Django models for storing and querying Celery task results, group results, and chord coordination data. These models provide full ORM capabilities for result management and include custom managers with transaction retry logic.

3

4

## Capabilities

5

6

### Task Result Storage

7

8

The primary model for storing individual Celery task results with comprehensive metadata and execution information.

9

10

```python { .api }

11

class TaskResult(models.Model):

12

"""Task result and status storage model."""

13

14

# Identity and naming

15

task_id: models.CharField # max_length=255, unique=True

16

task_name: models.CharField # max_length=255, null=True

17

periodic_task_name: models.CharField # max_length=255, null=True

18

19

# Task parameters and execution context

20

task_args: models.TextField # null=True, JSON serialized positional arguments

21

task_kwargs: models.TextField # null=True, JSON serialized keyword arguments

22

worker: models.CharField # max_length=100, null=True, default=None

23

24

# Result and status information

25

status: models.CharField # max_length=50, default='PENDING'

26

result: models.TextField # null=True, default=None, editable=False

27

traceback: models.TextField # blank=True, null=True

28

meta: models.TextField # null=True, default=None, editable=False

29

30

# Content encoding and type

31

content_type: models.CharField # max_length=128

32

content_encoding: models.CharField # max_length=64

33

34

# Timestamps

35

date_created: models.DateTimeField # auto_now_add=True

36

date_started: models.DateTimeField # null=True, default=None

37

date_done: models.DateTimeField # auto_now=True

38

39

# Custom manager

40

objects: TaskResultManager

41

42

def as_dict(self):

43

"""

44

Convert task result to dictionary.

45

46

Returns:

47

dict: Task data as dictionary with all fields

48

"""

49

50

def __str__(self):

51

"""String representation showing task ID and status."""

52

53

class Meta:

54

ordering = ['-date_done']

55

verbose_name = 'task result'

56

verbose_name_plural = 'task results'

57

```

58

59

#### Usage Example

60

61

```python

62

from django_celery_results.models import TaskResult

63

from django.utils import timezone

64

from datetime import timedelta

65

66

# Query recent successful tasks

67

recent_success = TaskResult.objects.filter(

68

status='SUCCESS',

69

date_done__gte=timezone.now() - timedelta(hours=1)

70

)

71

72

# Find failed tasks with traceback

73

failed_tasks = TaskResult.objects.filter(

74

status='FAILURE',

75

traceback__isnull=False

76

).order_by('-date_done')[:10]

77

78

# Get task by ID

79

try:

80

task = TaskResult.objects.get(task_id='abc123')

81

print(f"Task status: {task.status}")

82

print(f"Task result: {task.result}")

83

task_data = task.as_dict()

84

except TaskResult.DoesNotExist:

85

print("Task not found")

86

87

# Filter by task name

88

specific_tasks = TaskResult.objects.filter(

89

task_name='myapp.tasks.process_data',

90

status='SUCCESS'

91

)

92

```

93

94

### Group Result Storage

95

96

Model for storing results of Celery group operations, enabling tracking of multiple related tasks.

97

98

```python { .api }

99

class GroupResult(models.Model):

100

"""Task group result and status storage model."""

101

102

# Identity

103

group_id: models.CharField # max_length=255, unique=True

104

105

# Result information

106

result: models.TextField # null=True, default=None, editable=False

107

content_type: models.CharField # max_length=128

108

content_encoding: models.CharField # max_length=64

109

110

# Timestamps

111

date_created: models.DateTimeField # auto_now_add=True

112

date_done: models.DateTimeField # auto_now=True

113

114

# Custom manager

115

objects: GroupResultManager

116

117

def as_dict(self):

118

"""

119

Convert group result to dictionary.

120

121

Returns:

122

dict: Group data as dictionary with all fields

123

"""

124

125

def __str__(self):

126

"""String representation showing group ID."""

127

128

class Meta:

129

ordering = ['-date_done']

130

verbose_name = 'group result'

131

verbose_name_plural = 'group results'

132

```

133

134

#### Usage Example

135

136

```python

137

from django_celery_results.models import GroupResult

138

139

# Query recent group results

140

recent_groups = GroupResult.objects.filter(

141

date_done__gte=timezone.now() - timedelta(days=1)

142

).order_by('-date_done')

143

144

# Get specific group

145

try:

146

group = GroupResult.objects.get(group_id='group-abc123')

147

group_data = group.as_dict()

148

print(f"Group completed: {group.date_done}")

149

except GroupResult.DoesNotExist:

150

print("Group not found")

151

```

152

153

### Chord Coordination

154

155

Model for managing Celery chord synchronization, tracking completion of chord header tasks.

156

157

```python { .api }

158

class ChordCounter(models.Model):

159

"""Chord synchronization and coordination model."""

160

161

# Identity and coordination

162

group_id: models.CharField # max_length=255, unique=True

163

sub_tasks: models.TextField # JSON serialized list of task result tuples

164

count: models.PositiveIntegerField # Starts at chord header length, decrements

165

166

def group_result(self, app=None):

167

"""

168

Return the GroupResult instance for this chord.

169

170

Args:

171

app: Celery app instance (optional)

172

173

Returns:

174

celery.result.GroupResult: Group result with all sub-task results

175

"""

176

```

177

178

#### Usage Example

179

180

```python

181

from django_celery_results.models import ChordCounter

182

from celery import current_app

183

184

# Check chord progress

185

try:

186

chord = ChordCounter.objects.get(group_id='chord-group-123')

187

print(f"Remaining tasks: {chord.count}")

188

189

# Get full group result

190

group_result = chord.group_result(app=current_app)

191

print(f"Group ready: {group_result.ready()}")

192

193

if group_result.ready():

194

results = group_result.results

195

print(f"All {len(results)} tasks completed")

196

except ChordCounter.DoesNotExist:

197

print("Chord already completed or not found")

198

```

199

200

## Custom Managers

201

202

### ResultManager

203

204

Base manager class providing common functionality for both TaskResult and GroupResult managers.

205

206

```python { .api }

207

class ResultManager(models.Manager):

208

"""Base manager for celery result models."""

209

210

def connection_for_write(self):

211

"""Get database connection for write operations."""

212

213

def connection_for_read(self):

214

"""Get database connection for read operations."""

215

216

def current_engine(self):

217

"""Get current database engine name."""

218

219

def get_all_expired(self, expires):

220

"""

221

Get all expired results.

222

223

Args:

224

expires: Expiration time delta

225

226

Returns:

227

QuerySet: Expired result instances

228

"""

229

230

def delete_expired(self, expires):

231

"""

232

Delete all expired results.

233

234

Args:

235

expires: Expiration time delta

236

"""

237

238

def warn_if_repeatable_read(self):

239

"""Warn if MySQL transaction isolation level is suboptimal."""

240

```

241

242

### TaskResultManager

243

244

Custom manager for TaskResult model with enhanced database operations and retry logic.

245

246

```python { .api }

247

class TaskResultManager(ResultManager):

248

"""Manager for TaskResult model with retry logic."""

249

250

_last_id: str # Last queried task ID for optimization

251

252

def get_task(self, task_id):

253

"""

254

Get or create task result by task ID.

255

256

Args:

257

task_id (str): Task ID to retrieve

258

259

Returns:

260

TaskResult: Existing or new TaskResult instance

261

"""

262

263

def store_result(self, content_type, content_encoding, task_id, result,

264

status, traceback=None, meta=None, periodic_task_name=None,

265

task_name=None, task_args=None, task_kwargs=None,

266

worker=None, using=None, **kwargs):

267

"""

268

Store task result with transaction retry logic.

269

270

Args:

271

content_type (str): MIME type of result content

272

content_encoding (str): Encoding type

273

task_id (str): Unique task identifier

274

result (str): Serialized task result

275

status (str): Task status

276

traceback (str, optional): Exception traceback

277

meta (str, optional): Serialized metadata

278

periodic_task_name (str, optional): Periodic task name

279

task_name (str, optional): Task name

280

task_args (str, optional): Serialized arguments

281

task_kwargs (str, optional): Serialized keyword arguments

282

worker (str, optional): Worker hostname

283

using (str, optional): Database connection name

284

**kwargs: Additional fields including date_started

285

286

Returns:

287

TaskResult: Created or updated TaskResult instance

288

"""

289

290

def get_all_expired(self, expires):

291

"""

292

Get all expired task results.

293

294

Args:

295

expires: Expiration time delta

296

297

Returns:

298

QuerySet: Expired TaskResult instances

299

"""

300

301

def delete_expired(self, expires):

302

"""

303

Delete expired task results.

304

305

Args:

306

expires: Expiration time delta

307

"""

308

309

def warn_if_repeatable_read(self):

310

"""Warn if MySQL transaction isolation level is suboptimal."""

311

```

312

313

### GroupResultManager

314

315

Custom manager for GroupResult model with database operations and retry logic.

316

317

```python { .api }

318

class GroupResultManager(ResultManager):

319

"""Manager for GroupResult model with retry logic."""

320

321

_last_id: str # Last queried group ID for optimization

322

323

def get_group(self, group_id):

324

"""

325

Get or create group result by group ID.

326

327

Args:

328

group_id (str): Group ID to retrieve

329

330

Returns:

331

GroupResult: Existing or new GroupResult instance

332

"""

333

334

def store_group_result(self, content_type, content_encoding, group_id,

335

result, using=None):

336

"""

337

Store group result with transaction retry logic.

338

339

Args:

340

content_type (str): MIME type of result content

341

content_encoding (str): Encoding type

342

group_id (str): Unique group identifier

343

result (str): Serialized group result

344

using (str, optional): Database connection name

345

346

Returns:

347

GroupResult: Created or updated GroupResult instance

348

"""

349

350

def get_all_expired(self, expires):

351

"""

352

Get all expired group results.

353

354

Args:

355

expires: Expiration time delta

356

357

Returns:

358

QuerySet: Expired GroupResult instances

359

"""

360

361

def delete_expired(self, expires):

362

"""

363

Delete expired group results.

364

365

Args:

366

expires: Expiration time delta

367

"""

368

369

def warn_if_repeatable_read(self):

370

"""Warn if MySQL transaction isolation level is suboptimal."""

371

```

372

373

## Database Schema

374

375

### Indexes

376

377

The models include optimized database indexes for common query patterns:

378

379

**TaskResult indexes:**

380

- `task_name` - For filtering by task type

381

- `status` - For filtering by task status

382

- `worker` - For filtering by worker

383

- `date_created` - For chronological ordering

384

- `date_done` - For completion time queries

385

- `periodic_task_name` - For periodic task queries

386

387

**GroupResult indexes:**

388

- `date_created` - For chronological ordering

389

- `date_done` - For completion time queries

390

391

### Configuration Settings

392

393

- **DJANGO_CELERY_RESULTS_TASK_ID_MAX_LENGTH**: Maximum length for task_id fields (default: 255)

394

- **DJANGO_CELERY_RESULTS['ALLOW_EDITS']**: Enable editing in Django admin (default: False)