A modern, enterprise-ready business intelligence web application for data exploration and visualization.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Enterprise-grade security system with role-based access control, row-level security, OAuth integration, and comprehensive permission management.
Core security management system built on Flask-AppBuilder.
class SupersetSecurityManager(SecurityManager):
"""Enhanced security manager for Superset with custom permissions."""
def __init__(self, appbuilder: AppBuilder):
"""
Initialize security manager.
Args:
appbuilder: Flask-AppBuilder instance
"""
def get_user_menu_access(self, menu_names: List[str]) -> Set[str]:
"""
Get menus accessible to current user.
Args:
menu_names: List of menu names to check
Returns:
Set of accessible menu names
"""
def can_access(self, permission_name: str, view_name: str) -> bool:
"""
Check if current user has permission.
Args:
permission_name: Permission to check (can_read, can_write, etc.)
view_name: View/resource name
Returns:
True if user has permission
"""
def get_rls_filters(self, table: BaseDatasource) -> List[SqlaQuery]:
"""
Get row-level security filters for user and table.
Args:
table: Target datasource
Returns:
List of SQL filter conditions
"""
def get_datasource_access_error_msg(self, datasource: BaseDatasource) -> str:
"""
Get error message for datasource access denial.
Args:
datasource: Inaccessible datasource
Returns:
Human-readable error message
"""
def raise_for_access(self, permission_name: str = None,
view_name: str = None, **kwargs) -> None:
"""
Raise exception if user lacks permission.
Args:
permission_name: Required permission
view_name: Target view/resource
Raises:
SupersetSecurityException: If access denied
"""
def get_user_roles(self) -> List[Role]:
"""Get roles for current user."""
def is_admin(self) -> bool:
"""Check if current user is admin."""
def is_alpha(self) -> bool:
"""Check if current user has alpha role."""
def is_gamma(self) -> bool:
"""Check if current user has gamma role."""Usage Examples:
from superset import security_manager
# Check permissions
if security_manager.can_access('can_read', 'Dashboard'):
# User can read dashboards
pass
# Get accessible menus
accessible_menus = security_manager.get_user_menu_access(['SQL Lab', 'Charts'])
# Raise for access (will throw exception if denied)
security_manager.raise_for_access('can_edit', 'Chart')Different authentication mechanisms supported by Superset.
# Authentication Type Constants
AUTH_OID: int = 0 # OpenID authentication
AUTH_DB: int = 1 # Database authentication
AUTH_LDAP: int = 2 # LDAP authentication
AUTH_REMOTE_USER: int = 3 # Remote user authentication
AUTH_OAUTH: int = 4 # OAuth authentication
class AuthDBView(AuthView):
"""Database authentication view."""
def login(self) -> Response:
"""
Handle database login.
Returns:
Login response or redirect
"""
class AuthOAuthView(AuthView):
"""OAuth authentication view."""
def oauth_authorize(self, provider: str) -> Response:
"""
Initiate OAuth authorization flow.
Args:
provider: OAuth provider name
Returns:
Redirect to OAuth provider
"""
def oauth_authorized(self, provider: str) -> Response:
"""
Handle OAuth callback.
Args:
provider: OAuth provider name
Returns:
Login completion response
"""
class AuthLDAPView(AuthView):
"""LDAP authentication view."""
def authenticate(self, username: str, password: str) -> bool:
"""
Authenticate against LDAP server.
Args:
username: LDAP username
password: User password
Returns:
True if authentication successful
"""Usage Examples:
# Database authentication configuration
AUTH_TYPE = AUTH_DB
AUTH_USER_REGISTRATION = True
AUTH_USER_REGISTRATION_ROLE = "Gamma"
# OAuth configuration
AUTH_TYPE = AUTH_OAUTH
OAUTH_PROVIDERS = [{
'name': 'google',
'token_key': 'access_token',
'icon': 'fa-google',
'remote_app': {
'client_id': 'your-client-id',
'client_secret': 'your-client-secret',
'server_metadata_url': 'https://accounts.google.com/.well-known/openid_configuration',
'client_kwargs': {'scope': 'openid email profile'}
}
}]Granular permission system for controlling access to resources.
# Core Permission Classes
class Permission:
"""Permission for accessing resources."""
name: str # Permission name (can_read, can_write, etc.)
view_menu: ViewMenu # Target resource/view
def __init__(self, name: str, view_menu: ViewMenu):
"""
Initialize permission.
Args:
name: Permission type
view_menu: Target resource
"""
class ViewMenu:
"""Resource/view that can be protected."""
name: str # Resource name (Dashboard, Chart, etc.)
def __init__(self, name: str):
"""
Initialize view menu.
Args:
name: Resource name
"""
class Role:
"""Group of permissions assigned to users."""
name: str # Role name
permissions: List[Permission] # Assigned permissions
def __init__(self, name: str):
"""
Initialize role.
Args:
name: Role name
"""
# Permission Constants
PERMISSION_CAN_READ = "can_read"
PERMISSION_CAN_WRITE = "can_write"
PERMISSION_CAN_DELETE = "can_delete"
PERMISSION_CAN_LIST = "can_list"
PERMISSION_CAN_SHOW = "can_show"
PERMISSION_CAN_ADD = "can_add"
PERMISSION_CAN_EDIT = "can_edit"
# Resource Constants
RESOURCE_DASHBOARD = "Dashboard"
RESOURCE_CHART = "Chart"
RESOURCE_DATASET = "Dataset"
RESOURCE_DATABASE = "Database"
RESOURCE_SQL_LAB = "SQL Lab"Predefined roles with different permission levels.
class DefaultRoles:
"""Default roles in Superset."""
ADMIN = "Admin" # Full system access
ALPHA = "Alpha" # Power user access
GAMMA = "Gamma" # Standard user access
PUBLIC = "Public" # Public/guest access
SQL = "sql_lab" # SQL Lab access
GRANTER = "granter" # Can grant permissions
def create_admin_role() -> Role:
"""
Create admin role with all permissions.
Returns:
Admin role with full access
"""
def create_alpha_role() -> Role:
"""
Create alpha role with power user permissions.
Returns:
Alpha role for power users
"""
def create_gamma_role() -> Role:
"""
Create gamma role with standard user permissions.
Returns:
Gamma role for standard users
"""
def create_public_role() -> Role:
"""
Create public role for guest access.
Returns:
Public role with limited access
"""Usage Examples:
# Check user roles
from superset import security_manager
user = security_manager.get_user()
if security_manager.is_admin():
# User has admin privileges
pass
# Get user permissions
user_permissions = []
for role in user.roles:
user_permissions.extend(role.permissions)Fine-grained data access control at the row level.
class RowLevelSecurityFilter(Model):
"""Row-level security filter definition."""
id: int
filter_type: str # SQL or Regular
tables: List[SqlaTable] # Target tables
roles: List[Role] # Applicable roles
group_key: str # Grouping key for filters
clause: str # SQL WHERE clause
def __init__(self, filter_type: str = "Regular", clause: str = None,
**kwargs):
"""
Initialize RLS filter.
Args:
filter_type: Type of filter (SQL or Regular)
clause: SQL WHERE clause for filtering
"""
def get_rls_filters(datasource: BaseDatasource,
user: User = None) -> List[str]:
"""
Get applicable RLS filters for datasource and user.
Args:
datasource: Target datasource
user: User to get filters for (current user if None)
Returns:
List of SQL WHERE clauses to apply
"""
def apply_rls_filters(query: str, filters: List[str]) -> str:
"""
Apply RLS filters to SQL query.
Args:
query: Original SQL query
filters: List of filter clauses
Returns:
Modified query with filters applied
"""Usage Examples:
# Create RLS filter
rls_filter = RowLevelSecurityFilter(
filter_type="Regular",
clause="department = '{{ current_user_id() }}'"
)
rls_filter.tables.append(sales_table)
rls_filter.roles.append(gamma_role)
# Apply RLS in query execution
filters = get_rls_filters(datasource, current_user)
filtered_query = apply_rls_filters(original_query, filters)User creation, role assignment, and profile management.
class UserManager:
"""User management utilities."""
def create_user(self, username: str, email: str, first_name: str,
last_name: str, password: str, role: Role) -> User:
"""
Create new user.
Args:
username: Login username
email: Email address
first_name: First name
last_name: Last name
password: Initial password
role: Primary role
Returns:
Created user instance
"""
def update_user_roles(self, user: User, roles: List[Role]) -> None:
"""
Update user's role assignments.
Args:
user: Target user
roles: New role list
"""
def deactivate_user(self, user: User) -> None:
"""
Deactivate user account.
Args:
user: User to deactivate
"""
def reset_password(self, user: User, new_password: str) -> None:
"""
Reset user password.
Args:
user: Target user
new_password: New password
"""
def get_current_user() -> User:
"""Get currently authenticated user."""
def get_user_by_username(username: str) -> User:
"""Get user by username."""
def get_user_roles(user: User) -> List[Role]:
"""Get roles assigned to user."""JWT-based API authentication and authorization.
class JWTManager:
"""JWT token management for API access."""
def generate_token(self, user: User, expires_delta: timedelta = None) -> str:
"""
Generate JWT access token.
Args:
user: User to generate token for
expires_delta: Token expiration time
Returns:
JWT access token
"""
def verify_token(self, token: str) -> User:
"""
Verify and decode JWT token.
Args:
token: JWT token to verify
Returns:
User associated with token
Raises:
TokenError: If token invalid or expired
"""
def refresh_token(self, refresh_token: str) -> str:
"""
Generate new access token from refresh token.
Args:
refresh_token: Valid refresh token
Returns:
New access token
"""
def create_guest_token(resources: List[Dict[str, Any]],
user_data: Dict[str, Any] = None) -> str:
"""
Create guest access token for embedded dashboards.
Args:
resources: List of accessible resources
user_data: Guest user attributes
Returns:
Guest access token
"""Usage Examples:
# Generate API token
from superset.utils import jwt
user = get_current_user()
token = jwt.generate_token(user)
# Use token in API calls
headers = {'Authorization': f'Bearer {token}'}
response = requests.get('/api/v1/dashboard/', headers=headers)
# Create guest token for embedded dashboard
guest_token = create_guest_token([
{'type': 'dashboard', 'id': 123}
], {'username': 'guest', 'first_name': 'Guest'})# Core security settings
SECRET_KEY: str = "your-secret-key"
WTF_CSRF_ENABLED: bool = True
WTF_CSRF_TIME_LIMIT: int = 3600
# Session security
SESSION_COOKIE_SECURE: bool = True
SESSION_COOKIE_HTTPONLY: bool = True
SESSION_COOKIE_SAMESITE: str = "Lax"
PERMANENT_SESSION_LIFETIME: timedelta = timedelta(days=31)
# Password policies
AUTH_PASSWORD_COMPLEXITY_ENABLED: bool = True
AUTH_PASSWORD_MIN_LENGTH: int = 8
AUTH_PASSWORD_COMPLEXITY_VALIDATOR: Callable = None
# Login attempt limits
AUTH_RATE_LIMITED: bool = True
AUTH_RATE_LIMIT: str = "5 per 40 second"Install with Tessl CLI
npx tessl i tessl/pypi-apache-superset@1.0.1