CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-opentelemetry-instrumentation-bedrock

OpenTelemetry instrumentation for AWS Bedrock runtime services providing automatic tracing, metrics, and event emission for AI model invocations

Pending
Overview
Eval results
Files

utilities.mddocs/

Utilities and Streaming

Utility functions, streaming response handling, and helper classes for managing configuration, error handling, content tracing controls, and reusable streaming response bodies.

Capabilities

Error Handling Utilities

Decorator and utility functions for robust error handling in instrumentation code, ensuring that observability never breaks application functionality.

def dont_throw(func):
    """
    Decorator that wraps functions to log exceptions instead of throwing them.
    
    Critical for instrumentation code where errors in observability
    must not impact the core application functionality. Logs exceptions
    and returns None on failure.
    
    Parameters:
    - func: Function to wrap with exception handling
    
    Returns:
    Wrapped function that logs exceptions instead of raising them
    """

Configuration Control Functions

Functions for determining instrumentation behavior based on environment variables and configuration settings.

def should_send_prompts() -> bool:
    """
    Determine if prompt content should be included in traces.
    
    Checks the TRACELOOP_TRACE_CONTENT environment variable to
    control whether sensitive prompt content is captured in spans
    and events for privacy and compliance.
    
    Returns:
    Boolean indicating if prompt content should be traced
    (default: true if TRACELOOP_TRACE_CONTENT != "false")
    """


def should_emit_events() -> bool:
    """
    Check if structured event emission is enabled.
    
    Determines whether to emit structured OpenTelemetry events
    based on the use_legacy_attributes configuration setting.
    
    Returns:
    Boolean indicating if events should be emitted
    (true when use_legacy_attributes=False)
    """

Environment Variable Constants

Constants for environment variables that control instrumentation behavior.

TRACELOOP_TRACE_CONTENT = "TRACELOOP_TRACE_CONTENT"
"""
Environment variable name for controlling content tracing.

Set to "false" to disable prompt and response content capture
for privacy and compliance requirements. All other values 
(including unset) enable content tracing.
"""

Streaming Response Wrapper

Advanced wrapper for streaming responses that enables comprehensive instrumentation of streaming AI model interactions.

class StreamingWrapper(ObjectProxy):
    """
    Wraps streaming responses for comprehensive instrumentation.
    
    Intercepts streaming response iteration to collect metrics,
    emit events, and set span attributes when the stream completes.
    Maintains full compatibility with the original streaming interface.
    """
    
    def __init__(self, response, stream_done_callback):
        """
        Initialize streaming wrapper with completion callback.
        
        Parameters:
        - response: Original streaming response object to wrap
        - stream_done_callback: Function called when stream completes
                               with accumulated response data
        """
    
    def __iter__(self):
        """
        Stream iterator with instrumentation.
        
        Provides iteration over streaming events while accumulating
        response data for final instrumentation when stream completes.
        
        Yields:
        Individual streaming events from the wrapped response
        """
    
    def _process_event(self, event):
        """
        Process individual streaming events.
        
        Extracts and accumulates relevant data from each streaming
        event for final instrumentation processing.
        
        Parameters:
        - event: Individual streaming event from the response
        
        Returns:
        Processed event (may be modified for instrumentation)
        """
    
    def _accumulate_events(self, event):
        """
        Accumulate response data from streaming events.
        
        Builds complete response data structure from individual
        streaming events for final span attribute setting and
        metrics collection.
        
        Parameters:
        - event: Streaming event containing partial response data
        """

Reusable Streaming Body

Enhanced streaming body implementation that allows multiple reads from the same response, essential for instrumentation without breaking application functionality.

class ReusableStreamingBody(StreamingBody):
    """
    Streaming body that allows multiple reads from the same response.
    
    Extends botocore's StreamingBody to buffer content on first read,
    enabling instrumentation to read response content without consuming
    the stream for the application. Essential for non-destructive
    instrumentation of HTTP response bodies.
    """
    
    def __init__(self, raw_stream, content_length):
        """
        Initialize reusable streaming body.
        
        Parameters:
        - raw_stream: Original stream object from HTTP response
        - content_length: Expected content length for buffering
        """
    
    def read(self, amt=None):
        """
        Read from buffered stream content.
        
        First call buffers the entire stream content. Subsequent
        calls read from the buffer, allowing multiple consumers
        to read the same response data.
        
        Parameters:
        - amt: Number of bytes to read (None for all remaining)
        
        Returns:
        Bytes from the response body
        """

Span Attribute Management

Comprehensive functions for setting detailed span attributes across all Bedrock API operations and model types.

def _set_span_attribute(span, name, value):
    """
    Utility function for safely setting span attributes.
    
    Sets span attributes only if the value is not None or empty string,
    preventing cluttered spans with empty attributes.
    
    Parameters:
    - span: OpenTelemetry span for attribute setting
    - name: Attribute name to set
    - value: Attribute value (set only if not None/empty)
    
    Returns:
    None
    """


def set_model_message_span_attributes(model_vendor, span, request_body):
    """
    Set span attributes for input messages to AI models.
    
    Extracts and sets comprehensive attributes for input prompts,
    parameters, and configuration from request data.
    
    Parameters:
    - model_vendor: AI model vendor identifier
    - span: OpenTelemetry span for attribute setting
    - request_body: Parsed request body containing input parameters
    """


def set_model_choice_span_attributes(model_vendor, span, response_body):
    """
    Set span attributes for AI model completion responses.
    
    Processes response data to set attributes for generated content,
    token usage, completion reasons, and response metadata.
    
    Parameters:
    - model_vendor: AI model vendor identifier  
    - span: OpenTelemetry span for attribute setting
    - response_body: Parsed response body containing completion data
    """


def set_model_span_attributes(
    provider,
    model_vendor, 
    model,
    span,
    request_body,
    response_body,
    headers,
    metric_params,
    kwargs
):
    """
    Set comprehensive span attributes for complete model interactions.
    
    Primary function for setting all relevant span attributes including
    model information, request/response data, performance metrics,
    and vendor-specific attributes.
    
    Parameters:
    - provider: Cloud provider (typically "AWS")
    - model_vendor: AI model vendor (anthropic, cohere, etc.)
    - model: Specific model identifier
    - span: OpenTelemetry span for attribute setting
    - request_body: Parsed request data
    - response_body: Parsed response data
    - headers: HTTP response headers
    - metric_params: MetricParams for metrics recording
    - kwargs: Additional request parameters
    """

Converse API Utilities

Specialized functions for handling Bedrock's modern converse API with its conversation-based interaction model.

def set_converse_model_span_attributes(span, provider, model, kwargs):
    """
    Set model-specific span attributes for converse API calls.
    
    Handles the conversation-based attribute setting for the
    modern Bedrock converse API format.
    
    Parameters:
    - span: OpenTelemetry span for attribute setting
    - provider: Cloud provider identifier
    - model: Model identifier
    - kwargs: Converse API request parameters
    """


def set_converse_input_prompt_span_attributes(kwargs, span):
    """
    Set input prompt attributes for converse API requests.
    
    Extracts and sets attributes for conversation messages,
    system prompts, and input configuration.
    
    Parameters:
    - kwargs: Converse API request parameters
    - span: OpenTelemetry span for attribute setting
    """


def set_converse_response_span_attributes(response, span):
    """
    Set response attributes for converse API responses.
    
    Processes converse API response format to set completion
    attributes, token usage, and response metadata.
    
    Parameters:
    - response: Converse API response object
    - span: OpenTelemetry span for attribute setting
    """


def set_converse_streaming_response_span_attributes(response, role, span):
    """
    Set response attributes for streaming converse API responses.
    
    Handles attribute setting for streaming converse responses
    with accumulated response data and completion metadata.
    
    Parameters:
    - response: Accumulated streaming response data
    - role: Response message role
    - span: OpenTelemetry span for attribute setting
    """


def converse_usage_record(span, response, metric_params):
    """
    Record usage metrics for converse API operations.
    
    Extracts and records token usage, request duration, and
    other utilization metrics from converse API responses.
    
    Parameters:
    - span: OpenTelemetry span for usage attributes
    - response: Converse API response with usage data
    - metric_params: MetricParams for metrics recording
    """

Usage Examples

Error-Safe Instrumentation

from opentelemetry.instrumentation.bedrock.utils import dont_throw

@dont_throw
def risky_instrumentation_function():
    """
    Function that might fail but shouldn't break the application.
    
    If this function throws an exception, it will be logged
    but not propagated to the calling application code.
    """
    # Potentially failing instrumentation code
    pass

# Use in instrumentation context
risky_instrumentation_function()  # Never throws, only logs errors

Content Tracing Control

from opentelemetry.instrumentation.bedrock.utils import should_send_prompts
import os

# Check if content tracing is enabled
if should_send_prompts():
    # Include prompt content in spans
    span.set_attribute("gen_ai.prompt", prompt_text)
else:
    # Skip content for privacy/compliance
    span.set_attribute("gen_ai.prompt", "[REDACTED]")

# Environment-based control
os.environ["TRACELOOP_TRACE_CONTENT"] = "false"  # Disable content tracing

Event Emission Control

from opentelemetry.instrumentation.bedrock.utils import should_emit_events
from opentelemetry.instrumentation.bedrock.event_emitter import emit_message_events

# Conditional event emission
if should_emit_events():
    # Emit structured events (semantic conventions)
    emit_message_events(event_logger, request_kwargs)
else:
    # Use legacy span attributes instead
    set_legacy_span_attributes(span, request_kwargs)

Streaming Response Handling

from opentelemetry.instrumentation.bedrock.streaming_wrapper import StreamingWrapper

def stream_completion_callback(response_body):
    """Called when streaming completes with full response data"""
    # Set final span attributes
    span.set_attribute("gen_ai.response.text", response_body.get("text"))
    # Record final metrics
    metrics.record_completion(response_body)
    # End the span
    span.end()

# Wrap streaming response
original_stream = bedrock_response['body']
instrumented_stream = StreamingWrapper(
    original_stream, 
    stream_done_callback=stream_completion_callback
)

# Use wrapped stream normally
for event in instrumented_stream:
    # Process streaming events
    # Instrumentation happens transparently
    process_event(event)

Reusable Response Bodies

from opentelemetry.instrumentation.bedrock.reusable_streaming_body import ReusableStreamingBody

# Create reusable body from HTTP response
original_body = http_response['body']
reusable_body = ReusableStreamingBody(
    original_body._raw_stream,
    original_body._content_length
)

# First read for instrumentation
response_data = reusable_body.read()
process_for_instrumentation(response_data)

# Second read for application (same data)
app_data = reusable_body.read()
return app_data  # Application gets complete response

Comprehensive Span Attribution

from opentelemetry.instrumentation.bedrock.span_utils import set_model_span_attributes

# Set complete span attributes for a model interaction
set_model_span_attributes(
    provider="AWS",
    model_vendor="anthropic", 
    model="claude-3-sonnet-20240229-v1:0",
    span=current_span,
    request_body=parsed_request,
    response_body=parsed_response,
    headers=http_headers,
    metric_params=metrics_container,
    kwargs=original_request_kwargs
)

Configuration Patterns

Privacy-Conscious Setup

import os
from opentelemetry.instrumentation.bedrock import BedrockInstrumentor

# Disable content tracing for privacy
os.environ["TRACELOOP_TRACE_CONTENT"] = "false"

# Enable structured events without content
BedrockInstrumentor(use_legacy_attributes=False).instrument()

Development vs Production Configuration

import os
from opentelemetry.instrumentation.bedrock import BedrockInstrumentor

def configure_bedrock_instrumentation():
    """Configure instrumentation based on environment"""
    
    is_production = os.getenv("ENVIRONMENT") == "production"
    
    if is_production:
        # Production: minimal content, structured events
        os.environ["TRACELOOP_TRACE_CONTENT"] = "false"
        instrumentor = BedrockInstrumentor(
            enrich_token_usage=True,  # Detailed token metrics
            use_legacy_attributes=False  # Structured events
        )
    else:
        # Development: full content, legacy attributes for compatibility
        instrumentor = BedrockInstrumentor(
            enrich_token_usage=False,  # Simpler metrics
            use_legacy_attributes=True  # Span attributes
        )
    
    instrumentor.instrument()
    return instrumentor

Performance Optimization

Lazy Initialization

Utilities support lazy initialization to minimize startup overhead:

# Functions check configuration only when called
should_send_prompts()  # Checks environment on each call
should_emit_events()   # Checks configuration on each call

Exception Handling Overhead

The @dont_throw decorator adds minimal overhead:

  • No exceptions: ~5-10 nanoseconds per call
  • With exceptions: Exception logging cost + graceful degradation
  • Memory impact: No additional memory allocation in success case

Streaming Performance

Streaming wrappers maintain near-native performance:

  • Iteration overhead: ~20-50 nanoseconds per event
  • Memory usage: Minimal buffering only for final attributes
  • Latency impact: No additional network round-trips

Install with Tessl CLI

npx tessl i tessl/pypi-opentelemetry-instrumentation-bedrock

docs

events.md

index.md

instrumentation.md

metrics.md

utilities.md

tile.json