CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-aws-xray-sdk

The AWS X-Ray SDK for Python enables Python developers to record and emit information from within their applications to the AWS X-Ray service for distributed tracing.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

database-integration.mddocs/

Database Integration

Database-specific tracing for SQL and NoSQL databases. Captures query information, connection details, and performance metrics while respecting security best practices. Supports major Python database libraries and ORMs.

Capabilities

SQLAlchemy Integration

Comprehensive integration with SQLAlchemy Core and ORM for automatic query tracing.

SQLAlchemy Core Integration

from aws_xray_sdk.ext.sqlalchemy.query import XRaySessionMaker

# Create X-Ray enabled session maker
Session = XRaySessionMaker(bind=engine)
session = Session()

Flask-SQLAlchemy Integration

from aws_xray_sdk.ext.flask_sqlalchemy.query import XRayFlaskSqlAlchemy

# Replace Flask-SQLAlchemy with X-Ray enabled version
db = XRayFlaskSqlAlchemy(app)

Direct Database Driver Integration

Automatic tracing for popular Python database drivers through patching.

# Patch database drivers
from aws_xray_sdk.core import patch

# SQL databases
patch(['sqlite3'])      # SQLite
patch(['mysql'])        # MySQLdb
patch(['pymysql'])      # PyMySQL  
patch(['psycopg2'])     # PostgreSQL psycopg2
patch(['pg8000'])       # PostgreSQL pg8000

# NoSQL databases
patch(['pymongo'])      # MongoDB
patch(['pynamodb'])     # DynamoDB via PynamoDB

SQL Query Streaming

Control SQL query capture and streaming for performance optimization.

# Configure SQL streaming
xray_recorder.configure(stream_sql=True)

# Or in Django settings
XRAY_RECORDER = {
    'STREAM_SQL': True,  # Enable SQL query streaming
}

Database-Specific Usage

SQLite Integration

from aws_xray_sdk.core import patch, xray_recorder
import sqlite3

# Patch SQLite for automatic tracing
patch(['sqlite3'])

with xray_recorder.in_segment('sqlite-operations') as segment:
    # Database operations are automatically traced
    conn = sqlite3.connect('app.db')
    cursor = conn.cursor()
    
    # Each query creates a subsegment with SQL metadata
    cursor.execute('CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY, name TEXT)')
    
    # INSERT operations
    cursor.execute('INSERT INTO users (name) VALUES (?)', ('John Doe',))
    
    # SELECT operations with query details
    cursor.execute('SELECT * FROM users WHERE name LIKE ?', ('%John%',))
    results = cursor.fetchall()
    
    # Batch operations
    users_data = [('Alice',), ('Bob',), ('Charlie',)]
    cursor.executemany('INSERT INTO users (name) VALUES (?)', users_data)
    
    conn.commit()
    conn.close()

MySQL Integration

from aws_xray_sdk.core import patch, xray_recorder
import pymysql

# Patch PyMySQL for automatic tracing
patch(['pymysql'])

with xray_recorder.in_segment('mysql-operations') as segment:
    # Connection and queries are automatically traced
    connection = pymysql.connect(
        host='localhost',
        user='user',
        password='password',
        database='testdb',
        charset='utf8mb4'
    )
    
    try:
        with connection.cursor() as cursor:
            # Each query is traced with execution details
            cursor.execute('SELECT * FROM users WHERE active = %s', (True,))
            users = cursor.fetchall()
            
            # Complex queries with joins
            cursor.execute('''
                SELECT u.id, u.name, p.email 
                FROM users u 
                JOIN profiles p ON u.id = p.user_id 
                WHERE u.created_at > %s
            ''', (datetime.now() - timedelta(days=30),))
            
            recent_users = cursor.fetchall()
            
    finally:
        connection.close()

PostgreSQL Integration

from aws_xray_sdk.core import patch, xray_recorder
import psycopg2
from psycopg2.extras import RealDictCursor

# Patch psycopg2 for automatic tracing
patch(['psycopg2'])

with xray_recorder.in_segment('postgresql-operations') as segment:
    # Connection is traced with connection metadata
    conn = psycopg2.connect(
        host='localhost',
        database='mydb',
        user='user',
        password='password'
    )
    
    try:
        with conn.cursor(cursor_factory=RealDictCursor) as cursor:
            # Queries are traced with PostgreSQL-specific metadata
            cursor.execute('''
                SELECT u.*, COUNT(o.id) as order_count
                FROM users u
                LEFT JOIN orders o ON u.id = o.user_id
                WHERE u.status = %s
                GROUP BY u.id
                ORDER BY order_count DESC
                LIMIT %s
            ''', ('active', 50))
            
            top_users = cursor.fetchall()
            
            # Batch operations
            cursor.execute('''
                INSERT INTO user_activity (user_id, activity_type, timestamp)
                VALUES %s
            ''', [
                (user['id'], 'login', datetime.now())
                for user in top_users
            ])
            
        conn.commit()
        
    except Exception as e:
        conn.rollback()
        raise
    finally:
        conn.close()

MongoDB Integration

from aws_xray_sdk.core import patch, xray_recorder
import pymongo

# Patch pymongo for automatic tracing
patch(['pymongo'])

with xray_recorder.in_segment('mongodb-operations') as segment:
    # MongoDB operations are automatically traced
    client = pymongo.MongoClient('mongodb://localhost:27017/')
    db = client.myapp
    
    # Collection operations with query details
    users_collection = db.users
    
    # Insert operations
    user_doc = {
        'name': 'John Doe',
        'email': 'john@example.com',
        'created_at': datetime.now()
    }
    result = users_collection.insert_one(user_doc)
    
    # Query operations with MongoDB-specific metadata
    active_users = users_collection.find(
        {'status': 'active'},
        {'name': 1, 'email': 1, 'last_login': 1}
    ).limit(100)
    
    # Aggregation pipelines are traced with pipeline details
    pipeline = [
        {'$match': {'status': 'active'}},
        {'$group': {
            '_id': '$department',
            'user_count': {'$sum': 1},
            'avg_age': {'$avg': '$age'}
        }},
        {'$sort': {'user_count': -1}}
    ]
    
    department_stats = list(users_collection.aggregate(pipeline))
    
    # Update operations
    users_collection.update_many(
        {'last_login': {'$lt': datetime.now() - timedelta(days=90)}},
        {'$set': {'status': 'inactive'}}
    )
    
    client.close()

SQLAlchemy Core Integration

from aws_xray_sdk.core import patch, xray_recorder
from aws_xray_sdk.ext.sqlalchemy.query import XRaySessionMaker
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker

# Patch SQLAlchemy Core
patch(['sqlalchemy_core'])

# Create engine and X-Ray enabled session
engine = create_engine('postgresql://user:pass@localhost/db')
Session = XRaySessionMaker(bind=engine)

with xray_recorder.in_segment('sqlalchemy-operations') as segment:
    session = Session()
    
    try:
        # Raw SQL queries are traced
        result = session.execute(text('SELECT * FROM users WHERE active = :active'), 
                               {'active': True})
        users = result.fetchall()
        
        # ORM queries are traced with relationship loading info
        from myapp.models import User, Order
        
        # Query with joins - traces JOIN details
        users_with_orders = session.query(User).join(Order).filter(
            Order.status == 'completed'
        ).all()
        
        # Complex queries with subqueries
        subquery = session.query(Order.user_id).filter(
            Order.total > 1000
        ).subquery()
        
        high_value_users = session.query(User).filter(
            User.id.in_(subquery)
        ).all()
        
        session.commit()
        
    except Exception as e:
        session.rollback()
        raise
    finally:
        session.close()

Flask-SQLAlchemy Integration

from flask import Flask
from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.ext.flask.middleware import XRayMiddleware
from aws_xray_sdk.ext.flask_sqlalchemy.query import XRayFlaskSqlAlchemy

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql://user:pass@localhost/db'

# Configure X-Ray
xray_recorder.configure(service='Flask App with DB')
XRayMiddleware(app, xray_recorder)

# Use X-Ray enabled Flask-SQLAlchemy
db = XRayFlaskSqlAlchemy(app)

class User(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(80), unique=True, nullable=False)
    email = db.Column(db.String(120), unique=True, nullable=False)
    orders = db.relationship('Order', backref='user', lazy=True)

class Order(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
    total = db.Column(db.Float, nullable=False)
    status = db.Column(db.String(20), nullable=False)

@app.route('/users/<int:user_id>')
def get_user_with_orders(user_id):
    # Database queries are automatically traced
    user = User.query.get_or_404(user_id)
    
    # Relationship loading is traced
    orders = user.orders
    
    return {
        'user': {'id': user.id, 'username': user.username},
        'orders': [{'id': o.id, 'total': o.total} for o in orders]
    }

@app.route('/users')
def list_users():
    # Complex queries with filters and joins are traced
    users = db.session.query(User).join(Order).filter(
        Order.status == 'completed'
    ).distinct().all()
    
    return {'users': [{'id': u.id, 'username': u.username} for u in users]}

PynamoDB Integration (DynamoDB)

from aws_xray_sdk.core import patch, xray_recorder
from pynamodb.models import Model
from pynamodb.attributes import UnicodeAttribute, NumberAttribute, UTCDateTimeAttribute

# Patch PynamoDB for automatic DynamoDB tracing
patch(['pynamodb'])

class User(Model):
    class Meta:
        table_name = 'users'
        region = 'us-east-1'
    
    user_id = UnicodeAttribute(hash_key=True)
    username = UnicodeAttribute()
    email = UnicodeAttribute()
    created_at = UTCDateTimeAttribute()
    login_count = NumberAttribute(default=0)

with xray_recorder.in_segment('dynamodb-operations') as segment:
    # Model operations are automatically traced with DynamoDB metadata
    
    # Create item
    user = User(
        user_id='123',
        username='john_doe',
        email='john@example.com',
        created_at=datetime.now()
    )
    user.save()  # Traced as DynamoDB PutItem
    
    # Get item
    retrieved_user = User.get('123')  # Traced as DynamoDB GetItem
    
    # Query operations
    users = list(User.query('123'))  # Traced as DynamoDB Query
    
    # Scan operations
    all_users = list(User.scan())  # Traced as DynamoDB Scan
    
    # Update operations
    retrieved_user.login_count += 1
    retrieved_user.save()  # Traced as DynamoDB UpdateItem
    
    # Batch operations
    with User.batch_write() as batch:
        for i in range(100):
            batch.save(User(
                user_id=str(i),
                username=f'user_{i}',
                email=f'user{i}@example.com',
                created_at=datetime.now()
            ))
    # Batch operations are traced with batch efficiency metrics

Advanced Database Features

Custom SQL Metadata

from aws_xray_sdk.core import xray_recorder

with xray_recorder.in_segment('custom-sql-metadata') as segment:
    with xray_recorder.in_subsegment('complex-query') as subsegment:
        # Add custom database annotations
        subsegment.put_annotation('query_type', 'analytical')
        subsegment.put_annotation('table_count', '3')
        subsegment.put_annotation('expected_rows', '10000')
        
        # Add query performance metadata
        start_time = time.time()
        
        # Execute complex query
        result = execute_complex_analytical_query()
        
        execution_time = time.time() - start_time
        
        # Add performance metadata
        subsegment.put_metadata('query_performance', {
            'execution_time_ms': execution_time * 1000,
            'rows_returned': len(result),
            'rows_per_second': len(result) / execution_time if execution_time > 0 else 0,
            'query_plan': get_query_execution_plan(),
            'index_usage': get_index_usage_stats()
        }, namespace='database')

Connection Pool Monitoring

from aws_xray_sdk.core import xray_recorder
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool

# Create engine with connection pool
engine = create_engine(
    'postgresql://user:pass@localhost/db',
    poolclass=QueuePool,
    pool_size=10,
    max_overflow=20
)

with xray_recorder.in_segment('connection-pool-demo') as segment:
    # Add connection pool metadata
    pool = engine.pool
    
    segment.put_metadata('connection_pool', {
        'pool_size': pool.size(),
        'checked_in_connections': pool.checkedin(),
        'checked_out_connections': pool.checkedout(),
        'overflow_connections': pool.overflow(),
        'invalid_connections': pool.invalid()
    }, namespace='database')
    
    # Use connection
    with engine.connect() as conn:
        result = conn.execute('SELECT COUNT(*) FROM users')
        count = result.scalar()

Transaction Tracing

from aws_xray_sdk.core import xray_recorder
import psycopg2

patch(['psycopg2'])

with xray_recorder.in_segment('transaction-operations') as segment:
    conn = psycopg2.connect('postgresql://user:pass@localhost/db')
    
    try:
        with xray_recorder.in_subsegment('database-transaction') as subsegment:
            subsegment.put_annotation('transaction_type', 'user_registration')
            
            cursor = conn.cursor()
            
            # Transaction operations are grouped under one subsegment
            cursor.execute('INSERT INTO users (name, email) VALUES (%s, %s) RETURNING id',
                         ('John Doe', 'john@example.com'))
            user_id = cursor.fetchone()[0]
            
            cursor.execute('INSERT INTO user_profiles (user_id, bio) VALUES (%s, %s)',
                         (user_id, 'Software developer'))
            
            cursor.execute('INSERT INTO user_preferences (user_id, notifications) VALUES (%s, %s)',
                         (user_id, True))
            
            # Commit transaction
            conn.commit()
            
            subsegment.put_annotation('transaction_status', 'committed')
            subsegment.put_metadata('transaction_details', {
                'user_id': user_id,
                'operations_count': 3,
                'transaction_time_ms': get_transaction_time()
            })
            
    except Exception as e:
        conn.rollback()
        subsegment.put_annotation('transaction_status', 'rolled_back')
        subsegment.add_fault_flag()
        raise
    finally:
        conn.close()

Database Error Handling

from aws_xray_sdk.core import xray_recorder
import psycopg2
from psycopg2 import IntegrityError, OperationalError

patch(['psycopg2'])

with xray_recorder.in_segment('database-error-handling') as segment:
    try:
        conn = psycopg2.connect('postgresql://user:pass@localhost/db')
        cursor = conn.cursor()
        
        # Operation that might fail
        cursor.execute('INSERT INTO users (email) VALUES (%s)', ('duplicate@example.com',))
        conn.commit()
        
    except IntegrityError as e:
        # Database constraint violation
        segment.add_error_flag()  # Client error
        xray_recorder.put_annotation('error_type', 'integrity_constraint')
        xray_recorder.put_metadata('database_error', {
            'error_code': e.pgcode,
            'error_message': str(e),
            'constraint_type': 'unique_violation'
        })
        raise
        
    except OperationalError as e:
        # Database connection or operational error
        segment.add_fault_flag()  # Service fault
        xray_recorder.put_annotation('error_type', 'operational_error')
        xray_recorder.put_metadata('database_error', {
            'error_code': e.pgcode,
            'error_message': str(e),
            'connection_status': 'failed'
        })
        raise
        
    finally:
        if 'conn' in locals():
            conn.close()

Performance Optimization

Conditional SQL Streaming

from aws_xray_sdk.core import xray_recorder

# Configure SQL streaming based on environment
if os.getenv('ENVIRONMENT') == 'production':
    xray_recorder.configure(stream_sql=False)  # Disable in prod for performance
else:
    xray_recorder.configure(stream_sql=True)   # Enable in dev for debugging

# Or conditionally based on sampling
with xray_recorder.in_segment('database-operations') as segment:
    if xray_recorder.is_sampled():
        # Add detailed SQL metadata only for sampled traces
        segment.put_metadata('sql_queries', collect_sql_queries())
    
    # Execute database operations
    perform_database_operations()

Batch Operation Optimization

from aws_xray_sdk.core import xray_recorder

with xray_recorder.in_segment('optimized-batch-operations') as segment:
    with xray_recorder.in_subsegment('batch-insert') as subsegment:
        # Batch operations are more efficient and create fewer subsegments
        batch_size = 1000
        total_records = 10000
        
        subsegment.put_annotation('batch_size', str(batch_size))
        subsegment.put_annotation('total_records', str(total_records))
        
        for i in range(0, total_records, batch_size):
            batch = records[i:i + batch_size]
            
            # Each batch creates one database subsegment instead of 1000
            cursor.executemany(
                'INSERT INTO users (name, email) VALUES (%s, %s)',
                [(r['name'], r['email']) for r in batch]
            )
            
        conn.commit()

Best Practices

Security and Sensitive Data

# SQL queries are automatically sanitized by X-Ray SDK
# But you can add additional protection

from aws_xray_sdk.core import xray_recorder

def sanitize_sql_params(params):
    """Remove sensitive data from SQL parameters."""
    if isinstance(params, (list, tuple)):
        return ['***REDACTED***' if is_sensitive(p) else p for p in params]
    elif isinstance(params, dict):
        return {k: '***REDACTED***' if is_sensitive(v) else v for k, v in params.items()}
    return params

def is_sensitive(value):
    """Check if value contains sensitive information."""
    if isinstance(value, str):
        return any(keyword in value.lower() for keyword in ['password', 'secret', 'token'])
    return False

# Use in custom metadata
with xray_recorder.in_subsegment('secure-query') as subsegment:
    subsegment.put_metadata('sanitized_params', sanitize_sql_params(query_params))

Query Performance Monitoring

from aws_xray_sdk.core import xray_recorder

class DatabasePerformanceMonitor:
    def __init__(self, slow_query_threshold=1.0):
        self.slow_query_threshold = slow_query_threshold
    
    def monitor_query(self, query_func, *args, **kwargs):
        start_time = time.time()
        
        try:
            result = query_func(*args, **kwargs)
            execution_time = time.time() - start_time
            
            # Add performance annotations
            if execution_time > self.slow_query_threshold:
                xray_recorder.put_annotation('slow_query', 'true')
                xray_recorder.put_annotation('execution_time', f'{execution_time:.3f}s')
            
            return result
            
        except Exception as e:
            execution_time = time.time() - start_time
            xray_recorder.put_annotation('query_failed', 'true')
            xray_recorder.put_annotation('failure_time', f'{execution_time:.3f}s')
            raise

# Usage
monitor = DatabasePerformanceMonitor(slow_query_threshold=0.5)

with xray_recorder.in_segment('monitored-queries') as segment:
    result = monitor.monitor_query(execute_complex_query, query_params)

Install with Tessl CLI

npx tessl i tessl/pypi-aws-xray-sdk

docs

annotations-metadata.md

aws-services.md

configuration-plugins.md

core-recording.md

database-integration.md

http-utilities.md

index.md

library-patching.md

sampling.md

web-frameworks.md

tile.json