The AWS X-Ray SDK for Python enables Python developers to record and emit information from within their applications to the AWS X-Ray service for distributed tracing.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Database-specific tracing for SQL and NoSQL databases. Captures query information, connection details, and performance metrics while respecting security best practices. Supports major Python database libraries and ORMs.
Comprehensive integration with SQLAlchemy Core and ORM for automatic query tracing.
from aws_xray_sdk.ext.sqlalchemy.query import XRaySessionMaker
# Create X-Ray enabled session maker
Session = XRaySessionMaker(bind=engine)
session = Session()from aws_xray_sdk.ext.flask_sqlalchemy.query import XRayFlaskSqlAlchemy
# Replace Flask-SQLAlchemy with X-Ray enabled version
db = XRayFlaskSqlAlchemy(app)Automatic tracing for popular Python database drivers through patching.
# Patch database drivers
from aws_xray_sdk.core import patch
# SQL databases
patch(['sqlite3']) # SQLite
patch(['mysql']) # MySQLdb
patch(['pymysql']) # PyMySQL
patch(['psycopg2']) # PostgreSQL psycopg2
patch(['pg8000']) # PostgreSQL pg8000
# NoSQL databases
patch(['pymongo']) # MongoDB
patch(['pynamodb']) # DynamoDB via PynamoDBControl SQL query capture and streaming for performance optimization.
# Configure SQL streaming
xray_recorder.configure(stream_sql=True)
# Or in Django settings
XRAY_RECORDER = {
'STREAM_SQL': True, # Enable SQL query streaming
}from aws_xray_sdk.core import patch, xray_recorder
import sqlite3
# Patch SQLite for automatic tracing
patch(['sqlite3'])
with xray_recorder.in_segment('sqlite-operations') as segment:
# Database operations are automatically traced
conn = sqlite3.connect('app.db')
cursor = conn.cursor()
# Each query creates a subsegment with SQL metadata
cursor.execute('CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY, name TEXT)')
# INSERT operations
cursor.execute('INSERT INTO users (name) VALUES (?)', ('John Doe',))
# SELECT operations with query details
cursor.execute('SELECT * FROM users WHERE name LIKE ?', ('%John%',))
results = cursor.fetchall()
# Batch operations
users_data = [('Alice',), ('Bob',), ('Charlie',)]
cursor.executemany('INSERT INTO users (name) VALUES (?)', users_data)
conn.commit()
conn.close()from aws_xray_sdk.core import patch, xray_recorder
import pymysql
# Patch PyMySQL for automatic tracing
patch(['pymysql'])
with xray_recorder.in_segment('mysql-operations') as segment:
# Connection and queries are automatically traced
connection = pymysql.connect(
host='localhost',
user='user',
password='password',
database='testdb',
charset='utf8mb4'
)
try:
with connection.cursor() as cursor:
# Each query is traced with execution details
cursor.execute('SELECT * FROM users WHERE active = %s', (True,))
users = cursor.fetchall()
# Complex queries with joins
cursor.execute('''
SELECT u.id, u.name, p.email
FROM users u
JOIN profiles p ON u.id = p.user_id
WHERE u.created_at > %s
''', (datetime.now() - timedelta(days=30),))
recent_users = cursor.fetchall()
finally:
connection.close()from aws_xray_sdk.core import patch, xray_recorder
import psycopg2
from psycopg2.extras import RealDictCursor
# Patch psycopg2 for automatic tracing
patch(['psycopg2'])
with xray_recorder.in_segment('postgresql-operations') as segment:
# Connection is traced with connection metadata
conn = psycopg2.connect(
host='localhost',
database='mydb',
user='user',
password='password'
)
try:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
# Queries are traced with PostgreSQL-specific metadata
cursor.execute('''
SELECT u.*, COUNT(o.id) as order_count
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
WHERE u.status = %s
GROUP BY u.id
ORDER BY order_count DESC
LIMIT %s
''', ('active', 50))
top_users = cursor.fetchall()
# Batch operations
cursor.execute('''
INSERT INTO user_activity (user_id, activity_type, timestamp)
VALUES %s
''', [
(user['id'], 'login', datetime.now())
for user in top_users
])
conn.commit()
except Exception as e:
conn.rollback()
raise
finally:
conn.close()from aws_xray_sdk.core import patch, xray_recorder
import pymongo
# Patch pymongo for automatic tracing
patch(['pymongo'])
with xray_recorder.in_segment('mongodb-operations') as segment:
# MongoDB operations are automatically traced
client = pymongo.MongoClient('mongodb://localhost:27017/')
db = client.myapp
# Collection operations with query details
users_collection = db.users
# Insert operations
user_doc = {
'name': 'John Doe',
'email': 'john@example.com',
'created_at': datetime.now()
}
result = users_collection.insert_one(user_doc)
# Query operations with MongoDB-specific metadata
active_users = users_collection.find(
{'status': 'active'},
{'name': 1, 'email': 1, 'last_login': 1}
).limit(100)
# Aggregation pipelines are traced with pipeline details
pipeline = [
{'$match': {'status': 'active'}},
{'$group': {
'_id': '$department',
'user_count': {'$sum': 1},
'avg_age': {'$avg': '$age'}
}},
{'$sort': {'user_count': -1}}
]
department_stats = list(users_collection.aggregate(pipeline))
# Update operations
users_collection.update_many(
{'last_login': {'$lt': datetime.now() - timedelta(days=90)}},
{'$set': {'status': 'inactive'}}
)
client.close()from aws_xray_sdk.core import patch, xray_recorder
from aws_xray_sdk.ext.sqlalchemy.query import XRaySessionMaker
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
# Patch SQLAlchemy Core
patch(['sqlalchemy_core'])
# Create engine and X-Ray enabled session
engine = create_engine('postgresql://user:pass@localhost/db')
Session = XRaySessionMaker(bind=engine)
with xray_recorder.in_segment('sqlalchemy-operations') as segment:
session = Session()
try:
# Raw SQL queries are traced
result = session.execute(text('SELECT * FROM users WHERE active = :active'),
{'active': True})
users = result.fetchall()
# ORM queries are traced with relationship loading info
from myapp.models import User, Order
# Query with joins - traces JOIN details
users_with_orders = session.query(User).join(Order).filter(
Order.status == 'completed'
).all()
# Complex queries with subqueries
subquery = session.query(Order.user_id).filter(
Order.total > 1000
).subquery()
high_value_users = session.query(User).filter(
User.id.in_(subquery)
).all()
session.commit()
except Exception as e:
session.rollback()
raise
finally:
session.close()from flask import Flask
from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.ext.flask.middleware import XRayMiddleware
from aws_xray_sdk.ext.flask_sqlalchemy.query import XRayFlaskSqlAlchemy
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql://user:pass@localhost/db'
# Configure X-Ray
xray_recorder.configure(service='Flask App with DB')
XRayMiddleware(app, xray_recorder)
# Use X-Ray enabled Flask-SQLAlchemy
db = XRayFlaskSqlAlchemy(app)
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
orders = db.relationship('Order', backref='user', lazy=True)
class Order(db.Model):
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
total = db.Column(db.Float, nullable=False)
status = db.Column(db.String(20), nullable=False)
@app.route('/users/<int:user_id>')
def get_user_with_orders(user_id):
# Database queries are automatically traced
user = User.query.get_or_404(user_id)
# Relationship loading is traced
orders = user.orders
return {
'user': {'id': user.id, 'username': user.username},
'orders': [{'id': o.id, 'total': o.total} for o in orders]
}
@app.route('/users')
def list_users():
# Complex queries with filters and joins are traced
users = db.session.query(User).join(Order).filter(
Order.status == 'completed'
).distinct().all()
return {'users': [{'id': u.id, 'username': u.username} for u in users]}from aws_xray_sdk.core import patch, xray_recorder
from pynamodb.models import Model
from pynamodb.attributes import UnicodeAttribute, NumberAttribute, UTCDateTimeAttribute
# Patch PynamoDB for automatic DynamoDB tracing
patch(['pynamodb'])
class User(Model):
class Meta:
table_name = 'users'
region = 'us-east-1'
user_id = UnicodeAttribute(hash_key=True)
username = UnicodeAttribute()
email = UnicodeAttribute()
created_at = UTCDateTimeAttribute()
login_count = NumberAttribute(default=0)
with xray_recorder.in_segment('dynamodb-operations') as segment:
# Model operations are automatically traced with DynamoDB metadata
# Create item
user = User(
user_id='123',
username='john_doe',
email='john@example.com',
created_at=datetime.now()
)
user.save() # Traced as DynamoDB PutItem
# Get item
retrieved_user = User.get('123') # Traced as DynamoDB GetItem
# Query operations
users = list(User.query('123')) # Traced as DynamoDB Query
# Scan operations
all_users = list(User.scan()) # Traced as DynamoDB Scan
# Update operations
retrieved_user.login_count += 1
retrieved_user.save() # Traced as DynamoDB UpdateItem
# Batch operations
with User.batch_write() as batch:
for i in range(100):
batch.save(User(
user_id=str(i),
username=f'user_{i}',
email=f'user{i}@example.com',
created_at=datetime.now()
))
# Batch operations are traced with batch efficiency metricsfrom aws_xray_sdk.core import xray_recorder
with xray_recorder.in_segment('custom-sql-metadata') as segment:
with xray_recorder.in_subsegment('complex-query') as subsegment:
# Add custom database annotations
subsegment.put_annotation('query_type', 'analytical')
subsegment.put_annotation('table_count', '3')
subsegment.put_annotation('expected_rows', '10000')
# Add query performance metadata
start_time = time.time()
# Execute complex query
result = execute_complex_analytical_query()
execution_time = time.time() - start_time
# Add performance metadata
subsegment.put_metadata('query_performance', {
'execution_time_ms': execution_time * 1000,
'rows_returned': len(result),
'rows_per_second': len(result) / execution_time if execution_time > 0 else 0,
'query_plan': get_query_execution_plan(),
'index_usage': get_index_usage_stats()
}, namespace='database')from aws_xray_sdk.core import xray_recorder
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
# Create engine with connection pool
engine = create_engine(
'postgresql://user:pass@localhost/db',
poolclass=QueuePool,
pool_size=10,
max_overflow=20
)
with xray_recorder.in_segment('connection-pool-demo') as segment:
# Add connection pool metadata
pool = engine.pool
segment.put_metadata('connection_pool', {
'pool_size': pool.size(),
'checked_in_connections': pool.checkedin(),
'checked_out_connections': pool.checkedout(),
'overflow_connections': pool.overflow(),
'invalid_connections': pool.invalid()
}, namespace='database')
# Use connection
with engine.connect() as conn:
result = conn.execute('SELECT COUNT(*) FROM users')
count = result.scalar()from aws_xray_sdk.core import xray_recorder
import psycopg2
patch(['psycopg2'])
with xray_recorder.in_segment('transaction-operations') as segment:
conn = psycopg2.connect('postgresql://user:pass@localhost/db')
try:
with xray_recorder.in_subsegment('database-transaction') as subsegment:
subsegment.put_annotation('transaction_type', 'user_registration')
cursor = conn.cursor()
# Transaction operations are grouped under one subsegment
cursor.execute('INSERT INTO users (name, email) VALUES (%s, %s) RETURNING id',
('John Doe', 'john@example.com'))
user_id = cursor.fetchone()[0]
cursor.execute('INSERT INTO user_profiles (user_id, bio) VALUES (%s, %s)',
(user_id, 'Software developer'))
cursor.execute('INSERT INTO user_preferences (user_id, notifications) VALUES (%s, %s)',
(user_id, True))
# Commit transaction
conn.commit()
subsegment.put_annotation('transaction_status', 'committed')
subsegment.put_metadata('transaction_details', {
'user_id': user_id,
'operations_count': 3,
'transaction_time_ms': get_transaction_time()
})
except Exception as e:
conn.rollback()
subsegment.put_annotation('transaction_status', 'rolled_back')
subsegment.add_fault_flag()
raise
finally:
conn.close()from aws_xray_sdk.core import xray_recorder
import psycopg2
from psycopg2 import IntegrityError, OperationalError
patch(['psycopg2'])
with xray_recorder.in_segment('database-error-handling') as segment:
try:
conn = psycopg2.connect('postgresql://user:pass@localhost/db')
cursor = conn.cursor()
# Operation that might fail
cursor.execute('INSERT INTO users (email) VALUES (%s)', ('duplicate@example.com',))
conn.commit()
except IntegrityError as e:
# Database constraint violation
segment.add_error_flag() # Client error
xray_recorder.put_annotation('error_type', 'integrity_constraint')
xray_recorder.put_metadata('database_error', {
'error_code': e.pgcode,
'error_message': str(e),
'constraint_type': 'unique_violation'
})
raise
except OperationalError as e:
# Database connection or operational error
segment.add_fault_flag() # Service fault
xray_recorder.put_annotation('error_type', 'operational_error')
xray_recorder.put_metadata('database_error', {
'error_code': e.pgcode,
'error_message': str(e),
'connection_status': 'failed'
})
raise
finally:
if 'conn' in locals():
conn.close()from aws_xray_sdk.core import xray_recorder
# Configure SQL streaming based on environment
if os.getenv('ENVIRONMENT') == 'production':
xray_recorder.configure(stream_sql=False) # Disable in prod for performance
else:
xray_recorder.configure(stream_sql=True) # Enable in dev for debugging
# Or conditionally based on sampling
with xray_recorder.in_segment('database-operations') as segment:
if xray_recorder.is_sampled():
# Add detailed SQL metadata only for sampled traces
segment.put_metadata('sql_queries', collect_sql_queries())
# Execute database operations
perform_database_operations()from aws_xray_sdk.core import xray_recorder
with xray_recorder.in_segment('optimized-batch-operations') as segment:
with xray_recorder.in_subsegment('batch-insert') as subsegment:
# Batch operations are more efficient and create fewer subsegments
batch_size = 1000
total_records = 10000
subsegment.put_annotation('batch_size', str(batch_size))
subsegment.put_annotation('total_records', str(total_records))
for i in range(0, total_records, batch_size):
batch = records[i:i + batch_size]
# Each batch creates one database subsegment instead of 1000
cursor.executemany(
'INSERT INTO users (name, email) VALUES (%s, %s)',
[(r['name'], r['email']) for r in batch]
)
conn.commit()# SQL queries are automatically sanitized by X-Ray SDK
# But you can add additional protection
from aws_xray_sdk.core import xray_recorder
def sanitize_sql_params(params):
"""Remove sensitive data from SQL parameters."""
if isinstance(params, (list, tuple)):
return ['***REDACTED***' if is_sensitive(p) else p for p in params]
elif isinstance(params, dict):
return {k: '***REDACTED***' if is_sensitive(v) else v for k, v in params.items()}
return params
def is_sensitive(value):
"""Check if value contains sensitive information."""
if isinstance(value, str):
return any(keyword in value.lower() for keyword in ['password', 'secret', 'token'])
return False
# Use in custom metadata
with xray_recorder.in_subsegment('secure-query') as subsegment:
subsegment.put_metadata('sanitized_params', sanitize_sql_params(query_params))from aws_xray_sdk.core import xray_recorder
class DatabasePerformanceMonitor:
def __init__(self, slow_query_threshold=1.0):
self.slow_query_threshold = slow_query_threshold
def monitor_query(self, query_func, *args, **kwargs):
start_time = time.time()
try:
result = query_func(*args, **kwargs)
execution_time = time.time() - start_time
# Add performance annotations
if execution_time > self.slow_query_threshold:
xray_recorder.put_annotation('slow_query', 'true')
xray_recorder.put_annotation('execution_time', f'{execution_time:.3f}s')
return result
except Exception as e:
execution_time = time.time() - start_time
xray_recorder.put_annotation('query_failed', 'true')
xray_recorder.put_annotation('failure_time', f'{execution_time:.3f}s')
raise
# Usage
monitor = DatabasePerformanceMonitor(slow_query_threshold=0.5)
with xray_recorder.in_segment('monitored-queries') as segment:
result = monitor.monitor_query(execute_complex_query, query_params)Install with Tessl CLI
npx tessl i tessl/pypi-aws-xray-sdk