Add SQLAlchemy support to your Flask application.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Optional features for development including query recording, modification tracking, and Flask shell integration for enhanced debugging capabilities.
Tools for recording and analyzing database queries during development and testing.
def get_recorded_queries() -> list[_QueryInfo]:
"""
Get list of recorded query information for current session.
Only available when SQLALCHEMY_RECORD_QUERIES config is enabled.
Returns empty list if recording is disabled or no queries recorded.
Returns:
List of _QueryInfo objects with query details and timing
"""
@dataclasses.dataclass
class _QueryInfo:
"""
Information about an executed database query.
Contains timing, statement, and location information for debugging
slow queries and understanding application database usage patterns.
"""
statement: str | None # SQL statement with parameter placeholders
parameters: Any # Parameters sent with the SQL statement
start_time: float # Query start timestamp
end_time: float # Query completion timestamp
location: str # Code location where query was executed
@property
def duration(self) -> float:
"""Query execution duration in seconds."""Signals for tracking model changes during database sessions.
from flask.signals import Namespace
_signals: Namespace
models_committed: Signal
"""
Blinker signal sent after session commit with model changes.
The sender is the Flask application. The receiver gets a 'changes'
argument with list of (instance, operation) tuples where operation
is 'insert', 'update', or 'delete'.
"""
before_models_committed: Signal
"""
Blinker signal sent before session commit with model changes.
Works exactly like models_committed but fires before the commit
takes place, allowing for last-minute modifications or validations.
"""Functions for adding database context to Flask shell sessions.
def add_models_to_shell() -> dict[str, Any]:
"""
Add db instance and all model classes to Flask shell context.
Automatically registered as shell context processor when
add_models_to_shell=True in SQLAlchemy initialization.
Returns:
Dictionary mapping model class names to classes plus 'db' key
"""# Enable query recording in development
app.config['SQLALCHEMY_RECORD_QUERIES'] = True
db = SQLAlchemy(app)
@app.after_request
def log_queries(response):
if app.debug:
queries = get_recorded_queries()
for query in queries:
app.logger.debug(
f"Query: {query.statement} "
f"Duration: {query.duration:.3f}s "
f"Location: {query.location}"
)
return response@app.route('/debug/queries')
def debug_queries():
"""Debug endpoint to analyze recent queries."""
if not app.debug:
abort(404)
queries = get_recorded_queries()
# Find slow queries
slow_queries = [q for q in queries if q.duration > 0.1]
# Query statistics
total_time = sum(q.duration for q in queries)
avg_time = total_time / len(queries) if queries else 0
return {
'total_queries': len(queries),
'total_time': total_time,
'average_time': avg_time,
'slow_queries': len(slow_queries),
'queries': [
{
'statement': q.statement,
'duration': q.duration,
'location': q.location,
'parameters': str(q.parameters)
}
for q in queries
]
}# Enable modification tracking
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
db = SQLAlchemy(app)
@models_committed.connect
def log_model_changes(sender, changes):
"""Log all model changes after commit."""
for instance, operation in changes:
app.logger.info(
f"{operation.title()}: {instance.__class__.__name__} "
f"ID={getattr(instance, 'id', 'unknown')}"
)
@before_models_committed.connect
def validate_changes(sender, changes):
"""Perform validation before committing changes."""
for instance, operation in changes:
if operation == 'delete' and hasattr(instance, 'protected'):
if instance.protected:
raise ValueError(f"Cannot delete protected {instance}")class AuditMixin:
"""Mixin to add audit fields to models."""
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
created_by = db.Column(db.String(100))
updated_by = db.Column(db.String(100))
class User(AuditMixin, db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True)
@before_models_committed.connect
def set_audit_fields(sender, changes):
"""Automatically set audit fields on model changes."""
current_user = get_current_user() # Your auth function
for instance, operation in changes:
if isinstance(instance, AuditMixin):
if operation == 'insert':
instance.created_by = current_user.username
elif operation == 'update':
instance.updated_by = current_user.username# Automatic shell integration (enabled by default)
db = SQLAlchemy(app, add_models_to_shell=True)
# Manual shell integration
def make_shell_context():
"""Add custom items to shell context."""
context = add_models_to_shell() # Get db and models
context.update({
'datetime': datetime,
'timedelta': timedelta,
})
return context
app.shell_context_processor(make_shell_context)Using the enhanced shell:
$ flask shell
>>> db
<SQLAlchemy postgresql://...>
>>> User
<class 'app.models.User'>
>>> users = User.query.all()
>>> len(users)
42class QueryAnalysisMiddleware:
"""WSGI middleware to analyze database queries."""
def __init__(self, app):
self.app = app
def __call__(self, environ, start_response):
def new_start_response(status, headers):
# Log query stats after request
queries = get_recorded_queries()
if queries:
total_time = sum(q.duration for q in queries)
print(f"Request used {len(queries)} queries in {total_time:.3f}s")
return start_response(status, headers)
return self.app(environ, new_start_response)
# Apply middleware in development
if app.debug:
app.wsgi_app = QueryAnalysisMiddleware(app.wsgi_app)import cProfile
import pstats
from io import StringIO
@app.route('/profile/<path:endpoint>')
def profile_endpoint(endpoint):
"""Profile database queries for a specific endpoint."""
if not app.debug:
abort(404)
# Clear previous queries
g._sqlalchemy_queries = []
# Profile the endpoint
pr = cProfile.Profile()
pr.enable()
# Make request to endpoint
with app.test_client() as client:
response = client.get(f'/{endpoint}')
pr.disable()
# Get query information
queries = get_recorded_queries()
# Get profiling stats
s = StringIO()
ps = pstats.Stats(pr, stream=s).sort_stats('cumulative')
ps.print_stats()
return {
'endpoint': endpoint,
'status_code': response.status_code,
'query_count': len(queries),
'total_query_time': sum(q.duration for q in queries),
'queries': [
{
'statement': q.statement,
'duration': q.duration,
'location': q.location
}
for q in queries
],
'profile_stats': s.getvalue()
}def test_user_list_queries(client, app):
"""Test that user list endpoint doesn't have N+1 queries."""
with app.app_context():
# Clear any existing queries
g._sqlalchemy_queries = []
# Make request
response = client.get('/users')
# Check query count
queries = get_recorded_queries()
assert len(queries) <= 2 # Should be 1-2 queries max
# Check for N+1 patterns
select_queries = [q for q in queries if 'SELECT' in q.statement]
assert len(select_queries) <= 1, "Possible N+1 query detected"# Enable query recording (disabled by default)
SQLALCHEMY_RECORD_QUERIES = True
# Only record in development
SQLALCHEMY_RECORD_QUERIES = app.debug# Enable modification tracking (disabled by default)
SQLALCHEMY_TRACK_MODIFICATIONS = True
# Can be expensive in production, consider carefully
SQLALCHEMY_TRACK_MODIFICATIONS = app.debug# Shell integration (enabled by default)
db = SQLAlchemy(app, add_models_to_shell=True)
# Disable shell integration
db = SQLAlchemy(app, add_models_to_shell=False)Install with Tessl CLI
npx tessl i tessl/pypi-flask-sqlalchemy