Core foundational classes and utilities for the aiSSEMBLE platform, providing authentication, metadata management, configuration, file storage, and policy management capabilities.
—
Enterprise-grade security framework with JWT token management, Java keystore integration, and configurable authentication providers. The authentication system supports both token validation and generation capabilities with policy decision point integration for comprehensive authorization workflows.
Comprehensive configuration management for authentication and authorization systems including public key paths, Java keystore settings, and policy decision point server integration.
class AuthConfig:
"""
Configurations for authentication and authorization.
"""
def __init__(self) -> None:
"""Initialize with auth.properties"""
...
def public_key_path(self) -> str:
"""Returns path of the public key"""
...
def jks_path(self) -> str:
"""Returns path of the keystore"""
...
def jks_password(self) -> str:
"""Returns keystore password"""
...
def jks_key_alias(self) -> str:
"""Returns key alias in keystore"""
...
def pdp_host_url(self) -> str:
"""Returns host URL for policy decision point server"""
...
def is_authorization_enabled(self) -> bool:
"""Returns whether authorization is enabled"""
...Comprehensive JWT token management utilities for parsing, validation, and generation with Java keystore integration for enterprise security requirements.
class JsonWebTokenUtil:
"""
Utility for parsing JWT for authentication.
"""
def __init__(self) -> None:
"""Constructor"""
...
def parse_token(self, token: str):
"""
Parses JWT token using public key from auth.properties.
Parameters:
- token: str - JWT token string to parse
Returns:
Parsed token payload
"""
...
def create_token(self):
"""
Convenience method to generate valid token with hardcoded subject.
Returns:
Generated JWT token string
"""
...
def validate_token(self, token: str) -> None:
"""
Validates token and raises AissembleSecurityException on failure.
Parameters:
- token: str - JWT token to validate
Raises:
AissembleSecurityException - If token validation fails
"""
...
def get_sign_key(self) -> str:
"""
Retrieves signing key from Java keystore.
Returns:
str - Signing key for token generation
"""
...Custom exception class for handling aiSSEMBLE security-related errors with specific error contexts and appropriate error handling patterns.
class AissembleSecurityException(Exception):
"""
Custom exception for aiSSEMBLE security errors.
"""
passfrom aissembleauth.json_web_token_util import JsonWebTokenUtil, AissembleSecurityException
from aissembleauth.auth_config import AuthConfig
# Initialize JWT utility
jwt_util = JsonWebTokenUtil()
def authenticate_request(authorization_header: str) -> dict:
"""Authenticate incoming request using JWT token"""
if not authorization_header or not authorization_header.startswith("Bearer "):
raise AissembleSecurityException("Missing or invalid authorization header")
# Extract token from Bearer header
token = authorization_header[7:] # Remove "Bearer " prefix
try:
# Validate token
jwt_util.validate_token(token)
# Parse token for user information
parsed_token = jwt_util.parse_token(token)
print(f"Authentication successful for user: {parsed_token.get('sub', 'unknown')}")
return parsed_token
except AissembleSecurityException as e:
print(f"Authentication failed: {e}")
raise
except Exception as e:
print(f"Unexpected authentication error: {e}")
raise AissembleSecurityException(f"Token validation failed: {e}")
# Usage example
try:
auth_header = "Bearer eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9..."
user_info = authenticate_request(auth_header)
print(f"Authenticated user: {user_info}")
except AissembleSecurityException as e:
print(f"Access denied: {e}")from aissembleauth.json_web_token_util import JsonWebTokenUtil, AissembleSecurityException
from aissembleauth.auth_config import AuthConfig
from datetime import datetime, timedelta
import jwt
class TokenManager:
"""Comprehensive token management for authentication workflows"""
def __init__(self):
self.jwt_util = JsonWebTokenUtil()
self.auth_config = AuthConfig()
def generate_user_token(self, user_id: str, roles: list, expires_in_hours: int = 24) -> str:
"""Generate JWT token for user with roles and expiration"""
try:
# Get signing key from keystore
signing_key = self.jwt_util.get_sign_key()
# Create token payload
now = datetime.utcnow()
payload = {
"sub": user_id,
"roles": roles,
"iat": now,
"exp": now + timedelta(hours=expires_in_hours),
"iss": "aissemble-auth-service",
"aud": "aissemble-platform"
}
# Generate token using RS256 algorithm
token = jwt.encode(payload, signing_key, algorithm="RS256")
print(f"Generated token for user {user_id} with roles: {roles}")
return token
except Exception as e:
raise AissembleSecurityException(f"Token generation failed: {e}")
def refresh_token(self, existing_token: str) -> str:
"""Refresh existing valid token with new expiration"""
try:
# Validate existing token
self.jwt_util.validate_token(existing_token)
# Parse existing token
parsed = self.jwt_util.parse_token(existing_token)
# Generate new token with same claims but new expiration
return self.generate_user_token(
user_id=parsed.get("sub"),
roles=parsed.get("roles", []),
expires_in_hours=24
)
except Exception as e:
raise AissembleSecurityException(f"Token refresh failed: {e}")
def extract_user_context(self, token: str) -> dict:
"""Extract user context from valid token"""
try:
self.jwt_util.validate_token(token)
parsed = self.jwt_util.parse_token(token)
return {
"user_id": parsed.get("sub"),
"roles": parsed.get("roles", []),
"issued_at": parsed.get("iat"),
"expires_at": parsed.get("exp"),
"issuer": parsed.get("iss")
}
except Exception as e:
raise AissembleSecurityException(f"Context extraction failed: {e}")
def is_token_expired(self, token: str) -> bool:
"""Check if token is expired without full validation"""
try:
parsed = jwt.decode(token, options={"verify_signature": False})
exp_timestamp = parsed.get("exp")
if exp_timestamp:
return datetime.utcnow() > datetime.fromtimestamp(exp_timestamp)
return True
except Exception:
return True
# Usage example
token_manager = TokenManager()
# Generate token for user
user_token = token_manager.generate_user_token(
user_id="john.doe@company.com",
roles=["data_scientist", "ml_engineer"],
expires_in_hours=8
)
# Extract user context
user_context = token_manager.extract_user_context(user_token)
print(f"User context: {user_context}")
# Check token expiration
is_expired = token_manager.is_token_expired(user_token)
print(f"Token expired: {is_expired}")
# Refresh token if needed
if not is_expired:
refreshed_token = token_manager.refresh_token(user_token)
print("Token refreshed successfully")from aissembleauth.json_web_token_util import JsonWebTokenUtil, AissembleSecurityException
from aissembleauth.auth_config import AuthConfig
from functools import wraps
from typing import List
class RoleBasedAccessControl:
"""Role-based access control using JWT tokens"""
def __init__(self):
self.jwt_util = JsonWebTokenUtil()
self.auth_config = AuthConfig()
def require_roles(self, required_roles: List[str]):
"""Decorator for enforcing role-based access control"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# Extract token from request context (simplified)
token = kwargs.get('auth_token') or args[0] if args else None
if not token:
raise AissembleSecurityException("Authentication token required")
try:
# Validate and parse token
self.jwt_util.validate_token(token)
parsed_token = self.jwt_util.parse_token(token)
# Extract user roles
user_roles = parsed_token.get("roles", [])
# Check if user has required roles
if not any(role in user_roles for role in required_roles):
raise AissembleSecurityException(
f"Access denied. Required roles: {required_roles}, User roles: {user_roles}"
)
print(f"Access granted for user: {parsed_token.get('sub')}")
return func(*args, **kwargs)
except AissembleSecurityException:
raise
except Exception as e:
raise AissembleSecurityException(f"Authorization failed: {e}")
return wrapper
return decorator
def check_permissions(self, token: str, resource: str, action: str) -> bool:
"""Check if user has permissions for specific resource and action"""
try:
self.jwt_util.validate_token(token)
parsed_token = self.jwt_util.parse_token(token)
user_roles = parsed_token.get("roles", [])
# Define role-based permissions (in practice, this would be configurable)
permissions = {
"admin": {"*": ["*"]}, # Admin has all permissions
"data_scientist": {
"datasets": ["read", "analyze"],
"models": ["read", "train", "evaluate"]
},
"ml_engineer": {
"models": ["read", "deploy", "monitor"],
"infrastructure": ["read", "configure"]
},
"viewer": {
"*": ["read"]
}
}
# Check permissions
for role in user_roles:
role_perms = permissions.get(role, {})
# Check wildcard permissions
if "*" in role_perms and ("*" in role_perms["*"] or action in role_perms["*"]):
return True
# Check specific resource permissions
if resource in role_perms and (action in role_perms[resource] or "*" in role_perms[resource]):
return True
return False
except Exception as e:
print(f"Permission check failed: {e}")
return False
# Usage examples
rbac = RoleBasedAccessControl()
# Decorator usage
@rbac.require_roles(["data_scientist", "ml_engineer"])
def train_model(auth_token: str, model_config: dict):
"""Function that requires data scientist or ML engineer role"""
print("Training model with configuration:", model_config)
return {"status": "training_started"}
@rbac.require_roles(["admin"])
def delete_dataset(auth_token: str, dataset_id: str):
"""Function that requires admin role"""
print(f"Deleting dataset: {dataset_id}")
return {"status": "deleted"}
# Direct permission checking
def access_resource(token: str, resource: str, action: str):
"""Check access to specific resource and action"""
if rbac.check_permissions(token, resource, action):
print(f"Access granted to {action} on {resource}")
return True
else:
print(f"Access denied to {action} on {resource}")
return False
# Example usage
user_token = "eyJhbGciOiJSUzI1NiJ9..." # Valid JWT token
try:
# Use decorated functions
result = train_model(auth_token=user_token, model_config={"algorithm": "RandomForest"})
print("Training result:", result)
# Check specific permissions
can_read_datasets = access_resource(user_token, "datasets", "read")
can_deploy_models = access_resource(user_token, "models", "deploy")
except AissembleSecurityException as e:
print(f"Security error: {e}")from aissembleauth.auth_config import AuthConfig
from aissembleauth.json_web_token_util import JsonWebTokenUtil, AissembleSecurityException
import os
class SecurityConfigurationManager:
"""Manage security configuration across different environments"""
def __init__(self):
self.auth_config = AuthConfig()
self.jwt_util = JsonWebTokenUtil()
def validate_configuration(self) -> dict:
"""Validate authentication configuration"""
config_status = {
"public_key_accessible": False,
"keystore_accessible": False,
"authorization_enabled": False,
"pdp_configured": False,
"configuration_valid": False
}
try:
# Check public key
public_key_path = self.auth_config.public_key_path()
if public_key_path and os.path.exists(public_key_path):
config_status["public_key_accessible"] = True
# Check keystore
jks_path = self.auth_config.jks_path()
if jks_path and os.path.exists(jks_path):
config_status["keystore_accessible"] = True
# Check authorization
config_status["authorization_enabled"] = self.auth_config.is_authorization_enabled()
# Check PDP configuration
pdp_url = self.auth_config.pdp_host_url()
if pdp_url:
config_status["pdp_configured"] = True
# Overall status
config_status["configuration_valid"] = (
config_status["public_key_accessible"] and
config_status["keystore_accessible"]
)
except Exception as e:
print(f"Configuration validation error: {e}")
return config_status
def get_security_summary(self) -> dict:
"""Get comprehensive security configuration summary"""
return {
"public_key_path": self.auth_config.public_key_path(),
"keystore_path": self.auth_config.jks_path(),
"keystore_alias": self.auth_config.jks_key_alias(),
"authorization_enabled": self.auth_config.is_authorization_enabled(),
"pdp_host": self.auth_config.pdp_host_url(),
"configuration_status": self.validate_configuration()
}
def test_jwt_operations(self) -> dict:
"""Test JWT token operations"""
test_results = {
"token_generation": False,
"token_validation": False,
"token_parsing": False,
"signing_key_access": False
}
try:
# Test signing key access
signing_key = self.jwt_util.get_sign_key()
if signing_key:
test_results["signing_key_access"] = True
# Test token generation
test_token = self.jwt_util.create_token()
if test_token:
test_results["token_generation"] = True
# Test token validation
try:
self.jwt_util.validate_token(test_token)
test_results["token_validation"] = True
except AissembleSecurityException:
pass
# Test token parsing
try:
parsed = self.jwt_util.parse_token(test_token)
if parsed:
test_results["token_parsing"] = True
except Exception:
pass
except Exception as e:
print(f"JWT operations test error: {e}")
return test_results
def initialize_security_system(self) -> bool:
"""Initialize and validate entire security system"""
print("Initializing aiSSEMBLE security system...")
# Validate configuration
config_status = self.validate_configuration()
if not config_status["configuration_valid"]:
print("Security configuration validation failed")
return False
# Test JWT operations
jwt_status = self.test_jwt_operations()
if not all(jwt_status.values()):
print("JWT operations test failed")
return False
print("Security system initialized successfully")
return True
# Usage example
security_manager = SecurityConfigurationManager()
# Get security summary
summary = security_manager.get_security_summary()
print("Security Configuration Summary:")
for key, value in summary.items():
print(f" {key}: {value}")
# Initialize security system
if security_manager.initialize_security_system():
print("Ready to handle authentication requests")
else:
print("Security system initialization failed")
# Validate configuration in different environments
config_validation = security_manager.validate_configuration()
print(f"Configuration validation: {config_validation}")
# Test JWT operations
jwt_tests = security_manager.test_jwt_operations()
print(f"JWT operations test: {jwt_tests}")from aissembleauth.auth_config import AuthConfig
from aissembleauth.json_web_token_util import JsonWebTokenUtil, AissembleSecurityException
import requests
import json
class PolicyDecisionPointClient:
"""Client for integrating with Policy Decision Point (PDP) server"""
def __init__(self):
self.auth_config = AuthConfig()
self.jwt_util = JsonWebTokenUtil()
self.pdp_url = self.auth_config.pdp_host_url()
def authorize_action(self, token: str, resource: str, action: str, context: dict = None) -> bool:
"""Check authorization through PDP server"""
if not self.auth_config.is_authorization_enabled():
print("Authorization is disabled, allowing access")
return True
try:
# Validate JWT token first
self.jwt_util.validate_token(token)
user_info = self.jwt_util.parse_token(token)
# Prepare authorization request
auth_request = {
"subject": user_info.get("sub"),
"resource": resource,
"action": action,
"context": context or {},
"roles": user_info.get("roles", [])
}
# Send request to PDP
response = requests.post(
f"{self.pdp_url}/authorize",
json=auth_request,
headers={"Content-Type": "application/json"},
timeout=5
)
if response.status_code == 200:
decision = response.json()
return decision.get("permit", False)
else:
print(f"PDP error: {response.status_code}")
return False
except AissembleSecurityException:
print("Token validation failed")
return False
except Exception as e:
print(f"Authorization check failed: {e}")
return False
def get_user_permissions(self, token: str) -> list:
"""Get all permissions for authenticated user"""
try:
self.jwt_util.validate_token(token)
user_info = self.jwt_util.parse_token(token)
# Request user permissions from PDP
permission_request = {
"subject": user_info.get("sub"),
"roles": user_info.get("roles", [])
}
response = requests.post(
f"{self.pdp_url}/permissions",
json=permission_request,
headers={"Content-Type": "application/json"},
timeout=5
)
if response.status_code == 200:
return response.json().get("permissions", [])
else:
return []
except Exception as e:
print(f"Permission retrieval failed: {e}")
return []
class SecureAPIEndpoint:
"""Example of secure API endpoint with PDP integration"""
def __init__(self):
self.pdp_client = PolicyDecisionPointClient()
def secure_endpoint(self, token: str, resource_id: str, action: str, **kwargs):
"""Generic secure endpoint with PDP authorization"""
# Check authorization through PDP
if not self.pdp_client.authorize_action(
token=token,
resource=f"api/resource/{resource_id}",
action=action,
context={
"ip_address": kwargs.get("client_ip"),
"user_agent": kwargs.get("user_agent"),
"timestamp": kwargs.get("timestamp")
}
):
raise AissembleSecurityException(f"Access denied for {action} on resource {resource_id}")
# Proceed with authorized action
return f"Successfully executed {action} on resource {resource_id}"
# Usage example
pdp_client = PolicyDecisionPointClient()
secure_api = SecureAPIEndpoint()
# Example authorization check
user_token = "eyJhbGciOiJSUzI1NiJ9..." # Valid JWT token
try:
# Check specific authorization
can_access = pdp_client.authorize_action(
token=user_token,
resource="ml_models/fraud_detection",
action="deploy",
context={"environment": "production", "approval_id": "DEPLOY-001"}
)
if can_access:
print("User authorized to deploy model")
else:
print("User not authorized to deploy model")
# Get user permissions
permissions = pdp_client.get_user_permissions(user_token)
print(f"User permissions: {permissions}")
# Use secure endpoint
result = secure_api.secure_endpoint(
token=user_token,
resource_id="dataset_001",
action="read",
client_ip="192.168.1.100"
)
print(f"API result: {result}")
except AissembleSecurityException as e:
print(f"Security error: {e}")Install with Tessl CLI
npx tessl i tessl/pypi-aissemble-foundation-core-python