The AWS X-Ray SDK for Python enables Python developers to record and emit information from within their applications to the AWS X-Ray service for distributed tracing.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Primary tracing functionality for creating and managing X-Ray segments and subsegments. Provides both synchronous and asynchronous recording capabilities through context managers, decorators, and manual management methods.
The SDK provides a global recorder instance that serves as the primary interface for X-Ray tracing operations.
xray_recorder: AsyncAWSXRayRecorderThe global xray_recorder is an instance of AsyncAWSXRayRecorder that provides both synchronous and asynchronous tracing capabilities.
Configure the X-Ray recorder with sampling settings, plugins, context behavior, and daemon connection details.
def configure(
sampling: bool = None,
plugins: tuple = None,
context_missing: str = None,
sampling_rules: str = None,
daemon_address: str = None,
service: str = None,
context: object = None,
emitter: object = None,
streaming: bool = None,
dynamic_naming: str = None,
streaming_threshold: int = None,
max_trace_back: int = None,
sampler: object = None,
stream_sql: bool = None
) -> None:
"""
Configure the X-Ray recorder.
Args:
sampling (bool): Enable/disable sampling
plugins (tuple): Plugin names to load ('EC2Plugin', 'ECSPlugin', 'ElasticBeanstalkPlugin')
context_missing (str): Behavior when no active segment ('LOG_ERROR', 'RUNTIME_ERROR', 'IGNORE_ERROR')
sampling_rules (str): Path to sampling rules JSON file
daemon_address (str): X-Ray daemon address (default: '127.0.0.1:2000')
service (str): Service name override
context (object): Context implementation for trace storage
emitter (object): Custom emitter for sending traces
streaming (bool): Enable subsegment streaming
dynamic_naming (str): Pattern for dynamic segment naming
streaming_threshold (int): Subsegment count threshold for streaming
max_trace_back (int): Maximum stack trace depth
sampler (object): Custom sampler implementation
stream_sql (bool): Enable SQL query streaming
"""Create and manage segments using context managers for automatic lifecycle management.
def in_segment(name: str, **segment_kwargs) -> SegmentContextManager:
"""
Context manager for creating and managing a segment.
Args:
name (str): Segment name
**segment_kwargs: Additional segment parameters
Returns:
SegmentContextManager: Context manager yielding the segment
Usage:
with xray_recorder.in_segment('my-segment') as segment:
# Traced code here
"""
def in_segment_async(name: str, **segment_kwargs) -> AsyncSegmentContextManager:
"""
Async context manager for creating and managing a segment.
Args:
name (str): Segment name
**segment_kwargs: Additional segment parameters
Returns:
AsyncSegmentContextManager: Async context manager yielding the segment
Usage:
async with xray_recorder.in_segment_async('my-segment') as segment:
# Traced async code here
"""Create and manage subsegments within segments for detailed tracing of operations.
def in_subsegment(name: str, **subsegment_kwargs) -> SubsegmentContextManager:
"""
Context manager for creating and managing a subsegment.
Args:
name (str): Subsegment name
**subsegment_kwargs: Additional subsegment parameters
Returns:
SubsegmentContextManager: Context manager yielding the subsegment
Usage:
with xray_recorder.in_subsegment('database-query') as subsegment:
# Database operation here
"""
def in_subsegment_async(name: str, **subsegment_kwargs) -> AsyncSubsegmentContextManager:
"""
Async context manager for creating and managing a subsegment.
Args:
name (str): Subsegment name
**subsegment_kwargs: Additional subsegment parameters
Returns:
AsyncSubsegmentContextManager: Async context manager yielding the subsegment
Usage:
async with xray_recorder.in_subsegment_async('async-operation') as subsegment:
# Async operation here
"""Automatically trace function execution using decorators.
def capture(name: str) -> Callable:
"""
Decorator for automatic function tracing in a subsegment.
Args:
name (str): Subsegment name for the traced function
Returns:
Callable: Decorated function
Usage:
@xray_recorder.capture('process-data')
def process_data(data):
return processed_data
"""
def capture_async(name: str) -> Callable:
"""
Decorator for automatic async function tracing in a subsegment.
Args:
name (str): Subsegment name for the traced async function
Returns:
Callable: Decorated async function
Usage:
@xray_recorder.capture_async('async-process')
async def async_process_data(data):
return await process_async(data)
"""Manually create and manage segments for fine-grained control over trace lifecycle.
def begin_segment(
name: str,
traceid: str = None,
parent_id: str = None,
sampling: int = None
) -> Segment:
"""
Begin a new segment.
Args:
name (str): Segment name
traceid (str): Optional trace ID (auto-generated if not provided)
parent_id (str): Optional parent segment ID
sampling (int): Sampling decision (1=sampled, 0=not sampled)
Returns:
Segment: The created segment
"""
def end_segment(end_time: float = None) -> None:
"""
End the current segment.
Args:
end_time (float): Optional end timestamp (defaults to current time)
"""
def current_segment() -> Segment:
"""
Get the current active segment.
Returns:
Segment: Current active segment
Raises:
SegmentNotFoundException: If no segment is active
"""Manually create and manage subsegments within segments.
def begin_subsegment(name: str, namespace: str = None) -> Subsegment:
"""
Begin a new subsegment under the current segment.
Args:
name (str): Subsegment name
namespace (str): Optional namespace for categorization
Returns:
Subsegment: The created subsegment
"""
def begin_subsegment_without_sampling(name: str) -> Subsegment:
"""
Begin a new unsampled subsegment under the current segment.
Args:
name (str): Subsegment name
Returns:
Subsegment: The created unsampled subsegment
"""
def end_subsegment(end_time: float = None) -> None:
"""
End the current subsegment.
Args:
end_time (float): Optional end timestamp (defaults to current time)
"""
def current_subsegment() -> Subsegment:
"""
Get the current active subsegment.
Returns:
Subsegment: Current active subsegment
Raises:
SegmentNotFoundException: If no subsegment is active
"""Control the current trace context and entity hierarchy.
def get_trace_entity() -> Union[Segment, Subsegment]:
"""
Get the current active trace entity (segment or subsegment).
Returns:
Union[Segment, Subsegment]: Current active trace entity
"""
def set_trace_entity(trace_entity: Union[Segment, Subsegment]) -> None:
"""
Set the current active trace entity.
Args:
trace_entity (Union[Segment, Subsegment]): Trace entity to set as active
"""
def clear_trace_entities() -> None:
"""
Clear all trace entities from the current context.
Useful for preventing thread pollution in multi-threaded applications.
"""Manage subsegment streaming for large traces.
def stream_subsegments() -> None:
"""
Stream closed subsegments to reduce memory usage.
Automatically called when streaming is enabled and thresholds are met.
"""Low-level recording functionality for custom integrations.
def record_subsegment(
wrapped: Callable,
instance: object,
args: tuple,
kwargs: dict,
name: str,
namespace: str = None,
meta_processor: Callable = None
) -> Any:
"""
Record function execution in a subsegment with metadata processing.
Args:
wrapped (Callable): Function to wrap and trace
instance (object): Instance object for method calls
args (tuple): Function arguments
kwargs (dict): Function keyword arguments
name (str): Subsegment name
namespace (str): Optional namespace
meta_processor (Callable): Optional metadata processor function
Returns:
Any: Function result
"""
def record_subsegment_async(
wrapped: Callable,
instance: object,
args: tuple,
kwargs: dict,
name: str,
namespace: str = None,
meta_processor: Callable = None
) -> Any:
"""
Record async function execution in a subsegment with metadata processing.
Args:
wrapped (Callable): Async function to wrap and trace
instance (object): Instance object for method calls
args (tuple): Function arguments
kwargs (dict): Function keyword arguments
name (str): Subsegment name
namespace (str): Optional namespace
meta_processor (Callable): Optional metadata processor function
Returns:
Any: Function result
"""from aws_xray_sdk.core import xray_recorder
# Configure the recorder
xray_recorder.configure(
sampling=False,
context_missing='LOG_ERROR'
)
# Using context managers
with xray_recorder.in_segment('web-request') as segment:
segment.put_annotation('user_id', '12345')
segment.put_metadata('request_info', {'method': 'GET', 'path': '/api/data'})
with xray_recorder.in_subsegment('database-query') as subsegment:
subsegment.put_annotation('table', 'users')
# Database query here
with xray_recorder.in_subsegment('external-api-call') as subsegment:
subsegment.put_annotation('service', 'payment-api')
# External API call hereimport asyncio
from aws_xray_sdk.core import xray_recorder
async def async_operation():
async with xray_recorder.in_segment_async('async-request') as segment:
segment.put_annotation('operation', 'data-processing')
async with xray_recorder.in_subsegment_async('async-db-query') as subsegment:
# Async database operation
await asyncio.sleep(0.1) # Simulate async work
async with xray_recorder.in_subsegment_async('async-computation') as subsegment:
# Async computation
await asyncio.sleep(0.2) # Simulate async workfrom aws_xray_sdk.core import xray_recorder
@xray_recorder.capture('data-processing')
def process_user_data(user_id, data):
# Add annotations within the traced function
xray_recorder.put_annotation('user_id', user_id)
xray_recorder.put_metadata('data_size', len(data))
# Process data
return processed_data
@xray_recorder.capture_async('async-data-processing')
async def async_process_user_data(user_id, data):
# Async processing with automatic tracing
xray_recorder.put_annotation('user_id', user_id)
result = await process_async(data)
return resultfrom aws_xray_sdk.core import xray_recorder
# Manual segment management
segment = xray_recorder.begin_segment('manual-segment')
segment.put_annotation('type', 'manual')
try:
# Start subsegment
subsegment = xray_recorder.begin_subsegment('manual-subsegment')
subsegment.put_metadata('operation', 'manual-process')
# Your traced code here
result = perform_operation()
except Exception as e:
# Exception will be automatically captured
raise
finally:
# Clean up
xray_recorder.end_subsegment()
xray_recorder.end_segment()import concurrent.futures
from aws_xray_sdk.core import xray_recorder
def worker_function(data, trace_entity):
# Set the trace entity for the worker thread
xray_recorder.set_trace_entity(trace_entity)
try:
with xray_recorder.in_subsegment('worker-operation') as subsegment:
# Process data in worker thread
result = process_data(data)
return result
finally:
# Prevent thread pollution
xray_recorder.clear_trace_entities()
# Main thread
with xray_recorder.in_segment('threaded-operation') as segment:
current_entity = xray_recorder.get_trace_entity()
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = [
executor.submit(worker_function, data, current_entity)
for data in data_list
]
results = [future.result() for future in futures]Install with Tessl CLI
npx tessl i tessl/pypi-aws-xray-sdk