or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

celery-extension.mdcontext.mdindex.mdlogging.mdmiddleware.mdsentry-extension.md

celery-extension.mddocs/

0

# Celery Extension

1

2

Integration with Celery for propagating correlation IDs from HTTP requests to background tasks, enabling end-to-end request tracing across async and task processing layers. This extension automatically transfers correlation context from web requests to Celery workers.

3

4

## Capabilities

5

6

### Basic Correlation ID Transfer

7

8

Automatically transfers correlation IDs from HTTP requests to Celery workers when tasks are spawned from request contexts.

9

10

```python { .api }

11

def load_correlation_ids(

12

header_key: str = 'CORRELATION_ID',

13

generator: Callable[[], str] = uuid_hex_generator

14

) -> None:

15

"""

16

Transfer correlation IDs from HTTP request to Celery worker.

17

18

This function is called automatically when Celery is installed.

19

It sets up signal handlers to:

20

1. Transfer correlation ID from request thread to Celery worker headers

21

2. Load correlation ID in worker process from headers

22

3. Generate new ID if none exists

23

4. Clean up context when task completes

24

25

Parameters:

26

- header_key: Header key for passing correlation ID (default: 'CORRELATION_ID')

27

- generator: Function to generate new correlation IDs (default: uuid_hex_generator)

28

"""

29

```

30

31

This function connects to Celery signals to enable automatic correlation:

32

33

1. **`before_task_publish`**: Captures correlation ID from current request context and adds it to task headers

34

2. **`task_prerun`**: Extracts correlation ID from task headers and sets it in worker context

35

3. **`task_postrun`**: Cleans up correlation ID context to prevent reuse

36

37

### Hierarchical Task Tracing

38

39

Enables tracking of parent-child relationships between Celery tasks and their spawning processes.

40

41

```python { .api }

42

def load_celery_current_and_parent_ids(

43

header_key: str = 'CELERY_PARENT_ID',

44

generator: Callable[[], str] = uuid_hex_generator,

45

use_internal_celery_task_id: bool = False

46

) -> None:

47

"""

48

Configure Celery event hooks for generating tracing IDs with depth.

49

50

This function must be called manually during application startup.

51

It enables hierarchical task tracing by tracking:

52

- Parent ID: The correlation ID of the process that spawned this task

53

- Current ID: A unique ID for the current task process

54

55

Parameters:

56

- header_key: Header key for passing parent ID (default: 'CELERY_PARENT_ID')

57

- generator: Function to generate new task IDs (default: uuid_hex_generator)

58

- use_internal_celery_task_id: Use Celery's task_id instead of generated ID

59

"""

60

```

61

62

This enables sophisticated task tracing patterns:

63

- Web request → Celery task (parent: request ID, current: task ID)

64

- Celery task → Child task (parent: parent task ID, current: child task ID)

65

- Task chains and workflows with full hierarchical visibility

66

67

### UUID Generator

68

69

Default generator function for creating correlation IDs in Celery contexts.

70

71

```python { .api }

72

uuid_hex_generator: Callable[[], str]

73

```

74

75

This is a lambda function that generates UUID4 hex strings: `lambda: uuid4().hex`

76

77

Can be replaced with custom generators for different ID formats:

78

79

```python

80

def custom_generator():

81

return f"task-{uuid4().hex[:8]}"

82

83

load_correlation_ids(generator=custom_generator)

84

```

85

86

## Usage Examples

87

88

### Basic Setup (Automatic)

89

90

```python

91

# Celery extension is loaded automatically when Celery is installed

92

from fastapi import FastAPI

93

from asgi_correlation_id import CorrelationIdMiddleware

94

95

app = FastAPI()

96

app.add_middleware(CorrelationIdMiddleware)

97

98

# Celery tasks will automatically inherit correlation IDs from requests

99

```

100

101

### Manual Hierarchical Setup

102

103

```python

104

from celery import Celery

105

from asgi_correlation_id.extensions.celery import load_celery_current_and_parent_ids

106

107

app = Celery('myapp')

108

109

# Enable hierarchical task tracking during app startup

110

load_celery_current_and_parent_ids()

111

112

@app.task

113

def process_data(data):

114

from asgi_correlation_id import celery_current_id, celery_parent_id

115

116

current = celery_current_id.get()

117

parent = celery_parent_id.get()

118

119

logger.info(f"Task {current} spawned by {parent}")

120

return process(data)

121

122

@app.task

123

def spawn_subtasks(batch_data):

124

# This task's current ID becomes parent ID for subtasks

125

for item in batch_data:

126

process_data.delay(item)

127

```

128

129

### Custom Configuration

130

131

```python

132

from asgi_correlation_id.extensions.celery import (

133

load_correlation_ids,

134

load_celery_current_and_parent_ids

135

)

136

137

# Custom correlation ID configuration

138

def custom_generator():

139

return f"req-{uuid4().hex[:12]}"

140

141

load_correlation_ids(

142

header_key='CUSTOM_CORRELATION_ID',

143

generator=custom_generator

144

)

145

146

# Custom hierarchical tracing

147

load_celery_current_and_parent_ids(

148

header_key='CUSTOM_PARENT_ID',

149

use_internal_celery_task_id=True # Use Celery's internal task ID

150

)

151

```

152

153

### Integration with FastAPI

154

155

```python

156

from fastapi import FastAPI, BackgroundTasks

157

from celery import Celery

158

from asgi_correlation_id import CorrelationIdMiddleware, correlation_id

159

from asgi_correlation_id.extensions.celery import load_celery_current_and_parent_ids

160

161

# Setup FastAPI with correlation middleware

162

app = FastAPI()

163

app.add_middleware(CorrelationIdMiddleware)

164

165

# Setup Celery with hierarchical tracing

166

celery_app = Celery('tasks')

167

load_celery_current_and_parent_ids()

168

169

@celery_app.task

170

def process_order(order_id):

171

# Will have correlation context from originating request

172

logger.info(f"Processing order {order_id}")

173

return process(order_id)

174

175

@app.post("/orders")

176

async def create_order(order_data: dict):

177

# Correlation ID from middleware is available

178

request_id = correlation_id.get()

179

logger.info(f"Creating order in request {request_id}")

180

181

# Spawn Celery task - will inherit correlation ID

182

process_order.delay(order_data['id'])

183

184

return {"status": "order created", "correlation_id": request_id}

185

```

186

187

## Signal Handlers

188

189

The extension connects to three Celery signals:

190

191

### before_task_publish

192

193

Captures correlation context before task is sent to broker:

194

195

```python

196

@before_task_publish.connect(weak=False)

197

def transfer_correlation_id(headers, **kwargs):

198

"""Add correlation ID to task headers before publishing."""

199

cid = correlation_id.get()

200

if cid:

201

headers[header_key] = cid

202

```

203

204

### task_prerun

205

206

Sets up correlation context when task starts executing:

207

208

```python

209

@task_prerun.connect(weak=False)

210

def load_correlation_id(task, **kwargs):

211

"""Load correlation ID from headers into worker context."""

212

id_value = task.request.get(header_key)

213

if id_value:

214

correlation_id.set(id_value)

215

sentry_extension(id_value) # Integrate with Sentry if available

216

```

217

218

### task_postrun

219

220

Cleans up context when task completes:

221

222

```python

223

@task_postrun.connect(weak=False)

224

def cleanup(**kwargs):

225

"""Clear context vars to avoid reuse in next task."""

226

correlation_id.set(None)

227

```

228

229

## Extension Loading

230

231

The basic correlation transfer is loaded automatically:

232

233

```python

234

# In CorrelationIdMiddleware.__post_init__()

235

try:

236

import celery

237

from asgi_correlation_id.extensions.celery import load_correlation_ids

238

load_correlation_ids()

239

except ImportError:

240

pass # Celery not installed, skip extension

241

```

242

243

Hierarchical tracing must be enabled manually:

244

245

```python

246

# Call during application startup

247

load_celery_current_and_parent_ids()

248

```

249

250

## Types

251

252

```python { .api }

253

from typing import TYPE_CHECKING, Any, Callable, Dict

254

from uuid import uuid4

255

256

if TYPE_CHECKING:

257

from celery import Task

258

259

# Type definitions

260

TaskSignalHandler = Callable[[Any], None]

261

HeaderDict = Dict[str, str]

262

IdGenerator = Callable[[], str]

263

```