Middleware correlating project logs to individual requests
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Integration with Celery for propagating correlation IDs from HTTP requests to background tasks, enabling end-to-end request tracing across async and task processing layers. This extension automatically transfers correlation context from web requests to Celery workers.
Automatically transfers correlation IDs from HTTP requests to Celery workers when tasks are spawned from request contexts.
def load_correlation_ids(
header_key: str = 'CORRELATION_ID',
generator: Callable[[], str] = uuid_hex_generator
) -> None:
"""
Transfer correlation IDs from HTTP request to Celery worker.
This function is called automatically when Celery is installed.
It sets up signal handlers to:
1. Transfer correlation ID from request thread to Celery worker headers
2. Load correlation ID in worker process from headers
3. Generate new ID if none exists
4. Clean up context when task completes
Parameters:
- header_key: Header key for passing correlation ID (default: 'CORRELATION_ID')
- generator: Function to generate new correlation IDs (default: uuid_hex_generator)
"""This function connects to Celery signals to enable automatic correlation:
before_task_publish: Captures correlation ID from current request context and adds it to task headerstask_prerun: Extracts correlation ID from task headers and sets it in worker contexttask_postrun: Cleans up correlation ID context to prevent reuseEnables tracking of parent-child relationships between Celery tasks and their spawning processes.
def load_celery_current_and_parent_ids(
header_key: str = 'CELERY_PARENT_ID',
generator: Callable[[], str] = uuid_hex_generator,
use_internal_celery_task_id: bool = False
) -> None:
"""
Configure Celery event hooks for generating tracing IDs with depth.
This function must be called manually during application startup.
It enables hierarchical task tracing by tracking:
- Parent ID: The correlation ID of the process that spawned this task
- Current ID: A unique ID for the current task process
Parameters:
- header_key: Header key for passing parent ID (default: 'CELERY_PARENT_ID')
- generator: Function to generate new task IDs (default: uuid_hex_generator)
- use_internal_celery_task_id: Use Celery's task_id instead of generated ID
"""This enables sophisticated task tracing patterns:
Default generator function for creating correlation IDs in Celery contexts.
uuid_hex_generator: Callable[[], str]This is a lambda function that generates UUID4 hex strings: lambda: uuid4().hex
Can be replaced with custom generators for different ID formats:
def custom_generator():
return f"task-{uuid4().hex[:8]}"
load_correlation_ids(generator=custom_generator)# Celery extension is loaded automatically when Celery is installed
from fastapi import FastAPI
from asgi_correlation_id import CorrelationIdMiddleware
app = FastAPI()
app.add_middleware(CorrelationIdMiddleware)
# Celery tasks will automatically inherit correlation IDs from requestsfrom celery import Celery
from asgi_correlation_id.extensions.celery import load_celery_current_and_parent_ids
app = Celery('myapp')
# Enable hierarchical task tracking during app startup
load_celery_current_and_parent_ids()
@app.task
def process_data(data):
from asgi_correlation_id import celery_current_id, celery_parent_id
current = celery_current_id.get()
parent = celery_parent_id.get()
logger.info(f"Task {current} spawned by {parent}")
return process(data)
@app.task
def spawn_subtasks(batch_data):
# This task's current ID becomes parent ID for subtasks
for item in batch_data:
process_data.delay(item)from asgi_correlation_id.extensions.celery import (
load_correlation_ids,
load_celery_current_and_parent_ids
)
# Custom correlation ID configuration
def custom_generator():
return f"req-{uuid4().hex[:12]}"
load_correlation_ids(
header_key='CUSTOM_CORRELATION_ID',
generator=custom_generator
)
# Custom hierarchical tracing
load_celery_current_and_parent_ids(
header_key='CUSTOM_PARENT_ID',
use_internal_celery_task_id=True # Use Celery's internal task ID
)from fastapi import FastAPI, BackgroundTasks
from celery import Celery
from asgi_correlation_id import CorrelationIdMiddleware, correlation_id
from asgi_correlation_id.extensions.celery import load_celery_current_and_parent_ids
# Setup FastAPI with correlation middleware
app = FastAPI()
app.add_middleware(CorrelationIdMiddleware)
# Setup Celery with hierarchical tracing
celery_app = Celery('tasks')
load_celery_current_and_parent_ids()
@celery_app.task
def process_order(order_id):
# Will have correlation context from originating request
logger.info(f"Processing order {order_id}")
return process(order_id)
@app.post("/orders")
async def create_order(order_data: dict):
# Correlation ID from middleware is available
request_id = correlation_id.get()
logger.info(f"Creating order in request {request_id}")
# Spawn Celery task - will inherit correlation ID
process_order.delay(order_data['id'])
return {"status": "order created", "correlation_id": request_id}The extension connects to three Celery signals:
Captures correlation context before task is sent to broker:
@before_task_publish.connect(weak=False)
def transfer_correlation_id(headers, **kwargs):
"""Add correlation ID to task headers before publishing."""
cid = correlation_id.get()
if cid:
headers[header_key] = cidSets up correlation context when task starts executing:
@task_prerun.connect(weak=False)
def load_correlation_id(task, **kwargs):
"""Load correlation ID from headers into worker context."""
id_value = task.request.get(header_key)
if id_value:
correlation_id.set(id_value)
sentry_extension(id_value) # Integrate with Sentry if availableCleans up context when task completes:
@task_postrun.connect(weak=False)
def cleanup(**kwargs):
"""Clear context vars to avoid reuse in next task."""
correlation_id.set(None)The basic correlation transfer is loaded automatically:
# In CorrelationIdMiddleware.__post_init__()
try:
import celery
from asgi_correlation_id.extensions.celery import load_correlation_ids
load_correlation_ids()
except ImportError:
pass # Celery not installed, skip extensionHierarchical tracing must be enabled manually:
# Call during application startup
load_celery_current_and_parent_ids()from typing import TYPE_CHECKING, Any, Callable, Dict
from uuid import uuid4
if TYPE_CHECKING:
from celery import Task
# Type definitions
TaskSignalHandler = Callable[[Any], None]
HeaderDict = Dict[str, str]
IdGenerator = Callable[[], str]Install with Tessl CLI
npx tessl i tessl/pypi-asgi-correlation-id