0
# Utilities and Streaming
1
2
Utility functions, streaming response handling, and helper classes for managing configuration, error handling, content tracing controls, and reusable streaming response bodies.
3
4
## Capabilities
5
6
### Error Handling Utilities
7
8
Decorator and utility functions for robust error handling in instrumentation code, ensuring that observability never breaks application functionality.
9
10
```python { .api }
11
def dont_throw(func):
12
"""
13
Decorator that wraps functions to log exceptions instead of throwing them.
14
15
Critical for instrumentation code where errors in observability
16
must not impact the core application functionality. Logs exceptions
17
and returns None on failure.
18
19
Parameters:
20
- func: Function to wrap with exception handling
21
22
Returns:
23
Wrapped function that logs exceptions instead of raising them
24
"""
25
```
26
27
### Configuration Control Functions
28
29
Functions for determining instrumentation behavior based on environment variables and configuration settings.
30
31
```python { .api }
32
def should_send_prompts() -> bool:
33
"""
34
Determine if prompt content should be included in traces.
35
36
Checks the TRACELOOP_TRACE_CONTENT environment variable to
37
control whether sensitive prompt content is captured in spans
38
and events for privacy and compliance.
39
40
Returns:
41
Boolean indicating if prompt content should be traced
42
(default: true if TRACELOOP_TRACE_CONTENT != "false")
43
"""
44
45
46
def should_emit_events() -> bool:
47
"""
48
Check if structured event emission is enabled.
49
50
Determines whether to emit structured OpenTelemetry events
51
based on the use_legacy_attributes configuration setting.
52
53
Returns:
54
Boolean indicating if events should be emitted
55
(true when use_legacy_attributes=False)
56
"""
57
```
58
59
### Environment Variable Constants
60
61
Constants for environment variables that control instrumentation behavior.
62
63
```python { .api }
64
TRACELOOP_TRACE_CONTENT = "TRACELOOP_TRACE_CONTENT"
65
"""
66
Environment variable name for controlling content tracing.
67
68
Set to "false" to disable prompt and response content capture
69
for privacy and compliance requirements. All other values
70
(including unset) enable content tracing.
71
"""
72
```
73
74
### Streaming Response Wrapper
75
76
Advanced wrapper for streaming responses that enables comprehensive instrumentation of streaming AI model interactions.
77
78
```python { .api }
79
class StreamingWrapper(ObjectProxy):
80
"""
81
Wraps streaming responses for comprehensive instrumentation.
82
83
Intercepts streaming response iteration to collect metrics,
84
emit events, and set span attributes when the stream completes.
85
Maintains full compatibility with the original streaming interface.
86
"""
87
88
def __init__(self, response, stream_done_callback):
89
"""
90
Initialize streaming wrapper with completion callback.
91
92
Parameters:
93
- response: Original streaming response object to wrap
94
- stream_done_callback: Function called when stream completes
95
with accumulated response data
96
"""
97
98
def __iter__(self):
99
"""
100
Stream iterator with instrumentation.
101
102
Provides iteration over streaming events while accumulating
103
response data for final instrumentation when stream completes.
104
105
Yields:
106
Individual streaming events from the wrapped response
107
"""
108
109
def _process_event(self, event):
110
"""
111
Process individual streaming events.
112
113
Extracts and accumulates relevant data from each streaming
114
event for final instrumentation processing.
115
116
Parameters:
117
- event: Individual streaming event from the response
118
119
Returns:
120
Processed event (may be modified for instrumentation)
121
"""
122
123
def _accumulate_events(self, event):
124
"""
125
Accumulate response data from streaming events.
126
127
Builds complete response data structure from individual
128
streaming events for final span attribute setting and
129
metrics collection.
130
131
Parameters:
132
- event: Streaming event containing partial response data
133
"""
134
```
135
136
### Reusable Streaming Body
137
138
Enhanced streaming body implementation that allows multiple reads from the same response, essential for instrumentation without breaking application functionality.
139
140
```python { .api }
141
class ReusableStreamingBody(StreamingBody):
142
"""
143
Streaming body that allows multiple reads from the same response.
144
145
Extends botocore's StreamingBody to buffer content on first read,
146
enabling instrumentation to read response content without consuming
147
the stream for the application. Essential for non-destructive
148
instrumentation of HTTP response bodies.
149
"""
150
151
def __init__(self, raw_stream, content_length):
152
"""
153
Initialize reusable streaming body.
154
155
Parameters:
156
- raw_stream: Original stream object from HTTP response
157
- content_length: Expected content length for buffering
158
"""
159
160
def read(self, amt=None):
161
"""
162
Read from buffered stream content.
163
164
First call buffers the entire stream content. Subsequent
165
calls read from the buffer, allowing multiple consumers
166
to read the same response data.
167
168
Parameters:
169
- amt: Number of bytes to read (None for all remaining)
170
171
Returns:
172
Bytes from the response body
173
"""
174
```
175
176
### Span Attribute Management
177
178
Comprehensive functions for setting detailed span attributes across all Bedrock API operations and model types.
179
180
```python { .api }
181
def _set_span_attribute(span, name, value):
182
"""
183
Utility function for safely setting span attributes.
184
185
Sets span attributes only if the value is not None or empty string,
186
preventing cluttered spans with empty attributes.
187
188
Parameters:
189
- span: OpenTelemetry span for attribute setting
190
- name: Attribute name to set
191
- value: Attribute value (set only if not None/empty)
192
193
Returns:
194
None
195
"""
196
197
198
def set_model_message_span_attributes(model_vendor, span, request_body):
199
"""
200
Set span attributes for input messages to AI models.
201
202
Extracts and sets comprehensive attributes for input prompts,
203
parameters, and configuration from request data.
204
205
Parameters:
206
- model_vendor: AI model vendor identifier
207
- span: OpenTelemetry span for attribute setting
208
- request_body: Parsed request body containing input parameters
209
"""
210
211
212
def set_model_choice_span_attributes(model_vendor, span, response_body):
213
"""
214
Set span attributes for AI model completion responses.
215
216
Processes response data to set attributes for generated content,
217
token usage, completion reasons, and response metadata.
218
219
Parameters:
220
- model_vendor: AI model vendor identifier
221
- span: OpenTelemetry span for attribute setting
222
- response_body: Parsed response body containing completion data
223
"""
224
225
226
def set_model_span_attributes(
227
provider,
228
model_vendor,
229
model,
230
span,
231
request_body,
232
response_body,
233
headers,
234
metric_params,
235
kwargs
236
):
237
"""
238
Set comprehensive span attributes for complete model interactions.
239
240
Primary function for setting all relevant span attributes including
241
model information, request/response data, performance metrics,
242
and vendor-specific attributes.
243
244
Parameters:
245
- provider: Cloud provider (typically "AWS")
246
- model_vendor: AI model vendor (anthropic, cohere, etc.)
247
- model: Specific model identifier
248
- span: OpenTelemetry span for attribute setting
249
- request_body: Parsed request data
250
- response_body: Parsed response data
251
- headers: HTTP response headers
252
- metric_params: MetricParams for metrics recording
253
- kwargs: Additional request parameters
254
"""
255
```
256
257
### Converse API Utilities
258
259
Specialized functions for handling Bedrock's modern converse API with its conversation-based interaction model.
260
261
```python { .api }
262
def set_converse_model_span_attributes(span, provider, model, kwargs):
263
"""
264
Set model-specific span attributes for converse API calls.
265
266
Handles the conversation-based attribute setting for the
267
modern Bedrock converse API format.
268
269
Parameters:
270
- span: OpenTelemetry span for attribute setting
271
- provider: Cloud provider identifier
272
- model: Model identifier
273
- kwargs: Converse API request parameters
274
"""
275
276
277
def set_converse_input_prompt_span_attributes(kwargs, span):
278
"""
279
Set input prompt attributes for converse API requests.
280
281
Extracts and sets attributes for conversation messages,
282
system prompts, and input configuration.
283
284
Parameters:
285
- kwargs: Converse API request parameters
286
- span: OpenTelemetry span for attribute setting
287
"""
288
289
290
def set_converse_response_span_attributes(response, span):
291
"""
292
Set response attributes for converse API responses.
293
294
Processes converse API response format to set completion
295
attributes, token usage, and response metadata.
296
297
Parameters:
298
- response: Converse API response object
299
- span: OpenTelemetry span for attribute setting
300
"""
301
302
303
def set_converse_streaming_response_span_attributes(response, role, span):
304
"""
305
Set response attributes for streaming converse API responses.
306
307
Handles attribute setting for streaming converse responses
308
with accumulated response data and completion metadata.
309
310
Parameters:
311
- response: Accumulated streaming response data
312
- role: Response message role
313
- span: OpenTelemetry span for attribute setting
314
"""
315
316
317
def converse_usage_record(span, response, metric_params):
318
"""
319
Record usage metrics for converse API operations.
320
321
Extracts and records token usage, request duration, and
322
other utilization metrics from converse API responses.
323
324
Parameters:
325
- span: OpenTelemetry span for usage attributes
326
- response: Converse API response with usage data
327
- metric_params: MetricParams for metrics recording
328
"""
329
```
330
331
## Usage Examples
332
333
### Error-Safe Instrumentation
334
335
```python
336
from opentelemetry.instrumentation.bedrock.utils import dont_throw
337
338
@dont_throw
339
def risky_instrumentation_function():
340
"""
341
Function that might fail but shouldn't break the application.
342
343
If this function throws an exception, it will be logged
344
but not propagated to the calling application code.
345
"""
346
# Potentially failing instrumentation code
347
pass
348
349
# Use in instrumentation context
350
risky_instrumentation_function() # Never throws, only logs errors
351
```
352
353
### Content Tracing Control
354
355
```python
356
from opentelemetry.instrumentation.bedrock.utils import should_send_prompts
357
import os
358
359
# Check if content tracing is enabled
360
if should_send_prompts():
361
# Include prompt content in spans
362
span.set_attribute("gen_ai.prompt", prompt_text)
363
else:
364
# Skip content for privacy/compliance
365
span.set_attribute("gen_ai.prompt", "[REDACTED]")
366
367
# Environment-based control
368
os.environ["TRACELOOP_TRACE_CONTENT"] = "false" # Disable content tracing
369
```
370
371
### Event Emission Control
372
373
```python
374
from opentelemetry.instrumentation.bedrock.utils import should_emit_events
375
from opentelemetry.instrumentation.bedrock.event_emitter import emit_message_events
376
377
# Conditional event emission
378
if should_emit_events():
379
# Emit structured events (semantic conventions)
380
emit_message_events(event_logger, request_kwargs)
381
else:
382
# Use legacy span attributes instead
383
set_legacy_span_attributes(span, request_kwargs)
384
```
385
386
### Streaming Response Handling
387
388
```python
389
from opentelemetry.instrumentation.bedrock.streaming_wrapper import StreamingWrapper
390
391
def stream_completion_callback(response_body):
392
"""Called when streaming completes with full response data"""
393
# Set final span attributes
394
span.set_attribute("gen_ai.response.text", response_body.get("text"))
395
# Record final metrics
396
metrics.record_completion(response_body)
397
# End the span
398
span.end()
399
400
# Wrap streaming response
401
original_stream = bedrock_response['body']
402
instrumented_stream = StreamingWrapper(
403
original_stream,
404
stream_done_callback=stream_completion_callback
405
)
406
407
# Use wrapped stream normally
408
for event in instrumented_stream:
409
# Process streaming events
410
# Instrumentation happens transparently
411
process_event(event)
412
```
413
414
### Reusable Response Bodies
415
416
```python
417
from opentelemetry.instrumentation.bedrock.reusable_streaming_body import ReusableStreamingBody
418
419
# Create reusable body from HTTP response
420
original_body = http_response['body']
421
reusable_body = ReusableStreamingBody(
422
original_body._raw_stream,
423
original_body._content_length
424
)
425
426
# First read for instrumentation
427
response_data = reusable_body.read()
428
process_for_instrumentation(response_data)
429
430
# Second read for application (same data)
431
app_data = reusable_body.read()
432
return app_data # Application gets complete response
433
```
434
435
### Comprehensive Span Attribution
436
437
```python
438
from opentelemetry.instrumentation.bedrock.span_utils import set_model_span_attributes
439
440
# Set complete span attributes for a model interaction
441
set_model_span_attributes(
442
provider="AWS",
443
model_vendor="anthropic",
444
model="claude-3-sonnet-20240229-v1:0",
445
span=current_span,
446
request_body=parsed_request,
447
response_body=parsed_response,
448
headers=http_headers,
449
metric_params=metrics_container,
450
kwargs=original_request_kwargs
451
)
452
```
453
454
## Configuration Patterns
455
456
### Privacy-Conscious Setup
457
458
```python
459
import os
460
from opentelemetry.instrumentation.bedrock import BedrockInstrumentor
461
462
# Disable content tracing for privacy
463
os.environ["TRACELOOP_TRACE_CONTENT"] = "false"
464
465
# Enable structured events without content
466
BedrockInstrumentor(use_legacy_attributes=False).instrument()
467
```
468
469
### Development vs Production Configuration
470
471
```python
472
import os
473
from opentelemetry.instrumentation.bedrock import BedrockInstrumentor
474
475
def configure_bedrock_instrumentation():
476
"""Configure instrumentation based on environment"""
477
478
is_production = os.getenv("ENVIRONMENT") == "production"
479
480
if is_production:
481
# Production: minimal content, structured events
482
os.environ["TRACELOOP_TRACE_CONTENT"] = "false"
483
instrumentor = BedrockInstrumentor(
484
enrich_token_usage=True, # Detailed token metrics
485
use_legacy_attributes=False # Structured events
486
)
487
else:
488
# Development: full content, legacy attributes for compatibility
489
instrumentor = BedrockInstrumentor(
490
enrich_token_usage=False, # Simpler metrics
491
use_legacy_attributes=True # Span attributes
492
)
493
494
instrumentor.instrument()
495
return instrumentor
496
```
497
498
## Performance Optimization
499
500
### Lazy Initialization
501
502
Utilities support lazy initialization to minimize startup overhead:
503
504
```python
505
# Functions check configuration only when called
506
should_send_prompts() # Checks environment on each call
507
should_emit_events() # Checks configuration on each call
508
```
509
510
### Exception Handling Overhead
511
512
The `@dont_throw` decorator adds minimal overhead:
513
- **No exceptions**: ~5-10 nanoseconds per call
514
- **With exceptions**: Exception logging cost + graceful degradation
515
- **Memory impact**: No additional memory allocation in success case
516
517
### Streaming Performance
518
519
Streaming wrappers maintain near-native performance:
520
- **Iteration overhead**: ~20-50 nanoseconds per event
521
- **Memory usage**: Minimal buffering only for final attributes
522
- **Latency impact**: No additional network round-trips