CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-opentelemetry-api

OpenTelemetry Python API providing core abstractions for distributed tracing, metrics collection, and context propagation

Pending
Overview
Eval results
Files

propagation.mddocs/

Cross-Service Propagation

Comprehensive context propagation system for distributed tracing and baggage across service boundaries. OpenTelemetry propagation enables automatic context extraction and injection using standard HTTP headers with support for W3C Trace Context and W3C Baggage specifications.

Capabilities

Global Propagation Operations

Extract and inject context using the globally configured propagator.

def extract(
    carrier: CarrierT,
    context: Optional[Context] = None,
    getter: Getter[CarrierT] = default_getter,
) -> Context:
    """
    Uses the configured propagator to extract a Context from the carrier.
    
    Parameters:
    - carrier: Object containing values used to construct a Context
    - context: Optional Context to use, defaults to root context if not set
    - getter: Object that can retrieve values from the carrier
    
    Returns:
    Context extracted from the carrier
    """

def inject(
    carrier: CarrierT,
    context: Optional[Context] = None,
    setter: Setter[CarrierT] = default_setter,
) -> None:
    """
    Uses the configured propagator to inject a Context into the carrier.
    
    Parameters:
    - carrier: Medium used by propagators to write values
    - context: Optional Context to use, defaults to current context if not set
    - setter: Object that can set values on the carrier
    """

def get_global_textmap() -> TextMapPropagator:
    """
    Returns the global text map propagator.
    
    Returns:
    The current global TextMapPropagator instance
    """

def set_global_textmap(http_text_format: TextMapPropagator) -> None:
    """
    Sets the global text map propagator.
    
    Parameters:
    - http_text_format: The TextMapPropagator to set as global
    """

Text Map Propagator Interface

Base interface for implementing propagators that work with text-based carriers.

class TextMapPropagator(ABC):
    """Abstract base class for text map propagators."""
    
    def extract(
        self,
        carrier: CarrierT,
        context: Optional[Context] = None,
        getter: Getter[CarrierT] = default_getter,
    ) -> Context:
        """
        Extract context from a carrier.
        
        Parameters:
        - carrier: The carrier containing context data
        - context: Optional context to merge extracted data into
        - getter: Object to retrieve values from carrier
        
        Returns:
        Context with extracted data
        """
    
    def inject(
        self,
        carrier: CarrierT,
        context: Optional[Context] = None,
        setter: Setter[CarrierT] = default_setter,
    ) -> None:
        """
        Inject context into a carrier.
        
        Parameters:
        - carrier: The carrier to inject context into
        - context: Optional context to inject, defaults to current context
        - setter: Object to set values in carrier
        """
    
    @property
    def fields(self) -> Set[str]:
        """
        Returns the fields this propagator will read or write.
        
        Returns:
        Set of field names used by this propagator
        """

Carrier Access Patterns

Generic interfaces for reading from and writing to different carrier types.

class Getter(Generic[CarrierT], ABC):
    """Abstract base class for retrieving values from carriers."""
    
    def get(self, carrier: CarrierT, key: str) -> Optional[List[str]]:
        """
        Retrieve the value(s) associated with the key from the carrier.
        
        Parameters:
        - carrier: The carrier to retrieve values from
        - key: The key of the value to retrieve
        
        Returns:
        List of values associated with the key, or None if not found
        """
    
    def keys(self, carrier: CarrierT) -> List[str]:
        """
        Retrieve all keys from the carrier.
        
        Parameters:
        - carrier: The carrier to retrieve keys from
        
        Returns:
        List of all keys in the carrier
        """

class Setter(Generic[CarrierT], ABC):
    """Abstract base class for setting values in carriers."""
    
    def set(self, carrier: CarrierT, key: str, value: str) -> None:
        """
        Set a value in the carrier.
        
        Parameters:
        - carrier: The carrier to set the value in
        - key: The key to set
        - value: The value to set
        """

Default Carrier Implementations

Default implementations for common carrier patterns.

class DefaultGetter(Getter[Dict[str, str]]):
    """Default getter for dict-like carriers."""
    
    def get(self, carrier: Dict[str, str], key: str) -> Optional[List[str]]:
        """Get value from dict carrier."""
    
    def keys(self, carrier: Dict[str, str]) -> List[str]:
        """Get all keys from dict carrier."""

class DefaultSetter(Setter[Dict[str, str]]):
    """Default setter for dict-like carriers."""
    
    def set(self, carrier: Dict[str, str], key: str, value: str) -> None:
        """Set value in dict carrier."""

default_getter: DefaultGetter
default_setter: DefaultSetter

Composite Propagator

Combine multiple propagators for comprehensive context propagation.

class CompositePropagator(TextMapPropagator):
    """Propagator that combines multiple propagators."""
    
    def __init__(self, propagators: Sequence[TextMapPropagator]) -> None:
        """
        Initialize with a list of propagators.
        
        Parameters:
        - propagators: Sequence of TextMapPropagator instances to combine
        """
    
    def extract(
        self,
        carrier: CarrierT,
        context: Optional[Context] = None,
        getter: Getter[CarrierT] = default_getter,
    ) -> Context:
        """Extract using all propagators in sequence."""
    
    def inject(
        self,
        carrier: CarrierT,
        context: Optional[Context] = None,
        setter: Setter[CarrierT] = default_setter,
    ) -> None:
        """Inject using all propagators."""
    
    @property
    def fields(self) -> Set[str]:
        """Return combined fields from all propagators."""

W3C Trace Context Propagator

Standard W3C Trace Context propagation implementation.

class TraceContextTextMapPropagator(TextMapPropagator):
    """W3C Trace Context propagator for trace correlation."""
    
    def extract(
        self,
        carrier: CarrierT,
        context: Optional[Context] = None,
        getter: Getter[CarrierT] = default_getter,
    ) -> Context:
        """
        Extract trace context from traceparent and tracestate headers.
        
        Parameters:
        - carrier: The carrier containing HTTP headers or equivalent
        - context: Optional context to merge extracted data into
        - getter: Object to retrieve header values from carrier
        
        Returns:
        Context with extracted trace information
        """
    
    def inject(
        self,
        carrier: CarrierT,
        context: Optional[Context] = None,
        setter: Setter[CarrierT] = default_setter,
    ) -> None:
        """
        Inject trace context into traceparent and tracestate headers.
        
        Parameters:
        - carrier: The carrier to inject headers into
        - context: Optional context to inject, defaults to current context
        - setter: Object to set header values in carrier
        """
    
    @property
    def fields(self) -> Set[str]:
        """Returns {'traceparent', 'tracestate'}."""

W3C Baggage Propagator

Standard W3C Baggage propagation implementation.

class W3CBaggagePropagator(TextMapPropagator):
    """W3C Baggage propagator for cross-service data propagation."""
    
    def extract(
        self,
        carrier: CarrierT,
        context: Optional[Context] = None,
        getter: Getter[CarrierT] = default_getter,
    ) -> Context:
        """
        Extract baggage from baggage header.
        
        Parameters:
        - carrier: The carrier containing HTTP headers or equivalent
        - context: Optional context to merge extracted data into
        - getter: Object to retrieve header values from carrier
        
        Returns:
        Context with extracted baggage information
        """
    
    def inject(
        self,
        carrier: CarrierT,
        context: Optional[Context] = None,
        setter: Setter[CarrierT] = default_setter,
    ) -> None:
        """
        Inject baggage into baggage header.
        
        Parameters:
        - carrier: The carrier to inject headers into
        - context: Optional context to inject, defaults to current context
        - setter: Object to set header values in carrier
        """
    
    @property  
    def fields(self) -> Set[str]:
        """Returns {'baggage'}."""

Baggage Operations

Manage baggage key-value pairs for cross-service context propagation.

def get_all(context: Optional[Context] = None) -> Mapping[str, object]:
    """
    Returns all name/value pairs in the Baggage.
    
    Parameters:
    - context: The Context to use, if not set uses current Context
    
    Returns:
    The name/value pairs in the Baggage as a read-only mapping
    """

def get_baggage(name: str, context: Optional[Context] = None) -> Optional[object]:
    """
    Provides access to the value for a name/value pair in the Baggage.
    
    Parameters:
    - name: The name of the value to retrieve
    - context: The Context to use, if not set uses current Context
    
    Returns:
    The value associated with the given name, or None if not present
    """

def set_baggage(
    name: str, 
    value: object, 
    context: Optional[Context] = None
) -> Context:
    """
    Sets a value in the Baggage.
    
    Parameters:
    - name: The name of the value to set
    - value: The value to set
    - context: The Context to use, if not set uses current Context
    
    Returns:
    A Context with the value updated
    """

def remove_baggage(name: str, context: Optional[Context] = None) -> Context:
    """
    Removes a value from the Baggage.
    
    Parameters:
    - name: The name of the value to remove
    - context: The Context to use, if not set uses current Context
    
    Returns:
    A Context with the name/value removed
    """

def clear(context: Optional[Context] = None) -> Context:
    """
    Removes all values from the Baggage.
    
    Parameters:
    - context: The Context to use, if not set uses current Context
    
    Returns:
    A Context with all baggage entries removed
    """

Usage Examples

Basic HTTP Header Propagation

from opentelemetry import propagate, trace, baggage
import requests

# Server side - extract context from incoming request
def handle_request(request):
    # Extract context from HTTP headers
    context = propagate.extract(request.headers)
    
    # Activate the extracted context
    token = context.attach(context)
    try:
        # Process request with trace context
        tracer = trace.get_tracer(__name__)
        with tracer.start_as_current_span("handle-request") as span:
            span.set_attribute("http.method", request.method)
            
            # Baggage is automatically available
            user_id = baggage.get_baggage("user.id")
            if user_id:
                span.set_attribute("user.id", user_id)
            
            return process_request()
    finally:
        context.detach(token)

# Client side - inject context into outgoing request  
def make_request():
    tracer = trace.get_tracer(__name__)
    with tracer.start_as_current_span("http-request") as span:
        # Set baggage for propagation
        ctx = baggage.set_baggage("user.id", "12345")
        token = context.attach(ctx)
        
        try:
            # Prepare outgoing request
            headers = {}
            
            # Inject current context into headers
            propagate.inject(headers)
            
            # Make request with propagated context
            response = requests.get(
                "http://api.example.com/data",
                headers=headers
            )
            
            span.set_attribute("http.status_code", response.status_code)
            return response.json()
        finally:
            context.detach(token)

Custom Carrier Implementation

from opentelemetry import propagate
from opentelemetry.propagators.textmap import Getter, Setter
from typing import Dict, List, Optional

# Custom carrier for message queue metadata
class MessageMetadata:
    def __init__(self):
        self.headers: Dict[str, str] = {}
    
    def get_header(self, key: str) -> Optional[str]:
        return self.headers.get(key)
    
    def set_header(self, key: str, value: str) -> None:
        self.headers[key] = value
    
    def get_all_headers(self) -> Dict[str, str]:
        return self.headers.copy()

# Custom getter for message metadata
class MessageGetter(Getter[MessageMetadata]):
    def get(self, carrier: MessageMetadata, key: str) -> Optional[List[str]]:
        value = carrier.get_header(key)
        return [value] if value is not None else None
    
    def keys(self, carrier: MessageMetadata) -> List[str]:
        return list(carrier.get_all_headers().keys())

# Custom setter for message metadata  
class MessageSetter(Setter[MessageMetadata]):
    def set(self, carrier: MessageMetadata, key: str, value: str) -> None:
        carrier.set_header(key, value)

# Usage with custom carrier
def publish_message(data):
    metadata = MessageMetadata()
    
    # Inject context into message metadata
    propagate.inject(
        metadata, 
        setter=MessageSetter()
    )
    
    # Publish with propagated context
    message_broker.publish(data, metadata)

def consume_message(data, metadata):
    # Extract context from message metadata
    context = propagate.extract(
        metadata,
        getter=MessageGetter()
    )
    
    # Process with extracted context
    token = context.attach(context)
    try:
        process_message(data)
    finally:
        context.detach(token)

Flask Integration Example

from flask import Flask, request, g
from opentelemetry import propagate, trace, baggage
from opentelemetry.propagators.textmap import Getter

app = Flask(__name__)

# Custom getter for Flask request headers
class FlaskRequestGetter(Getter):
    def get(self, carrier, key: str):
        return carrier.headers.getlist(key)
    
    def keys(self, carrier):
        return list(carrier.headers.keys())

@app.before_request
def before_request():
    # Extract context from incoming request
    context = propagate.extract(
        request,
        getter=FlaskRequestGetter()
    )
    
    # Store context in Flask's g object
    g.otel_context = context
    g.otel_token = context.attach(context)

@app.after_request  
def after_request(response):
    # Clean up context
    if hasattr(g, 'otel_token'):
        context.detach(g.otel_token)
    return response

@app.route('/api/users')
def get_users():
    tracer = trace.get_tracer(__name__)
    with tracer.start_as_current_span("get-users") as span:
        # Context and baggage automatically available
        user_role = baggage.get_baggage("user.role")
        
        span.set_attribute("http.method", request.method)
        span.set_attribute("http.url", request.url)
        
        if user_role:
            span.set_attribute("user.role", user_role)
        
        return {"users": get_user_list()}

def make_external_request(url):
    """Make external request with context propagation."""
    headers = {}
    
    # Inject current context
    propagate.inject(headers)
    
    # Make request
    response = requests.get(url, headers=headers)
    return response.json()

Environment-Based Propagator Configuration

import os
from opentelemetry import propagate
from opentelemetry.propagators.composite import CompositePropagator
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
from opentelemetry.baggage.propagation import W3CBaggagePropagator

# Configure propagators based on environment
def setup_propagators():
    # Default propagators
    propagators = [
        TraceContextTextMapPropagator(),
        W3CBaggagePropagator(),
    ]
    
    # Check environment configuration
    env_propagators = os.environ.get("OTEL_PROPAGATORS", "tracecontext,baggage")
    
    if env_propagators == "none":
        # Disable all propagation
        propagators = []
    elif "jaeger" in env_propagators:
        # Add Jaeger propagator if available
        try:
            from opentelemetry.propagators.jaeger import JaegerPropagator
            propagators.append(JaegerPropagator())
        except ImportError:
            pass
    
    # Set global propagator
    composite = CompositePropagator(propagators)
    propagate.set_global_textmap(composite)

# Initialize propagators
setup_propagators()

Testing Propagation

from opentelemetry import propagate, trace, baggage
from opentelemetry.propagators.textmap import DefaultGetter, DefaultSetter

def test_context_propagation():
    """Test context extraction and injection."""
    tracer = trace.get_tracer(__name__)
    
    # Create a span and set baggage
    with tracer.start_as_current_span("test-span") as span:
        ctx = baggage.set_baggage("test.key", "test.value")
        token = context.attach(ctx)
        
        try:
            # Inject into headers
            headers = {}
            propagate.inject(headers)
            
            print("Injected headers:")
            for key, value in headers.items():
                print(f"  {key}: {value}")
            
            # Extract from headers (simulating different service)
            extracted_context = propagate.extract(headers)
            
            # Verify extraction
            token2 = context.attach(extracted_context)
            try:
                # Should have baggage and trace context
                test_value = baggage.get_baggage("test.key")
                current_span = trace.get_current_span()
                
                print(f"Extracted baggage: {test_value}")
                print(f"Extracted span: {current_span.get_span_context()}")
                
                assert test_value == "test.value"
                assert current_span.get_span_context().is_valid
                
            finally:
                context.detach(token2)
        finally:
            context.detach(token)

# Run test
test_context_propagation()

Type Definitions

from typing import TypeVar, Generic, Dict, List, Optional, Set

CarrierT = TypeVar("CarrierT")  # Type variable for carrier objects

Install with Tessl CLI

npx tessl i tessl/pypi-opentelemetry-api

docs

context.md

index.md

logging.md

metrics.md

propagation.md

tracing.md

tile.json