OpenTelemetry instrumentation for AWS Bedrock runtime services providing automatic tracing, metrics, and event emission for AI model invocations
—
Utility functions, streaming response handling, and helper classes for managing configuration, error handling, content tracing controls, and reusable streaming response bodies.
Decorator and utility functions for robust error handling in instrumentation code, ensuring that observability never breaks application functionality.
def dont_throw(func):
"""
Decorator that wraps functions to log exceptions instead of throwing them.
Critical for instrumentation code where errors in observability
must not impact the core application functionality. Logs exceptions
and returns None on failure.
Parameters:
- func: Function to wrap with exception handling
Returns:
Wrapped function that logs exceptions instead of raising them
"""Functions for determining instrumentation behavior based on environment variables and configuration settings.
def should_send_prompts() -> bool:
"""
Determine if prompt content should be included in traces.
Checks the TRACELOOP_TRACE_CONTENT environment variable to
control whether sensitive prompt content is captured in spans
and events for privacy and compliance.
Returns:
Boolean indicating if prompt content should be traced
(default: true if TRACELOOP_TRACE_CONTENT != "false")
"""
def should_emit_events() -> bool:
"""
Check if structured event emission is enabled.
Determines whether to emit structured OpenTelemetry events
based on the use_legacy_attributes configuration setting.
Returns:
Boolean indicating if events should be emitted
(true when use_legacy_attributes=False)
"""Constants for environment variables that control instrumentation behavior.
TRACELOOP_TRACE_CONTENT = "TRACELOOP_TRACE_CONTENT"
"""
Environment variable name for controlling content tracing.
Set to "false" to disable prompt and response content capture
for privacy and compliance requirements. All other values
(including unset) enable content tracing.
"""Advanced wrapper for streaming responses that enables comprehensive instrumentation of streaming AI model interactions.
class StreamingWrapper(ObjectProxy):
"""
Wraps streaming responses for comprehensive instrumentation.
Intercepts streaming response iteration to collect metrics,
emit events, and set span attributes when the stream completes.
Maintains full compatibility with the original streaming interface.
"""
def __init__(self, response, stream_done_callback):
"""
Initialize streaming wrapper with completion callback.
Parameters:
- response: Original streaming response object to wrap
- stream_done_callback: Function called when stream completes
with accumulated response data
"""
def __iter__(self):
"""
Stream iterator with instrumentation.
Provides iteration over streaming events while accumulating
response data for final instrumentation when stream completes.
Yields:
Individual streaming events from the wrapped response
"""
def _process_event(self, event):
"""
Process individual streaming events.
Extracts and accumulates relevant data from each streaming
event for final instrumentation processing.
Parameters:
- event: Individual streaming event from the response
Returns:
Processed event (may be modified for instrumentation)
"""
def _accumulate_events(self, event):
"""
Accumulate response data from streaming events.
Builds complete response data structure from individual
streaming events for final span attribute setting and
metrics collection.
Parameters:
- event: Streaming event containing partial response data
"""Enhanced streaming body implementation that allows multiple reads from the same response, essential for instrumentation without breaking application functionality.
class ReusableStreamingBody(StreamingBody):
"""
Streaming body that allows multiple reads from the same response.
Extends botocore's StreamingBody to buffer content on first read,
enabling instrumentation to read response content without consuming
the stream for the application. Essential for non-destructive
instrumentation of HTTP response bodies.
"""
def __init__(self, raw_stream, content_length):
"""
Initialize reusable streaming body.
Parameters:
- raw_stream: Original stream object from HTTP response
- content_length: Expected content length for buffering
"""
def read(self, amt=None):
"""
Read from buffered stream content.
First call buffers the entire stream content. Subsequent
calls read from the buffer, allowing multiple consumers
to read the same response data.
Parameters:
- amt: Number of bytes to read (None for all remaining)
Returns:
Bytes from the response body
"""Comprehensive functions for setting detailed span attributes across all Bedrock API operations and model types.
def _set_span_attribute(span, name, value):
"""
Utility function for safely setting span attributes.
Sets span attributes only if the value is not None or empty string,
preventing cluttered spans with empty attributes.
Parameters:
- span: OpenTelemetry span for attribute setting
- name: Attribute name to set
- value: Attribute value (set only if not None/empty)
Returns:
None
"""
def set_model_message_span_attributes(model_vendor, span, request_body):
"""
Set span attributes for input messages to AI models.
Extracts and sets comprehensive attributes for input prompts,
parameters, and configuration from request data.
Parameters:
- model_vendor: AI model vendor identifier
- span: OpenTelemetry span for attribute setting
- request_body: Parsed request body containing input parameters
"""
def set_model_choice_span_attributes(model_vendor, span, response_body):
"""
Set span attributes for AI model completion responses.
Processes response data to set attributes for generated content,
token usage, completion reasons, and response metadata.
Parameters:
- model_vendor: AI model vendor identifier
- span: OpenTelemetry span for attribute setting
- response_body: Parsed response body containing completion data
"""
def set_model_span_attributes(
provider,
model_vendor,
model,
span,
request_body,
response_body,
headers,
metric_params,
kwargs
):
"""
Set comprehensive span attributes for complete model interactions.
Primary function for setting all relevant span attributes including
model information, request/response data, performance metrics,
and vendor-specific attributes.
Parameters:
- provider: Cloud provider (typically "AWS")
- model_vendor: AI model vendor (anthropic, cohere, etc.)
- model: Specific model identifier
- span: OpenTelemetry span for attribute setting
- request_body: Parsed request data
- response_body: Parsed response data
- headers: HTTP response headers
- metric_params: MetricParams for metrics recording
- kwargs: Additional request parameters
"""Specialized functions for handling Bedrock's modern converse API with its conversation-based interaction model.
def set_converse_model_span_attributes(span, provider, model, kwargs):
"""
Set model-specific span attributes for converse API calls.
Handles the conversation-based attribute setting for the
modern Bedrock converse API format.
Parameters:
- span: OpenTelemetry span for attribute setting
- provider: Cloud provider identifier
- model: Model identifier
- kwargs: Converse API request parameters
"""
def set_converse_input_prompt_span_attributes(kwargs, span):
"""
Set input prompt attributes for converse API requests.
Extracts and sets attributes for conversation messages,
system prompts, and input configuration.
Parameters:
- kwargs: Converse API request parameters
- span: OpenTelemetry span for attribute setting
"""
def set_converse_response_span_attributes(response, span):
"""
Set response attributes for converse API responses.
Processes converse API response format to set completion
attributes, token usage, and response metadata.
Parameters:
- response: Converse API response object
- span: OpenTelemetry span for attribute setting
"""
def set_converse_streaming_response_span_attributes(response, role, span):
"""
Set response attributes for streaming converse API responses.
Handles attribute setting for streaming converse responses
with accumulated response data and completion metadata.
Parameters:
- response: Accumulated streaming response data
- role: Response message role
- span: OpenTelemetry span for attribute setting
"""
def converse_usage_record(span, response, metric_params):
"""
Record usage metrics for converse API operations.
Extracts and records token usage, request duration, and
other utilization metrics from converse API responses.
Parameters:
- span: OpenTelemetry span for usage attributes
- response: Converse API response with usage data
- metric_params: MetricParams for metrics recording
"""from opentelemetry.instrumentation.bedrock.utils import dont_throw
@dont_throw
def risky_instrumentation_function():
"""
Function that might fail but shouldn't break the application.
If this function throws an exception, it will be logged
but not propagated to the calling application code.
"""
# Potentially failing instrumentation code
pass
# Use in instrumentation context
risky_instrumentation_function() # Never throws, only logs errorsfrom opentelemetry.instrumentation.bedrock.utils import should_send_prompts
import os
# Check if content tracing is enabled
if should_send_prompts():
# Include prompt content in spans
span.set_attribute("gen_ai.prompt", prompt_text)
else:
# Skip content for privacy/compliance
span.set_attribute("gen_ai.prompt", "[REDACTED]")
# Environment-based control
os.environ["TRACELOOP_TRACE_CONTENT"] = "false" # Disable content tracingfrom opentelemetry.instrumentation.bedrock.utils import should_emit_events
from opentelemetry.instrumentation.bedrock.event_emitter import emit_message_events
# Conditional event emission
if should_emit_events():
# Emit structured events (semantic conventions)
emit_message_events(event_logger, request_kwargs)
else:
# Use legacy span attributes instead
set_legacy_span_attributes(span, request_kwargs)from opentelemetry.instrumentation.bedrock.streaming_wrapper import StreamingWrapper
def stream_completion_callback(response_body):
"""Called when streaming completes with full response data"""
# Set final span attributes
span.set_attribute("gen_ai.response.text", response_body.get("text"))
# Record final metrics
metrics.record_completion(response_body)
# End the span
span.end()
# Wrap streaming response
original_stream = bedrock_response['body']
instrumented_stream = StreamingWrapper(
original_stream,
stream_done_callback=stream_completion_callback
)
# Use wrapped stream normally
for event in instrumented_stream:
# Process streaming events
# Instrumentation happens transparently
process_event(event)from opentelemetry.instrumentation.bedrock.reusable_streaming_body import ReusableStreamingBody
# Create reusable body from HTTP response
original_body = http_response['body']
reusable_body = ReusableStreamingBody(
original_body._raw_stream,
original_body._content_length
)
# First read for instrumentation
response_data = reusable_body.read()
process_for_instrumentation(response_data)
# Second read for application (same data)
app_data = reusable_body.read()
return app_data # Application gets complete responsefrom opentelemetry.instrumentation.bedrock.span_utils import set_model_span_attributes
# Set complete span attributes for a model interaction
set_model_span_attributes(
provider="AWS",
model_vendor="anthropic",
model="claude-3-sonnet-20240229-v1:0",
span=current_span,
request_body=parsed_request,
response_body=parsed_response,
headers=http_headers,
metric_params=metrics_container,
kwargs=original_request_kwargs
)import os
from opentelemetry.instrumentation.bedrock import BedrockInstrumentor
# Disable content tracing for privacy
os.environ["TRACELOOP_TRACE_CONTENT"] = "false"
# Enable structured events without content
BedrockInstrumentor(use_legacy_attributes=False).instrument()import os
from opentelemetry.instrumentation.bedrock import BedrockInstrumentor
def configure_bedrock_instrumentation():
"""Configure instrumentation based on environment"""
is_production = os.getenv("ENVIRONMENT") == "production"
if is_production:
# Production: minimal content, structured events
os.environ["TRACELOOP_TRACE_CONTENT"] = "false"
instrumentor = BedrockInstrumentor(
enrich_token_usage=True, # Detailed token metrics
use_legacy_attributes=False # Structured events
)
else:
# Development: full content, legacy attributes for compatibility
instrumentor = BedrockInstrumentor(
enrich_token_usage=False, # Simpler metrics
use_legacy_attributes=True # Span attributes
)
instrumentor.instrument()
return instrumentorUtilities support lazy initialization to minimize startup overhead:
# Functions check configuration only when called
should_send_prompts() # Checks environment on each call
should_emit_events() # Checks configuration on each callThe @dont_throw decorator adds minimal overhead:
Streaming wrappers maintain near-native performance:
Install with Tessl CLI
npx tessl i tessl/pypi-opentelemetry-instrumentation-bedrock