The AWS X-Ray SDK for Python enables Python developers to record and emit information from within their applications to the AWS X-Ray service for distributed tracing.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Specialized integration for AWS services through boto3/botocore patching. Automatically traces AWS API calls with service-specific metadata, error handling, and comprehensive request/response information for effective monitoring of AWS service interactions.
Enable automatic tracing of all AWS service calls through boto3/botocore patching.
# Import and patch botocore for AWS service tracing
from aws_xray_sdk.core import patch
patch(['botocore'])
# For async AWS calls
patch(['aiobotocore'])AWS service calls automatically include comprehensive metadata:
All AWS services supported by boto3/botocore are automatically traced, including:
from aws_xray_sdk.core import patch, xray_recorder
import boto3
# Patch botocore for automatic AWS tracing
patch(['botocore'])
# Configure X-Ray
xray_recorder.configure(
service='AWS Integration Demo',
daemon_address='127.0.0.1:2000'
)
with xray_recorder.in_segment('aws-operations') as segment:
# All AWS calls are automatically traced
# S3 operations
s3 = boto3.client('s3')
buckets = s3.list_buckets() # Automatically traced as subsegment
# DynamoDB operations
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('users')
response = table.get_item(Key={'id': '123'}) # Automatically traced
# Lambda invocation
lambda_client = boto3.client('lambda')
response = lambda_client.invoke(
FunctionName='my-function',
Payload=json.dumps({'key': 'value'})
) # Automatically tracedfrom aws_xray_sdk.core import patch, xray_recorder
import boto3
patch(['botocore'])
with xray_recorder.in_segment('s3-operations') as segment:
s3 = boto3.client('s3')
# Each operation creates a subsegment with detailed metadata
# Upload file - traces upload progress for large files
s3.upload_file('local-file.txt', 'my-bucket', 'remote-file.txt')
# Download file - traces download progress
s3.download_file('my-bucket', 'remote-file.txt', 'downloaded-file.txt')
# List objects - includes pagination tracing
response = s3.list_objects_v2(Bucket='my-bucket', Prefix='data/')
# Multipart upload - traces each part
response = s3.create_multipart_upload(Bucket='my-bucket', Key='large-file.dat')
upload_id = response['UploadId']
# Each part upload is traced separately
part1 = s3.upload_part(
Bucket='my-bucket',
Key='large-file.dat',
PartNumber=1,
UploadId=upload_id,
Body=b'part1 data'
)from aws_xray_sdk.core import patch, xray_recorder
import boto3
patch(['botocore'])
with xray_recorder.in_segment('dynamodb-operations') as segment:
# Resource interface
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('users')
# Item operations - automatically traced with table metadata
response = table.put_item(
Item={'id': '123', 'name': 'John Doe', 'email': 'john@example.com'}
)
user = table.get_item(Key={'id': '123'})
# Query operations - includes index and pagination info
response = table.query(
IndexName='email-index',
KeyConditionExpression=Key('email').eq('john@example.com')
)
# Scan operations - includes parallel scan metadata
response = table.scan(
FilterExpression=Attr('active').eq(True),
Limit=100
)
# Batch operations - traces batch efficiency
with table.batch_writer() as batch:
for i in range(100):
batch.put_item(Item={'id': str(i), 'data': f'item{i}'})from aws_xray_sdk.core import patch, xray_recorder
import boto3
import json
patch(['botocore'])
with xray_recorder.in_segment('lambda-operations') as segment:
lambda_client = boto3.client('lambda')
# Synchronous invocation - includes execution time and response
response = lambda_client.invoke(
FunctionName='data-processor',
InvocationType='RequestResponse',
Payload=json.dumps({'data': 'process this'})
)
result = json.loads(response['Payload'].read())
# Asynchronous invocation - traces invocation only
lambda_client.invoke(
FunctionName='async-processor',
InvocationType='Event',
Payload=json.dumps({'data': 'process async'})
)
# List functions - includes pagination
functions = lambda_client.list_functions(MaxItems=50)from aws_xray_sdk.core import patch, xray_recorder
import boto3
from botocore.exceptions import ClientError
patch(['botocore'])
with xray_recorder.in_segment('aws-error-handling') as segment:
s3 = boto3.client('s3')
try:
# Operation that might fail
response = s3.get_object(Bucket='non-existent-bucket', Key='file.txt')
except ClientError as e:
# AWS errors are automatically captured with:
# - Error code (NoSuchBucket, AccessDenied, etc.)
# - Error message
# - Request ID for AWS support
# - HTTP status code
# - Retry information
error_code = e.response['Error']['Code']
xray_recorder.put_annotation('aws_error_code', error_code)
if error_code == 'NoSuchBucket':
# Handle specific error
segment.add_error_flag() # Mark as client error
else:
# Handle other errors
segment.add_fault_flag() # Mark as service fault
raisefrom aws_xray_sdk.core import patch, xray_recorder
import boto3
patch(['botocore'])
with xray_recorder.in_segment('batch-operations') as segment:
# DynamoDB batch operations
dynamodb = boto3.resource('dynamodb')
with xray_recorder.in_subsegment('batch-write') as subsegment:
# Batch write automatically handles retries and traces each batch
with dynamodb.batch_writer() as batch:
for i in range(1000): # Automatically batched into 25-item chunks
batch.put_item(Item={'id': str(i), 'data': f'item{i}'})
subsegment.put_annotation('items_written', '1000')
# S3 paginated operations
s3 = boto3.client('s3')
with xray_recorder.in_subsegment('list-all-objects') as subsegment:
paginator = s3.get_paginator('list_objects_v2')
object_count = 0
for page in paginator.paginate(Bucket='my-large-bucket'):
# Each page request is traced as a separate subsegment
object_count += len(page.get('Contents', []))
subsegment.put_annotation('total_objects', str(object_count))from aws_xray_sdk.core import patch, xray_recorder
from aws_xray_sdk.core.async_context import AsyncContext
import aiobotocore.session
import asyncio
# Patch aiobotocore for async AWS tracing
patch(['aiobotocore'])
# Configure recorder with async context
xray_recorder.configure(
service='Async AWS Demo',
context=AsyncContext()
)
async def async_aws_operations():
async with xray_recorder.in_segment_async('async-aws-ops') as segment:
session = aiobotocore.session.get_session()
# Async S3 operations
async with session.create_client('s3') as s3:
# Each async operation is traced
buckets = await s3.list_buckets()
# Concurrent operations are traced individually
tasks = [
s3.head_object(Bucket='my-bucket', Key=f'file{i}.txt')
for i in range(10)
]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Async DynamoDB operations
async with session.create_client('dynamodb') as dynamodb:
response = await dynamodb.describe_table(TableName='users')
# Batch async operations
put_requests = [
dynamodb.put_item(
TableName='users',
Item={'id': {'S': str(i)}, 'name': {'S': f'User{i}'}}
)
for i in range(50)
]
await asyncio.gather(*put_requests)
# Run async operations
asyncio.run(async_aws_operations())from aws_xray_sdk.core import patch, xray_recorder
import boto3
patch(['botocore'])
with xray_recorder.in_segment('custom-aws-metadata') as segment:
# Add custom annotations for filtering
xray_recorder.put_annotation('environment', 'production')
xray_recorder.put_annotation('service_version', '1.2.3')
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('user_sessions')
with xray_recorder.in_subsegment('user-session-cleanup') as subsegment:
# Add business context
subsegment.put_annotation('cleanup_type', 'expired_sessions')
subsegment.put_annotation('table_name', 'user_sessions')
# Scan for expired sessions
response = table.scan(
FilterExpression=Attr('expires_at').lt(int(time.time())),
ProjectionExpression='session_id'
)
expired_sessions = response['Items']
subsegment.put_annotation('expired_count', str(len(expired_sessions)))
# Add detailed metadata
subsegment.put_metadata('cleanup_info', {
'total_scanned': response['ScannedCount'],
'expired_found': len(expired_sessions),
'consumed_capacity': response.get('ConsumedCapacity', {}),
'scan_time': time.time()
}, namespace='cleanup')
# Batch delete expired sessions
if expired_sessions:
with table.batch_writer() as batch:
for session in expired_sessions:
batch.delete_item(Key={'session_id': session['session_id']})from aws_xray_sdk.core import patch, xray_recorder
# Custom metadata processor for AWS calls
def aws_metadata_processor(wrapped, instance, args, kwargs, name, namespace, meta_processor):
# Add custom logic before/after AWS calls
start_time = time.time()
try:
result = wrapped(*args, **kwargs)
# Add custom success metadata
xray_recorder.put_metadata('aws_call_timing', {
'duration_ms': (time.time() - start_time) * 1000,
'success': True
}, namespace='custom')
return result
except Exception as e:
# Add custom error metadata
xray_recorder.put_metadata('aws_call_error', {
'duration_ms': (time.time() - start_time) * 1000,
'error_type': type(e).__name__,
'error_message': str(e)
}, namespace='custom')
raise
patch(['botocore'])from aws_xray_sdk.core import patch, xray_recorder
# Configure different services differently
xray_recorder.configure(
service='Multi-Service App',
sampling=True,
sampling_rules='sampling-rules.json'
)
# Custom sampling rules can target specific AWS services
# Example sampling-rules.json:
# {
# "rules": [
# {
# "description": "High-volume DynamoDB operations",
# "service_name": "*",
# "http_method": "*",
# "url_path": "*",
# "fixed_target": 1,
# "rate": 0.1,
# "service_type": "AWS::DynamoDB::Table"
# },
# {
# "description": "S3 operations",
# "service_name": "*",
# "http_method": "*",
# "url_path": "*",
# "fixed_target": 2,
# "rate": 0.5,
# "service_type": "AWS::S3::Bucket"
# }
# ]
# }
patch(['botocore'])# Use sampling for high-volume AWS operations
xray_recorder.configure(sampling=True)
# Conditional expensive metadata
with xray_recorder.in_segment('aws-operations') as segment:
if xray_recorder.is_sampled():
# Only add expensive metadata for sampled traces
segment.put_metadata('detailed_aws_config', get_detailed_aws_config())
# Lightweight annotations for all traces
xray_recorder.put_annotation('aws_region', 'us-east-1')# AWS SDK automatically sanitizes sensitive data
# But you can add additional sanitization
def sanitize_aws_parameters(params):
"""Remove sensitive information from AWS parameters."""
sensitive_keys = ['password', 'secret', 'token', 'key']
if isinstance(params, dict):
return {
k: '***REDACTED***' if any(sensitive in k.lower() for sensitive in sensitive_keys)
else sanitize_aws_parameters(v)
for k, v in params.items()
}
elif isinstance(params, list):
return [sanitize_aws_parameters(item) for item in params]
else:
return params
# Use in custom metadata
xray_recorder.put_metadata('sanitized_params', sanitize_aws_parameters(aws_params))from botocore.exceptions import ClientError, NoCredentialsError
with xray_recorder.in_segment('robust-aws-operations') as segment:
try:
# AWS operations
result = perform_aws_operations()
except NoCredentialsError:
# Credential issues
segment.add_fault_flag()
xray_recorder.put_annotation('error_category', 'credentials')
raise
except ClientError as e:
error_code = e.response['Error']['Code']
if error_code in ['ThrottlingException', 'ProvisionedThroughputExceededException']:
# Throttling - may want to retry
segment.add_throttle_flag()
xray_recorder.put_annotation('error_category', 'throttling')
elif error_code.startswith('4'):
# Client errors
segment.add_error_flag()
xray_recorder.put_annotation('error_category', 'client_error')
else:
# Service errors
segment.add_fault_flag()
xray_recorder.put_annotation('error_category', 'service_error')
raiseInstall with Tessl CLI
npx tessl i tessl/pypi-aws-xray-sdk