CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-asgi-correlation-id

Middleware correlating project logs to individual requests

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

celery-extension.mddocs/

Celery Extension

Integration with Celery for propagating correlation IDs from HTTP requests to background tasks, enabling end-to-end request tracing across async and task processing layers. This extension automatically transfers correlation context from web requests to Celery workers.

Capabilities

Basic Correlation ID Transfer

Automatically transfers correlation IDs from HTTP requests to Celery workers when tasks are spawned from request contexts.

def load_correlation_ids(
    header_key: str = 'CORRELATION_ID',
    generator: Callable[[], str] = uuid_hex_generator
) -> None:
    """
    Transfer correlation IDs from HTTP request to Celery worker.
    
    This function is called automatically when Celery is installed.
    It sets up signal handlers to:
    1. Transfer correlation ID from request thread to Celery worker headers
    2. Load correlation ID in worker process from headers  
    3. Generate new ID if none exists
    4. Clean up context when task completes
    
    Parameters:
    - header_key: Header key for passing correlation ID (default: 'CORRELATION_ID')
    - generator: Function to generate new correlation IDs (default: uuid_hex_generator)
    """

This function connects to Celery signals to enable automatic correlation:

  1. before_task_publish: Captures correlation ID from current request context and adds it to task headers
  2. task_prerun: Extracts correlation ID from task headers and sets it in worker context
  3. task_postrun: Cleans up correlation ID context to prevent reuse

Hierarchical Task Tracing

Enables tracking of parent-child relationships between Celery tasks and their spawning processes.

def load_celery_current_and_parent_ids(
    header_key: str = 'CELERY_PARENT_ID',
    generator: Callable[[], str] = uuid_hex_generator,
    use_internal_celery_task_id: bool = False
) -> None:
    """
    Configure Celery event hooks for generating tracing IDs with depth.
    
    This function must be called manually during application startup.
    It enables hierarchical task tracing by tracking:
    - Parent ID: The correlation ID of the process that spawned this task
    - Current ID: A unique ID for the current task process
    
    Parameters:
    - header_key: Header key for passing parent ID (default: 'CELERY_PARENT_ID')
    - generator: Function to generate new task IDs (default: uuid_hex_generator)
    - use_internal_celery_task_id: Use Celery's task_id instead of generated ID
    """

This enables sophisticated task tracing patterns:

  • Web request → Celery task (parent: request ID, current: task ID)
  • Celery task → Child task (parent: parent task ID, current: child task ID)
  • Task chains and workflows with full hierarchical visibility

UUID Generator

Default generator function for creating correlation IDs in Celery contexts.

uuid_hex_generator: Callable[[], str]

This is a lambda function that generates UUID4 hex strings: lambda: uuid4().hex

Can be replaced with custom generators for different ID formats:

def custom_generator():
    return f"task-{uuid4().hex[:8]}"

load_correlation_ids(generator=custom_generator)

Usage Examples

Basic Setup (Automatic)

# Celery extension is loaded automatically when Celery is installed
from fastapi import FastAPI
from asgi_correlation_id import CorrelationIdMiddleware

app = FastAPI()
app.add_middleware(CorrelationIdMiddleware)

# Celery tasks will automatically inherit correlation IDs from requests

Manual Hierarchical Setup

from celery import Celery
from asgi_correlation_id.extensions.celery import load_celery_current_and_parent_ids

app = Celery('myapp')

# Enable hierarchical task tracking during app startup
load_celery_current_and_parent_ids()

@app.task
def process_data(data):
    from asgi_correlation_id import celery_current_id, celery_parent_id
    
    current = celery_current_id.get()
    parent = celery_parent_id.get()
    
    logger.info(f"Task {current} spawned by {parent}")
    return process(data)

@app.task  
def spawn_subtasks(batch_data):
    # This task's current ID becomes parent ID for subtasks
    for item in batch_data:
        process_data.delay(item)

Custom Configuration

from asgi_correlation_id.extensions.celery import (
    load_correlation_ids,
    load_celery_current_and_parent_ids
)

# Custom correlation ID configuration
def custom_generator():
    return f"req-{uuid4().hex[:12]}"

load_correlation_ids(
    header_key='CUSTOM_CORRELATION_ID',
    generator=custom_generator
)

# Custom hierarchical tracing
load_celery_current_and_parent_ids(
    header_key='CUSTOM_PARENT_ID',
    use_internal_celery_task_id=True  # Use Celery's internal task ID
)

Integration with FastAPI

from fastapi import FastAPI, BackgroundTasks
from celery import Celery
from asgi_correlation_id import CorrelationIdMiddleware, correlation_id
from asgi_correlation_id.extensions.celery import load_celery_current_and_parent_ids

# Setup FastAPI with correlation middleware
app = FastAPI()
app.add_middleware(CorrelationIdMiddleware)

# Setup Celery with hierarchical tracing
celery_app = Celery('tasks')
load_celery_current_and_parent_ids()

@celery_app.task
def process_order(order_id):
    # Will have correlation context from originating request
    logger.info(f"Processing order {order_id}")
    return process(order_id)

@app.post("/orders")
async def create_order(order_data: dict):
    # Correlation ID from middleware is available
    request_id = correlation_id.get()
    logger.info(f"Creating order in request {request_id}")
    
    # Spawn Celery task - will inherit correlation ID
    process_order.delay(order_data['id'])
    
    return {"status": "order created", "correlation_id": request_id}

Signal Handlers

The extension connects to three Celery signals:

before_task_publish

Captures correlation context before task is sent to broker:

@before_task_publish.connect(weak=False)
def transfer_correlation_id(headers, **kwargs):
    """Add correlation ID to task headers before publishing."""
    cid = correlation_id.get()
    if cid:
        headers[header_key] = cid

task_prerun

Sets up correlation context when task starts executing:

@task_prerun.connect(weak=False) 
def load_correlation_id(task, **kwargs):
    """Load correlation ID from headers into worker context."""
    id_value = task.request.get(header_key)
    if id_value:
        correlation_id.set(id_value)
        sentry_extension(id_value)  # Integrate with Sentry if available

task_postrun

Cleans up context when task completes:

@task_postrun.connect(weak=False)
def cleanup(**kwargs):
    """Clear context vars to avoid reuse in next task."""
    correlation_id.set(None)

Extension Loading

The basic correlation transfer is loaded automatically:

# In CorrelationIdMiddleware.__post_init__()
try:
    import celery
    from asgi_correlation_id.extensions.celery import load_correlation_ids
    load_correlation_ids()
except ImportError:
    pass  # Celery not installed, skip extension

Hierarchical tracing must be enabled manually:

# Call during application startup
load_celery_current_and_parent_ids()

Types

from typing import TYPE_CHECKING, Any, Callable, Dict
from uuid import uuid4

if TYPE_CHECKING:
    from celery import Task

# Type definitions
TaskSignalHandler = Callable[[Any], None]
HeaderDict = Dict[str, str]
IdGenerator = Callable[[], str]

Install with Tessl CLI

npx tessl i tessl/pypi-asgi-correlation-id

docs

celery-extension.md

context.md

index.md

logging.md

middleware.md

sentry-extension.md

tile.json