OpenTelemetry Python API providing core abstractions for distributed tracing, metrics collection, and context propagation
—
Comprehensive context propagation system for distributed tracing and baggage across service boundaries. OpenTelemetry propagation enables automatic context extraction and injection using standard HTTP headers with support for W3C Trace Context and W3C Baggage specifications.
Extract and inject context using the globally configured propagator.
def extract(
carrier: CarrierT,
context: Optional[Context] = None,
getter: Getter[CarrierT] = default_getter,
) -> Context:
"""
Uses the configured propagator to extract a Context from the carrier.
Parameters:
- carrier: Object containing values used to construct a Context
- context: Optional Context to use, defaults to root context if not set
- getter: Object that can retrieve values from the carrier
Returns:
Context extracted from the carrier
"""
def inject(
carrier: CarrierT,
context: Optional[Context] = None,
setter: Setter[CarrierT] = default_setter,
) -> None:
"""
Uses the configured propagator to inject a Context into the carrier.
Parameters:
- carrier: Medium used by propagators to write values
- context: Optional Context to use, defaults to current context if not set
- setter: Object that can set values on the carrier
"""
def get_global_textmap() -> TextMapPropagator:
"""
Returns the global text map propagator.
Returns:
The current global TextMapPropagator instance
"""
def set_global_textmap(http_text_format: TextMapPropagator) -> None:
"""
Sets the global text map propagator.
Parameters:
- http_text_format: The TextMapPropagator to set as global
"""Base interface for implementing propagators that work with text-based carriers.
class TextMapPropagator(ABC):
"""Abstract base class for text map propagators."""
def extract(
self,
carrier: CarrierT,
context: Optional[Context] = None,
getter: Getter[CarrierT] = default_getter,
) -> Context:
"""
Extract context from a carrier.
Parameters:
- carrier: The carrier containing context data
- context: Optional context to merge extracted data into
- getter: Object to retrieve values from carrier
Returns:
Context with extracted data
"""
def inject(
self,
carrier: CarrierT,
context: Optional[Context] = None,
setter: Setter[CarrierT] = default_setter,
) -> None:
"""
Inject context into a carrier.
Parameters:
- carrier: The carrier to inject context into
- context: Optional context to inject, defaults to current context
- setter: Object to set values in carrier
"""
@property
def fields(self) -> Set[str]:
"""
Returns the fields this propagator will read or write.
Returns:
Set of field names used by this propagator
"""Generic interfaces for reading from and writing to different carrier types.
class Getter(Generic[CarrierT], ABC):
"""Abstract base class for retrieving values from carriers."""
def get(self, carrier: CarrierT, key: str) -> Optional[List[str]]:
"""
Retrieve the value(s) associated with the key from the carrier.
Parameters:
- carrier: The carrier to retrieve values from
- key: The key of the value to retrieve
Returns:
List of values associated with the key, or None if not found
"""
def keys(self, carrier: CarrierT) -> List[str]:
"""
Retrieve all keys from the carrier.
Parameters:
- carrier: The carrier to retrieve keys from
Returns:
List of all keys in the carrier
"""
class Setter(Generic[CarrierT], ABC):
"""Abstract base class for setting values in carriers."""
def set(self, carrier: CarrierT, key: str, value: str) -> None:
"""
Set a value in the carrier.
Parameters:
- carrier: The carrier to set the value in
- key: The key to set
- value: The value to set
"""Default implementations for common carrier patterns.
class DefaultGetter(Getter[Dict[str, str]]):
"""Default getter for dict-like carriers."""
def get(self, carrier: Dict[str, str], key: str) -> Optional[List[str]]:
"""Get value from dict carrier."""
def keys(self, carrier: Dict[str, str]) -> List[str]:
"""Get all keys from dict carrier."""
class DefaultSetter(Setter[Dict[str, str]]):
"""Default setter for dict-like carriers."""
def set(self, carrier: Dict[str, str], key: str, value: str) -> None:
"""Set value in dict carrier."""
default_getter: DefaultGetter
default_setter: DefaultSetterCombine multiple propagators for comprehensive context propagation.
class CompositePropagator(TextMapPropagator):
"""Propagator that combines multiple propagators."""
def __init__(self, propagators: Sequence[TextMapPropagator]) -> None:
"""
Initialize with a list of propagators.
Parameters:
- propagators: Sequence of TextMapPropagator instances to combine
"""
def extract(
self,
carrier: CarrierT,
context: Optional[Context] = None,
getter: Getter[CarrierT] = default_getter,
) -> Context:
"""Extract using all propagators in sequence."""
def inject(
self,
carrier: CarrierT,
context: Optional[Context] = None,
setter: Setter[CarrierT] = default_setter,
) -> None:
"""Inject using all propagators."""
@property
def fields(self) -> Set[str]:
"""Return combined fields from all propagators."""Standard W3C Trace Context propagation implementation.
class TraceContextTextMapPropagator(TextMapPropagator):
"""W3C Trace Context propagator for trace correlation."""
def extract(
self,
carrier: CarrierT,
context: Optional[Context] = None,
getter: Getter[CarrierT] = default_getter,
) -> Context:
"""
Extract trace context from traceparent and tracestate headers.
Parameters:
- carrier: The carrier containing HTTP headers or equivalent
- context: Optional context to merge extracted data into
- getter: Object to retrieve header values from carrier
Returns:
Context with extracted trace information
"""
def inject(
self,
carrier: CarrierT,
context: Optional[Context] = None,
setter: Setter[CarrierT] = default_setter,
) -> None:
"""
Inject trace context into traceparent and tracestate headers.
Parameters:
- carrier: The carrier to inject headers into
- context: Optional context to inject, defaults to current context
- setter: Object to set header values in carrier
"""
@property
def fields(self) -> Set[str]:
"""Returns {'traceparent', 'tracestate'}."""Standard W3C Baggage propagation implementation.
class W3CBaggagePropagator(TextMapPropagator):
"""W3C Baggage propagator for cross-service data propagation."""
def extract(
self,
carrier: CarrierT,
context: Optional[Context] = None,
getter: Getter[CarrierT] = default_getter,
) -> Context:
"""
Extract baggage from baggage header.
Parameters:
- carrier: The carrier containing HTTP headers or equivalent
- context: Optional context to merge extracted data into
- getter: Object to retrieve header values from carrier
Returns:
Context with extracted baggage information
"""
def inject(
self,
carrier: CarrierT,
context: Optional[Context] = None,
setter: Setter[CarrierT] = default_setter,
) -> None:
"""
Inject baggage into baggage header.
Parameters:
- carrier: The carrier to inject headers into
- context: Optional context to inject, defaults to current context
- setter: Object to set header values in carrier
"""
@property
def fields(self) -> Set[str]:
"""Returns {'baggage'}."""Manage baggage key-value pairs for cross-service context propagation.
def get_all(context: Optional[Context] = None) -> Mapping[str, object]:
"""
Returns all name/value pairs in the Baggage.
Parameters:
- context: The Context to use, if not set uses current Context
Returns:
The name/value pairs in the Baggage as a read-only mapping
"""
def get_baggage(name: str, context: Optional[Context] = None) -> Optional[object]:
"""
Provides access to the value for a name/value pair in the Baggage.
Parameters:
- name: The name of the value to retrieve
- context: The Context to use, if not set uses current Context
Returns:
The value associated with the given name, or None if not present
"""
def set_baggage(
name: str,
value: object,
context: Optional[Context] = None
) -> Context:
"""
Sets a value in the Baggage.
Parameters:
- name: The name of the value to set
- value: The value to set
- context: The Context to use, if not set uses current Context
Returns:
A Context with the value updated
"""
def remove_baggage(name: str, context: Optional[Context] = None) -> Context:
"""
Removes a value from the Baggage.
Parameters:
- name: The name of the value to remove
- context: The Context to use, if not set uses current Context
Returns:
A Context with the name/value removed
"""
def clear(context: Optional[Context] = None) -> Context:
"""
Removes all values from the Baggage.
Parameters:
- context: The Context to use, if not set uses current Context
Returns:
A Context with all baggage entries removed
"""from opentelemetry import propagate, trace, baggage
import requests
# Server side - extract context from incoming request
def handle_request(request):
# Extract context from HTTP headers
context = propagate.extract(request.headers)
# Activate the extracted context
token = context.attach(context)
try:
# Process request with trace context
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span("handle-request") as span:
span.set_attribute("http.method", request.method)
# Baggage is automatically available
user_id = baggage.get_baggage("user.id")
if user_id:
span.set_attribute("user.id", user_id)
return process_request()
finally:
context.detach(token)
# Client side - inject context into outgoing request
def make_request():
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span("http-request") as span:
# Set baggage for propagation
ctx = baggage.set_baggage("user.id", "12345")
token = context.attach(ctx)
try:
# Prepare outgoing request
headers = {}
# Inject current context into headers
propagate.inject(headers)
# Make request with propagated context
response = requests.get(
"http://api.example.com/data",
headers=headers
)
span.set_attribute("http.status_code", response.status_code)
return response.json()
finally:
context.detach(token)from opentelemetry import propagate
from opentelemetry.propagators.textmap import Getter, Setter
from typing import Dict, List, Optional
# Custom carrier for message queue metadata
class MessageMetadata:
def __init__(self):
self.headers: Dict[str, str] = {}
def get_header(self, key: str) -> Optional[str]:
return self.headers.get(key)
def set_header(self, key: str, value: str) -> None:
self.headers[key] = value
def get_all_headers(self) -> Dict[str, str]:
return self.headers.copy()
# Custom getter for message metadata
class MessageGetter(Getter[MessageMetadata]):
def get(self, carrier: MessageMetadata, key: str) -> Optional[List[str]]:
value = carrier.get_header(key)
return [value] if value is not None else None
def keys(self, carrier: MessageMetadata) -> List[str]:
return list(carrier.get_all_headers().keys())
# Custom setter for message metadata
class MessageSetter(Setter[MessageMetadata]):
def set(self, carrier: MessageMetadata, key: str, value: str) -> None:
carrier.set_header(key, value)
# Usage with custom carrier
def publish_message(data):
metadata = MessageMetadata()
# Inject context into message metadata
propagate.inject(
metadata,
setter=MessageSetter()
)
# Publish with propagated context
message_broker.publish(data, metadata)
def consume_message(data, metadata):
# Extract context from message metadata
context = propagate.extract(
metadata,
getter=MessageGetter()
)
# Process with extracted context
token = context.attach(context)
try:
process_message(data)
finally:
context.detach(token)from flask import Flask, request, g
from opentelemetry import propagate, trace, baggage
from opentelemetry.propagators.textmap import Getter
app = Flask(__name__)
# Custom getter for Flask request headers
class FlaskRequestGetter(Getter):
def get(self, carrier, key: str):
return carrier.headers.getlist(key)
def keys(self, carrier):
return list(carrier.headers.keys())
@app.before_request
def before_request():
# Extract context from incoming request
context = propagate.extract(
request,
getter=FlaskRequestGetter()
)
# Store context in Flask's g object
g.otel_context = context
g.otel_token = context.attach(context)
@app.after_request
def after_request(response):
# Clean up context
if hasattr(g, 'otel_token'):
context.detach(g.otel_token)
return response
@app.route('/api/users')
def get_users():
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span("get-users") as span:
# Context and baggage automatically available
user_role = baggage.get_baggage("user.role")
span.set_attribute("http.method", request.method)
span.set_attribute("http.url", request.url)
if user_role:
span.set_attribute("user.role", user_role)
return {"users": get_user_list()}
def make_external_request(url):
"""Make external request with context propagation."""
headers = {}
# Inject current context
propagate.inject(headers)
# Make request
response = requests.get(url, headers=headers)
return response.json()import os
from opentelemetry import propagate
from opentelemetry.propagators.composite import CompositePropagator
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
from opentelemetry.baggage.propagation import W3CBaggagePropagator
# Configure propagators based on environment
def setup_propagators():
# Default propagators
propagators = [
TraceContextTextMapPropagator(),
W3CBaggagePropagator(),
]
# Check environment configuration
env_propagators = os.environ.get("OTEL_PROPAGATORS", "tracecontext,baggage")
if env_propagators == "none":
# Disable all propagation
propagators = []
elif "jaeger" in env_propagators:
# Add Jaeger propagator if available
try:
from opentelemetry.propagators.jaeger import JaegerPropagator
propagators.append(JaegerPropagator())
except ImportError:
pass
# Set global propagator
composite = CompositePropagator(propagators)
propagate.set_global_textmap(composite)
# Initialize propagators
setup_propagators()from opentelemetry import propagate, trace, baggage
from opentelemetry.propagators.textmap import DefaultGetter, DefaultSetter
def test_context_propagation():
"""Test context extraction and injection."""
tracer = trace.get_tracer(__name__)
# Create a span and set baggage
with tracer.start_as_current_span("test-span") as span:
ctx = baggage.set_baggage("test.key", "test.value")
token = context.attach(ctx)
try:
# Inject into headers
headers = {}
propagate.inject(headers)
print("Injected headers:")
for key, value in headers.items():
print(f" {key}: {value}")
# Extract from headers (simulating different service)
extracted_context = propagate.extract(headers)
# Verify extraction
token2 = context.attach(extracted_context)
try:
# Should have baggage and trace context
test_value = baggage.get_baggage("test.key")
current_span = trace.get_current_span()
print(f"Extracted baggage: {test_value}")
print(f"Extracted span: {current_span.get_span_context()}")
assert test_value == "test.value"
assert current_span.get_span_context().is_valid
finally:
context.detach(token2)
finally:
context.detach(token)
# Run test
test_context_propagation()from typing import TypeVar, Generic, Dict, List, Optional, Set
CarrierT = TypeVar("CarrierT") # Type variable for carrier objectsInstall with Tessl CLI
npx tessl i tessl/pypi-opentelemetry-api