or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

annotations-metadata.mdaws-services.mdconfiguration-plugins.mdcore-recording.mddatabase-integration.mdhttp-utilities.mdindex.mdlibrary-patching.mdsampling.mdweb-frameworks.md

database-integration.mddocs/

0

# Database Integration

1

2

Database-specific tracing for SQL and NoSQL databases. Captures query information, connection details, and performance metrics while respecting security best practices. Supports major Python database libraries and ORMs.

3

4

## Capabilities

5

6

### SQLAlchemy Integration

7

8

Comprehensive integration with SQLAlchemy Core and ORM for automatic query tracing.

9

10

#### SQLAlchemy Core Integration

11

12

```python { .api }

13

from aws_xray_sdk.ext.sqlalchemy.query import XRaySessionMaker

14

15

# Create X-Ray enabled session maker

16

Session = XRaySessionMaker(bind=engine)

17

session = Session()

18

```

19

20

#### Flask-SQLAlchemy Integration

21

22

```python { .api }

23

from aws_xray_sdk.ext.flask_sqlalchemy.query import XRayFlaskSqlAlchemy

24

25

# Replace Flask-SQLAlchemy with X-Ray enabled version

26

db = XRayFlaskSqlAlchemy(app)

27

```

28

29

### Direct Database Driver Integration

30

31

Automatic tracing for popular Python database drivers through patching.

32

33

```python { .api }

34

# Patch database drivers

35

from aws_xray_sdk.core import patch

36

37

# SQL databases

38

patch(['sqlite3']) # SQLite

39

patch(['mysql']) # MySQLdb

40

patch(['pymysql']) # PyMySQL

41

patch(['psycopg2']) # PostgreSQL psycopg2

42

patch(['pg8000']) # PostgreSQL pg8000

43

44

# NoSQL databases

45

patch(['pymongo']) # MongoDB

46

patch(['pynamodb']) # DynamoDB via PynamoDB

47

```

48

49

### SQL Query Streaming

50

51

Control SQL query capture and streaming for performance optimization.

52

53

```python { .api }

54

# Configure SQL streaming

55

xray_recorder.configure(stream_sql=True)

56

57

# Or in Django settings

58

XRAY_RECORDER = {

59

'STREAM_SQL': True, # Enable SQL query streaming

60

}

61

```

62

63

## Database-Specific Usage

64

65

### SQLite Integration

66

67

```python

68

from aws_xray_sdk.core import patch, xray_recorder

69

import sqlite3

70

71

# Patch SQLite for automatic tracing

72

patch(['sqlite3'])

73

74

with xray_recorder.in_segment('sqlite-operations') as segment:

75

# Database operations are automatically traced

76

conn = sqlite3.connect('app.db')

77

cursor = conn.cursor()

78

79

# Each query creates a subsegment with SQL metadata

80

cursor.execute('CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY, name TEXT)')

81

82

# INSERT operations

83

cursor.execute('INSERT INTO users (name) VALUES (?)', ('John Doe',))

84

85

# SELECT operations with query details

86

cursor.execute('SELECT * FROM users WHERE name LIKE ?', ('%John%',))

87

results = cursor.fetchall()

88

89

# Batch operations

90

users_data = [('Alice',), ('Bob',), ('Charlie',)]

91

cursor.executemany('INSERT INTO users (name) VALUES (?)', users_data)

92

93

conn.commit()

94

conn.close()

95

```

96

97

### MySQL Integration

98

99

```python

100

from aws_xray_sdk.core import patch, xray_recorder

101

import pymysql

102

103

# Patch PyMySQL for automatic tracing

104

patch(['pymysql'])

105

106

with xray_recorder.in_segment('mysql-operations') as segment:

107

# Connection and queries are automatically traced

108

connection = pymysql.connect(

109

host='localhost',

110

user='user',

111

password='password',

112

database='testdb',

113

charset='utf8mb4'

114

)

115

116

try:

117

with connection.cursor() as cursor:

118

# Each query is traced with execution details

119

cursor.execute('SELECT * FROM users WHERE active = %s', (True,))

120

users = cursor.fetchall()

121

122

# Complex queries with joins

123

cursor.execute('''

124

SELECT u.id, u.name, p.email

125

FROM users u

126

JOIN profiles p ON u.id = p.user_id

127

WHERE u.created_at > %s

128

''', (datetime.now() - timedelta(days=30),))

129

130

recent_users = cursor.fetchall()

131

132

finally:

133

connection.close()

134

```

135

136

### PostgreSQL Integration

137

138

```python

139

from aws_xray_sdk.core import patch, xray_recorder

140

import psycopg2

141

from psycopg2.extras import RealDictCursor

142

143

# Patch psycopg2 for automatic tracing

144

patch(['psycopg2'])

145

146

with xray_recorder.in_segment('postgresql-operations') as segment:

147

# Connection is traced with connection metadata

148

conn = psycopg2.connect(

149

host='localhost',

150

database='mydb',

151

user='user',

152

password='password'

153

)

154

155

try:

156

with conn.cursor(cursor_factory=RealDictCursor) as cursor:

157

# Queries are traced with PostgreSQL-specific metadata

158

cursor.execute('''

159

SELECT u.*, COUNT(o.id) as order_count

160

FROM users u

161

LEFT JOIN orders o ON u.id = o.user_id

162

WHERE u.status = %s

163

GROUP BY u.id

164

ORDER BY order_count DESC

165

LIMIT %s

166

''', ('active', 50))

167

168

top_users = cursor.fetchall()

169

170

# Batch operations

171

cursor.execute('''

172

INSERT INTO user_activity (user_id, activity_type, timestamp)

173

VALUES %s

174

''', [

175

(user['id'], 'login', datetime.now())

176

for user in top_users

177

])

178

179

conn.commit()

180

181

except Exception as e:

182

conn.rollback()

183

raise

184

finally:

185

conn.close()

186

```

187

188

### MongoDB Integration

189

190

```python

191

from aws_xray_sdk.core import patch, xray_recorder

192

import pymongo

193

194

# Patch pymongo for automatic tracing

195

patch(['pymongo'])

196

197

with xray_recorder.in_segment('mongodb-operations') as segment:

198

# MongoDB operations are automatically traced

199

client = pymongo.MongoClient('mongodb://localhost:27017/')

200

db = client.myapp

201

202

# Collection operations with query details

203

users_collection = db.users

204

205

# Insert operations

206

user_doc = {

207

'name': 'John Doe',

208

'email': 'john@example.com',

209

'created_at': datetime.now()

210

}

211

result = users_collection.insert_one(user_doc)

212

213

# Query operations with MongoDB-specific metadata

214

active_users = users_collection.find(

215

{'status': 'active'},

216

{'name': 1, 'email': 1, 'last_login': 1}

217

).limit(100)

218

219

# Aggregation pipelines are traced with pipeline details

220

pipeline = [

221

{'$match': {'status': 'active'}},

222

{'$group': {

223

'_id': '$department',

224

'user_count': {'$sum': 1},

225

'avg_age': {'$avg': '$age'}

226

}},

227

{'$sort': {'user_count': -1}}

228

]

229

230

department_stats = list(users_collection.aggregate(pipeline))

231

232

# Update operations

233

users_collection.update_many(

234

{'last_login': {'$lt': datetime.now() - timedelta(days=90)}},

235

{'$set': {'status': 'inactive'}}

236

)

237

238

client.close()

239

```

240

241

### SQLAlchemy Core Integration

242

243

```python

244

from aws_xray_sdk.core import patch, xray_recorder

245

from aws_xray_sdk.ext.sqlalchemy.query import XRaySessionMaker

246

from sqlalchemy import create_engine, text

247

from sqlalchemy.orm import sessionmaker

248

249

# Patch SQLAlchemy Core

250

patch(['sqlalchemy_core'])

251

252

# Create engine and X-Ray enabled session

253

engine = create_engine('postgresql://user:pass@localhost/db')

254

Session = XRaySessionMaker(bind=engine)

255

256

with xray_recorder.in_segment('sqlalchemy-operations') as segment:

257

session = Session()

258

259

try:

260

# Raw SQL queries are traced

261

result = session.execute(text('SELECT * FROM users WHERE active = :active'),

262

{'active': True})

263

users = result.fetchall()

264

265

# ORM queries are traced with relationship loading info

266

from myapp.models import User, Order

267

268

# Query with joins - traces JOIN details

269

users_with_orders = session.query(User).join(Order).filter(

270

Order.status == 'completed'

271

).all()

272

273

# Complex queries with subqueries

274

subquery = session.query(Order.user_id).filter(

275

Order.total > 1000

276

).subquery()

277

278

high_value_users = session.query(User).filter(

279

User.id.in_(subquery)

280

).all()

281

282

session.commit()

283

284

except Exception as e:

285

session.rollback()

286

raise

287

finally:

288

session.close()

289

```

290

291

### Flask-SQLAlchemy Integration

292

293

```python

294

from flask import Flask

295

from aws_xray_sdk.core import xray_recorder

296

from aws_xray_sdk.ext.flask.middleware import XRayMiddleware

297

from aws_xray_sdk.ext.flask_sqlalchemy.query import XRayFlaskSqlAlchemy

298

299

app = Flask(__name__)

300

app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql://user:pass@localhost/db'

301

302

# Configure X-Ray

303

xray_recorder.configure(service='Flask App with DB')

304

XRayMiddleware(app, xray_recorder)

305

306

# Use X-Ray enabled Flask-SQLAlchemy

307

db = XRayFlaskSqlAlchemy(app)

308

309

class User(db.Model):

310

id = db.Column(db.Integer, primary_key=True)

311

username = db.Column(db.String(80), unique=True, nullable=False)

312

email = db.Column(db.String(120), unique=True, nullable=False)

313

orders = db.relationship('Order', backref='user', lazy=True)

314

315

class Order(db.Model):

316

id = db.Column(db.Integer, primary_key=True)

317

user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)

318

total = db.Column(db.Float, nullable=False)

319

status = db.Column(db.String(20), nullable=False)

320

321

@app.route('/users/<int:user_id>')

322

def get_user_with_orders(user_id):

323

# Database queries are automatically traced

324

user = User.query.get_or_404(user_id)

325

326

# Relationship loading is traced

327

orders = user.orders

328

329

return {

330

'user': {'id': user.id, 'username': user.username},

331

'orders': [{'id': o.id, 'total': o.total} for o in orders]

332

}

333

334

@app.route('/users')

335

def list_users():

336

# Complex queries with filters and joins are traced

337

users = db.session.query(User).join(Order).filter(

338

Order.status == 'completed'

339

).distinct().all()

340

341

return {'users': [{'id': u.id, 'username': u.username} for u in users]}

342

```

343

344

### PynamoDB Integration (DynamoDB)

345

346

```python

347

from aws_xray_sdk.core import patch, xray_recorder

348

from pynamodb.models import Model

349

from pynamodb.attributes import UnicodeAttribute, NumberAttribute, UTCDateTimeAttribute

350

351

# Patch PynamoDB for automatic DynamoDB tracing

352

patch(['pynamodb'])

353

354

class User(Model):

355

class Meta:

356

table_name = 'users'

357

region = 'us-east-1'

358

359

user_id = UnicodeAttribute(hash_key=True)

360

username = UnicodeAttribute()

361

email = UnicodeAttribute()

362

created_at = UTCDateTimeAttribute()

363

login_count = NumberAttribute(default=0)

364

365

with xray_recorder.in_segment('dynamodb-operations') as segment:

366

# Model operations are automatically traced with DynamoDB metadata

367

368

# Create item

369

user = User(

370

user_id='123',

371

username='john_doe',

372

email='john@example.com',

373

created_at=datetime.now()

374

)

375

user.save() # Traced as DynamoDB PutItem

376

377

# Get item

378

retrieved_user = User.get('123') # Traced as DynamoDB GetItem

379

380

# Query operations

381

users = list(User.query('123')) # Traced as DynamoDB Query

382

383

# Scan operations

384

all_users = list(User.scan()) # Traced as DynamoDB Scan

385

386

# Update operations

387

retrieved_user.login_count += 1

388

retrieved_user.save() # Traced as DynamoDB UpdateItem

389

390

# Batch operations

391

with User.batch_write() as batch:

392

for i in range(100):

393

batch.save(User(

394

user_id=str(i),

395

username=f'user_{i}',

396

email=f'user{i}@example.com',

397

created_at=datetime.now()

398

))

399

# Batch operations are traced with batch efficiency metrics

400

```

401

402

## Advanced Database Features

403

404

### Custom SQL Metadata

405

406

```python

407

from aws_xray_sdk.core import xray_recorder

408

409

with xray_recorder.in_segment('custom-sql-metadata') as segment:

410

with xray_recorder.in_subsegment('complex-query') as subsegment:

411

# Add custom database annotations

412

subsegment.put_annotation('query_type', 'analytical')

413

subsegment.put_annotation('table_count', '3')

414

subsegment.put_annotation('expected_rows', '10000')

415

416

# Add query performance metadata

417

start_time = time.time()

418

419

# Execute complex query

420

result = execute_complex_analytical_query()

421

422

execution_time = time.time() - start_time

423

424

# Add performance metadata

425

subsegment.put_metadata('query_performance', {

426

'execution_time_ms': execution_time * 1000,

427

'rows_returned': len(result),

428

'rows_per_second': len(result) / execution_time if execution_time > 0 else 0,

429

'query_plan': get_query_execution_plan(),

430

'index_usage': get_index_usage_stats()

431

}, namespace='database')

432

```

433

434

### Connection Pool Monitoring

435

436

```python

437

from aws_xray_sdk.core import xray_recorder

438

from sqlalchemy import create_engine

439

from sqlalchemy.pool import QueuePool

440

441

# Create engine with connection pool

442

engine = create_engine(

443

'postgresql://user:pass@localhost/db',

444

poolclass=QueuePool,

445

pool_size=10,

446

max_overflow=20

447

)

448

449

with xray_recorder.in_segment('connection-pool-demo') as segment:

450

# Add connection pool metadata

451

pool = engine.pool

452

453

segment.put_metadata('connection_pool', {

454

'pool_size': pool.size(),

455

'checked_in_connections': pool.checkedin(),

456

'checked_out_connections': pool.checkedout(),

457

'overflow_connections': pool.overflow(),

458

'invalid_connections': pool.invalid()

459

}, namespace='database')

460

461

# Use connection

462

with engine.connect() as conn:

463

result = conn.execute('SELECT COUNT(*) FROM users')

464

count = result.scalar()

465

```

466

467

### Transaction Tracing

468

469

```python

470

from aws_xray_sdk.core import xray_recorder

471

import psycopg2

472

473

patch(['psycopg2'])

474

475

with xray_recorder.in_segment('transaction-operations') as segment:

476

conn = psycopg2.connect('postgresql://user:pass@localhost/db')

477

478

try:

479

with xray_recorder.in_subsegment('database-transaction') as subsegment:

480

subsegment.put_annotation('transaction_type', 'user_registration')

481

482

cursor = conn.cursor()

483

484

# Transaction operations are grouped under one subsegment

485

cursor.execute('INSERT INTO users (name, email) VALUES (%s, %s) RETURNING id',

486

('John Doe', 'john@example.com'))

487

user_id = cursor.fetchone()[0]

488

489

cursor.execute('INSERT INTO user_profiles (user_id, bio) VALUES (%s, %s)',

490

(user_id, 'Software developer'))

491

492

cursor.execute('INSERT INTO user_preferences (user_id, notifications) VALUES (%s, %s)',

493

(user_id, True))

494

495

# Commit transaction

496

conn.commit()

497

498

subsegment.put_annotation('transaction_status', 'committed')

499

subsegment.put_metadata('transaction_details', {

500

'user_id': user_id,

501

'operations_count': 3,

502

'transaction_time_ms': get_transaction_time()

503

})

504

505

except Exception as e:

506

conn.rollback()

507

subsegment.put_annotation('transaction_status', 'rolled_back')

508

subsegment.add_fault_flag()

509

raise

510

finally:

511

conn.close()

512

```

513

514

### Database Error Handling

515

516

```python

517

from aws_xray_sdk.core import xray_recorder

518

import psycopg2

519

from psycopg2 import IntegrityError, OperationalError

520

521

patch(['psycopg2'])

522

523

with xray_recorder.in_segment('database-error-handling') as segment:

524

try:

525

conn = psycopg2.connect('postgresql://user:pass@localhost/db')

526

cursor = conn.cursor()

527

528

# Operation that might fail

529

cursor.execute('INSERT INTO users (email) VALUES (%s)', ('duplicate@example.com',))

530

conn.commit()

531

532

except IntegrityError as e:

533

# Database constraint violation

534

segment.add_error_flag() # Client error

535

xray_recorder.put_annotation('error_type', 'integrity_constraint')

536

xray_recorder.put_metadata('database_error', {

537

'error_code': e.pgcode,

538

'error_message': str(e),

539

'constraint_type': 'unique_violation'

540

})

541

raise

542

543

except OperationalError as e:

544

# Database connection or operational error

545

segment.add_fault_flag() # Service fault

546

xray_recorder.put_annotation('error_type', 'operational_error')

547

xray_recorder.put_metadata('database_error', {

548

'error_code': e.pgcode,

549

'error_message': str(e),

550

'connection_status': 'failed'

551

})

552

raise

553

554

finally:

555

if 'conn' in locals():

556

conn.close()

557

```

558

559

## Performance Optimization

560

561

### Conditional SQL Streaming

562

563

```python

564

from aws_xray_sdk.core import xray_recorder

565

566

# Configure SQL streaming based on environment

567

if os.getenv('ENVIRONMENT') == 'production':

568

xray_recorder.configure(stream_sql=False) # Disable in prod for performance

569

else:

570

xray_recorder.configure(stream_sql=True) # Enable in dev for debugging

571

572

# Or conditionally based on sampling

573

with xray_recorder.in_segment('database-operations') as segment:

574

if xray_recorder.is_sampled():

575

# Add detailed SQL metadata only for sampled traces

576

segment.put_metadata('sql_queries', collect_sql_queries())

577

578

# Execute database operations

579

perform_database_operations()

580

```

581

582

### Batch Operation Optimization

583

584

```python

585

from aws_xray_sdk.core import xray_recorder

586

587

with xray_recorder.in_segment('optimized-batch-operations') as segment:

588

with xray_recorder.in_subsegment('batch-insert') as subsegment:

589

# Batch operations are more efficient and create fewer subsegments

590

batch_size = 1000

591

total_records = 10000

592

593

subsegment.put_annotation('batch_size', str(batch_size))

594

subsegment.put_annotation('total_records', str(total_records))

595

596

for i in range(0, total_records, batch_size):

597

batch = records[i:i + batch_size]

598

599

# Each batch creates one database subsegment instead of 1000

600

cursor.executemany(

601

'INSERT INTO users (name, email) VALUES (%s, %s)',

602

[(r['name'], r['email']) for r in batch]

603

)

604

605

conn.commit()

606

```

607

608

## Best Practices

609

610

### Security and Sensitive Data

611

612

```python

613

# SQL queries are automatically sanitized by X-Ray SDK

614

# But you can add additional protection

615

616

from aws_xray_sdk.core import xray_recorder

617

618

def sanitize_sql_params(params):

619

"""Remove sensitive data from SQL parameters."""

620

if isinstance(params, (list, tuple)):

621

return ['***REDACTED***' if is_sensitive(p) else p for p in params]

622

elif isinstance(params, dict):

623

return {k: '***REDACTED***' if is_sensitive(v) else v for k, v in params.items()}

624

return params

625

626

def is_sensitive(value):

627

"""Check if value contains sensitive information."""

628

if isinstance(value, str):

629

return any(keyword in value.lower() for keyword in ['password', 'secret', 'token'])

630

return False

631

632

# Use in custom metadata

633

with xray_recorder.in_subsegment('secure-query') as subsegment:

634

subsegment.put_metadata('sanitized_params', sanitize_sql_params(query_params))

635

```

636

637

### Query Performance Monitoring

638

639

```python

640

from aws_xray_sdk.core import xray_recorder

641

642

class DatabasePerformanceMonitor:

643

def __init__(self, slow_query_threshold=1.0):

644

self.slow_query_threshold = slow_query_threshold

645

646

def monitor_query(self, query_func, *args, **kwargs):

647

start_time = time.time()

648

649

try:

650

result = query_func(*args, **kwargs)

651

execution_time = time.time() - start_time

652

653

# Add performance annotations

654

if execution_time > self.slow_query_threshold:

655

xray_recorder.put_annotation('slow_query', 'true')

656

xray_recorder.put_annotation('execution_time', f'{execution_time:.3f}s')

657

658

return result

659

660

except Exception as e:

661

execution_time = time.time() - start_time

662

xray_recorder.put_annotation('query_failed', 'true')

663

xray_recorder.put_annotation('failure_time', f'{execution_time:.3f}s')

664

raise

665

666

# Usage

667

monitor = DatabasePerformanceMonitor(slow_query_threshold=0.5)

668

669

with xray_recorder.in_segment('monitored-queries') as segment:

670

result = monitor.monitor_query(execute_complex_query, query_params)

671

```