Middleware correlating project logs to individual requests
npx @tessl/cli install tessl/pypi-asgi-correlation-id@4.3.0Middleware for reading or generating correlation IDs for each incoming request. Correlation IDs can then be added to your logs, making it simple to retrieve all logs generated from a single HTTP request. The middleware automatically detects correlation ID headers from incoming requests or generates new ones when none are present, then makes these IDs available throughout the request lifecycle for logging and tracing purposes.
pip install asgi-correlation-idfrom asgi_correlation_id import (
CorrelationIdMiddleware,
CorrelationIdFilter,
CeleryTracingIdsFilter,
correlation_id,
celery_current_id,
celery_parent_id,
)For extensions:
from asgi_correlation_id.extensions.celery import (
load_correlation_ids,
load_celery_current_and_parent_ids,
)
from asgi_correlation_id.extensions.sentry import (
get_sentry_extension,
set_transaction_id,
)from fastapi import FastAPI
from asgi_correlation_id import CorrelationIdMiddleware, CorrelationIdFilter
import logging
# Add middleware to your ASGI app
app = FastAPI()
app.add_middleware(CorrelationIdMiddleware)
# Configure logging with correlation ID filter
logging.basicConfig(
level=logging.INFO,
format='%(levelname)s [%(correlation_id)s] %(name)s %(message)s'
)
# Add the correlation ID filter to your logger
logger = logging.getLogger(__name__)
correlation_filter = CorrelationIdFilter()
logger.addFilter(correlation_filter)
@app.get("/")
async def root():
logger.info("This log will include the correlation ID")
return {"message": "Hello World"}The library follows a layered architecture:
CorrelationIdMiddleware intercepts requests, extracts or generates correlation IDs, and makes them available via context variablescorrelation_id, celery_current_id, celery_parent_id) store IDs throughout request/task lifecycleCorrelationIdFilter, CeleryTracingIdsFilter) automatically attach correlation IDs to log recordsThis design enables seamless request tracing across async applications, background tasks, and external service integrations.
Core middleware functionality for intercepting HTTP requests, extracting or generating correlation IDs, and making them available throughout the request lifecycle via context variables.
class CorrelationIdMiddleware:
def __init__(
self,
app,
header_name: str = 'X-Request-ID',
update_request_header: bool = True,
generator: Callable[[], str] = lambda: uuid4().hex,
validator: Optional[Callable[[str], bool]] = is_valid_uuid4,
transformer: Optional[Callable[[str], str]] = lambda a: a
): ...
async def __call__(self, scope, receive, send) -> None: ...
def is_valid_uuid4(uuid_: str) -> bool: ...Context variables for storing and accessing correlation IDs throughout async request/task execution, providing thread-safe access to tracking identifiers.
correlation_id: ContextVar[Optional[str]]
celery_parent_id: ContextVar[Optional[str]]
celery_current_id: ContextVar[Optional[str]]Log filters that automatically attach correlation IDs to log records, enabling seamless log correlation without manual ID injection.
class CorrelationIdFilter(logging.Filter):
def __init__(
self,
name: str = '',
uuid_length: Optional[int] = None,
default_value: Optional[str] = None
): ...
def filter(self, record) -> bool: ...
class CeleryTracingIdsFilter(logging.Filter):
def __init__(
self,
name: str = '',
uuid_length: Optional[int] = None,
default_value: Optional[str] = None
): ...
def filter(self, record) -> bool: ...Integration with Celery for propagating correlation IDs from HTTP requests to background tasks, enabling end-to-end request tracing across async and task processing layers.
def load_correlation_ids(
header_key: str = 'CORRELATION_ID',
generator: Callable[[], str] = uuid_hex_generator
) -> None: ...
def load_celery_current_and_parent_ids(
header_key: str = 'CELERY_PARENT_ID',
generator: Callable[[], str] = uuid_hex_generator,
use_internal_celery_task_id: bool = False
) -> None: ...
uuid_hex_generator: Callable[[], str]Integration with Sentry SDK for tagging events with correlation IDs, enabling correlation between application logs and error monitoring.
def get_sentry_extension() -> Callable[[str], None]: ...
def set_transaction_id(correlation_id: str) -> None: ...