0
# Authentication and JWT Handling
1
2
Enterprise-grade security framework with JWT token management, Java keystore integration, and configurable authentication providers. The authentication system supports both token validation and generation capabilities with policy decision point integration for comprehensive authorization workflows.
3
4
## Capabilities
5
6
### Authentication Configuration
7
8
Comprehensive configuration management for authentication and authorization systems including public key paths, Java keystore settings, and policy decision point server integration.
9
10
```python { .api }
11
class AuthConfig:
12
"""
13
Configurations for authentication and authorization.
14
"""
15
16
def __init__(self) -> None:
17
"""Initialize with auth.properties"""
18
...
19
20
def public_key_path(self) -> str:
21
"""Returns path of the public key"""
22
...
23
24
def jks_path(self) -> str:
25
"""Returns path of the keystore"""
26
...
27
28
def jks_password(self) -> str:
29
"""Returns keystore password"""
30
...
31
32
def jks_key_alias(self) -> str:
33
"""Returns key alias in keystore"""
34
...
35
36
def pdp_host_url(self) -> str:
37
"""Returns host URL for policy decision point server"""
38
...
39
40
def is_authorization_enabled(self) -> bool:
41
"""Returns whether authorization is enabled"""
42
...
43
```
44
45
### JWT Token Utilities
46
47
Comprehensive JWT token management utilities for parsing, validation, and generation with Java keystore integration for enterprise security requirements.
48
49
```python { .api }
50
class JsonWebTokenUtil:
51
"""
52
Utility for parsing JWT for authentication.
53
"""
54
55
def __init__(self) -> None:
56
"""Constructor"""
57
...
58
59
def parse_token(self, token: str):
60
"""
61
Parses JWT token using public key from auth.properties.
62
63
Parameters:
64
- token: str - JWT token string to parse
65
66
Returns:
67
Parsed token payload
68
"""
69
...
70
71
def create_token(self):
72
"""
73
Convenience method to generate valid token with hardcoded subject.
74
75
Returns:
76
Generated JWT token string
77
"""
78
...
79
80
def validate_token(self, token: str) -> None:
81
"""
82
Validates token and raises AissembleSecurityException on failure.
83
84
Parameters:
85
- token: str - JWT token to validate
86
87
Raises:
88
AissembleSecurityException - If token validation fails
89
"""
90
...
91
92
def get_sign_key(self) -> str:
93
"""
94
Retrieves signing key from Java keystore.
95
96
Returns:
97
str - Signing key for token generation
98
"""
99
...
100
```
101
102
### Security Exception Handling
103
104
Custom exception class for handling aiSSEMBLE security-related errors with specific error contexts and appropriate error handling patterns.
105
106
```python { .api }
107
class AissembleSecurityException(Exception):
108
"""
109
Custom exception for aiSSEMBLE security errors.
110
"""
111
pass
112
```
113
114
## Usage Examples
115
116
### Basic JWT Token Validation
117
118
```python
119
from aissembleauth.json_web_token_util import JsonWebTokenUtil, AissembleSecurityException
120
from aissembleauth.auth_config import AuthConfig
121
122
# Initialize JWT utility
123
jwt_util = JsonWebTokenUtil()
124
125
def authenticate_request(authorization_header: str) -> dict:
126
"""Authenticate incoming request using JWT token"""
127
128
if not authorization_header or not authorization_header.startswith("Bearer "):
129
raise AissembleSecurityException("Missing or invalid authorization header")
130
131
# Extract token from Bearer header
132
token = authorization_header[7:] # Remove "Bearer " prefix
133
134
try:
135
# Validate token
136
jwt_util.validate_token(token)
137
138
# Parse token for user information
139
parsed_token = jwt_util.parse_token(token)
140
141
print(f"Authentication successful for user: {parsed_token.get('sub', 'unknown')}")
142
return parsed_token
143
144
except AissembleSecurityException as e:
145
print(f"Authentication failed: {e}")
146
raise
147
except Exception as e:
148
print(f"Unexpected authentication error: {e}")
149
raise AissembleSecurityException(f"Token validation failed: {e}")
150
151
# Usage example
152
try:
153
auth_header = "Bearer eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9..."
154
user_info = authenticate_request(auth_header)
155
print(f"Authenticated user: {user_info}")
156
except AissembleSecurityException as e:
157
print(f"Access denied: {e}")
158
```
159
160
### JWT Token Generation and Management
161
162
```python
163
from aissembleauth.json_web_token_util import JsonWebTokenUtil, AissembleSecurityException
164
from aissembleauth.auth_config import AuthConfig
165
from datetime import datetime, timedelta
166
import jwt
167
168
class TokenManager:
169
"""Comprehensive token management for authentication workflows"""
170
171
def __init__(self):
172
self.jwt_util = JsonWebTokenUtil()
173
self.auth_config = AuthConfig()
174
175
def generate_user_token(self, user_id: str, roles: list, expires_in_hours: int = 24) -> str:
176
"""Generate JWT token for user with roles and expiration"""
177
try:
178
# Get signing key from keystore
179
signing_key = self.jwt_util.get_sign_key()
180
181
# Create token payload
182
now = datetime.utcnow()
183
payload = {
184
"sub": user_id,
185
"roles": roles,
186
"iat": now,
187
"exp": now + timedelta(hours=expires_in_hours),
188
"iss": "aissemble-auth-service",
189
"aud": "aissemble-platform"
190
}
191
192
# Generate token using RS256 algorithm
193
token = jwt.encode(payload, signing_key, algorithm="RS256")
194
195
print(f"Generated token for user {user_id} with roles: {roles}")
196
return token
197
198
except Exception as e:
199
raise AissembleSecurityException(f"Token generation failed: {e}")
200
201
def refresh_token(self, existing_token: str) -> str:
202
"""Refresh existing valid token with new expiration"""
203
try:
204
# Validate existing token
205
self.jwt_util.validate_token(existing_token)
206
207
# Parse existing token
208
parsed = self.jwt_util.parse_token(existing_token)
209
210
# Generate new token with same claims but new expiration
211
return self.generate_user_token(
212
user_id=parsed.get("sub"),
213
roles=parsed.get("roles", []),
214
expires_in_hours=24
215
)
216
217
except Exception as e:
218
raise AissembleSecurityException(f"Token refresh failed: {e}")
219
220
def extract_user_context(self, token: str) -> dict:
221
"""Extract user context from valid token"""
222
try:
223
self.jwt_util.validate_token(token)
224
parsed = self.jwt_util.parse_token(token)
225
226
return {
227
"user_id": parsed.get("sub"),
228
"roles": parsed.get("roles", []),
229
"issued_at": parsed.get("iat"),
230
"expires_at": parsed.get("exp"),
231
"issuer": parsed.get("iss")
232
}
233
234
except Exception as e:
235
raise AissembleSecurityException(f"Context extraction failed: {e}")
236
237
def is_token_expired(self, token: str) -> bool:
238
"""Check if token is expired without full validation"""
239
try:
240
parsed = jwt.decode(token, options={"verify_signature": False})
241
exp_timestamp = parsed.get("exp")
242
243
if exp_timestamp:
244
return datetime.utcnow() > datetime.fromtimestamp(exp_timestamp)
245
return True
246
247
except Exception:
248
return True
249
250
# Usage example
251
token_manager = TokenManager()
252
253
# Generate token for user
254
user_token = token_manager.generate_user_token(
255
user_id="john.doe@company.com",
256
roles=["data_scientist", "ml_engineer"],
257
expires_in_hours=8
258
)
259
260
# Extract user context
261
user_context = token_manager.extract_user_context(user_token)
262
print(f"User context: {user_context}")
263
264
# Check token expiration
265
is_expired = token_manager.is_token_expired(user_token)
266
print(f"Token expired: {is_expired}")
267
268
# Refresh token if needed
269
if not is_expired:
270
refreshed_token = token_manager.refresh_token(user_token)
271
print("Token refreshed successfully")
272
```
273
274
### Role-Based Access Control Integration
275
276
```python
277
from aissembleauth.json_web_token_util import JsonWebTokenUtil, AissembleSecurityException
278
from aissembleauth.auth_config import AuthConfig
279
from functools import wraps
280
from typing import List
281
282
class RoleBasedAccessControl:
283
"""Role-based access control using JWT tokens"""
284
285
def __init__(self):
286
self.jwt_util = JsonWebTokenUtil()
287
self.auth_config = AuthConfig()
288
289
def require_roles(self, required_roles: List[str]):
290
"""Decorator for enforcing role-based access control"""
291
def decorator(func):
292
@wraps(func)
293
def wrapper(*args, **kwargs):
294
# Extract token from request context (simplified)
295
token = kwargs.get('auth_token') or args[0] if args else None
296
297
if not token:
298
raise AissembleSecurityException("Authentication token required")
299
300
try:
301
# Validate and parse token
302
self.jwt_util.validate_token(token)
303
parsed_token = self.jwt_util.parse_token(token)
304
305
# Extract user roles
306
user_roles = parsed_token.get("roles", [])
307
308
# Check if user has required roles
309
if not any(role in user_roles for role in required_roles):
310
raise AissembleSecurityException(
311
f"Access denied. Required roles: {required_roles}, User roles: {user_roles}"
312
)
313
314
print(f"Access granted for user: {parsed_token.get('sub')}")
315
return func(*args, **kwargs)
316
317
except AissembleSecurityException:
318
raise
319
except Exception as e:
320
raise AissembleSecurityException(f"Authorization failed: {e}")
321
322
return wrapper
323
return decorator
324
325
def check_permissions(self, token: str, resource: str, action: str) -> bool:
326
"""Check if user has permissions for specific resource and action"""
327
try:
328
self.jwt_util.validate_token(token)
329
parsed_token = self.jwt_util.parse_token(token)
330
331
user_roles = parsed_token.get("roles", [])
332
333
# Define role-based permissions (in practice, this would be configurable)
334
permissions = {
335
"admin": {"*": ["*"]}, # Admin has all permissions
336
"data_scientist": {
337
"datasets": ["read", "analyze"],
338
"models": ["read", "train", "evaluate"]
339
},
340
"ml_engineer": {
341
"models": ["read", "deploy", "monitor"],
342
"infrastructure": ["read", "configure"]
343
},
344
"viewer": {
345
"*": ["read"]
346
}
347
}
348
349
# Check permissions
350
for role in user_roles:
351
role_perms = permissions.get(role, {})
352
353
# Check wildcard permissions
354
if "*" in role_perms and ("*" in role_perms["*"] or action in role_perms["*"]):
355
return True
356
357
# Check specific resource permissions
358
if resource in role_perms and (action in role_perms[resource] or "*" in role_perms[resource]):
359
return True
360
361
return False
362
363
except Exception as e:
364
print(f"Permission check failed: {e}")
365
return False
366
367
# Usage examples
368
rbac = RoleBasedAccessControl()
369
370
# Decorator usage
371
@rbac.require_roles(["data_scientist", "ml_engineer"])
372
def train_model(auth_token: str, model_config: dict):
373
"""Function that requires data scientist or ML engineer role"""
374
print("Training model with configuration:", model_config)
375
return {"status": "training_started"}
376
377
@rbac.require_roles(["admin"])
378
def delete_dataset(auth_token: str, dataset_id: str):
379
"""Function that requires admin role"""
380
print(f"Deleting dataset: {dataset_id}")
381
return {"status": "deleted"}
382
383
# Direct permission checking
384
def access_resource(token: str, resource: str, action: str):
385
"""Check access to specific resource and action"""
386
if rbac.check_permissions(token, resource, action):
387
print(f"Access granted to {action} on {resource}")
388
return True
389
else:
390
print(f"Access denied to {action} on {resource}")
391
return False
392
393
# Example usage
394
user_token = "eyJhbGciOiJSUzI1NiJ9..." # Valid JWT token
395
396
try:
397
# Use decorated functions
398
result = train_model(auth_token=user_token, model_config={"algorithm": "RandomForest"})
399
print("Training result:", result)
400
401
# Check specific permissions
402
can_read_datasets = access_resource(user_token, "datasets", "read")
403
can_deploy_models = access_resource(user_token, "models", "deploy")
404
405
except AissembleSecurityException as e:
406
print(f"Security error: {e}")
407
```
408
409
### Configuration-Driven Security Setup
410
411
```python
412
from aissembleauth.auth_config import AuthConfig
413
from aissembleauth.json_web_token_util import JsonWebTokenUtil, AissembleSecurityException
414
import os
415
416
class SecurityConfigurationManager:
417
"""Manage security configuration across different environments"""
418
419
def __init__(self):
420
self.auth_config = AuthConfig()
421
self.jwt_util = JsonWebTokenUtil()
422
423
def validate_configuration(self) -> dict:
424
"""Validate authentication configuration"""
425
config_status = {
426
"public_key_accessible": False,
427
"keystore_accessible": False,
428
"authorization_enabled": False,
429
"pdp_configured": False,
430
"configuration_valid": False
431
}
432
433
try:
434
# Check public key
435
public_key_path = self.auth_config.public_key_path()
436
if public_key_path and os.path.exists(public_key_path):
437
config_status["public_key_accessible"] = True
438
439
# Check keystore
440
jks_path = self.auth_config.jks_path()
441
if jks_path and os.path.exists(jks_path):
442
config_status["keystore_accessible"] = True
443
444
# Check authorization
445
config_status["authorization_enabled"] = self.auth_config.is_authorization_enabled()
446
447
# Check PDP configuration
448
pdp_url = self.auth_config.pdp_host_url()
449
if pdp_url:
450
config_status["pdp_configured"] = True
451
452
# Overall status
453
config_status["configuration_valid"] = (
454
config_status["public_key_accessible"] and
455
config_status["keystore_accessible"]
456
)
457
458
except Exception as e:
459
print(f"Configuration validation error: {e}")
460
461
return config_status
462
463
def get_security_summary(self) -> dict:
464
"""Get comprehensive security configuration summary"""
465
return {
466
"public_key_path": self.auth_config.public_key_path(),
467
"keystore_path": self.auth_config.jks_path(),
468
"keystore_alias": self.auth_config.jks_key_alias(),
469
"authorization_enabled": self.auth_config.is_authorization_enabled(),
470
"pdp_host": self.auth_config.pdp_host_url(),
471
"configuration_status": self.validate_configuration()
472
}
473
474
def test_jwt_operations(self) -> dict:
475
"""Test JWT token operations"""
476
test_results = {
477
"token_generation": False,
478
"token_validation": False,
479
"token_parsing": False,
480
"signing_key_access": False
481
}
482
483
try:
484
# Test signing key access
485
signing_key = self.jwt_util.get_sign_key()
486
if signing_key:
487
test_results["signing_key_access"] = True
488
489
# Test token generation
490
test_token = self.jwt_util.create_token()
491
if test_token:
492
test_results["token_generation"] = True
493
494
# Test token validation
495
try:
496
self.jwt_util.validate_token(test_token)
497
test_results["token_validation"] = True
498
except AissembleSecurityException:
499
pass
500
501
# Test token parsing
502
try:
503
parsed = self.jwt_util.parse_token(test_token)
504
if parsed:
505
test_results["token_parsing"] = True
506
except Exception:
507
pass
508
509
except Exception as e:
510
print(f"JWT operations test error: {e}")
511
512
return test_results
513
514
def initialize_security_system(self) -> bool:
515
"""Initialize and validate entire security system"""
516
print("Initializing aiSSEMBLE security system...")
517
518
# Validate configuration
519
config_status = self.validate_configuration()
520
if not config_status["configuration_valid"]:
521
print("Security configuration validation failed")
522
return False
523
524
# Test JWT operations
525
jwt_status = self.test_jwt_operations()
526
if not all(jwt_status.values()):
527
print("JWT operations test failed")
528
return False
529
530
print("Security system initialized successfully")
531
return True
532
533
# Usage example
534
security_manager = SecurityConfigurationManager()
535
536
# Get security summary
537
summary = security_manager.get_security_summary()
538
print("Security Configuration Summary:")
539
for key, value in summary.items():
540
print(f" {key}: {value}")
541
542
# Initialize security system
543
if security_manager.initialize_security_system():
544
print("Ready to handle authentication requests")
545
else:
546
print("Security system initialization failed")
547
548
# Validate configuration in different environments
549
config_validation = security_manager.validate_configuration()
550
print(f"Configuration validation: {config_validation}")
551
552
# Test JWT operations
553
jwt_tests = security_manager.test_jwt_operations()
554
print(f"JWT operations test: {jwt_tests}")
555
```
556
557
### Policy Decision Point Integration
558
559
```python
560
from aissembleauth.auth_config import AuthConfig
561
from aissembleauth.json_web_token_util import JsonWebTokenUtil, AissembleSecurityException
562
import requests
563
import json
564
565
class PolicyDecisionPointClient:
566
"""Client for integrating with Policy Decision Point (PDP) server"""
567
568
def __init__(self):
569
self.auth_config = AuthConfig()
570
self.jwt_util = JsonWebTokenUtil()
571
self.pdp_url = self.auth_config.pdp_host_url()
572
573
def authorize_action(self, token: str, resource: str, action: str, context: dict = None) -> bool:
574
"""Check authorization through PDP server"""
575
if not self.auth_config.is_authorization_enabled():
576
print("Authorization is disabled, allowing access")
577
return True
578
579
try:
580
# Validate JWT token first
581
self.jwt_util.validate_token(token)
582
user_info = self.jwt_util.parse_token(token)
583
584
# Prepare authorization request
585
auth_request = {
586
"subject": user_info.get("sub"),
587
"resource": resource,
588
"action": action,
589
"context": context or {},
590
"roles": user_info.get("roles", [])
591
}
592
593
# Send request to PDP
594
response = requests.post(
595
f"{self.pdp_url}/authorize",
596
json=auth_request,
597
headers={"Content-Type": "application/json"},
598
timeout=5
599
)
600
601
if response.status_code == 200:
602
decision = response.json()
603
return decision.get("permit", False)
604
else:
605
print(f"PDP error: {response.status_code}")
606
return False
607
608
except AissembleSecurityException:
609
print("Token validation failed")
610
return False
611
except Exception as e:
612
print(f"Authorization check failed: {e}")
613
return False
614
615
def get_user_permissions(self, token: str) -> list:
616
"""Get all permissions for authenticated user"""
617
try:
618
self.jwt_util.validate_token(token)
619
user_info = self.jwt_util.parse_token(token)
620
621
# Request user permissions from PDP
622
permission_request = {
623
"subject": user_info.get("sub"),
624
"roles": user_info.get("roles", [])
625
}
626
627
response = requests.post(
628
f"{self.pdp_url}/permissions",
629
json=permission_request,
630
headers={"Content-Type": "application/json"},
631
timeout=5
632
)
633
634
if response.status_code == 200:
635
return response.json().get("permissions", [])
636
else:
637
return []
638
639
except Exception as e:
640
print(f"Permission retrieval failed: {e}")
641
return []
642
643
class SecureAPIEndpoint:
644
"""Example of secure API endpoint with PDP integration"""
645
646
def __init__(self):
647
self.pdp_client = PolicyDecisionPointClient()
648
649
def secure_endpoint(self, token: str, resource_id: str, action: str, **kwargs):
650
"""Generic secure endpoint with PDP authorization"""
651
652
# Check authorization through PDP
653
if not self.pdp_client.authorize_action(
654
token=token,
655
resource=f"api/resource/{resource_id}",
656
action=action,
657
context={
658
"ip_address": kwargs.get("client_ip"),
659
"user_agent": kwargs.get("user_agent"),
660
"timestamp": kwargs.get("timestamp")
661
}
662
):
663
raise AissembleSecurityException(f"Access denied for {action} on resource {resource_id}")
664
665
# Proceed with authorized action
666
return f"Successfully executed {action} on resource {resource_id}"
667
668
# Usage example
669
pdp_client = PolicyDecisionPointClient()
670
secure_api = SecureAPIEndpoint()
671
672
# Example authorization check
673
user_token = "eyJhbGciOiJSUzI1NiJ9..." # Valid JWT token
674
675
try:
676
# Check specific authorization
677
can_access = pdp_client.authorize_action(
678
token=user_token,
679
resource="ml_models/fraud_detection",
680
action="deploy",
681
context={"environment": "production", "approval_id": "DEPLOY-001"}
682
)
683
684
if can_access:
685
print("User authorized to deploy model")
686
else:
687
print("User not authorized to deploy model")
688
689
# Get user permissions
690
permissions = pdp_client.get_user_permissions(user_token)
691
print(f"User permissions: {permissions}")
692
693
# Use secure endpoint
694
result = secure_api.secure_endpoint(
695
token=user_token,
696
resource_id="dataset_001",
697
action="read",
698
client_ip="192.168.1.100"
699
)
700
print(f"API result: {result}")
701
702
except AissembleSecurityException as e:
703
print(f"Security error: {e}")
704
```
705
706
## Best Practices
707
708
### Token Management
709
- Use appropriate token expiration times based on security requirements
710
- Implement token refresh mechanisms for long-running sessions
711
- Store tokens securely on client side (HttpOnly cookies, secure storage)
712
- Implement proper token revocation mechanisms
713
714
### Security Configuration
715
- Use strong cryptographic keys for token signing
716
- Regularly rotate signing keys and certificates
717
- Implement proper keystore security and access controls
718
- Use environment-specific configuration management
719
720
### Authorization Patterns
721
- Implement role-based access control consistently
722
- Use Policy Decision Points for complex authorization logic
723
- Audit and log all authentication and authorization events
724
- Implement proper error handling without information leakage
725
726
### Production Considerations
727
- Use HTTPS for all authentication endpoints
728
- Implement rate limiting for authentication attempts
729
- Monitor for suspicious authentication patterns
730
- Regular security audits and penetration testing