CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pony

Pony Object-Relational Mapper for Python with Pythonic query syntax using generator expressions

Pending
Overview
Eval results
Files

security-permissions.mddocs/

Security and Permissions

Optional security framework for implementing row-level security and user-based access control in database operations. These functions provide a foundation for building secure applications with fine-grained access control.

Capabilities

User Context Management

Functions for managing the current user context in security-aware operations.

def set_current_user(user):
    """Set current user for permission system.
    
    Args:
        user: User object or identifier to set as current user
        
    Sets the user context for subsequent permission checks and
    security-filtered queries. Must be called within db_session.
    
    Usage:
        set_current_user(user_instance)
        set_current_user(user_id)
    """

def get_current_user():
    """Get current user for permission checks.
    
    Returns:
        Current user object or identifier, or None if not set
        
    Usage:
        current_user = get_current_user()
        if current_user:
            # User-specific operations
    """

Permission Checking Functions

Functions for checking user permissions and implementing access control logic.

def has_perm(entity, permission):
    """Check if current user has specific permission on entity.
    
    Args:
        entity: Entity instance or class to check permission for
        permission: Permission name string
        
    Returns:
        bool: True if user has permission, False otherwise
        
    Usage:
        if has_perm(document, 'read'):
            # User can read this document
        
        if has_perm(User, 'create'):
            # User can create new users
    """

def perm(permission_name):
    """Define permission requirements on entities (decorator/filter).
    
    Args:
        permission_name: Name of required permission
        
    Returns:
        Permission specification for entity or query
        
    Usage:
        # As entity decorator
        @perm('admin')
        class AdminOnlyEntity(db.Entity):
            pass
            
        # In queries  
        secure_docs = select(d for d in Document if perm('read'))
    """

Group and Role Management

Functions for retrieving user groups and roles for permission evaluation.

def get_user_groups():
    """Get groups that current user belongs to.
    
    Returns:
        List of group identifiers or objects
        
    Usage:
        user_groups = get_user_groups()
        if 'admin' in user_groups:
            # Admin operations
    """

def get_user_roles(obj=None):
    """Get user roles for specific object or globally.
    
    Args:
        obj: Object to get roles for (optional)
        
    Returns:
        List of role identifiers for the object or globally
        
    Usage:
        global_roles = get_user_roles()
        document_roles = get_user_roles(document)
    """

def get_object_labels(obj):
    """Get security labels for object.
    
    Args:
        obj: Object to get security labels for
        
    Returns:
        List of security label identifiers
        
    Usage:
        labels = get_object_labels(document)
        if 'confidential' in labels:
            # Handle confidential document
    """

Custom Provider Registration

Decorator functions for registering custom permission providers.

def user_groups_getter(func):
    """Register custom user groups getter function.
    
    Args:
        func: Function that returns user groups for current user
        
    Returns:
        Decorated function
        
    Usage:
        @user_groups_getter
        def get_my_user_groups():
            user = get_current_user()
            return [g.name for g in user.groups]
    """

def user_roles_getter(func):
    """Register custom user roles getter function.
    
    Args:
        func: Function that returns user roles for objects
        
    Returns:
        Decorated function
        
    Usage:
        @user_roles_getter
        def get_my_user_roles(obj=None):
            user = get_current_user()
            if obj:
                return user.get_roles_for(obj)
            return user.global_roles
    """

def obj_labels_getter(func):
    """Register custom object labels getter function.
    
    Args:
        func: Function that returns security labels for objects
        
    Returns:
        Decorated function
        
    Usage:
        @obj_labels_getter
        def get_my_object_labels(obj):
            return obj.security_labels or []
    """

Usage Examples

Basic User Context and Permissions

from pony.orm import *

db = Database()

class User(db.Entity):
    username = Required(str, unique=True)
    email = Required(str, unique=True)
    is_admin = Required(bool, default=False)
    groups = Set('Group')

class Group(db.Entity):
    name = Required(str, unique=True)
    users = Set(User)

class Document(db.Entity):
    title = Required(str)
    content = Required(str)
    owner = Required(User)
    is_public = Required(bool, default=False)

db.bind('sqlite', filename='security_example.db')
db.generate_mapping(create_tables=True)

# Set up user context and check permissions
with db_session:
    # Create test data
    admin_group = Group(name='admin')
    user_group = Group(name='user')
    
    admin_user = User(username='admin', email='admin@example.com', 
                     is_admin=True, groups=[admin_group])
    regular_user = User(username='user1', email='user1@example.com',
                       groups=[user_group])
    
    doc1 = Document(title='Public Doc', content='Public content',
                   owner=admin_user, is_public=True)
    doc2 = Document(title='Private Doc', content='Private content',
                   owner=admin_user, is_public=False)

# Example permission checking
with db_session:
    # Set current user context
    user = User.get(username='user1')
    set_current_user(user)
    
    # Check current user
    current = get_current_user()
    print(f"Current user: {current.username}")
    
    # Permission-based document access
    def can_read_document(doc):
        current_user = get_current_user()
        if not current_user:
            return False
            
        # Public documents are readable by all
        if doc.is_public:
            return True
            
        # Own documents are always readable
        if doc.owner == current_user:
            return True
            
        # Admins can read everything
        if current_user.is_admin:
            return True
            
        return False
    
    # Check access to documents
    all_docs = Document.select()
    for doc in all_docs:
        can_read = can_read_document(doc)
        print(f"Can read '{doc.title}': {can_read}")

Custom Permission System Implementation

# Custom permission providers
@user_groups_getter
def get_my_user_groups():
    """Custom implementation of user groups."""
    user = get_current_user()
    if not user:
        return []
    return [g.name for g in user.groups]

@user_roles_getter  
def get_my_user_roles(obj=None):
    """Custom implementation of user roles."""
    user = get_current_user()
    if not user:
        return []
    
    roles = []
    
    # Global roles
    if user.is_admin:
        roles.append('admin')
    roles.append('user')
    
    # Object-specific roles
    if obj and hasattr(obj, 'owner') and obj.owner == user:
        roles.append('owner')
        
    return roles

@obj_labels_getter
def get_my_object_labels(obj):
    """Custom implementation of object security labels."""
    labels = []
    
    if hasattr(obj, 'is_public') and not obj.is_public:
        labels.append('private')
        
    if hasattr(obj, 'is_confidential') and obj.is_confidential:
        labels.append('confidential')
        
    return labels

# Use custom permission system
with db_session:
    set_current_user(regular_user)
    
    # Get user groups using custom provider
    user_groups = get_user_groups()
    print(f"User groups: {user_groups}")
    
    # Get user roles using custom provider
    global_roles = get_user_roles()
    print(f"Global roles: {global_roles}")
    
    # Get object-specific roles
    doc_roles = get_user_roles(doc1)
    print(f"Roles for doc1: {doc_roles}")
    
    # Get object security labels
    doc_labels = get_object_labels(doc2)
    print(f"Labels for doc2: {doc_labels}")

Advanced Permission-Based Queries

# Enhanced entity with permission support
class SecureDocument(db.Entity):
    title = Required(str)
    content = Required(str)
    owner = Required(User)
    is_public = Required(bool, default=False)
    is_confidential = Required(bool, default=False)
    allowed_groups = Set(Group)

# Permission-aware query functions
def get_readable_documents():
    """Get documents current user can read."""
    current_user = get_current_user()
    if not current_user:
        return []
    
    if current_user.is_admin:
        # Admins can read everything
        return SecureDocument.select()
    
    # Regular users can read:
    # 1. Public documents
    # 2. Their own documents  
    # 3. Documents their groups have access to
    user_groups = set(g.name for g in current_user.groups)
    
    readable_docs = select(d for d in SecureDocument 
                          if d.is_public 
                          or d.owner == current_user
                          or exists(g for g in d.allowed_groups 
                                   if g.name in user_groups))
    
    return readable_docs

def get_writable_documents():
    """Get documents current user can modify."""
    current_user = get_current_user()
    if not current_user:
        return []
    
    if current_user.is_admin:
        return SecureDocument.select()
    
    # Users can only modify their own documents
    return select(d for d in SecureDocument if d.owner == current_user)

# Row-level security implementation
with db_session:
    # Create test documents with different permissions
    public_doc = SecureDocument(title='Public Info', content='Public content',
                               owner=admin_user, is_public=True)
    
    private_doc = SecureDocument(title='Private Note', content='Private content',
                                owner=admin_user, is_public=False)
    
    group_doc = SecureDocument(title='Group Document', content='Group content',
                              owner=admin_user, is_public=False,
                              allowed_groups=[user_group])
    
    # Test as regular user
    set_current_user(regular_user)
    
    readable = list(get_readable_documents())
    writable = list(get_writable_documents())
    
    print(f"User can read {len(readable)} documents")
    print(f"User can write {len(writable)} documents")
    
    # Test as admin
    set_current_user(admin_user)
    
    readable_admin = list(get_readable_documents())
    writable_admin = list(get_writable_documents())
    
    print(f"Admin can read {len(readable_admin)} documents")
    print(f"Admin can write {len(writable_admin)} documents")

Integration with Web Framework Authentication

from flask import Flask, session, request, jsonify, g
from functools import wraps

app = Flask(__name__)

def login_required(f):
    """Decorator to require authentication."""
    @wraps(f)
    def decorated_function(*args, **kwargs):
        if 'user_id' not in session:
            return jsonify({'error': 'Authentication required'}), 401
        return f(*args, **kwargs)
    return decorated_function

def permission_required(permission):
    """Decorator to require specific permission."""
    def decorator(f):
        @wraps(f)
        def decorated_function(*args, **kwargs):
            if not has_current_permission(permission):
                return jsonify({'error': 'Permission denied'}), 403
            return f(*args, **kwargs)
        return decorated_function
    return decorator

@app.before_request
def load_user():
    """Load user context before each request."""
    if 'user_id' in session:
        with db_session:
            user = User.get(id=session['user_id'])
            if user:
                set_current_user(user)
                g.current_user = user

def has_current_permission(permission):
    """Check if current user has permission."""
    if not hasattr(g, 'current_user') or not g.current_user:
        return False
        
    # Custom permission logic
    if permission == 'admin' and g.current_user.is_admin:
        return True
        
    if permission == 'user':
        return True
        
    return False

# Protected routes
@app.route('/documents')
@login_required
def list_documents():
    with db_session:
        docs = list(get_readable_documents())
        return jsonify([{'id': d.id, 'title': d.title} for d in docs])

@app.route('/documents', methods=['POST'])
@login_required
@permission_required('user')
def create_document():
    data = request.get_json()
    
    with db_session:
        doc = SecureDocument(
            title=data['title'],
            content=data['content'],
            owner=g.current_user,
            is_public=data.get('is_public', False)
        )
        return jsonify({'id': doc.id, 'title': doc.title})

@app.route('/admin/documents')
@login_required
@permission_required('admin')
def admin_list_documents():
    with db_session:
        # Admins see all documents
        docs = list(SecureDocument.select())
        return jsonify([{
            'id': d.id, 
            'title': d.title, 
            'owner': d.owner.username,
            'is_public': d.is_public
        } for d in docs])

Audit Logging with Security Context

class AuditLog(db.Entity):
    timestamp = Required(datetime, default=datetime.now)
    user = Optional(User)
    action = Required(str)
    entity_type = Required(str)
    entity_id = Optional(int)
    details = Optional(str)

def audit_action(action, entity=None, details=None):
    """Log security-sensitive actions."""
    current_user = get_current_user()
    
    AuditLog(
        user=current_user,
        action=action,
        entity_type=entity.__class__.__name__ if entity else 'System',
        entity_id=entity.id if entity and hasattr(entity, 'id') else None,
        details=details
    )

# Usage in secure operations
with db_session:
    set_current_user(regular_user)
    
    # Audit document access
    doc = SecureDocument.get(id=1)
    if can_read_document(doc):
        audit_action('document_read', doc)
        content = doc.content
    else:
        audit_action('document_access_denied', doc, 'Insufficient permissions')
    
    # Audit document creation
    new_doc = SecureDocument(title='New Doc', content='Content', owner=regular_user)
    audit_action('document_created', new_doc)
    
    # View audit trail
    recent_audits = select(a for a in AuditLog 
                          if a.timestamp >= datetime.now() - timedelta(hours=24))
    
    for audit in recent_audits:
        user_name = audit.user.username if audit.user else 'System'
        print(f"{audit.timestamp}: {user_name} - {audit.action} on {audit.entity_type}")

Install with Tessl CLI

npx tessl i tessl/pypi-pony

docs

aggregations-helpers.md

attributes-relationships.md

data-types.md

database-entities.md

debugging-utilities.md

exception-handling.md

framework-integrations.md

index.md

query-operations.md

security-permissions.md

session-management.md

tile.json