Flask-AppBuilder (FAB) security integration component within Apache Airflow core, providing authentication, authorization, and security management features
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Multiple authentication backend support including database, LDAP, OAuth, OpenID, and remote user authentication with configurable options and provider-specific implementations. Each backend provides comprehensive authentication capabilities tailored to different enterprise environments.
Username/password authentication using the local database with password hashing and validation.
def auth_user_db(self, username: str, password: str) -> User | None:
"""
Authenticate user using database credentials.
Parameters:
- username: Username or registered email address
- password: Plain text password to verify against hashed password
Returns:
User object if authentication successful, None otherwise
Features:
- Case-insensitive username matching (if configured)
- Email address login support
- Password hash verification using Werkzeug
- Brute force protection with fail counters
- Session ID rotation on successful login
"""Enterprise LDAP and Active Directory authentication with extensive configuration options.
def auth_user_ldap(self, username: str, password: str) -> User | None:
"""
Authenticate user using LDAP/Active Directory.
Parameters:
- username: LDAP username
- password: LDAP password
Returns:
User object if authentication successful, None otherwise
Features:
- Direct bind and indirect bind (search first) modes
- TLS/SSL support with certificate validation
- User attribute mapping (name, email, groups)
- Group-based role mapping
- User auto-registration from LDAP attributes
- Role synchronization at login
"""OAuth 2.0 authentication supporting multiple providers with comprehensive provider integrations.
def auth_user_oauth(self, userinfo: dict) -> User | None:
"""
Authenticate user using OAuth provider information.
Parameters:
- userinfo: Dictionary containing user information from OAuth provider
Keys should match User model columns (username, email, first_name, etc.)
Returns:
User object if authentication successful, None otherwise
Supported Providers:
- GitHub/GitHub Enterprise
- Google
- Microsoft Azure AD
- OpenShift
- Okta
- Keycloak
- Twitter
- LinkedIn
- Custom OAuth providers
"""
def get_oauth_user_info(self, provider: str, resp: dict) -> dict:
"""
Extract user information from OAuth provider response.
Parameters:
- provider: OAuth provider name ('github', 'google', 'azure', etc.)
- resp: OAuth provider response containing user data
Returns:
Dictionary with standardized user information:
- username: Unique username for the user
- email: User's email address
- first_name: User's first name (if available)
- last_name: User's last name (if available)
- role_keys: List of external role identifiers for role mapping
"""OpenID authentication for identity provider integration.
def auth_user_oid(self, email: str) -> User | None:
"""
Authenticate user using OpenID.
Parameters:
- email: User's email address from OpenID provider
Returns:
User object if authentication successful, None otherwise
Features:
- Email-based user identification
- Integration with Flask-OpenID
- User account validation and activation checks
"""Authentication for users pre-authenticated by web server or proxy (e.g., Kerberos, SAML).
def auth_user_remote_user(self, username: str) -> User | None:
"""
Authenticate user using remote user authentication.
Parameters:
- username: Username from remote authentication system (e.g., REMOTE_USER header)
Returns:
User object if authentication successful, None otherwise
Features:
- Trusted authentication from upstream systems
- User auto-registration with default role assignment
- No password verification (trust upstream authentication)
"""# GitHub OAuth configuration example
OAUTH_PROVIDERS = [{
'name': 'github',
'token_key': 'access_token',
'icon': 'fa-github',
'remote_app': {
'client_id': 'your_github_client_id',
'client_secret': 'your_github_client_secret',
'server_metadata_url': 'https://github.com/.well-known/oauth_authorization_server',
'client_kwargs': {
'scope': 'user:email'
}
}
}]# Google OAuth configuration example
OAUTH_PROVIDERS = [{
'name': 'google',
'icon': 'fa-google',
'token_key': 'access_token',
'remote_app': {
'client_id': 'your_google_client_id',
'client_secret': 'your_google_client_secret',
'server_metadata_url': 'https://accounts.google.com/.well-known/openid_configuration',
'client_kwargs': {
'scope': 'openid email profile'
}
}
}]# Azure AD OAuth configuration example
OAUTH_PROVIDERS = [{
'name': 'azure',
'icon': 'fa-windows',
'token_key': 'access_token',
'remote_app': {
'client_id': 'your_azure_client_id',
'client_secret': 'your_azure_client_secret',
'server_metadata_url': 'https://login.microsoftonline.com/your_tenant_id/v2.0/.well-known/openid_configuration',
'client_kwargs': {
'scope': 'openid email profile'
}
}
}]# Basic LDAP configuration
AUTH_TYPE = AUTH_LDAP
AUTH_LDAP_SERVER = "ldap://ldap.company.com"
AUTH_LDAP_SEARCH = "ou=people,dc=company,dc=com"
AUTH_LDAP_UID_FIELD = "uid"
AUTH_LDAP_FIRSTNAME_FIELD = "givenName"
AUTH_LDAP_LASTNAME_FIELD = "sn"
AUTH_LDAP_EMAIL_FIELD = "mail"# Advanced LDAP with TLS and group mapping
AUTH_LDAP_USE_TLS = True
AUTH_LDAP_TLS_DEMAND = True
AUTH_LDAP_TLS_CACERTFILE = "/path/to/ca.crt"
AUTH_LDAP_BIND_USER = "cn=airflow,ou=service,dc=company,dc=com"
AUTH_LDAP_BIND_PASSWORD = "service_password"
AUTH_LDAP_SEARCH_FILTER = "(objectClass=person)"
AUTH_LDAP_GROUP_FIELD = "memberOf"
# Role mapping from LDAP groups
AUTH_ROLES_MAPPING = {
"cn=airflow-admins,ou=groups,dc=company,dc=com": ["Admin"],
"cn=airflow-users,ou=groups,dc=company,dc=com": ["User"],
}def _rotate_session_id(self) -> None:
"""
Rotate session ID upon successful authentication when using database sessions.
Prevents session fixation attacks.
"""
def update_user_auth_stat(self, user: User, success: bool = True) -> None:
"""
Update user authentication statistics.
Parameters:
- user: User object to update
- success: Whether authentication was successful
Updates:
- login_count: Incremented on successful login
- last_login: Set to current timestamp on success
- fail_login_count: Incremented on failure, reset to 0 on success
"""def _ldap_calculate_user_roles(self, user_attributes: dict[str, list[bytes]]) -> list[Role]:
"""
Calculate user roles from LDAP attributes using AUTH_ROLES_MAPPING.
Parameters:
- user_attributes: LDAP user attributes dictionary
Returns:
List of Role objects for the user
"""
def _oauth_calculate_user_roles(self, userinfo: dict) -> list[Role]:
"""
Calculate user roles from OAuth userinfo using role_keys and AUTH_ROLES_MAPPING.
Parameters:
- userinfo: OAuth user information dictionary
Returns:
List of Role objects for the user
"""
def get_roles_from_keys(self, role_keys: list[str]) -> set[Role]:
"""
Map external role keys to internal roles using AUTH_ROLES_MAPPING.
Parameters:
- role_keys: List of external role identifiers
Returns:
Set of Role objects mapped from the keys
"""AUTH_DB = 0 # Database authentication
AUTH_OID = 1 # OpenID authentication
AUTH_LDAP = 2 # LDAP authentication
AUTH_REMOTE_USER = 3 # Remote user authentication
AUTH_OAUTH = 4 # OAuth authentication# Default security configuration
AUTH_ROLE_ADMIN = "Admin"
AUTH_ROLE_PUBLIC = "Public"
AUTH_USER_REGISTRATION = False
AUTH_USER_REGISTRATION_ROLE = "Public"
AUTH_USERNAME_CI = True
AUTH_ROLES_SYNC_AT_LOGIN = False
AUTH_API_LOGIN_ALLOW_MULTIPLE_PROVIDERS = Falsefrom airflow.www.fab_security.manager import AUTH_DB
# Configure database authentication
app.config['AUTH_TYPE'] = AUTH_DB
app.config['AUTH_ROLE_ADMIN'] = 'Admin'
app.config['AUTH_ROLE_PUBLIC'] = 'Viewer'
# Authenticate user
user = security_manager.auth_user_db('john_doe', 'password123')
if user:
print(f"Authenticated: {user.get_full_name()}")from airflow.www.fab_security.manager import AUTH_LDAP
# Configure LDAP authentication
app.config.update({
'AUTH_TYPE': AUTH_LDAP,
'AUTH_LDAP_SERVER': 'ldaps://ldap.company.com:636',
'AUTH_LDAP_SEARCH': 'ou=employees,dc=company,dc=com',
'AUTH_LDAP_UID_FIELD': 'sAMAccountName',
'AUTH_LDAP_USE_TLS': True,
'AUTH_USER_REGISTRATION': True,
'AUTH_ROLES_SYNC_AT_LOGIN': True,
'AUTH_ROLES_MAPPING': {
'CN=AirflowAdmins,OU=Groups,DC=company,DC=com': ['Admin'],
'CN=AirflowUsers,OU=Groups,DC=company,DC=com': ['User']
}
})
# Authenticate LDAP user
user = security_manager.auth_user_ldap('john.doe', 'ldap_password')from airflow.www.fab_security.manager import AUTH_OAUTH
# Configure OAuth with Google
app.config.update({
'AUTH_TYPE': AUTH_OAUTH,
'OAUTH_PROVIDERS': [{
'name': 'google',
'icon': 'fa-google',
'token_key': 'access_token',
'remote_app': {
'client_id': 'your_google_client_id',
'client_secret': 'your_google_client_secret',
'server_metadata_url': 'https://accounts.google.com/.well-known/openid_configuration',
'client_kwargs': {'scope': 'openid email profile'}
}
}]
})
# OAuth userinfo processing
oauth_userinfo = {
'username': 'google_123456789',
'email': 'john@company.com',
'first_name': 'John',
'last_name': 'Doe'
}
user = security_manager.auth_user_oauth(oauth_userinfo)None on failure and log appropriate messagesInstall with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-fab-security