Pony Object-Relational Mapper for Python with Pythonic query syntax using generator expressions
—
Optional security framework for implementing row-level security and user-based access control in database operations. These functions provide a foundation for building secure applications with fine-grained access control.
Functions for managing the current user context in security-aware operations.
def set_current_user(user):
"""Set current user for permission system.
Args:
user: User object or identifier to set as current user
Sets the user context for subsequent permission checks and
security-filtered queries. Must be called within db_session.
Usage:
set_current_user(user_instance)
set_current_user(user_id)
"""
def get_current_user():
"""Get current user for permission checks.
Returns:
Current user object or identifier, or None if not set
Usage:
current_user = get_current_user()
if current_user:
# User-specific operations
"""Functions for checking user permissions and implementing access control logic.
def has_perm(entity, permission):
"""Check if current user has specific permission on entity.
Args:
entity: Entity instance or class to check permission for
permission: Permission name string
Returns:
bool: True if user has permission, False otherwise
Usage:
if has_perm(document, 'read'):
# User can read this document
if has_perm(User, 'create'):
# User can create new users
"""
def perm(permission_name):
"""Define permission requirements on entities (decorator/filter).
Args:
permission_name: Name of required permission
Returns:
Permission specification for entity or query
Usage:
# As entity decorator
@perm('admin')
class AdminOnlyEntity(db.Entity):
pass
# In queries
secure_docs = select(d for d in Document if perm('read'))
"""Functions for retrieving user groups and roles for permission evaluation.
def get_user_groups():
"""Get groups that current user belongs to.
Returns:
List of group identifiers or objects
Usage:
user_groups = get_user_groups()
if 'admin' in user_groups:
# Admin operations
"""
def get_user_roles(obj=None):
"""Get user roles for specific object or globally.
Args:
obj: Object to get roles for (optional)
Returns:
List of role identifiers for the object or globally
Usage:
global_roles = get_user_roles()
document_roles = get_user_roles(document)
"""
def get_object_labels(obj):
"""Get security labels for object.
Args:
obj: Object to get security labels for
Returns:
List of security label identifiers
Usage:
labels = get_object_labels(document)
if 'confidential' in labels:
# Handle confidential document
"""Decorator functions for registering custom permission providers.
def user_groups_getter(func):
"""Register custom user groups getter function.
Args:
func: Function that returns user groups for current user
Returns:
Decorated function
Usage:
@user_groups_getter
def get_my_user_groups():
user = get_current_user()
return [g.name for g in user.groups]
"""
def user_roles_getter(func):
"""Register custom user roles getter function.
Args:
func: Function that returns user roles for objects
Returns:
Decorated function
Usage:
@user_roles_getter
def get_my_user_roles(obj=None):
user = get_current_user()
if obj:
return user.get_roles_for(obj)
return user.global_roles
"""
def obj_labels_getter(func):
"""Register custom object labels getter function.
Args:
func: Function that returns security labels for objects
Returns:
Decorated function
Usage:
@obj_labels_getter
def get_my_object_labels(obj):
return obj.security_labels or []
"""from pony.orm import *
db = Database()
class User(db.Entity):
username = Required(str, unique=True)
email = Required(str, unique=True)
is_admin = Required(bool, default=False)
groups = Set('Group')
class Group(db.Entity):
name = Required(str, unique=True)
users = Set(User)
class Document(db.Entity):
title = Required(str)
content = Required(str)
owner = Required(User)
is_public = Required(bool, default=False)
db.bind('sqlite', filename='security_example.db')
db.generate_mapping(create_tables=True)
# Set up user context and check permissions
with db_session:
# Create test data
admin_group = Group(name='admin')
user_group = Group(name='user')
admin_user = User(username='admin', email='admin@example.com',
is_admin=True, groups=[admin_group])
regular_user = User(username='user1', email='user1@example.com',
groups=[user_group])
doc1 = Document(title='Public Doc', content='Public content',
owner=admin_user, is_public=True)
doc2 = Document(title='Private Doc', content='Private content',
owner=admin_user, is_public=False)
# Example permission checking
with db_session:
# Set current user context
user = User.get(username='user1')
set_current_user(user)
# Check current user
current = get_current_user()
print(f"Current user: {current.username}")
# Permission-based document access
def can_read_document(doc):
current_user = get_current_user()
if not current_user:
return False
# Public documents are readable by all
if doc.is_public:
return True
# Own documents are always readable
if doc.owner == current_user:
return True
# Admins can read everything
if current_user.is_admin:
return True
return False
# Check access to documents
all_docs = Document.select()
for doc in all_docs:
can_read = can_read_document(doc)
print(f"Can read '{doc.title}': {can_read}")# Custom permission providers
@user_groups_getter
def get_my_user_groups():
"""Custom implementation of user groups."""
user = get_current_user()
if not user:
return []
return [g.name for g in user.groups]
@user_roles_getter
def get_my_user_roles(obj=None):
"""Custom implementation of user roles."""
user = get_current_user()
if not user:
return []
roles = []
# Global roles
if user.is_admin:
roles.append('admin')
roles.append('user')
# Object-specific roles
if obj and hasattr(obj, 'owner') and obj.owner == user:
roles.append('owner')
return roles
@obj_labels_getter
def get_my_object_labels(obj):
"""Custom implementation of object security labels."""
labels = []
if hasattr(obj, 'is_public') and not obj.is_public:
labels.append('private')
if hasattr(obj, 'is_confidential') and obj.is_confidential:
labels.append('confidential')
return labels
# Use custom permission system
with db_session:
set_current_user(regular_user)
# Get user groups using custom provider
user_groups = get_user_groups()
print(f"User groups: {user_groups}")
# Get user roles using custom provider
global_roles = get_user_roles()
print(f"Global roles: {global_roles}")
# Get object-specific roles
doc_roles = get_user_roles(doc1)
print(f"Roles for doc1: {doc_roles}")
# Get object security labels
doc_labels = get_object_labels(doc2)
print(f"Labels for doc2: {doc_labels}")# Enhanced entity with permission support
class SecureDocument(db.Entity):
title = Required(str)
content = Required(str)
owner = Required(User)
is_public = Required(bool, default=False)
is_confidential = Required(bool, default=False)
allowed_groups = Set(Group)
# Permission-aware query functions
def get_readable_documents():
"""Get documents current user can read."""
current_user = get_current_user()
if not current_user:
return []
if current_user.is_admin:
# Admins can read everything
return SecureDocument.select()
# Regular users can read:
# 1. Public documents
# 2. Their own documents
# 3. Documents their groups have access to
user_groups = set(g.name for g in current_user.groups)
readable_docs = select(d for d in SecureDocument
if d.is_public
or d.owner == current_user
or exists(g for g in d.allowed_groups
if g.name in user_groups))
return readable_docs
def get_writable_documents():
"""Get documents current user can modify."""
current_user = get_current_user()
if not current_user:
return []
if current_user.is_admin:
return SecureDocument.select()
# Users can only modify their own documents
return select(d for d in SecureDocument if d.owner == current_user)
# Row-level security implementation
with db_session:
# Create test documents with different permissions
public_doc = SecureDocument(title='Public Info', content='Public content',
owner=admin_user, is_public=True)
private_doc = SecureDocument(title='Private Note', content='Private content',
owner=admin_user, is_public=False)
group_doc = SecureDocument(title='Group Document', content='Group content',
owner=admin_user, is_public=False,
allowed_groups=[user_group])
# Test as regular user
set_current_user(regular_user)
readable = list(get_readable_documents())
writable = list(get_writable_documents())
print(f"User can read {len(readable)} documents")
print(f"User can write {len(writable)} documents")
# Test as admin
set_current_user(admin_user)
readable_admin = list(get_readable_documents())
writable_admin = list(get_writable_documents())
print(f"Admin can read {len(readable_admin)} documents")
print(f"Admin can write {len(writable_admin)} documents")from flask import Flask, session, request, jsonify, g
from functools import wraps
app = Flask(__name__)
def login_required(f):
"""Decorator to require authentication."""
@wraps(f)
def decorated_function(*args, **kwargs):
if 'user_id' not in session:
return jsonify({'error': 'Authentication required'}), 401
return f(*args, **kwargs)
return decorated_function
def permission_required(permission):
"""Decorator to require specific permission."""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not has_current_permission(permission):
return jsonify({'error': 'Permission denied'}), 403
return f(*args, **kwargs)
return decorated_function
return decorator
@app.before_request
def load_user():
"""Load user context before each request."""
if 'user_id' in session:
with db_session:
user = User.get(id=session['user_id'])
if user:
set_current_user(user)
g.current_user = user
def has_current_permission(permission):
"""Check if current user has permission."""
if not hasattr(g, 'current_user') or not g.current_user:
return False
# Custom permission logic
if permission == 'admin' and g.current_user.is_admin:
return True
if permission == 'user':
return True
return False
# Protected routes
@app.route('/documents')
@login_required
def list_documents():
with db_session:
docs = list(get_readable_documents())
return jsonify([{'id': d.id, 'title': d.title} for d in docs])
@app.route('/documents', methods=['POST'])
@login_required
@permission_required('user')
def create_document():
data = request.get_json()
with db_session:
doc = SecureDocument(
title=data['title'],
content=data['content'],
owner=g.current_user,
is_public=data.get('is_public', False)
)
return jsonify({'id': doc.id, 'title': doc.title})
@app.route('/admin/documents')
@login_required
@permission_required('admin')
def admin_list_documents():
with db_session:
# Admins see all documents
docs = list(SecureDocument.select())
return jsonify([{
'id': d.id,
'title': d.title,
'owner': d.owner.username,
'is_public': d.is_public
} for d in docs])class AuditLog(db.Entity):
timestamp = Required(datetime, default=datetime.now)
user = Optional(User)
action = Required(str)
entity_type = Required(str)
entity_id = Optional(int)
details = Optional(str)
def audit_action(action, entity=None, details=None):
"""Log security-sensitive actions."""
current_user = get_current_user()
AuditLog(
user=current_user,
action=action,
entity_type=entity.__class__.__name__ if entity else 'System',
entity_id=entity.id if entity and hasattr(entity, 'id') else None,
details=details
)
# Usage in secure operations
with db_session:
set_current_user(regular_user)
# Audit document access
doc = SecureDocument.get(id=1)
if can_read_document(doc):
audit_action('document_read', doc)
content = doc.content
else:
audit_action('document_access_denied', doc, 'Insufficient permissions')
# Audit document creation
new_doc = SecureDocument(title='New Doc', content='Content', owner=regular_user)
audit_action('document_created', new_doc)
# View audit trail
recent_audits = select(a for a in AuditLog
if a.timestamp >= datetime.now() - timedelta(hours=24))
for audit in recent_audits:
user_name = audit.user.username if audit.user else 'System'
print(f"{audit.timestamp}: {user_name} - {audit.action} on {audit.entity_type}")