CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-aissemble-foundation-core-python

Core foundational classes and utilities for the aiSSEMBLE platform, providing authentication, metadata management, configuration, file storage, and policy management capabilities.

Pending
Overview
Eval results
Files

auth.mddocs/

Authentication and JWT Handling

Enterprise-grade security framework with JWT token management, Java keystore integration, and configurable authentication providers. The authentication system supports both token validation and generation capabilities with policy decision point integration for comprehensive authorization workflows.

Capabilities

Authentication Configuration

Comprehensive configuration management for authentication and authorization systems including public key paths, Java keystore settings, and policy decision point server integration.

class AuthConfig:
    """
    Configurations for authentication and authorization.
    """
    
    def __init__(self) -> None:
        """Initialize with auth.properties"""
        ...
    
    def public_key_path(self) -> str:
        """Returns path of the public key"""
        ...
    
    def jks_path(self) -> str:
        """Returns path of the keystore"""
        ...
    
    def jks_password(self) -> str:
        """Returns keystore password"""
        ...
    
    def jks_key_alias(self) -> str:
        """Returns key alias in keystore"""
        ...
    
    def pdp_host_url(self) -> str:
        """Returns host URL for policy decision point server"""
        ...
    
    def is_authorization_enabled(self) -> bool:
        """Returns whether authorization is enabled"""
        ...

JWT Token Utilities

Comprehensive JWT token management utilities for parsing, validation, and generation with Java keystore integration for enterprise security requirements.

class JsonWebTokenUtil:
    """
    Utility for parsing JWT for authentication.
    """
    
    def __init__(self) -> None:
        """Constructor"""
        ...
    
    def parse_token(self, token: str):
        """
        Parses JWT token using public key from auth.properties.
        
        Parameters:
        - token: str - JWT token string to parse
        
        Returns:
        Parsed token payload
        """
        ...
    
    def create_token(self):
        """
        Convenience method to generate valid token with hardcoded subject.
        
        Returns:
        Generated JWT token string
        """
        ...
    
    def validate_token(self, token: str) -> None:
        """
        Validates token and raises AissembleSecurityException on failure.
        
        Parameters:
        - token: str - JWT token to validate
        
        Raises:
        AissembleSecurityException - If token validation fails
        """
        ...
    
    def get_sign_key(self) -> str:
        """
        Retrieves signing key from Java keystore.
        
        Returns:
        str - Signing key for token generation
        """
        ...

Security Exception Handling

Custom exception class for handling aiSSEMBLE security-related errors with specific error contexts and appropriate error handling patterns.

class AissembleSecurityException(Exception):
    """
    Custom exception for aiSSEMBLE security errors.
    """
    pass

Usage Examples

Basic JWT Token Validation

from aissembleauth.json_web_token_util import JsonWebTokenUtil, AissembleSecurityException
from aissembleauth.auth_config import AuthConfig

# Initialize JWT utility
jwt_util = JsonWebTokenUtil()

def authenticate_request(authorization_header: str) -> dict:
    """Authenticate incoming request using JWT token"""
    
    if not authorization_header or not authorization_header.startswith("Bearer "):
        raise AissembleSecurityException("Missing or invalid authorization header")
    
    # Extract token from Bearer header
    token = authorization_header[7:]  # Remove "Bearer " prefix
    
    try:
        # Validate token
        jwt_util.validate_token(token)
        
        # Parse token for user information
        parsed_token = jwt_util.parse_token(token)
        
        print(f"Authentication successful for user: {parsed_token.get('sub', 'unknown')}")
        return parsed_token
        
    except AissembleSecurityException as e:
        print(f"Authentication failed: {e}")
        raise
    except Exception as e:
        print(f"Unexpected authentication error: {e}")
        raise AissembleSecurityException(f"Token validation failed: {e}")

# Usage example
try:
    auth_header = "Bearer eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9..."
    user_info = authenticate_request(auth_header)
    print(f"Authenticated user: {user_info}")
except AissembleSecurityException as e:
    print(f"Access denied: {e}")

JWT Token Generation and Management

from aissembleauth.json_web_token_util import JsonWebTokenUtil, AissembleSecurityException
from aissembleauth.auth_config import AuthConfig
from datetime import datetime, timedelta
import jwt

class TokenManager:
    """Comprehensive token management for authentication workflows"""
    
    def __init__(self):
        self.jwt_util = JsonWebTokenUtil()
        self.auth_config = AuthConfig()
    
    def generate_user_token(self, user_id: str, roles: list, expires_in_hours: int = 24) -> str:
        """Generate JWT token for user with roles and expiration"""
        try:
            # Get signing key from keystore
            signing_key = self.jwt_util.get_sign_key()
            
            # Create token payload
            now = datetime.utcnow()
            payload = {
                "sub": user_id,
                "roles": roles,
                "iat": now,
                "exp": now + timedelta(hours=expires_in_hours),
                "iss": "aissemble-auth-service",
                "aud": "aissemble-platform"
            }
            
            # Generate token using RS256 algorithm
            token = jwt.encode(payload, signing_key, algorithm="RS256")
            
            print(f"Generated token for user {user_id} with roles: {roles}")
            return token
            
        except Exception as e:
            raise AissembleSecurityException(f"Token generation failed: {e}")
    
    def refresh_token(self, existing_token: str) -> str:
        """Refresh existing valid token with new expiration"""
        try:
            # Validate existing token
            self.jwt_util.validate_token(existing_token)
            
            # Parse existing token
            parsed = self.jwt_util.parse_token(existing_token)
            
            # Generate new token with same claims but new expiration
            return self.generate_user_token(
                user_id=parsed.get("sub"),
                roles=parsed.get("roles", []),
                expires_in_hours=24
            )
            
        except Exception as e:
            raise AissembleSecurityException(f"Token refresh failed: {e}")
    
    def extract_user_context(self, token: str) -> dict:
        """Extract user context from valid token"""
        try:
            self.jwt_util.validate_token(token)
            parsed = self.jwt_util.parse_token(token)
            
            return {
                "user_id": parsed.get("sub"),
                "roles": parsed.get("roles", []),
                "issued_at": parsed.get("iat"),
                "expires_at": parsed.get("exp"),
                "issuer": parsed.get("iss")
            }
            
        except Exception as e:
            raise AissembleSecurityException(f"Context extraction failed: {e}")
    
    def is_token_expired(self, token: str) -> bool:
        """Check if token is expired without full validation"""
        try:
            parsed = jwt.decode(token, options={"verify_signature": False})
            exp_timestamp = parsed.get("exp")
            
            if exp_timestamp:
                return datetime.utcnow() > datetime.fromtimestamp(exp_timestamp)
            return True
            
        except Exception:
            return True

# Usage example
token_manager = TokenManager()

# Generate token for user
user_token = token_manager.generate_user_token(
    user_id="john.doe@company.com",
    roles=["data_scientist", "ml_engineer"],
    expires_in_hours=8
)

# Extract user context
user_context = token_manager.extract_user_context(user_token)
print(f"User context: {user_context}")

# Check token expiration
is_expired = token_manager.is_token_expired(user_token)
print(f"Token expired: {is_expired}")

# Refresh token if needed
if not is_expired:
    refreshed_token = token_manager.refresh_token(user_token)
    print("Token refreshed successfully")

Role-Based Access Control Integration

from aissembleauth.json_web_token_util import JsonWebTokenUtil, AissembleSecurityException
from aissembleauth.auth_config import AuthConfig
from functools import wraps
from typing import List

class RoleBasedAccessControl:
    """Role-based access control using JWT tokens"""
    
    def __init__(self):
        self.jwt_util = JsonWebTokenUtil()
        self.auth_config = AuthConfig()
    
    def require_roles(self, required_roles: List[str]):
        """Decorator for enforcing role-based access control"""
        def decorator(func):
            @wraps(func)
            def wrapper(*args, **kwargs):
                # Extract token from request context (simplified)
                token = kwargs.get('auth_token') or args[0] if args else None
                
                if not token:
                    raise AissembleSecurityException("Authentication token required")
                
                try:
                    # Validate and parse token
                    self.jwt_util.validate_token(token)
                    parsed_token = self.jwt_util.parse_token(token)
                    
                    # Extract user roles
                    user_roles = parsed_token.get("roles", [])
                    
                    # Check if user has required roles
                    if not any(role in user_roles for role in required_roles):
                        raise AissembleSecurityException(
                            f"Access denied. Required roles: {required_roles}, User roles: {user_roles}"
                        )
                    
                    print(f"Access granted for user: {parsed_token.get('sub')}")
                    return func(*args, **kwargs)
                    
                except AissembleSecurityException:
                    raise
                except Exception as e:
                    raise AissembleSecurityException(f"Authorization failed: {e}")
                    
            return wrapper
        return decorator
    
    def check_permissions(self, token: str, resource: str, action: str) -> bool:
        """Check if user has permissions for specific resource and action"""
        try:
            self.jwt_util.validate_token(token)
            parsed_token = self.jwt_util.parse_token(token)
            
            user_roles = parsed_token.get("roles", [])
            
            # Define role-based permissions (in practice, this would be configurable)
            permissions = {
                "admin": {"*": ["*"]},  # Admin has all permissions
                "data_scientist": {
                    "datasets": ["read", "analyze"],
                    "models": ["read", "train", "evaluate"]
                },
                "ml_engineer": {
                    "models": ["read", "deploy", "monitor"],
                    "infrastructure": ["read", "configure"]
                },
                "viewer": {
                    "*": ["read"]
                }
            }
            
            # Check permissions
            for role in user_roles:
                role_perms = permissions.get(role, {})
                
                # Check wildcard permissions
                if "*" in role_perms and ("*" in role_perms["*"] or action in role_perms["*"]):
                    return True
                
                # Check specific resource permissions
                if resource in role_perms and (action in role_perms[resource] or "*" in role_perms[resource]):
                    return True
            
            return False
            
        except Exception as e:
            print(f"Permission check failed: {e}")
            return False

# Usage examples
rbac = RoleBasedAccessControl()

# Decorator usage
@rbac.require_roles(["data_scientist", "ml_engineer"])
def train_model(auth_token: str, model_config: dict):
    """Function that requires data scientist or ML engineer role"""
    print("Training model with configuration:", model_config)
    return {"status": "training_started"}

@rbac.require_roles(["admin"])
def delete_dataset(auth_token: str, dataset_id: str):
    """Function that requires admin role"""
    print(f"Deleting dataset: {dataset_id}")
    return {"status": "deleted"}

# Direct permission checking
def access_resource(token: str, resource: str, action: str):
    """Check access to specific resource and action"""
    if rbac.check_permissions(token, resource, action):
        print(f"Access granted to {action} on {resource}")
        return True
    else:
        print(f"Access denied to {action} on {resource}")
        return False

# Example usage
user_token = "eyJhbGciOiJSUzI1NiJ9..."  # Valid JWT token

try:
    # Use decorated functions
    result = train_model(auth_token=user_token, model_config={"algorithm": "RandomForest"})
    print("Training result:", result)
    
    # Check specific permissions
    can_read_datasets = access_resource(user_token, "datasets", "read")
    can_deploy_models = access_resource(user_token, "models", "deploy")
    
except AissembleSecurityException as e:
    print(f"Security error: {e}")

Configuration-Driven Security Setup

from aissembleauth.auth_config import AuthConfig
from aissembleauth.json_web_token_util import JsonWebTokenUtil, AissembleSecurityException
import os

class SecurityConfigurationManager:
    """Manage security configuration across different environments"""
    
    def __init__(self):
        self.auth_config = AuthConfig()
        self.jwt_util = JsonWebTokenUtil()
    
    def validate_configuration(self) -> dict:
        """Validate authentication configuration"""
        config_status = {
            "public_key_accessible": False,
            "keystore_accessible": False,
            "authorization_enabled": False,
            "pdp_configured": False,
            "configuration_valid": False
        }
        
        try:
            # Check public key
            public_key_path = self.auth_config.public_key_path()
            if public_key_path and os.path.exists(public_key_path):
                config_status["public_key_accessible"] = True
            
            # Check keystore
            jks_path = self.auth_config.jks_path()
            if jks_path and os.path.exists(jks_path):
                config_status["keystore_accessible"] = True
            
            # Check authorization
            config_status["authorization_enabled"] = self.auth_config.is_authorization_enabled()
            
            # Check PDP configuration
            pdp_url = self.auth_config.pdp_host_url()
            if pdp_url:
                config_status["pdp_configured"] = True
            
            # Overall status
            config_status["configuration_valid"] = (
                config_status["public_key_accessible"] and
                config_status["keystore_accessible"]
            )
            
        except Exception as e:
            print(f"Configuration validation error: {e}")
        
        return config_status
    
    def get_security_summary(self) -> dict:
        """Get comprehensive security configuration summary"""
        return {
            "public_key_path": self.auth_config.public_key_path(),
            "keystore_path": self.auth_config.jks_path(),
            "keystore_alias": self.auth_config.jks_key_alias(),
            "authorization_enabled": self.auth_config.is_authorization_enabled(),
            "pdp_host": self.auth_config.pdp_host_url(),
            "configuration_status": self.validate_configuration()
        }
    
    def test_jwt_operations(self) -> dict:
        """Test JWT token operations"""
        test_results = {
            "token_generation": False,
            "token_validation": False,
            "token_parsing": False,
            "signing_key_access": False
        }
        
        try:
            # Test signing key access
            signing_key = self.jwt_util.get_sign_key()
            if signing_key:
                test_results["signing_key_access"] = True
            
            # Test token generation
            test_token = self.jwt_util.create_token()
            if test_token:
                test_results["token_generation"] = True
                
                # Test token validation
                try:
                    self.jwt_util.validate_token(test_token)
                    test_results["token_validation"] = True
                except AissembleSecurityException:
                    pass
                
                # Test token parsing
                try:
                    parsed = self.jwt_util.parse_token(test_token)
                    if parsed:
                        test_results["token_parsing"] = True
                except Exception:
                    pass
                    
        except Exception as e:
            print(f"JWT operations test error: {e}")
        
        return test_results
    
    def initialize_security_system(self) -> bool:
        """Initialize and validate entire security system"""
        print("Initializing aiSSEMBLE security system...")
        
        # Validate configuration
        config_status = self.validate_configuration()
        if not config_status["configuration_valid"]:
            print("Security configuration validation failed")
            return False
        
        # Test JWT operations
        jwt_status = self.test_jwt_operations()
        if not all(jwt_status.values()):
            print("JWT operations test failed")
            return False
        
        print("Security system initialized successfully")
        return True

# Usage example
security_manager = SecurityConfigurationManager()

# Get security summary
summary = security_manager.get_security_summary()
print("Security Configuration Summary:")
for key, value in summary.items():
    print(f"  {key}: {value}")

# Initialize security system
if security_manager.initialize_security_system():
    print("Ready to handle authentication requests")
else:
    print("Security system initialization failed")

# Validate configuration in different environments
config_validation = security_manager.validate_configuration()
print(f"Configuration validation: {config_validation}")

# Test JWT operations
jwt_tests = security_manager.test_jwt_operations()
print(f"JWT operations test: {jwt_tests}")

Policy Decision Point Integration

from aissembleauth.auth_config import AuthConfig
from aissembleauth.json_web_token_util import JsonWebTokenUtil, AissembleSecurityException
import requests
import json

class PolicyDecisionPointClient:
    """Client for integrating with Policy Decision Point (PDP) server"""
    
    def __init__(self):
        self.auth_config = AuthConfig()
        self.jwt_util = JsonWebTokenUtil()
        self.pdp_url = self.auth_config.pdp_host_url()
    
    def authorize_action(self, token: str, resource: str, action: str, context: dict = None) -> bool:
        """Check authorization through PDP server"""
        if not self.auth_config.is_authorization_enabled():
            print("Authorization is disabled, allowing access")
            return True
        
        try:
            # Validate JWT token first
            self.jwt_util.validate_token(token)
            user_info = self.jwt_util.parse_token(token)
            
            # Prepare authorization request
            auth_request = {
                "subject": user_info.get("sub"),
                "resource": resource,
                "action": action,
                "context": context or {},
                "roles": user_info.get("roles", [])
            }
            
            # Send request to PDP
            response = requests.post(
                f"{self.pdp_url}/authorize",
                json=auth_request,
                headers={"Content-Type": "application/json"},
                timeout=5
            )
            
            if response.status_code == 200:
                decision = response.json()
                return decision.get("permit", False)
            else:
                print(f"PDP error: {response.status_code}")
                return False
                
        except AissembleSecurityException:
            print("Token validation failed")
            return False
        except Exception as e:
            print(f"Authorization check failed: {e}")
            return False
    
    def get_user_permissions(self, token: str) -> list:
        """Get all permissions for authenticated user"""
        try:
            self.jwt_util.validate_token(token)
            user_info = self.jwt_util.parse_token(token)
            
            # Request user permissions from PDP
            permission_request = {
                "subject": user_info.get("sub"),
                "roles": user_info.get("roles", [])
            }
            
            response = requests.post(
                f"{self.pdp_url}/permissions",
                json=permission_request,
                headers={"Content-Type": "application/json"},
                timeout=5
            )
            
            if response.status_code == 200:
                return response.json().get("permissions", [])
            else:
                return []
                
        except Exception as e:
            print(f"Permission retrieval failed: {e}")
            return []

class SecureAPIEndpoint:
    """Example of secure API endpoint with PDP integration"""
    
    def __init__(self):
        self.pdp_client = PolicyDecisionPointClient()
    
    def secure_endpoint(self, token: str, resource_id: str, action: str, **kwargs):
        """Generic secure endpoint with PDP authorization"""
        
        # Check authorization through PDP
        if not self.pdp_client.authorize_action(
            token=token,
            resource=f"api/resource/{resource_id}",
            action=action,
            context={
                "ip_address": kwargs.get("client_ip"),
                "user_agent": kwargs.get("user_agent"),
                "timestamp": kwargs.get("timestamp")
            }
        ):
            raise AissembleSecurityException(f"Access denied for {action} on resource {resource_id}")
        
        # Proceed with authorized action
        return f"Successfully executed {action} on resource {resource_id}"

# Usage example
pdp_client = PolicyDecisionPointClient()
secure_api = SecureAPIEndpoint()

# Example authorization check
user_token = "eyJhbGciOiJSUzI1NiJ9..."  # Valid JWT token

try:
    # Check specific authorization
    can_access = pdp_client.authorize_action(
        token=user_token,
        resource="ml_models/fraud_detection",
        action="deploy",
        context={"environment": "production", "approval_id": "DEPLOY-001"}
    )
    
    if can_access:
        print("User authorized to deploy model")
    else:
        print("User not authorized to deploy model")
    
    # Get user permissions
    permissions = pdp_client.get_user_permissions(user_token)
    print(f"User permissions: {permissions}")
    
    # Use secure endpoint
    result = secure_api.secure_endpoint(
        token=user_token,
        resource_id="dataset_001",
        action="read",
        client_ip="192.168.1.100"
    )
    print(f"API result: {result}")
    
except AissembleSecurityException as e:
    print(f"Security error: {e}")

Best Practices

Token Management

  • Use appropriate token expiration times based on security requirements
  • Implement token refresh mechanisms for long-running sessions
  • Store tokens securely on client side (HttpOnly cookies, secure storage)
  • Implement proper token revocation mechanisms

Security Configuration

  • Use strong cryptographic keys for token signing
  • Regularly rotate signing keys and certificates
  • Implement proper keystore security and access controls
  • Use environment-specific configuration management

Authorization Patterns

  • Implement role-based access control consistently
  • Use Policy Decision Points for complex authorization logic
  • Audit and log all authentication and authorization events
  • Implement proper error handling without information leakage

Production Considerations

  • Use HTTPS for all authentication endpoints
  • Implement rate limiting for authentication attempts
  • Monitor for suspicious authentication patterns
  • Regular security audits and penetration testing

Install with Tessl CLI

npx tessl i tessl/pypi-aissemble-foundation-core-python

docs

auth.md

bom.md

config.md

filestore.md

index.md

inference.md

metadata.md

policy.md

tile.json