CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-superset

A modern, enterprise-ready business intelligence web application for data exploration and visualization.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

security-auth.mddocs/

Security and Authentication

Enterprise-grade security system with role-based access control, row-level security, OAuth integration, and comprehensive permission management.

Capabilities

Security Manager

Core security management system built on Flask-AppBuilder.

class SupersetSecurityManager(SecurityManager):
    """Enhanced security manager for Superset with custom permissions."""
    
    def __init__(self, appbuilder: AppBuilder):
        """
        Initialize security manager.
        
        Args:
            appbuilder: Flask-AppBuilder instance
        """
    
    def get_user_menu_access(self, menu_names: List[str]) -> Set[str]:
        """
        Get menus accessible to current user.
        
        Args:
            menu_names: List of menu names to check
            
        Returns:
            Set of accessible menu names
        """
    
    def can_access(self, permission_name: str, view_name: str) -> bool:
        """
        Check if current user has permission.
        
        Args:
            permission_name: Permission to check (can_read, can_write, etc.)
            view_name: View/resource name
            
        Returns:
            True if user has permission
        """
    
    def get_rls_filters(self, table: BaseDatasource) -> List[SqlaQuery]:
        """
        Get row-level security filters for user and table.
        
        Args:
            table: Target datasource
            
        Returns:
            List of SQL filter conditions
        """
    
    def get_datasource_access_error_msg(self, datasource: BaseDatasource) -> str:
        """
        Get error message for datasource access denial.
        
        Args:
            datasource: Inaccessible datasource
            
        Returns:
            Human-readable error message
        """
    
    def raise_for_access(self, permission_name: str = None, 
                        view_name: str = None, **kwargs) -> None:
        """
        Raise exception if user lacks permission.
        
        Args:
            permission_name: Required permission
            view_name: Target view/resource
            
        Raises:
            SupersetSecurityException: If access denied
        """
    
    def get_user_roles(self) -> List[Role]:
        """Get roles for current user."""
    
    def is_admin(self) -> bool:
        """Check if current user is admin."""
    
    def is_alpha(self) -> bool:
        """Check if current user has alpha role."""
    
    def is_gamma(self) -> bool:  
        """Check if current user has gamma role."""

Usage Examples:

from superset import security_manager

# Check permissions
if security_manager.can_access('can_read', 'Dashboard'):
    # User can read dashboards
    pass

# Get accessible menus
accessible_menus = security_manager.get_user_menu_access(['SQL Lab', 'Charts'])

# Raise for access (will throw exception if denied)
security_manager.raise_for_access('can_edit', 'Chart')

Authentication Types

Different authentication mechanisms supported by Superset.

# Authentication Type Constants
AUTH_OID: int = 0          # OpenID authentication
AUTH_DB: int = 1           # Database authentication  
AUTH_LDAP: int = 2         # LDAP authentication
AUTH_REMOTE_USER: int = 3  # Remote user authentication
AUTH_OAUTH: int = 4        # OAuth authentication

class AuthDBView(AuthView):
    """Database authentication view."""
    
    def login(self) -> Response:
        """
        Handle database login.
        
        Returns:
            Login response or redirect
        """

class AuthOAuthView(AuthView):
    """OAuth authentication view."""
    
    def oauth_authorize(self, provider: str) -> Response:
        """
        Initiate OAuth authorization flow.
        
        Args:
            provider: OAuth provider name
            
        Returns:
            Redirect to OAuth provider
        """
    
    def oauth_authorized(self, provider: str) -> Response:
        """
        Handle OAuth callback.
        
        Args:
            provider: OAuth provider name
            
        Returns:
            Login completion response
        """

class AuthLDAPView(AuthView):
    """LDAP authentication view."""
    
    def authenticate(self, username: str, password: str) -> bool:
        """
        Authenticate against LDAP server.
        
        Args:
            username: LDAP username
            password: User password
            
        Returns:
            True if authentication successful
        """

Usage Examples:

# Database authentication configuration
AUTH_TYPE = AUTH_DB
AUTH_USER_REGISTRATION = True
AUTH_USER_REGISTRATION_ROLE = "Gamma"

# OAuth configuration
AUTH_TYPE = AUTH_OAUTH
OAUTH_PROVIDERS = [{
    'name': 'google',
    'token_key': 'access_token',
    'icon': 'fa-google',
    'remote_app': {
        'client_id': 'your-client-id',
        'client_secret': 'your-client-secret',
        'server_metadata_url': 'https://accounts.google.com/.well-known/openid_configuration',
        'client_kwargs': {'scope': 'openid email profile'}
    }
}]

Permission System

Granular permission system for controlling access to resources.

# Core Permission Classes
class Permission:
    """Permission for accessing resources."""
    
    name: str              # Permission name (can_read, can_write, etc.)
    view_menu: ViewMenu    # Target resource/view
    
    def __init__(self, name: str, view_menu: ViewMenu):
        """
        Initialize permission.
        
        Args:
            name: Permission type
            view_menu: Target resource
        """

class ViewMenu:
    """Resource/view that can be protected."""
    
    name: str  # Resource name (Dashboard, Chart, etc.)
    
    def __init__(self, name: str):
        """
        Initialize view menu.
        
        Args:
            name: Resource name
        """

class Role:
    """Group of permissions assigned to users."""
    
    name: str                      # Role name
    permissions: List[Permission]  # Assigned permissions
    
    def __init__(self, name: str):
        """
        Initialize role.
        
        Args:
            name: Role name
        """

# Permission Constants
PERMISSION_CAN_READ = "can_read"
PERMISSION_CAN_WRITE = "can_write" 
PERMISSION_CAN_DELETE = "can_delete"
PERMISSION_CAN_LIST = "can_list"
PERMISSION_CAN_SHOW = "can_show"
PERMISSION_CAN_ADD = "can_add"
PERMISSION_CAN_EDIT = "can_edit"

# Resource Constants
RESOURCE_DASHBOARD = "Dashboard"
RESOURCE_CHART = "Chart" 
RESOURCE_DATASET = "Dataset"
RESOURCE_DATABASE = "Database"
RESOURCE_SQL_LAB = "SQL Lab"

Built-in Roles

Predefined roles with different permission levels.

class DefaultRoles:
    """Default roles in Superset."""
    
    ADMIN = "Admin"           # Full system access
    ALPHA = "Alpha"           # Power user access
    GAMMA = "Gamma"           # Standard user access
    PUBLIC = "Public"         # Public/guest access
    SQL = "sql_lab"          # SQL Lab access
    GRANTER = "granter"      # Can grant permissions

def create_admin_role() -> Role:
    """
    Create admin role with all permissions.
    
    Returns:
        Admin role with full access
    """

def create_alpha_role() -> Role:
    """
    Create alpha role with power user permissions.
    
    Returns:
        Alpha role for power users
    """

def create_gamma_role() -> Role:
    """
    Create gamma role with standard user permissions.
    
    Returns:
        Gamma role for standard users
    """

def create_public_role() -> Role:
    """
    Create public role for guest access.
    
    Returns:
        Public role with limited access
    """

Usage Examples:

# Check user roles
from superset import security_manager

user = security_manager.get_user()
if security_manager.is_admin():
    # User has admin privileges
    pass

# Get user permissions  
user_permissions = []
for role in user.roles:
    user_permissions.extend(role.permissions)

Row-Level Security

Fine-grained data access control at the row level.

class RowLevelSecurityFilter(Model):
    """Row-level security filter definition."""
    
    id: int
    filter_type: str           # SQL or Regular
    tables: List[SqlaTable]    # Target tables
    roles: List[Role]          # Applicable roles  
    group_key: str            # Grouping key for filters
    clause: str               # SQL WHERE clause
    
    def __init__(self, filter_type: str = "Regular", clause: str = None,
                 **kwargs):
        """
        Initialize RLS filter.
        
        Args:
            filter_type: Type of filter (SQL or Regular)
            clause: SQL WHERE clause for filtering
        """

def get_rls_filters(datasource: BaseDatasource, 
                   user: User = None) -> List[str]:
    """
    Get applicable RLS filters for datasource and user.
    
    Args:
        datasource: Target datasource
        user: User to get filters for (current user if None)
        
    Returns:
        List of SQL WHERE clauses to apply
    """

def apply_rls_filters(query: str, filters: List[str]) -> str:
    """
    Apply RLS filters to SQL query.
    
    Args:
        query: Original SQL query
        filters: List of filter clauses
        
    Returns:
        Modified query with filters applied
    """

Usage Examples:

# Create RLS filter
rls_filter = RowLevelSecurityFilter(
    filter_type="Regular",
    clause="department = '{{ current_user_id() }}'"
)
rls_filter.tables.append(sales_table)
rls_filter.roles.append(gamma_role)

# Apply RLS in query execution
filters = get_rls_filters(datasource, current_user)
filtered_query = apply_rls_filters(original_query, filters)

User Management

User creation, role assignment, and profile management.

class UserManager:
    """User management utilities."""
    
    def create_user(self, username: str, email: str, first_name: str,
                   last_name: str, password: str, role: Role) -> User:
        """
        Create new user.
        
        Args:
            username: Login username
            email: Email address
            first_name: First name
            last_name: Last name  
            password: Initial password
            role: Primary role
            
        Returns:
            Created user instance
        """
    
    def update_user_roles(self, user: User, roles: List[Role]) -> None:
        """
        Update user's role assignments.
        
        Args:
            user: Target user
            roles: New role list
        """
    
    def deactivate_user(self, user: User) -> None:
        """
        Deactivate user account.
        
        Args:
            user: User to deactivate
        """
    
    def reset_password(self, user: User, new_password: str) -> None:
        """
        Reset user password.
        
        Args:
            user: Target user
            new_password: New password
        """

def get_current_user() -> User:
    """Get currently authenticated user."""

def get_user_by_username(username: str) -> User:
    """Get user by username."""

def get_user_roles(user: User) -> List[Role]:
    """Get roles assigned to user."""

API Security

JWT-based API authentication and authorization.

class JWTManager:
    """JWT token management for API access."""
    
    def generate_token(self, user: User, expires_delta: timedelta = None) -> str:
        """
        Generate JWT access token.
        
        Args:
            user: User to generate token for
            expires_delta: Token expiration time
            
        Returns:
            JWT access token
        """
    
    def verify_token(self, token: str) -> User:
        """
        Verify and decode JWT token.
        
        Args:
            token: JWT token to verify
            
        Returns:
            User associated with token
            
        Raises:
            TokenError: If token invalid or expired
        """
    
    def refresh_token(self, refresh_token: str) -> str:
        """
        Generate new access token from refresh token.
        
        Args:
            refresh_token: Valid refresh token
            
        Returns:
            New access token
        """

def create_guest_token(resources: List[Dict[str, Any]], 
                      user_data: Dict[str, Any] = None) -> str:
    """
    Create guest access token for embedded dashboards.
    
    Args:
        resources: List of accessible resources
        user_data: Guest user attributes
        
    Returns:
        Guest access token
    """

Usage Examples:

# Generate API token
from superset.utils import jwt

user = get_current_user()
token = jwt.generate_token(user)

# Use token in API calls
headers = {'Authorization': f'Bearer {token}'}
response = requests.get('/api/v1/dashboard/', headers=headers)

# Create guest token for embedded dashboard
guest_token = create_guest_token([
    {'type': 'dashboard', 'id': 123}
], {'username': 'guest', 'first_name': 'Guest'})

Security Configuration

Security Settings

# Core security settings
SECRET_KEY: str = "your-secret-key"
WTF_CSRF_ENABLED: bool = True
WTF_CSRF_TIME_LIMIT: int = 3600

# Session security
SESSION_COOKIE_SECURE: bool = True
SESSION_COOKIE_HTTPONLY: bool = True
SESSION_COOKIE_SAMESITE: str = "Lax"
PERMANENT_SESSION_LIFETIME: timedelta = timedelta(days=31)

# Password policies
AUTH_PASSWORD_COMPLEXITY_ENABLED: bool = True
AUTH_PASSWORD_MIN_LENGTH: int = 8
AUTH_PASSWORD_COMPLEXITY_VALIDATOR: Callable = None

# Login attempt limits
AUTH_RATE_LIMITED: bool = True
AUTH_RATE_LIMIT: str = "5 per 40 second"

Install with Tessl CLI

npx tessl i tessl/pypi-apache-superset@1.0.1

docs

cli-commands.md

configuration.md

index.md

models-database.md

rest-api.md

security-auth.md

visualization.md

tile.json

VALIDATION_REPORT.md