0
# Database Integration
1
2
Database-specific tracing for SQL and NoSQL databases. Captures query information, connection details, and performance metrics while respecting security best practices. Supports major Python database libraries and ORMs.
3
4
## Capabilities
5
6
### SQLAlchemy Integration
7
8
Comprehensive integration with SQLAlchemy Core and ORM for automatic query tracing.
9
10
#### SQLAlchemy Core Integration
11
12
```python { .api }
13
from aws_xray_sdk.ext.sqlalchemy.query import XRaySessionMaker
14
15
# Create X-Ray enabled session maker
16
Session = XRaySessionMaker(bind=engine)
17
session = Session()
18
```
19
20
#### Flask-SQLAlchemy Integration
21
22
```python { .api }
23
from aws_xray_sdk.ext.flask_sqlalchemy.query import XRayFlaskSqlAlchemy
24
25
# Replace Flask-SQLAlchemy with X-Ray enabled version
26
db = XRayFlaskSqlAlchemy(app)
27
```
28
29
### Direct Database Driver Integration
30
31
Automatic tracing for popular Python database drivers through patching.
32
33
```python { .api }
34
# Patch database drivers
35
from aws_xray_sdk.core import patch
36
37
# SQL databases
38
patch(['sqlite3']) # SQLite
39
patch(['mysql']) # MySQLdb
40
patch(['pymysql']) # PyMySQL
41
patch(['psycopg2']) # PostgreSQL psycopg2
42
patch(['pg8000']) # PostgreSQL pg8000
43
44
# NoSQL databases
45
patch(['pymongo']) # MongoDB
46
patch(['pynamodb']) # DynamoDB via PynamoDB
47
```
48
49
### SQL Query Streaming
50
51
Control SQL query capture and streaming for performance optimization.
52
53
```python { .api }
54
# Configure SQL streaming
55
xray_recorder.configure(stream_sql=True)
56
57
# Or in Django settings
58
XRAY_RECORDER = {
59
'STREAM_SQL': True, # Enable SQL query streaming
60
}
61
```
62
63
## Database-Specific Usage
64
65
### SQLite Integration
66
67
```python
68
from aws_xray_sdk.core import patch, xray_recorder
69
import sqlite3
70
71
# Patch SQLite for automatic tracing
72
patch(['sqlite3'])
73
74
with xray_recorder.in_segment('sqlite-operations') as segment:
75
# Database operations are automatically traced
76
conn = sqlite3.connect('app.db')
77
cursor = conn.cursor()
78
79
# Each query creates a subsegment with SQL metadata
80
cursor.execute('CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY, name TEXT)')
81
82
# INSERT operations
83
cursor.execute('INSERT INTO users (name) VALUES (?)', ('John Doe',))
84
85
# SELECT operations with query details
86
cursor.execute('SELECT * FROM users WHERE name LIKE ?', ('%John%',))
87
results = cursor.fetchall()
88
89
# Batch operations
90
users_data = [('Alice',), ('Bob',), ('Charlie',)]
91
cursor.executemany('INSERT INTO users (name) VALUES (?)', users_data)
92
93
conn.commit()
94
conn.close()
95
```
96
97
### MySQL Integration
98
99
```python
100
from aws_xray_sdk.core import patch, xray_recorder
101
import pymysql
102
103
# Patch PyMySQL for automatic tracing
104
patch(['pymysql'])
105
106
with xray_recorder.in_segment('mysql-operations') as segment:
107
# Connection and queries are automatically traced
108
connection = pymysql.connect(
109
host='localhost',
110
user='user',
111
password='password',
112
database='testdb',
113
charset='utf8mb4'
114
)
115
116
try:
117
with connection.cursor() as cursor:
118
# Each query is traced with execution details
119
cursor.execute('SELECT * FROM users WHERE active = %s', (True,))
120
users = cursor.fetchall()
121
122
# Complex queries with joins
123
cursor.execute('''
124
SELECT u.id, u.name, p.email
125
FROM users u
126
JOIN profiles p ON u.id = p.user_id
127
WHERE u.created_at > %s
128
''', (datetime.now() - timedelta(days=30),))
129
130
recent_users = cursor.fetchall()
131
132
finally:
133
connection.close()
134
```
135
136
### PostgreSQL Integration
137
138
```python
139
from aws_xray_sdk.core import patch, xray_recorder
140
import psycopg2
141
from psycopg2.extras import RealDictCursor
142
143
# Patch psycopg2 for automatic tracing
144
patch(['psycopg2'])
145
146
with xray_recorder.in_segment('postgresql-operations') as segment:
147
# Connection is traced with connection metadata
148
conn = psycopg2.connect(
149
host='localhost',
150
database='mydb',
151
user='user',
152
password='password'
153
)
154
155
try:
156
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
157
# Queries are traced with PostgreSQL-specific metadata
158
cursor.execute('''
159
SELECT u.*, COUNT(o.id) as order_count
160
FROM users u
161
LEFT JOIN orders o ON u.id = o.user_id
162
WHERE u.status = %s
163
GROUP BY u.id
164
ORDER BY order_count DESC
165
LIMIT %s
166
''', ('active', 50))
167
168
top_users = cursor.fetchall()
169
170
# Batch operations
171
cursor.execute('''
172
INSERT INTO user_activity (user_id, activity_type, timestamp)
173
VALUES %s
174
''', [
175
(user['id'], 'login', datetime.now())
176
for user in top_users
177
])
178
179
conn.commit()
180
181
except Exception as e:
182
conn.rollback()
183
raise
184
finally:
185
conn.close()
186
```
187
188
### MongoDB Integration
189
190
```python
191
from aws_xray_sdk.core import patch, xray_recorder
192
import pymongo
193
194
# Patch pymongo for automatic tracing
195
patch(['pymongo'])
196
197
with xray_recorder.in_segment('mongodb-operations') as segment:
198
# MongoDB operations are automatically traced
199
client = pymongo.MongoClient('mongodb://localhost:27017/')
200
db = client.myapp
201
202
# Collection operations with query details
203
users_collection = db.users
204
205
# Insert operations
206
user_doc = {
207
'name': 'John Doe',
208
'email': 'john@example.com',
209
'created_at': datetime.now()
210
}
211
result = users_collection.insert_one(user_doc)
212
213
# Query operations with MongoDB-specific metadata
214
active_users = users_collection.find(
215
{'status': 'active'},
216
{'name': 1, 'email': 1, 'last_login': 1}
217
).limit(100)
218
219
# Aggregation pipelines are traced with pipeline details
220
pipeline = [
221
{'$match': {'status': 'active'}},
222
{'$group': {
223
'_id': '$department',
224
'user_count': {'$sum': 1},
225
'avg_age': {'$avg': '$age'}
226
}},
227
{'$sort': {'user_count': -1}}
228
]
229
230
department_stats = list(users_collection.aggregate(pipeline))
231
232
# Update operations
233
users_collection.update_many(
234
{'last_login': {'$lt': datetime.now() - timedelta(days=90)}},
235
{'$set': {'status': 'inactive'}}
236
)
237
238
client.close()
239
```
240
241
### SQLAlchemy Core Integration
242
243
```python
244
from aws_xray_sdk.core import patch, xray_recorder
245
from aws_xray_sdk.ext.sqlalchemy.query import XRaySessionMaker
246
from sqlalchemy import create_engine, text
247
from sqlalchemy.orm import sessionmaker
248
249
# Patch SQLAlchemy Core
250
patch(['sqlalchemy_core'])
251
252
# Create engine and X-Ray enabled session
253
engine = create_engine('postgresql://user:pass@localhost/db')
254
Session = XRaySessionMaker(bind=engine)
255
256
with xray_recorder.in_segment('sqlalchemy-operations') as segment:
257
session = Session()
258
259
try:
260
# Raw SQL queries are traced
261
result = session.execute(text('SELECT * FROM users WHERE active = :active'),
262
{'active': True})
263
users = result.fetchall()
264
265
# ORM queries are traced with relationship loading info
266
from myapp.models import User, Order
267
268
# Query with joins - traces JOIN details
269
users_with_orders = session.query(User).join(Order).filter(
270
Order.status == 'completed'
271
).all()
272
273
# Complex queries with subqueries
274
subquery = session.query(Order.user_id).filter(
275
Order.total > 1000
276
).subquery()
277
278
high_value_users = session.query(User).filter(
279
User.id.in_(subquery)
280
).all()
281
282
session.commit()
283
284
except Exception as e:
285
session.rollback()
286
raise
287
finally:
288
session.close()
289
```
290
291
### Flask-SQLAlchemy Integration
292
293
```python
294
from flask import Flask
295
from aws_xray_sdk.core import xray_recorder
296
from aws_xray_sdk.ext.flask.middleware import XRayMiddleware
297
from aws_xray_sdk.ext.flask_sqlalchemy.query import XRayFlaskSqlAlchemy
298
299
app = Flask(__name__)
300
app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql://user:pass@localhost/db'
301
302
# Configure X-Ray
303
xray_recorder.configure(service='Flask App with DB')
304
XRayMiddleware(app, xray_recorder)
305
306
# Use X-Ray enabled Flask-SQLAlchemy
307
db = XRayFlaskSqlAlchemy(app)
308
309
class User(db.Model):
310
id = db.Column(db.Integer, primary_key=True)
311
username = db.Column(db.String(80), unique=True, nullable=False)
312
email = db.Column(db.String(120), unique=True, nullable=False)
313
orders = db.relationship('Order', backref='user', lazy=True)
314
315
class Order(db.Model):
316
id = db.Column(db.Integer, primary_key=True)
317
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
318
total = db.Column(db.Float, nullable=False)
319
status = db.Column(db.String(20), nullable=False)
320
321
@app.route('/users/<int:user_id>')
322
def get_user_with_orders(user_id):
323
# Database queries are automatically traced
324
user = User.query.get_or_404(user_id)
325
326
# Relationship loading is traced
327
orders = user.orders
328
329
return {
330
'user': {'id': user.id, 'username': user.username},
331
'orders': [{'id': o.id, 'total': o.total} for o in orders]
332
}
333
334
@app.route('/users')
335
def list_users():
336
# Complex queries with filters and joins are traced
337
users = db.session.query(User).join(Order).filter(
338
Order.status == 'completed'
339
).distinct().all()
340
341
return {'users': [{'id': u.id, 'username': u.username} for u in users]}
342
```
343
344
### PynamoDB Integration (DynamoDB)
345
346
```python
347
from aws_xray_sdk.core import patch, xray_recorder
348
from pynamodb.models import Model
349
from pynamodb.attributes import UnicodeAttribute, NumberAttribute, UTCDateTimeAttribute
350
351
# Patch PynamoDB for automatic DynamoDB tracing
352
patch(['pynamodb'])
353
354
class User(Model):
355
class Meta:
356
table_name = 'users'
357
region = 'us-east-1'
358
359
user_id = UnicodeAttribute(hash_key=True)
360
username = UnicodeAttribute()
361
email = UnicodeAttribute()
362
created_at = UTCDateTimeAttribute()
363
login_count = NumberAttribute(default=0)
364
365
with xray_recorder.in_segment('dynamodb-operations') as segment:
366
# Model operations are automatically traced with DynamoDB metadata
367
368
# Create item
369
user = User(
370
user_id='123',
371
username='john_doe',
372
email='john@example.com',
373
created_at=datetime.now()
374
)
375
user.save() # Traced as DynamoDB PutItem
376
377
# Get item
378
retrieved_user = User.get('123') # Traced as DynamoDB GetItem
379
380
# Query operations
381
users = list(User.query('123')) # Traced as DynamoDB Query
382
383
# Scan operations
384
all_users = list(User.scan()) # Traced as DynamoDB Scan
385
386
# Update operations
387
retrieved_user.login_count += 1
388
retrieved_user.save() # Traced as DynamoDB UpdateItem
389
390
# Batch operations
391
with User.batch_write() as batch:
392
for i in range(100):
393
batch.save(User(
394
user_id=str(i),
395
username=f'user_{i}',
396
email=f'user{i}@example.com',
397
created_at=datetime.now()
398
))
399
# Batch operations are traced with batch efficiency metrics
400
```
401
402
## Advanced Database Features
403
404
### Custom SQL Metadata
405
406
```python
407
from aws_xray_sdk.core import xray_recorder
408
409
with xray_recorder.in_segment('custom-sql-metadata') as segment:
410
with xray_recorder.in_subsegment('complex-query') as subsegment:
411
# Add custom database annotations
412
subsegment.put_annotation('query_type', 'analytical')
413
subsegment.put_annotation('table_count', '3')
414
subsegment.put_annotation('expected_rows', '10000')
415
416
# Add query performance metadata
417
start_time = time.time()
418
419
# Execute complex query
420
result = execute_complex_analytical_query()
421
422
execution_time = time.time() - start_time
423
424
# Add performance metadata
425
subsegment.put_metadata('query_performance', {
426
'execution_time_ms': execution_time * 1000,
427
'rows_returned': len(result),
428
'rows_per_second': len(result) / execution_time if execution_time > 0 else 0,
429
'query_plan': get_query_execution_plan(),
430
'index_usage': get_index_usage_stats()
431
}, namespace='database')
432
```
433
434
### Connection Pool Monitoring
435
436
```python
437
from aws_xray_sdk.core import xray_recorder
438
from sqlalchemy import create_engine
439
from sqlalchemy.pool import QueuePool
440
441
# Create engine with connection pool
442
engine = create_engine(
443
'postgresql://user:pass@localhost/db',
444
poolclass=QueuePool,
445
pool_size=10,
446
max_overflow=20
447
)
448
449
with xray_recorder.in_segment('connection-pool-demo') as segment:
450
# Add connection pool metadata
451
pool = engine.pool
452
453
segment.put_metadata('connection_pool', {
454
'pool_size': pool.size(),
455
'checked_in_connections': pool.checkedin(),
456
'checked_out_connections': pool.checkedout(),
457
'overflow_connections': pool.overflow(),
458
'invalid_connections': pool.invalid()
459
}, namespace='database')
460
461
# Use connection
462
with engine.connect() as conn:
463
result = conn.execute('SELECT COUNT(*) FROM users')
464
count = result.scalar()
465
```
466
467
### Transaction Tracing
468
469
```python
470
from aws_xray_sdk.core import xray_recorder
471
import psycopg2
472
473
patch(['psycopg2'])
474
475
with xray_recorder.in_segment('transaction-operations') as segment:
476
conn = psycopg2.connect('postgresql://user:pass@localhost/db')
477
478
try:
479
with xray_recorder.in_subsegment('database-transaction') as subsegment:
480
subsegment.put_annotation('transaction_type', 'user_registration')
481
482
cursor = conn.cursor()
483
484
# Transaction operations are grouped under one subsegment
485
cursor.execute('INSERT INTO users (name, email) VALUES (%s, %s) RETURNING id',
486
('John Doe', 'john@example.com'))
487
user_id = cursor.fetchone()[0]
488
489
cursor.execute('INSERT INTO user_profiles (user_id, bio) VALUES (%s, %s)',
490
(user_id, 'Software developer'))
491
492
cursor.execute('INSERT INTO user_preferences (user_id, notifications) VALUES (%s, %s)',
493
(user_id, True))
494
495
# Commit transaction
496
conn.commit()
497
498
subsegment.put_annotation('transaction_status', 'committed')
499
subsegment.put_metadata('transaction_details', {
500
'user_id': user_id,
501
'operations_count': 3,
502
'transaction_time_ms': get_transaction_time()
503
})
504
505
except Exception as e:
506
conn.rollback()
507
subsegment.put_annotation('transaction_status', 'rolled_back')
508
subsegment.add_fault_flag()
509
raise
510
finally:
511
conn.close()
512
```
513
514
### Database Error Handling
515
516
```python
517
from aws_xray_sdk.core import xray_recorder
518
import psycopg2
519
from psycopg2 import IntegrityError, OperationalError
520
521
patch(['psycopg2'])
522
523
with xray_recorder.in_segment('database-error-handling') as segment:
524
try:
525
conn = psycopg2.connect('postgresql://user:pass@localhost/db')
526
cursor = conn.cursor()
527
528
# Operation that might fail
529
cursor.execute('INSERT INTO users (email) VALUES (%s)', ('duplicate@example.com',))
530
conn.commit()
531
532
except IntegrityError as e:
533
# Database constraint violation
534
segment.add_error_flag() # Client error
535
xray_recorder.put_annotation('error_type', 'integrity_constraint')
536
xray_recorder.put_metadata('database_error', {
537
'error_code': e.pgcode,
538
'error_message': str(e),
539
'constraint_type': 'unique_violation'
540
})
541
raise
542
543
except OperationalError as e:
544
# Database connection or operational error
545
segment.add_fault_flag() # Service fault
546
xray_recorder.put_annotation('error_type', 'operational_error')
547
xray_recorder.put_metadata('database_error', {
548
'error_code': e.pgcode,
549
'error_message': str(e),
550
'connection_status': 'failed'
551
})
552
raise
553
554
finally:
555
if 'conn' in locals():
556
conn.close()
557
```
558
559
## Performance Optimization
560
561
### Conditional SQL Streaming
562
563
```python
564
from aws_xray_sdk.core import xray_recorder
565
566
# Configure SQL streaming based on environment
567
if os.getenv('ENVIRONMENT') == 'production':
568
xray_recorder.configure(stream_sql=False) # Disable in prod for performance
569
else:
570
xray_recorder.configure(stream_sql=True) # Enable in dev for debugging
571
572
# Or conditionally based on sampling
573
with xray_recorder.in_segment('database-operations') as segment:
574
if xray_recorder.is_sampled():
575
# Add detailed SQL metadata only for sampled traces
576
segment.put_metadata('sql_queries', collect_sql_queries())
577
578
# Execute database operations
579
perform_database_operations()
580
```
581
582
### Batch Operation Optimization
583
584
```python
585
from aws_xray_sdk.core import xray_recorder
586
587
with xray_recorder.in_segment('optimized-batch-operations') as segment:
588
with xray_recorder.in_subsegment('batch-insert') as subsegment:
589
# Batch operations are more efficient and create fewer subsegments
590
batch_size = 1000
591
total_records = 10000
592
593
subsegment.put_annotation('batch_size', str(batch_size))
594
subsegment.put_annotation('total_records', str(total_records))
595
596
for i in range(0, total_records, batch_size):
597
batch = records[i:i + batch_size]
598
599
# Each batch creates one database subsegment instead of 1000
600
cursor.executemany(
601
'INSERT INTO users (name, email) VALUES (%s, %s)',
602
[(r['name'], r['email']) for r in batch]
603
)
604
605
conn.commit()
606
```
607
608
## Best Practices
609
610
### Security and Sensitive Data
611
612
```python
613
# SQL queries are automatically sanitized by X-Ray SDK
614
# But you can add additional protection
615
616
from aws_xray_sdk.core import xray_recorder
617
618
def sanitize_sql_params(params):
619
"""Remove sensitive data from SQL parameters."""
620
if isinstance(params, (list, tuple)):
621
return ['***REDACTED***' if is_sensitive(p) else p for p in params]
622
elif isinstance(params, dict):
623
return {k: '***REDACTED***' if is_sensitive(v) else v for k, v in params.items()}
624
return params
625
626
def is_sensitive(value):
627
"""Check if value contains sensitive information."""
628
if isinstance(value, str):
629
return any(keyword in value.lower() for keyword in ['password', 'secret', 'token'])
630
return False
631
632
# Use in custom metadata
633
with xray_recorder.in_subsegment('secure-query') as subsegment:
634
subsegment.put_metadata('sanitized_params', sanitize_sql_params(query_params))
635
```
636
637
### Query Performance Monitoring
638
639
```python
640
from aws_xray_sdk.core import xray_recorder
641
642
class DatabasePerformanceMonitor:
643
def __init__(self, slow_query_threshold=1.0):
644
self.slow_query_threshold = slow_query_threshold
645
646
def monitor_query(self, query_func, *args, **kwargs):
647
start_time = time.time()
648
649
try:
650
result = query_func(*args, **kwargs)
651
execution_time = time.time() - start_time
652
653
# Add performance annotations
654
if execution_time > self.slow_query_threshold:
655
xray_recorder.put_annotation('slow_query', 'true')
656
xray_recorder.put_annotation('execution_time', f'{execution_time:.3f}s')
657
658
return result
659
660
except Exception as e:
661
execution_time = time.time() - start_time
662
xray_recorder.put_annotation('query_failed', 'true')
663
xray_recorder.put_annotation('failure_time', f'{execution_time:.3f}s')
664
raise
665
666
# Usage
667
monitor = DatabasePerformanceMonitor(slow_query_threshold=0.5)
668
669
with xray_recorder.in_segment('monitored-queries') as segment:
670
result = monitor.monitor_query(execute_complex_query, query_params)
671
```