The AWS X-Ray SDK for Python enables Python developers to record and emit information from within their applications to the AWS X-Ray service for distributed tracing.
npx @tessl/cli install tessl/pypi-aws-xray-sdk@2.14.0The AWS X-Ray SDK for Python enables Python developers to record and emit information from within their applications to the AWS X-Ray service for distributed tracing. It provides comprehensive distributed tracing capabilities for monitoring and troubleshooting complex distributed Python applications in AWS environments.
pip install aws-xray-sdkwrapt, botocore>=1.11.3from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core import patch, patch_all
from typing import UnionFor configuration:
from aws_xray_sdk import global_sdk_configFor model classes:
from aws_xray_sdk.core.models.segment import Segment
from aws_xray_sdk.core.models.subsegment import Subsegment
from aws_xray_sdk.core.models.trace_header import TraceHeaderfrom aws_xray_sdk.core import xray_recorder, patch_all
# Configure X-Ray recorder
xray_recorder.configure(
sampling=False,
context_missing='LOG_ERROR',
plugins=('EC2Plugin', 'ECSPlugin'),
daemon_address='127.0.0.1:2000',
dynamic_naming='*mysite.com*'
)
# Patch all supported libraries for automatic instrumentation
patch_all()
# Use context managers for tracing
with xray_recorder.in_segment('my-application') as segment:
segment.put_metadata('key', {'data': 'value'})
with xray_recorder.in_subsegment('database-operation') as subsegment:
subsegment.put_annotation('table', 'users')
# Your database operation here
# Use decorators for function tracing
@xray_recorder.capture('process-data')
def process_data(data):
# Your function logic here
return processed_dataThe AWS X-Ray SDK follows a hierarchical tracing model:
The SDK provides both synchronous and asynchronous recorders, automatic instrumentation through patching, and extensive integration with Python web frameworks and AWS services.
Primary tracing functionality for creating and managing segments and subsegments. Includes synchronous and asynchronous recorders, context managers, decorators, and manual trace management.
# Global recorder instance
xray_recorder: AsyncAWSXRayRecorder
# Context managers
def in_segment(name: str, **segment_kwargs) -> SegmentContextManager: ...
def in_subsegment(name: str, **subsegment_kwargs) -> SubsegmentContextManager: ...
def in_segment_async(name: str, **segment_kwargs) -> AsyncSegmentContextManager: ...
def in_subsegment_async(name: str, **subsegment_kwargs) -> AsyncSubsegmentContextManager: ...
# Decorators
def capture(name: str) -> Callable: ...
def capture_async(name: str) -> Callable: ...
# Manual management - Segments
def begin_segment(name: str = None, traceid: str = None, parent_id: str = None, sampling: int = None) -> Segment: ...
def end_segment(end_time: float = None) -> None: ...
def current_segment() -> Segment: ...
# Manual management - Subsegments
def begin_subsegment(name: str, namespace: str = 'local') -> Subsegment: ...
def begin_subsegment_without_sampling(name: str) -> Subsegment: ...
def end_subsegment(end_time: float = None) -> None: ...
def current_subsegment() -> Subsegment: ...
# Trace entity management
def get_trace_entity() -> Union[Segment, Subsegment]: ...
def set_trace_entity(trace_entity: Union[Segment, Subsegment]) -> None: ...
def clear_trace_entities() -> None: ...
# Streaming
def stream_subsegments() -> None: ...Automatic instrumentation for popular Python libraries and frameworks. Provides seamless tracing integration without code changes for supported libraries.
def patch(modules_to_patch: tuple, raise_errors: bool = True, ignore_module_patterns: list = None) -> None: ...
def patch_all(double_patch: bool = False) -> None: ...Supported Libraries: aiobotocore, botocore, pynamodb, requests, sqlite3, mysql, httplib, pymongo, pymysql, psycopg2, pg8000, sqlalchemy_core, httpx
Data attachment capabilities for enriching traces with searchable annotations and debugging metadata. Annotations are indexed for filtering and searching in the X-Ray console.
def put_annotation(key: str, value: str) -> None: ...
def put_metadata(key: str, value: Any, namespace: str = 'default') -> None: ...
def is_sampled() -> bool: ...Middleware and integration components for popular Python web frameworks including Django, Flask, Bottle, and aiohttp. Provides automatic request tracing and HTTP metadata collection.
# Flask integration
class XRayMiddleware:
def __init__(self, app: Flask, recorder: AWSXRayRecorder): ...
# Django integration
MIDDLEWARE = ['aws_xray_sdk.ext.django.middleware.XRayMiddleware']
# aiohttp integration
def middleware(request: web.Request, handler: Callable) -> web.Response: ...Specialized integration for AWS services through boto3/botocore patching. Automatically traces AWS API calls with service-specific metadata and error handling.
# Automatic AWS service call tracing via patching
import boto3
from aws_xray_sdk.core import patch
patch(['botocore'])
client = boto3.client('dynamodb') # Automatically tracedDatabase-specific tracing for SQL and NoSQL databases. Captures query information, connection details, and performance metrics while respecting security best practices.
# SQLAlchemy integration
from aws_xray_sdk.ext.sqlalchemy.query import XRaySessionMaker
Session = XRaySessionMaker(bind=engine)
# Flask-SQLAlchemy integration
from aws_xray_sdk.ext.flask_sqlalchemy.query import XRayFlaskSqlAlchemy
db = XRayFlaskSqlAlchemy(app)Recorder configuration options, plugin system for AWS metadata collection, and global SDK settings. Includes sampling configuration, context management, and environment-specific plugins.
def configure(
sampling: bool = None,
plugins: tuple = None,
context_missing: str = None,
sampling_rules: str = None,
daemon_address: str = None,
service: str = None,
context: object = None,
emitter: object = None,
streaming: bool = None,
dynamic_naming: str = None,
streaming_threshold: int = None,
max_trace_back: int = None,
sampler: object = None,
stream_sql: bool = True
) -> None: ...
# Plugin classes
class EC2Plugin: ...
class ECSPlugin: ...
class ElasticBeanstalkPlugin: ...Trace entity context management for controlling the active segment or subsegment in multi-threaded environments and manual trace control scenarios.
def get_trace_entity() -> Union[Segment, Subsegment]: ...
def set_trace_entity(trace_entity: Union[Segment, Subsegment]) -> None: ...
def clear_trace_entities() -> None: ...Detailed in Core Recording under Trace Entity Management section
Exception and error tracking capabilities for adding fault, error, and throttle information to traces, along with detailed exception metadata.
# Segment/Subsegment methods for error tracking
def add_exception(self, exception: Exception, stack: list = None, remote: bool = False) -> None: ...
def add_fault_flag(self) -> None: ...
def add_error_flag(self) -> None: ...
def add_throttle_flag(self) -> None: ...
def apply_status_code(self, status_code: int) -> None: ...HTTP-specific metadata collection for web requests, including status codes, request/response headers, and URL information.
# Segment/Subsegment methods for HTTP metadata
def put_http_meta(self, key: str, value: Any) -> None: ...
def set_user(self, user: str) -> None: ...
def apply_status_code(self, status_code: int) -> None: ...Advanced sampling control and decision inspection for optimizing trace collection and performance.
def is_sampled() -> bool: ...
def begin_subsegment_without_sampling(name: str) -> Subsegment: ...
# Sampling configuration via configure() methodclass AWSXRayRecorder:
"""Synchronous X-Ray recorder for managing traces."""
class AsyncAWSXRayRecorder(AWSXRayRecorder):
"""Asynchronous X-Ray recorder extending synchronous functionality."""
class Segment:
"""Represents a segment - compute resources running application logic."""
def __init__(self, name: str, entityid: str = None, traceid: str = None, parent_id: str = None, sampled: bool = True): ...
# Subsegment management
def add_subsegment(self, subsegment: 'Subsegment') -> None: ...
def remove_subsegment(self, subsegment: 'Subsegment') -> None: ...
def increment(self) -> None: ...
def decrement_ref_counter(self) -> None: ...
def ready_to_send(self) -> bool: ...
# Annotations and metadata
def put_annotation(self, key: str, value: Any) -> None: ...
def put_metadata(self, key: str, value: Any, namespace: str = 'default') -> None: ...
def put_http_meta(self, key: str, value: Any) -> None: ...
# Error handling
def add_exception(self, exception: Exception, stack: list = None, remote: bool = False) -> None: ...
def add_fault_flag(self) -> None: ...
def add_error_flag(self) -> None: ...
def add_throttle_flag(self) -> None: ...
def apply_status_code(self, status_code: int) -> None: ...
# AWS and service metadata
def set_aws(self, aws_meta: dict) -> None: ...
def set_service(self, service_info: dict) -> None: ...
def set_user(self, user: str) -> None: ...
# Serialization
def serialize(self) -> str: ...
def to_dict(self) -> dict: ...
def close(self, end_time: float = None) -> None: ...
class Subsegment:
"""Represents a subsegment - granular timing and details about downstream calls."""
def __init__(self, name: str, namespace: str = 'local', segment: Segment = None): ...
# Subsegment management
def add_subsegment(self, subsegment: 'Subsegment') -> None: ...
def remove_subsegment(self, subsegment: 'Subsegment') -> None: ...
# Annotations and metadata
def put_annotation(self, key: str, value: Any) -> None: ...
def put_metadata(self, key: str, value: Any, namespace: str = 'default') -> None: ...
def put_http_meta(self, key: str, value: Any) -> None: ...
def set_sql(self, sql: dict) -> None: ...
# Error handling
def add_exception(self, exception: Exception, stack: list = None, remote: bool = False) -> None: ...
def add_fault_flag(self) -> None: ...
def add_error_flag(self) -> None: ...
def add_throttle_flag(self) -> None: ...
def apply_status_code(self, status_code: int) -> None: ...
# AWS metadata
def set_aws(self, aws_meta: dict) -> None: ...
# Serialization
def serialize(self) -> str: ...
def to_dict(self) -> dict: ...
def close(self, end_time: float = None) -> None: ...
class TraceHeader:
"""Represents X-Ray trace header for HTTP request propagation."""
def __init__(self, root: str = None, parent: str = None, sampled: int = None, data: str = None): ...
@classmethod
def from_header_str(cls, header: str) -> 'TraceHeader': ...
def to_header_str(self) -> str: ...
class SDKConfig:
"""Global SDK configuration management."""
@classmethod
def sdk_enabled(cls) -> bool: ...
@classmethod
def set_sdk_enabled(cls, value: bool) -> None: ...
# Context manager classes
class SegmentContextManager:
"""Context manager for segment lifecycle management."""
def __enter__(self) -> Segment: ...
def __exit__(self, exc_type, exc_val, exc_tb) -> None: ...
class SubsegmentContextManager:
"""Context manager for subsegment lifecycle management."""
def __enter__(self) -> Subsegment: ...
def __exit__(self, exc_type, exc_val, exc_tb) -> None: ...
class AsyncSegmentContextManager:
"""Async context manager for segment lifecycle management."""
async def __aenter__(self) -> Segment: ...
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: ...
class AsyncSubsegmentContextManager:
"""Async context manager for subsegment lifecycle management."""
async def __aenter__(self) -> Subsegment: ...
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: ...
# Dummy entities for unsampled traces
class DummySegment(Segment):
"""Dummy segment used when traces are not sampled."""
pass
class DummySubsegment(Subsegment):
"""Dummy subsegment used when traces are not sampled."""
pass