0
# Security and Permissions
1
2
Optional security framework for implementing row-level security and user-based access control in database operations. These functions provide a foundation for building secure applications with fine-grained access control.
3
4
## Capabilities
5
6
### User Context Management
7
8
Functions for managing the current user context in security-aware operations.
9
10
```python { .api }
11
def set_current_user(user):
12
"""Set current user for permission system.
13
14
Args:
15
user: User object or identifier to set as current user
16
17
Sets the user context for subsequent permission checks and
18
security-filtered queries. Must be called within db_session.
19
20
Usage:
21
set_current_user(user_instance)
22
set_current_user(user_id)
23
"""
24
25
def get_current_user():
26
"""Get current user for permission checks.
27
28
Returns:
29
Current user object or identifier, or None if not set
30
31
Usage:
32
current_user = get_current_user()
33
if current_user:
34
# User-specific operations
35
"""
36
```
37
38
### Permission Checking Functions
39
40
Functions for checking user permissions and implementing access control logic.
41
42
```python { .api }
43
def has_perm(entity, permission):
44
"""Check if current user has specific permission on entity.
45
46
Args:
47
entity: Entity instance or class to check permission for
48
permission: Permission name string
49
50
Returns:
51
bool: True if user has permission, False otherwise
52
53
Usage:
54
if has_perm(document, 'read'):
55
# User can read this document
56
57
if has_perm(User, 'create'):
58
# User can create new users
59
"""
60
61
def perm(permission_name):
62
"""Define permission requirements on entities (decorator/filter).
63
64
Args:
65
permission_name: Name of required permission
66
67
Returns:
68
Permission specification for entity or query
69
70
Usage:
71
# As entity decorator
72
@perm('admin')
73
class AdminOnlyEntity(db.Entity):
74
pass
75
76
# In queries
77
secure_docs = select(d for d in Document if perm('read'))
78
"""
79
```
80
81
### Group and Role Management
82
83
Functions for retrieving user groups and roles for permission evaluation.
84
85
```python { .api }
86
def get_user_groups():
87
"""Get groups that current user belongs to.
88
89
Returns:
90
List of group identifiers or objects
91
92
Usage:
93
user_groups = get_user_groups()
94
if 'admin' in user_groups:
95
# Admin operations
96
"""
97
98
def get_user_roles(obj=None):
99
"""Get user roles for specific object or globally.
100
101
Args:
102
obj: Object to get roles for (optional)
103
104
Returns:
105
List of role identifiers for the object or globally
106
107
Usage:
108
global_roles = get_user_roles()
109
document_roles = get_user_roles(document)
110
"""
111
112
def get_object_labels(obj):
113
"""Get security labels for object.
114
115
Args:
116
obj: Object to get security labels for
117
118
Returns:
119
List of security label identifiers
120
121
Usage:
122
labels = get_object_labels(document)
123
if 'confidential' in labels:
124
# Handle confidential document
125
"""
126
```
127
128
### Custom Provider Registration
129
130
Decorator functions for registering custom permission providers.
131
132
```python { .api }
133
def user_groups_getter(func):
134
"""Register custom user groups getter function.
135
136
Args:
137
func: Function that returns user groups for current user
138
139
Returns:
140
Decorated function
141
142
Usage:
143
@user_groups_getter
144
def get_my_user_groups():
145
user = get_current_user()
146
return [g.name for g in user.groups]
147
"""
148
149
def user_roles_getter(func):
150
"""Register custom user roles getter function.
151
152
Args:
153
func: Function that returns user roles for objects
154
155
Returns:
156
Decorated function
157
158
Usage:
159
@user_roles_getter
160
def get_my_user_roles(obj=None):
161
user = get_current_user()
162
if obj:
163
return user.get_roles_for(obj)
164
return user.global_roles
165
"""
166
167
def obj_labels_getter(func):
168
"""Register custom object labels getter function.
169
170
Args:
171
func: Function that returns security labels for objects
172
173
Returns:
174
Decorated function
175
176
Usage:
177
@obj_labels_getter
178
def get_my_object_labels(obj):
179
return obj.security_labels or []
180
"""
181
```
182
183
## Usage Examples
184
185
### Basic User Context and Permissions
186
187
```python
188
from pony.orm import *
189
190
db = Database()
191
192
class User(db.Entity):
193
username = Required(str, unique=True)
194
email = Required(str, unique=True)
195
is_admin = Required(bool, default=False)
196
groups = Set('Group')
197
198
class Group(db.Entity):
199
name = Required(str, unique=True)
200
users = Set(User)
201
202
class Document(db.Entity):
203
title = Required(str)
204
content = Required(str)
205
owner = Required(User)
206
is_public = Required(bool, default=False)
207
208
db.bind('sqlite', filename='security_example.db')
209
db.generate_mapping(create_tables=True)
210
211
# Set up user context and check permissions
212
with db_session:
213
# Create test data
214
admin_group = Group(name='admin')
215
user_group = Group(name='user')
216
217
admin_user = User(username='admin', email='admin@example.com',
218
is_admin=True, groups=[admin_group])
219
regular_user = User(username='user1', email='user1@example.com',
220
groups=[user_group])
221
222
doc1 = Document(title='Public Doc', content='Public content',
223
owner=admin_user, is_public=True)
224
doc2 = Document(title='Private Doc', content='Private content',
225
owner=admin_user, is_public=False)
226
227
# Example permission checking
228
with db_session:
229
# Set current user context
230
user = User.get(username='user1')
231
set_current_user(user)
232
233
# Check current user
234
current = get_current_user()
235
print(f"Current user: {current.username}")
236
237
# Permission-based document access
238
def can_read_document(doc):
239
current_user = get_current_user()
240
if not current_user:
241
return False
242
243
# Public documents are readable by all
244
if doc.is_public:
245
return True
246
247
# Own documents are always readable
248
if doc.owner == current_user:
249
return True
250
251
# Admins can read everything
252
if current_user.is_admin:
253
return True
254
255
return False
256
257
# Check access to documents
258
all_docs = Document.select()
259
for doc in all_docs:
260
can_read = can_read_document(doc)
261
print(f"Can read '{doc.title}': {can_read}")
262
```
263
264
### Custom Permission System Implementation
265
266
```python
267
# Custom permission providers
268
@user_groups_getter
269
def get_my_user_groups():
270
"""Custom implementation of user groups."""
271
user = get_current_user()
272
if not user:
273
return []
274
return [g.name for g in user.groups]
275
276
@user_roles_getter
277
def get_my_user_roles(obj=None):
278
"""Custom implementation of user roles."""
279
user = get_current_user()
280
if not user:
281
return []
282
283
roles = []
284
285
# Global roles
286
if user.is_admin:
287
roles.append('admin')
288
roles.append('user')
289
290
# Object-specific roles
291
if obj and hasattr(obj, 'owner') and obj.owner == user:
292
roles.append('owner')
293
294
return roles
295
296
@obj_labels_getter
297
def get_my_object_labels(obj):
298
"""Custom implementation of object security labels."""
299
labels = []
300
301
if hasattr(obj, 'is_public') and not obj.is_public:
302
labels.append('private')
303
304
if hasattr(obj, 'is_confidential') and obj.is_confidential:
305
labels.append('confidential')
306
307
return labels
308
309
# Use custom permission system
310
with db_session:
311
set_current_user(regular_user)
312
313
# Get user groups using custom provider
314
user_groups = get_user_groups()
315
print(f"User groups: {user_groups}")
316
317
# Get user roles using custom provider
318
global_roles = get_user_roles()
319
print(f"Global roles: {global_roles}")
320
321
# Get object-specific roles
322
doc_roles = get_user_roles(doc1)
323
print(f"Roles for doc1: {doc_roles}")
324
325
# Get object security labels
326
doc_labels = get_object_labels(doc2)
327
print(f"Labels for doc2: {doc_labels}")
328
```
329
330
### Advanced Permission-Based Queries
331
332
```python
333
# Enhanced entity with permission support
334
class SecureDocument(db.Entity):
335
title = Required(str)
336
content = Required(str)
337
owner = Required(User)
338
is_public = Required(bool, default=False)
339
is_confidential = Required(bool, default=False)
340
allowed_groups = Set(Group)
341
342
# Permission-aware query functions
343
def get_readable_documents():
344
"""Get documents current user can read."""
345
current_user = get_current_user()
346
if not current_user:
347
return []
348
349
if current_user.is_admin:
350
# Admins can read everything
351
return SecureDocument.select()
352
353
# Regular users can read:
354
# 1. Public documents
355
# 2. Their own documents
356
# 3. Documents their groups have access to
357
user_groups = set(g.name for g in current_user.groups)
358
359
readable_docs = select(d for d in SecureDocument
360
if d.is_public
361
or d.owner == current_user
362
or exists(g for g in d.allowed_groups
363
if g.name in user_groups))
364
365
return readable_docs
366
367
def get_writable_documents():
368
"""Get documents current user can modify."""
369
current_user = get_current_user()
370
if not current_user:
371
return []
372
373
if current_user.is_admin:
374
return SecureDocument.select()
375
376
# Users can only modify their own documents
377
return select(d for d in SecureDocument if d.owner == current_user)
378
379
# Row-level security implementation
380
with db_session:
381
# Create test documents with different permissions
382
public_doc = SecureDocument(title='Public Info', content='Public content',
383
owner=admin_user, is_public=True)
384
385
private_doc = SecureDocument(title='Private Note', content='Private content',
386
owner=admin_user, is_public=False)
387
388
group_doc = SecureDocument(title='Group Document', content='Group content',
389
owner=admin_user, is_public=False,
390
allowed_groups=[user_group])
391
392
# Test as regular user
393
set_current_user(regular_user)
394
395
readable = list(get_readable_documents())
396
writable = list(get_writable_documents())
397
398
print(f"User can read {len(readable)} documents")
399
print(f"User can write {len(writable)} documents")
400
401
# Test as admin
402
set_current_user(admin_user)
403
404
readable_admin = list(get_readable_documents())
405
writable_admin = list(get_writable_documents())
406
407
print(f"Admin can read {len(readable_admin)} documents")
408
print(f"Admin can write {len(writable_admin)} documents")
409
```
410
411
### Integration with Web Framework Authentication
412
413
```python
414
from flask import Flask, session, request, jsonify, g
415
from functools import wraps
416
417
app = Flask(__name__)
418
419
def login_required(f):
420
"""Decorator to require authentication."""
421
@wraps(f)
422
def decorated_function(*args, **kwargs):
423
if 'user_id' not in session:
424
return jsonify({'error': 'Authentication required'}), 401
425
return f(*args, **kwargs)
426
return decorated_function
427
428
def permission_required(permission):
429
"""Decorator to require specific permission."""
430
def decorator(f):
431
@wraps(f)
432
def decorated_function(*args, **kwargs):
433
if not has_current_permission(permission):
434
return jsonify({'error': 'Permission denied'}), 403
435
return f(*args, **kwargs)
436
return decorated_function
437
return decorator
438
439
@app.before_request
440
def load_user():
441
"""Load user context before each request."""
442
if 'user_id' in session:
443
with db_session:
444
user = User.get(id=session['user_id'])
445
if user:
446
set_current_user(user)
447
g.current_user = user
448
449
def has_current_permission(permission):
450
"""Check if current user has permission."""
451
if not hasattr(g, 'current_user') or not g.current_user:
452
return False
453
454
# Custom permission logic
455
if permission == 'admin' and g.current_user.is_admin:
456
return True
457
458
if permission == 'user':
459
return True
460
461
return False
462
463
# Protected routes
464
@app.route('/documents')
465
@login_required
466
def list_documents():
467
with db_session:
468
docs = list(get_readable_documents())
469
return jsonify([{'id': d.id, 'title': d.title} for d in docs])
470
471
@app.route('/documents', methods=['POST'])
472
@login_required
473
@permission_required('user')
474
def create_document():
475
data = request.get_json()
476
477
with db_session:
478
doc = SecureDocument(
479
title=data['title'],
480
content=data['content'],
481
owner=g.current_user,
482
is_public=data.get('is_public', False)
483
)
484
return jsonify({'id': doc.id, 'title': doc.title})
485
486
@app.route('/admin/documents')
487
@login_required
488
@permission_required('admin')
489
def admin_list_documents():
490
with db_session:
491
# Admins see all documents
492
docs = list(SecureDocument.select())
493
return jsonify([{
494
'id': d.id,
495
'title': d.title,
496
'owner': d.owner.username,
497
'is_public': d.is_public
498
} for d in docs])
499
```
500
501
### Audit Logging with Security Context
502
503
```python
504
class AuditLog(db.Entity):
505
timestamp = Required(datetime, default=datetime.now)
506
user = Optional(User)
507
action = Required(str)
508
entity_type = Required(str)
509
entity_id = Optional(int)
510
details = Optional(str)
511
512
def audit_action(action, entity=None, details=None):
513
"""Log security-sensitive actions."""
514
current_user = get_current_user()
515
516
AuditLog(
517
user=current_user,
518
action=action,
519
entity_type=entity.__class__.__name__ if entity else 'System',
520
entity_id=entity.id if entity and hasattr(entity, 'id') else None,
521
details=details
522
)
523
524
# Usage in secure operations
525
with db_session:
526
set_current_user(regular_user)
527
528
# Audit document access
529
doc = SecureDocument.get(id=1)
530
if can_read_document(doc):
531
audit_action('document_read', doc)
532
content = doc.content
533
else:
534
audit_action('document_access_denied', doc, 'Insufficient permissions')
535
536
# Audit document creation
537
new_doc = SecureDocument(title='New Doc', content='Content', owner=regular_user)
538
audit_action('document_created', new_doc)
539
540
# View audit trail
541
recent_audits = select(a for a in AuditLog
542
if a.timestamp >= datetime.now() - timedelta(hours=24))
543
544
for audit in recent_audits:
545
user_name = audit.user.username if audit.user else 'System'
546
print(f"{audit.timestamp}: {user_name} - {audit.action} on {audit.entity_type}")
547
```