Quickly add security features to your Flask application.
—
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pending
The risk profile of this skill
Datastore classes and database abstraction layer supporting multiple ORMs (SQLAlchemy, MongoEngine, Peewee, Pony) for flexible database integration with Flask-Security.
Abstract base classes providing the foundation for Flask-Security's database abstraction layer.
class Datastore:
"""
Abstract base datastore class defining the interface for all datastores.
"""
def __init__(self, db):
"""
Initialize datastore with database connection.
Parameters:
- db: Database connection or session object
"""
def put(self, obj):
"""
Save object to database.
Parameters:
- obj: Object to save
Returns:
Saved object
"""
def delete(self, obj):
"""
Delete object from database.
Parameters:
- obj: Object to delete
Returns:
Deleted object
"""
def commit(self):
"""
Commit current transaction.
"""
class UserDatastore(Datastore):
"""
Core user management datastore with CRUD operations for users and roles.
"""
def __init__(self, db, user_model, role_model=None):
"""
Initialize user datastore.
Parameters:
- db: Database connection or session
- user_model: User model class
- role_model: Role model class (optional)
"""
def get_user(self, identifier):
"""
Get user by identifier (ID, email, username).
Parameters:
- identifier: User ID, email, or username
Returns:
User object if found, None otherwise
"""
def find_user(self, **kwargs):
"""
Find user by arbitrary attributes.
Parameters:
- kwargs: Attribute filters for user lookup
Returns:
User object if found, None otherwise
"""
def create_user(self, **kwargs):
"""
Create new user with specified attributes.
Parameters:
- kwargs: User attributes (email, password, etc.)
Returns:
Created user object
"""
def delete_user(self, user):
"""
Delete user from database.
Parameters:
- user: User object to delete
Returns:
True if deleted successfully, False otherwise
"""
def activate_user(self, user):
"""
Activate user account.
Parameters:
- user: User object to activate
Returns:
Activated user object
"""
def deactivate_user(self, user):
"""
Deactivate user account.
Parameters:
- user: User object to deactivate
Returns:
Deactivated user object
"""
def create_role(self, **kwargs):
"""
Create new role with specified attributes.
Parameters:
- kwargs: Role attributes (name, description, permissions)
Returns:
Created role object
"""
def find_role(self, role):
"""
Find role by name or role object.
Parameters:
- role: Role name (string) or role object
Returns:
Role object if found, None otherwise
"""
def add_role_to_user(self, user, role):
"""
Add role to user.
Parameters:
- user: User object
- role: Role object or role name
Returns:
True if role added successfully, False if user already had role
"""
def remove_role_from_user(self, user, role):
"""
Remove role from user.
Parameters:
- user: User object
- role: Role object or role name
Returns:
True if role removed successfully, False if user didn't have role
"""Datastores for SQLAlchemy ORM integration with various Flask-SQLAlchemy versions and configurations.
class SQLAlchemyUserDatastore(UserDatastore):
"""
SQLAlchemy integration datastore for Flask-Security.
"""
def __init__(self, db, user_model, role_model):
"""
Initialize SQLAlchemy datastore.
Parameters:
- db: SQLAlchemy database instance
- user_model: SQLAlchemy user model class
- role_model: SQLAlchemy role model class
"""
def get_user(self, identifier):
"""Get user by ID, email, or username using SQLAlchemy query."""
def find_user(self, case_insensitive=False, **kwargs):
"""
Find user with SQLAlchemy filters.
Parameters:
- case_insensitive: Whether to perform case-insensitive search
- kwargs: Filter attributes
Returns:
User object if found, None otherwise
"""
def toggle_active(self, user):
"""
Toggle user active status.
Parameters:
- user: User object to toggle
Returns:
Updated user object
"""
class SQLAlchemySessionUserDatastore(SQLAlchemyUserDatastore):
"""
SQLAlchemy datastore with explicit session management.
"""
def __init__(self, session, user_model, role_model):
"""
Initialize SQLAlchemy session datastore.
Parameters:
- session: SQLAlchemy session object
- user_model: SQLAlchemy user model class
- role_model: SQLAlchemy role model class
"""
class FSQLALiteUserDatastore(UserDatastore):
"""
Lightweight SQLAlchemy datastore for Flask-SQLAlchemy-Lite.
"""
def __init__(self, db, user_model, role_model):
"""
Initialize Flask-SQLAlchemy-Lite datastore.
Parameters:
- db: Flask-SQLAlchemy-Lite database instance
- user_model: User model class
- role_model: Role model class
"""Datastores for other popular Python ORMs providing flexible database backend options.
class MongoEngineUserDatastore(UserDatastore):
"""
MongoDB/MongoEngine integration datastore.
"""
def __init__(self, db, user_model, role_model=None):
"""
Initialize MongoEngine datastore.
Parameters:
- db: MongoEngine database connection
- user_model: MongoEngine user document class
- role_model: MongoEngine role document class (optional)
"""
def get_user(self, identifier):
"""Get user from MongoDB using MongoEngine queries."""
def find_user(self, case_insensitive=False, **kwargs):
"""
Find user in MongoDB with case-insensitive option.
Parameters:
- case_insensitive: Enable case-insensitive search
- kwargs: Query filters
Returns:
User document if found, None otherwise
"""
class PeeweeUserDatastore(UserDatastore):
"""
Peewee ORM integration datastore.
"""
def __init__(self, db, user_model, role_model=None, role_link=None):
"""
Initialize Peewee datastore.
Parameters:
- db: Peewee database instance
- user_model: Peewee user model class
- role_model: Peewee role model class (optional)
- role_link: Peewee many-to-many link model (optional)
"""
def get_user(self, identifier):
"""Get user using Peewee ORM queries."""
def find_user(self, **kwargs):
"""
Find user with Peewee query filters.
Parameters:
- kwargs: Query filter attributes
Returns:
User model instance if found, None otherwise
"""
class PonyUserDatastore(UserDatastore):
"""
Pony ORM integration datastore.
"""
def __init__(self, db, user_model, role_model=None):
"""
Initialize Pony ORM datastore.
Parameters:
- db: Pony database instance
- user_model: Pony user entity class
- role_model: Pony role entity class (optional)
"""
def get_user(self, identifier):
"""Get user using Pony ORM queries."""
def find_user(self, **kwargs):
"""
Find user with Pony ORM query syntax.
Parameters:
- kwargs: Entity attribute filters
Returns:
User entity if found, None otherwise
"""Utility classes for database compatibility and data type management.
class AsaList:
"""
Custom list type for database storage compatibility.
Handles serialization/deserialization of list data for database storage.
"""
def __init__(self, data=None):
"""
Initialize AsaList with optional data.
Parameters:
- data: Initial list data (optional)
"""
def append(self, item):
"""
Append item to list.
Parameters:
- item: Item to append
"""
def remove(self, item):
"""
Remove item from list.
Parameters:
- item: Item to remove
"""
def __contains__(self, item):
"""Check if item is in list."""
def __iter__(self):
"""Iterate over list items."""from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_security import Security, SQLAlchemyUserDatastore, UserMixin, RoleMixin
app = Flask(__name__)
app.config['SECRET_KEY'] = 'super-secret'
app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql://user:pass@localhost/mydb'
db = SQLAlchemy(app)
# Define many-to-many relationship table
roles_users = db.Table('roles_users',
db.Column('user_id', db.Integer(), db.ForeignKey('user.id')),
db.Column('role_id', db.Integer(), db.ForeignKey('role.id'))
)
# Define User model
class User(db.Model, UserMixin):
__tablename__ = 'user'
id = db.Column(db.Integer, primary_key=True)
email = db.Column(db.String(255), unique=True, nullable=False)
username = db.Column(db.String(80), unique=True, nullable=True)
password = db.Column(db.String(255), nullable=False)
active = db.Column(db.Boolean(), default=True)
confirmed_at = db.Column(db.DateTime())
# Two-factor authentication fields
tf_totp_secret = db.Column(db.String(255))
tf_primary_method = db.Column(db.String(20))
# Unified signin fields
us_phone_number = db.Column(db.String(20))
us_totp_secrets = db.Column(db.Text) # JSON field
# Relationships
roles = db.relationship('Role', secondary=roles_users,
backref=db.backref('users', lazy='dynamic'))
# Define Role model
class Role(db.Model, RoleMixin):
__tablename__ = 'role'
id = db.Column(db.Integer(), primary_key=True)
name = db.Column(db.String(80), unique=True, nullable=False)
description = db.Column(db.String(255))
# Initialize Flask-Security with SQLAlchemy datastore
user_datastore = SQLAlchemyUserDatastore(db, User, Role)
security = Security(app, user_datastore)
# Create tables
with app.app_context():
db.create_all()from flask import Flask
from flask_mongoengine import MongoEngine
from flask_security import Security, MongoEngineUserDatastore, UserMixin, RoleMixin
app = Flask(__name__)
app.config['SECRET_KEY'] = 'super-secret'
app.config['MONGODB_DB'] = 'mydatabase'
app.config['MONGODB_HOST'] = 'localhost'
app.config['MONGODB_PORT'] = 27017
db = MongoEngine(app)
# Define Role document
class Role(db.Document, RoleMixin):
name = db.StringField(max_length=80, unique=True, required=True)
description = db.StringField(max_length=255)
# Define User document
class User(db.Document, UserMixin):
email = db.EmailField(unique=True, required=True)
username = db.StringField(max_length=80, unique=True)
password = db.StringField(max_length=255, required=True)
active = db.BooleanField(default=True)
confirmed_at = db.DateTimeField()
# Two-factor fields
tf_totp_secret = db.StringField(max_length=255)
tf_primary_method = db.StringField(max_length=20)
# Role relationship
roles = db.ListField(db.ReferenceField(Role), default=[])
# Initialize Flask-Security with MongoEngine datastore
user_datastore = MongoEngineUserDatastore(db, User, Role)
security = Security(app, user_datastore)from flask import Flask
from peewee import *
from flask_security import Security, PeeweeUserDatastore, UserMixin, RoleMixin
app = Flask(__name__)
app.config['SECRET_KEY'] = 'super-secret'
# Initialize Peewee database
database = SqliteDatabase('myapp.db')
# Define base model
class BaseModel(Model):
class Meta:
database = database
# Define Role model
class Role(BaseModel, RoleMixin):
name = CharField(unique=True, max_length=80)
description = CharField(max_length=255, null=True)
# Define User model
class User(BaseModel, UserMixin):
email = CharField(unique=True, max_length=255)
username = CharField(unique=True, max_length=80, null=True)
password = CharField(max_length=255)
active = BooleanField(default=True)
confirmed_at = DateTimeField(null=True)
# Two-factor fields
tf_totp_secret = CharField(max_length=255, null=True)
tf_primary_method = CharField(max_length=20, null=True)
# Define many-to-many relationship
class UserRole(BaseModel):
user = ForeignKeyField(User, backref='user_roles')
role = ForeignKeyField(Role, backref='role_users')
class Meta:
indexes = (
(('user', 'role'), True), # Unique constraint
)
# Initialize Flask-Security with Peewee datastore
user_datastore = PeeweeUserDatastore(database, User, Role, UserRole)
security = Security(app, user_datastore)
# Create tables
with app.app_context():
database.create_tables([User, Role, UserRole], safe=True)from flask import Flask
from pony.orm import *
from flask_security import Security, PonyUserDatastore, UserMixin, RoleMixin
app = Flask(__name__)
app.config['SECRET_KEY'] = 'super-secret'
# Initialize Pony database
db = Database()
# Define User entity
class User(db.Entity, UserMixin):
_table_ = 'user'
id = PrimaryKey(int, auto=True)
email = Required(str, unique=True, max_len=255)
username = Optional(str, unique=True, max_len=80)
password = Required(str, max_len=255)
active = Required(bool, default=True)
confirmed_at = Optional(datetime)
# Two-factor fields
tf_totp_secret = Optional(str, max_len=255)
tf_primary_method = Optional(str, max_len=20)
# Role relationship
roles = Set('Role')
# Define Role entity
class Role(db.Entity, RoleMixin):
_table_ = 'role'
id = PrimaryKey(int, auto=True)
name = Required(str, unique=True, max_len=80)
description = Optional(str, max_len=255)
# User relationship
users = Set(User)
# Bind database and generate mapping
db.bind('sqlite', 'myapp.db')
db.generate_mapping(create_tables=True)
# Initialize Flask-Security with Pony datastore
user_datastore = PonyUserDatastore(db, User, Role)
security = Security(app, user_datastore)from flask_security import current_user
@app.route('/admin/users')
def list_users():
"""List all users using datastore."""
users = user_datastore.user_model.query.all() # SQLAlchemy example
return render_template('admin_users.html', users=users)
@app.route('/admin/create-user', methods=['POST'])
def create_user():
"""Create user via datastore."""
email = request.form.get('email')
password = request.form.get('password')
# Create user using datastore
user = user_datastore.create_user(
email=email,
password=hash_password(password),
active=True
)
# Assign default role
default_role = user_datastore.find_role('user')
if default_role:
user_datastore.add_role_to_user(user, default_role)
user_datastore.commit()
flash(f'User {email} created successfully')
return redirect(url_for('list_users'))
@app.route('/admin/toggle-user/<int:user_id>')
def toggle_user_active(user_id):
"""Toggle user active status."""
user = user_datastore.get_user(user_id)
if user:
if hasattr(user_datastore, 'toggle_active'):
# SQLAlchemy datastore has toggle_active method
user_datastore.toggle_active(user)
else:
# Manual toggle for other datastores
user.active = not user.active
user_datastore.put(user)
user_datastore.commit()
flash(f'User {user.email} {"activated" if user.active else "deactivated"}')
return redirect(url_for('list_users'))@app.route('/admin/setup-roles')
def setup_roles():
"""Initialize default roles and permissions."""
# Create roles if they don't exist
roles_to_create = [
('admin', 'Administrator with full access'),
('moderator', 'Content moderator'),
('user', 'Regular user'),
]
for role_name, description in roles_to_create:
if not user_datastore.find_role(role_name):
user_datastore.create_role(
name=role_name,
description=description
)
user_datastore.commit()
flash('Default roles created')
return redirect(url_for('admin_dashboard'))
@app.route('/admin/assign-role/<int:user_id>/<role_name>')
def assign_role(user_id, role_name):
"""Assign role to user."""
user = user_datastore.get_user(user_id)
role = user_datastore.find_role(role_name)
if user and role:
if user_datastore.add_role_to_user(user, role):
user_datastore.commit()
flash(f'Role {role_name} assigned to {user.email}')
else:
flash(f'{user.email} already has role {role_name}')
else:
flash('User or role not found')
return redirect(url_for('list_users'))def migrate_user_data():
"""Example migration function for updating user data."""
# Add new field to existing users (example)
for user in user_datastore.user_model.query.all():
if not hasattr(user, 'created_at') or user.created_at is None:
user.created_at = datetime.utcnow()
user_datastore.put(user)
user_datastore.commit()
def cleanup_inactive_users():
"""Clean up old inactive user accounts."""
from datetime import datetime, timedelta
cutoff_date = datetime.utcnow() - timedelta(days=365)
# Find inactive users older than cutoff
if hasattr(user_datastore, 'find_user'):
# Use datastore method if available
inactive_users = user_datastore.user_model.query.filter(
user_datastore.user_model.active == False,
user_datastore.user_model.confirmed_at < cutoff_date
).all()
else:
# Manual query for other datastores
inactive_users = []
for user in inactive_users:
print(f"Would delete inactive user: {user.email}")
# user_datastore.delete_user(user) # Uncomment to actually delete
# user_datastore.commit() # Uncomment when ready to commit changes# SQLAlchemy configurations
app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql://user:pass@localhost/mydb'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SQLALCHEMY_ENGINE_OPTIONS'] = {
'pool_size': 10,
'pool_recycle': 3600,
'pool_pre_ping': True
}
# MongoEngine configurations
app.config['MONGODB_SETTINGS'] = {
'db': 'mydatabase',
'host': 'localhost',
'port': 27017,
'username': 'myuser',
'password': 'mypassword',
'authentication_source': 'admin'
}
# Peewee configurations (handled in code)
database_config = {
'engine': 'peewee.PostgresqlDatabase',
'name': 'mydatabase',
'user': 'myuser',
'password': 'mypassword',
'host': 'localhost',
'port': 5432
}# Configure Flask-Security model field names
app.config['SECURITY_USER_IDENTITY_ATTRIBUTES'] = ['email', 'username']
app.config['SECURITY_USERNAME_ENABLE'] = True
app.config['SECURITY_USERNAME_REQUIRED'] = False
# Password configuration
app.config['SECURITY_PASSWORD_HASH'] = 'bcrypt'
app.config['SECURITY_PASSWORD_SALT'] = 'your-password-salt'
# Role configuration
app.config['SECURITY_ROLE_HIERARCHY'] = {
'admin': ['moderator', 'user'],
'moderator': ['user']
}