0
# Celery Extension
1
2
Integration with Celery for propagating correlation IDs from HTTP requests to background tasks, enabling end-to-end request tracing across async and task processing layers. This extension automatically transfers correlation context from web requests to Celery workers.
3
4
## Capabilities
5
6
### Basic Correlation ID Transfer
7
8
Automatically transfers correlation IDs from HTTP requests to Celery workers when tasks are spawned from request contexts.
9
10
```python { .api }
11
def load_correlation_ids(
12
header_key: str = 'CORRELATION_ID',
13
generator: Callable[[], str] = uuid_hex_generator
14
) -> None:
15
"""
16
Transfer correlation IDs from HTTP request to Celery worker.
17
18
This function is called automatically when Celery is installed.
19
It sets up signal handlers to:
20
1. Transfer correlation ID from request thread to Celery worker headers
21
2. Load correlation ID in worker process from headers
22
3. Generate new ID if none exists
23
4. Clean up context when task completes
24
25
Parameters:
26
- header_key: Header key for passing correlation ID (default: 'CORRELATION_ID')
27
- generator: Function to generate new correlation IDs (default: uuid_hex_generator)
28
"""
29
```
30
31
This function connects to Celery signals to enable automatic correlation:
32
33
1. **`before_task_publish`**: Captures correlation ID from current request context and adds it to task headers
34
2. **`task_prerun`**: Extracts correlation ID from task headers and sets it in worker context
35
3. **`task_postrun`**: Cleans up correlation ID context to prevent reuse
36
37
### Hierarchical Task Tracing
38
39
Enables tracking of parent-child relationships between Celery tasks and their spawning processes.
40
41
```python { .api }
42
def load_celery_current_and_parent_ids(
43
header_key: str = 'CELERY_PARENT_ID',
44
generator: Callable[[], str] = uuid_hex_generator,
45
use_internal_celery_task_id: bool = False
46
) -> None:
47
"""
48
Configure Celery event hooks for generating tracing IDs with depth.
49
50
This function must be called manually during application startup.
51
It enables hierarchical task tracing by tracking:
52
- Parent ID: The correlation ID of the process that spawned this task
53
- Current ID: A unique ID for the current task process
54
55
Parameters:
56
- header_key: Header key for passing parent ID (default: 'CELERY_PARENT_ID')
57
- generator: Function to generate new task IDs (default: uuid_hex_generator)
58
- use_internal_celery_task_id: Use Celery's task_id instead of generated ID
59
"""
60
```
61
62
This enables sophisticated task tracing patterns:
63
- Web request → Celery task (parent: request ID, current: task ID)
64
- Celery task → Child task (parent: parent task ID, current: child task ID)
65
- Task chains and workflows with full hierarchical visibility
66
67
### UUID Generator
68
69
Default generator function for creating correlation IDs in Celery contexts.
70
71
```python { .api }
72
uuid_hex_generator: Callable[[], str]
73
```
74
75
This is a lambda function that generates UUID4 hex strings: `lambda: uuid4().hex`
76
77
Can be replaced with custom generators for different ID formats:
78
79
```python
80
def custom_generator():
81
return f"task-{uuid4().hex[:8]}"
82
83
load_correlation_ids(generator=custom_generator)
84
```
85
86
## Usage Examples
87
88
### Basic Setup (Automatic)
89
90
```python
91
# Celery extension is loaded automatically when Celery is installed
92
from fastapi import FastAPI
93
from asgi_correlation_id import CorrelationIdMiddleware
94
95
app = FastAPI()
96
app.add_middleware(CorrelationIdMiddleware)
97
98
# Celery tasks will automatically inherit correlation IDs from requests
99
```
100
101
### Manual Hierarchical Setup
102
103
```python
104
from celery import Celery
105
from asgi_correlation_id.extensions.celery import load_celery_current_and_parent_ids
106
107
app = Celery('myapp')
108
109
# Enable hierarchical task tracking during app startup
110
load_celery_current_and_parent_ids()
111
112
@app.task
113
def process_data(data):
114
from asgi_correlation_id import celery_current_id, celery_parent_id
115
116
current = celery_current_id.get()
117
parent = celery_parent_id.get()
118
119
logger.info(f"Task {current} spawned by {parent}")
120
return process(data)
121
122
@app.task
123
def spawn_subtasks(batch_data):
124
# This task's current ID becomes parent ID for subtasks
125
for item in batch_data:
126
process_data.delay(item)
127
```
128
129
### Custom Configuration
130
131
```python
132
from asgi_correlation_id.extensions.celery import (
133
load_correlation_ids,
134
load_celery_current_and_parent_ids
135
)
136
137
# Custom correlation ID configuration
138
def custom_generator():
139
return f"req-{uuid4().hex[:12]}"
140
141
load_correlation_ids(
142
header_key='CUSTOM_CORRELATION_ID',
143
generator=custom_generator
144
)
145
146
# Custom hierarchical tracing
147
load_celery_current_and_parent_ids(
148
header_key='CUSTOM_PARENT_ID',
149
use_internal_celery_task_id=True # Use Celery's internal task ID
150
)
151
```
152
153
### Integration with FastAPI
154
155
```python
156
from fastapi import FastAPI, BackgroundTasks
157
from celery import Celery
158
from asgi_correlation_id import CorrelationIdMiddleware, correlation_id
159
from asgi_correlation_id.extensions.celery import load_celery_current_and_parent_ids
160
161
# Setup FastAPI with correlation middleware
162
app = FastAPI()
163
app.add_middleware(CorrelationIdMiddleware)
164
165
# Setup Celery with hierarchical tracing
166
celery_app = Celery('tasks')
167
load_celery_current_and_parent_ids()
168
169
@celery_app.task
170
def process_order(order_id):
171
# Will have correlation context from originating request
172
logger.info(f"Processing order {order_id}")
173
return process(order_id)
174
175
@app.post("/orders")
176
async def create_order(order_data: dict):
177
# Correlation ID from middleware is available
178
request_id = correlation_id.get()
179
logger.info(f"Creating order in request {request_id}")
180
181
# Spawn Celery task - will inherit correlation ID
182
process_order.delay(order_data['id'])
183
184
return {"status": "order created", "correlation_id": request_id}
185
```
186
187
## Signal Handlers
188
189
The extension connects to three Celery signals:
190
191
### before_task_publish
192
193
Captures correlation context before task is sent to broker:
194
195
```python
196
@before_task_publish.connect(weak=False)
197
def transfer_correlation_id(headers, **kwargs):
198
"""Add correlation ID to task headers before publishing."""
199
cid = correlation_id.get()
200
if cid:
201
headers[header_key] = cid
202
```
203
204
### task_prerun
205
206
Sets up correlation context when task starts executing:
207
208
```python
209
@task_prerun.connect(weak=False)
210
def load_correlation_id(task, **kwargs):
211
"""Load correlation ID from headers into worker context."""
212
id_value = task.request.get(header_key)
213
if id_value:
214
correlation_id.set(id_value)
215
sentry_extension(id_value) # Integrate with Sentry if available
216
```
217
218
### task_postrun
219
220
Cleans up context when task completes:
221
222
```python
223
@task_postrun.connect(weak=False)
224
def cleanup(**kwargs):
225
"""Clear context vars to avoid reuse in next task."""
226
correlation_id.set(None)
227
```
228
229
## Extension Loading
230
231
The basic correlation transfer is loaded automatically:
232
233
```python
234
# In CorrelationIdMiddleware.__post_init__()
235
try:
236
import celery
237
from asgi_correlation_id.extensions.celery import load_correlation_ids
238
load_correlation_ids()
239
except ImportError:
240
pass # Celery not installed, skip extension
241
```
242
243
Hierarchical tracing must be enabled manually:
244
245
```python
246
# Call during application startup
247
load_celery_current_and_parent_ids()
248
```
249
250
## Types
251
252
```python { .api }
253
from typing import TYPE_CHECKING, Any, Callable, Dict
254
from uuid import uuid4
255
256
if TYPE_CHECKING:
257
from celery import Task
258
259
# Type definitions
260
TaskSignalHandler = Callable[[Any], None]
261
HeaderDict = Dict[str, str]
262
IdGenerator = Callable[[], str]
263
```