The AWS X-Ray SDK for Python enables Python developers to record and emit information from within their applications to the AWS X-Ray service for distributed tracing.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Recorder configuration options, plugin system for AWS metadata collection, and global SDK settings. Includes sampling configuration, context management, environment-specific plugins, and comprehensive customization options.
Configure the X-Ray recorder with comprehensive settings for sampling, plugins, error handling, and performance optimization.
def configure(
sampling: bool = None,
plugins: tuple = None,
context_missing: str = None,
sampling_rules: str = None,
daemon_address: str = None,
service: str = None,
context: object = None,
emitter: object = None,
streaming: bool = None,
dynamic_naming: str = None,
streaming_threshold: int = None,
max_trace_back: int = None,
sampler: object = None,
stream_sql: bool = None
) -> None:
"""
Configure the X-Ray recorder with various settings.
Args:
sampling (bool): Enable/disable sampling (default: True)
plugins (tuple): Plugin names to load ('EC2Plugin', 'ECSPlugin', 'ElasticBeanstalkPlugin')
context_missing (str): Behavior when no active segment ('LOG_ERROR', 'RUNTIME_ERROR', 'IGNORE_ERROR')
sampling_rules (str): Path to JSON file containing sampling rules
daemon_address (str): X-Ray daemon address in format 'host:port' (default: '127.0.0.1:2000')
service (str): Service name override for dynamic segment naming
context (object): Context implementation for trace entity storage
emitter (object): Custom emitter for sending traces to daemon
streaming (bool): Enable subsegment streaming for large traces
dynamic_naming (str): Pattern for dynamic segment naming (e.g., '*.example.com*')
streaming_threshold (int): Number of subsegments before streaming (default: 20)
max_trace_back (int): Maximum stack trace depth for exceptions (default: 10)
sampler (object): Custom sampler implementation
stream_sql (bool): Enable SQL query streaming for database traces
"""Global SDK settings that control overall X-Ray functionality across the application.
class SDKConfig:
"""Global SDK configuration management."""
@classmethod
def sdk_enabled(cls) -> bool:
"""
Check if the X-Ray SDK is globally enabled.
Returns:
bool: True if SDK is enabled, False otherwise
Notes:
- Controlled by AWS_XRAY_SDK_ENABLED environment variable
- When disabled, all X-Ray operations become no-ops
- AWS_XRAY_NOOP_ID controls trace ID generation (default: True for no-op IDs in unsampled requests)
"""
@classmethod
def set_sdk_enabled(cls, value: bool) -> None:
"""
Enable or disable the X-Ray SDK globally.
Args:
value (bool): True to enable, False to disable
Notes:
- Environment variables take precedence over programmatic settings
- Useful for testing and development environments
"""AWS environment plugins that automatically collect metadata about the runtime environment.
class EC2Plugin:
"""
AWS EC2 metadata plugin.
Collects:
- Instance ID
- Availability Zone
- Instance Type
- AMI ID
- Security Groups
"""
class ECSPlugin:
"""
AWS ECS metadata plugin.
Collects:
- Container ID
- Container Name
- Task ARN
- Task Definition
- Cluster Name
"""
class ElasticBeanstalkPlugin:
"""
AWS Elastic Beanstalk metadata plugin.
Collects:
- Environment Name
- Version Label
- Deployment ID
"""from aws_xray_sdk.core import xray_recorder
# Simple configuration for development
xray_recorder.configure(
sampling=False, # Disable sampling for dev
context_missing='LOG_ERROR', # Log missing context instead of throwing
daemon_address='127.0.0.1:2000' # Local daemon address
)from aws_xray_sdk.core import xray_recorder
# Production configuration with sampling and plugins
xray_recorder.configure(
sampling=True, # Enable sampling
plugins=('EC2Plugin', 'ECSPlugin'), # Load environment plugins
context_missing='LOG_ERROR', # Handle missing context gracefully
daemon_address='xray-daemon:2000', # Daemon service address
service='production-api', # Service name
streaming=True, # Enable streaming for large traces
streaming_threshold=50, # Stream after 50 subsegments
max_trace_back=20, # Capture more stack trace
stream_sql=True # Enable SQL streaming
)from aws_xray_sdk.core import xray_recorder
# Enable sampling with default rules
xray_recorder.configure(sampling=True)
# Disable sampling (trace everything)
xray_recorder.configure(sampling=False)from aws_xray_sdk.core import xray_recorder
# Load sampling rules from file
xray_recorder.configure(
sampling=True,
sampling_rules='/path/to/sampling-rules.json'
)Example sampling rules file:
{
"version": 2,
"default": {
"fixed_target": 1,
"rate": 0.1
},
"rules": [
{
"description": "High priority endpoints",
"service_name": "my-service",
"http_method": "*",
"url_path": "/api/critical/*",
"fixed_target": 2,
"rate": 1.0
},
{
"description": "Health checks",
"service_name": "*",
"http_method": "GET",
"url_path": "/health",
"fixed_target": 0,
"rate": 0.01
},
{
"description": "Database operations",
"service_name": "database-service",
"http_method": "*",
"url_path": "*",
"fixed_target": 1,
"rate": 0.5
}
]
}Configure dynamic segment naming based on request characteristics.
from aws_xray_sdk.core import xray_recorder
# Dynamic naming based on hostname
xray_recorder.configure(
service='my-service',
dynamic_naming='*.example.com*' # Matches any subdomain of example.com
)
# Multiple patterns
xray_recorder.configure(
service='multi-domain-service',
dynamic_naming='*.example.com*,*.test.com*,localhost'
)from aws_xray_sdk.core import xray_recorder
# Default context for synchronous applications
xray_recorder.configure() # Uses default contextfrom aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core.async_context import AsyncContext
# Async context for asynchronous applications
xray_recorder.configure(
context=AsyncContext()
)from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core.context import Context
class CustomContext(Context):
"""Custom context implementation for special requirements."""
def __init__(self):
super().__init__()
self.custom_storage = {}
def put_segment(self, segment):
# Custom segment storage logic
self.custom_storage[threading.current_thread().ident] = segment
def get_trace_entity(self):
# Custom trace entity retrieval
return self.custom_storage.get(threading.current_thread().ident)
# Use custom context
xray_recorder.configure(context=CustomContext())from aws_xray_sdk.core import xray_recorder
# Different error handling strategies
xray_recorder.configure(context_missing='LOG_ERROR') # Log and continue
xray_recorder.configure(context_missing='RUNTIME_ERROR') # Raise exception
xray_recorder.configure(context_missing='IGNORE_ERROR') # Silent ignorePlugins automatically detect the runtime environment and collect relevant metadata.
from aws_xray_sdk.core import xray_recorder
# Load all available plugins
xray_recorder.configure(
plugins=('EC2Plugin', 'ECSPlugin', 'ElasticBeanstalkPlugin')
)
# Plugins automatically detect environment and collect metadata:
# - EC2Plugin: Only activates on EC2 instances
# - ECSPlugin: Only activates in ECS containers
# - ElasticBeanstalkPlugin: Only activates in Elastic Beanstalk environmentsfrom aws_xray_sdk.core import xray_recorder
# EC2 plugin automatically collects:
xray_recorder.configure(plugins=('EC2Plugin',))
# Metadata collected:
# {
# "ec2": {
# "instance_id": "i-1234567890abcdef0",
# "availability_zone": "us-west-2a",
# "instance_type": "m5.large",
# "ami_id": "ami-12345678",
# "instance_size": "m5.large"
# }
# }from aws_xray_sdk.core import xray_recorder
# ECS plugin for containerized applications
xray_recorder.configure(plugins=('ECSPlugin',))
# Metadata collected:
# {
# "ecs": {
# "container": "my-app-container",
# "container_id": "abcd1234",
# "task_arn": "arn:aws:ecs:us-west-2:123456789012:task/my-task",
# "task_family": "my-task-family",
# "cluster_name": "my-cluster"
# }
# }from aws_xray_sdk.core import xray_recorder
# Elastic Beanstalk plugin
xray_recorder.configure(plugins=('ElasticBeanstalkPlugin',))
# Metadata collected:
# {
# "elastic_beanstalk": {
# "environment_name": "my-app-prod",
# "version_label": "v1.2.3",
# "deployment_id": "12345"
# }
# }from aws_xray_sdk.core import xray_recorder
# Check which plugins are loaded and active
xray_recorder.configure(plugins=('EC2Plugin', 'ECSPlugin'))
# Plugin metadata is automatically added to segments
with xray_recorder.in_segment('plugin-demo') as segment:
# Plugin metadata is automatically attached
# You can access it programmatically if needed
segment_dict = segment.to_dict()
aws_metadata = segment_dict.get('aws', {})
if 'ec2' in aws_metadata:
print(f"Running on EC2 instance: {aws_metadata['ec2']['instance_id']}")
if 'ecs' in aws_metadata:
print(f"Running in ECS container: {aws_metadata['ecs']['container']}")from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core.emitters.udp_emitter import UDPEmitter
class CustomEmitter(UDPEmitter):
"""Custom emitter with additional processing."""
def send_entity(self, entity):
# Add custom processing before sending
self.preprocess_entity(entity)
# Call parent implementation
super().send_entity(entity)
def preprocess_entity(self, entity):
# Custom preprocessing logic
entity_dict = entity.to_dict()
# Add custom metadata
if 'metadata' not in entity_dict:
entity_dict['metadata'] = {}
entity_dict['metadata']['custom'] = {
'processed_at': time.time(),
'version': '1.0'
}
# Use custom emitter
custom_emitter = CustomEmitter()
xray_recorder.configure(emitter=custom_emitter)from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core.sampling.sampler import Sampler
class CustomSampler(Sampler):
"""Custom sampler with business logic."""
def should_trace(self, sampling_req):
# Custom sampling logic based on request
service_name = sampling_req.get('service_name', '')
url_path = sampling_req.get('url_path', '')
# Always trace errors
if 'error' in url_path.lower():
return True
# Never trace health checks
if url_path == '/health':
return False
# Custom rate based on service
if service_name == 'critical-service':
return random.random() < 0.8 # 80% sampling
# Default rate
return random.random() < 0.1 # 10% sampling
# Use custom sampler
custom_sampler = CustomSampler()
xray_recorder.configure(sampler=custom_sampler)import os
from aws_xray_sdk.core import xray_recorder
def configure_xray_for_environment():
"""Configure X-Ray based on environment variables."""
environment = os.getenv('ENVIRONMENT', 'development')
debug_mode = os.getenv('DEBUG', 'false').lower() == 'true'
if environment == 'production':
# Production configuration
xray_recorder.configure(
sampling=True,
plugins=('EC2Plugin', 'ECSPlugin'),
context_missing='LOG_ERROR',
daemon_address=os.getenv('XRAY_DAEMON_ADDRESS', 'xray-daemon:2000'),
service=os.getenv('SERVICE_NAME', 'production-service'),
streaming=True,
streaming_threshold=100,
stream_sql=False # Disable SQL streaming in production
)
elif environment == 'staging':
# Staging configuration
xray_recorder.configure(
sampling=True,
plugins=('EC2Plugin',),
context_missing='LOG_ERROR',
daemon_address=os.getenv('XRAY_DAEMON_ADDRESS', '127.0.0.1:2000'),
service=f"staging-{os.getenv('SERVICE_NAME', 'service')}",
stream_sql=True
)
else:
# Development configuration
xray_recorder.configure(
sampling=not debug_mode, # Disable sampling in debug mode
context_missing='LOG_ERROR',
daemon_address='127.0.0.1:2000',
service=f"dev-{os.getenv('SERVICE_NAME', 'service')}",
stream_sql=True
)
# Apply environment-based configuration
configure_xray_for_environment()# settings.py
import os
XRAY_RECORDER = {
'AWS_XRAY_TRACING_NAME': os.getenv('SERVICE_NAME', 'Django App'),
'PLUGINS': ('EC2Plugin', 'ECSPlugin'),
'DAEMON_ADDRESS': os.getenv('XRAY_DAEMON_ADDRESS', '127.0.0.1:2000'),
'CONTEXT_MISSING': 'LOG_ERROR',
'SAMPLING': os.getenv('ENVIRONMENT') == 'production',
'SAMPLING_RULES': os.getenv('XRAY_SAMPLING_RULES'),
'STREAM_SQL': os.getenv('XRAY_STREAM_SQL', 'true').lower() == 'true',
'AUTO_INSTRUMENT': True,
'PATCH_MODULES': [
'boto3',
'requests',
'sqlalchemy_core',
],
'IGNORE_MODULE_PATTERNS': [
'.*test.*',
'.*mock.*',
],
}import os
from aws_xray_sdk.core import xray_recorder
class XRayConfig:
"""X-Ray configuration for Flask applications."""
@staticmethod
def init_app(app):
# Environment-based configuration
environment = app.config.get('ENVIRONMENT', 'development')
config = {
'context_missing': 'LOG_ERROR',
'daemon_address': app.config.get('XRAY_DAEMON_ADDRESS', '127.0.0.1:2000'),
'service': app.config.get('SERVICE_NAME', 'flask-app'),
}
if environment == 'production':
config.update({
'sampling': True,
'plugins': ('EC2Plugin', 'ECSPlugin'),
'streaming': True,
'stream_sql': False
})
else:
config.update({
'sampling': False,
'stream_sql': True
})
# Apply configuration
xray_recorder.configure(**config)
return xray_recorder
# Usage in Flask app
from flask import Flask
from aws_xray_sdk.ext.flask.middleware import XRayMiddleware
app = Flask(__name__)
recorder = XRayConfig.init_app(app)
XRayMiddleware(app, recorder)import os
# Control SDK enabled state via environment
os.environ['AWS_XRAY_SDK_ENABLED'] = 'false' # Disable SDK
os.environ['AWS_XRAY_NOOP_ID'] = 'false' # Generate secure random IDs for all requests
# Check SDK state
from aws_xray_sdk import global_sdk_config
if global_sdk_config.sdk_enabled():
print("X-Ray SDK is enabled")
else:
print("X-Ray SDK is disabled")from aws_xray_sdk import global_sdk_config
# Disable SDK programmatically
global_sdk_config.set_sdk_enabled(False)
# Re-enable SDK (must also clear environment variable)
import os
if 'AWS_XRAY_SDK_ENABLED' in os.environ:
del os.environ['AWS_XRAY_SDK_ENABLED']
global_sdk_config.set_sdk_enabled(True)import pytest
from aws_xray_sdk import global_sdk_config
@pytest.fixture
def disable_xray():
"""Disable X-Ray for testing."""
global_sdk_config.set_sdk_enabled(False)
yield
global_sdk_config.set_sdk_enabled(True)
def test_my_function(disable_xray):
# X-Ray is disabled during this test
result = my_function_that_uses_xray()
assert result == expected_valuefrom aws_xray_sdk.core import xray_recorder, patch_all
# 1. Configure recorder first
xray_recorder.configure(
sampling=True,
plugins=('EC2Plugin',),
context_missing='LOG_ERROR'
)
# 2. Then patch libraries
patch_all()
# 3. Finally import and use libraries
import requests
import boto3from aws_xray_sdk.core import xray_recorder
# Optimize for high-throughput applications
xray_recorder.configure(
sampling=True, # Use sampling to reduce overhead
streaming=True, # Stream large traces
streaming_threshold=20, # Lower threshold for faster streaming
max_trace_back=5, # Reduce stack trace depth
stream_sql=False # Disable SQL streaming in production
)from aws_xray_sdk.core import xray_recorder
# Graceful error handling
xray_recorder.configure(context_missing='LOG_ERROR')
# Verify configuration
try:
with xray_recorder.in_segment('test-segment'):
print("X-Ray is working correctly")
except Exception as e:
print(f"X-Ray configuration issue: {e}")Install with Tessl CLI
npx tessl i tessl/pypi-aws-xray-sdk