Flask-AppBuilder (FAB) security integration component within Apache Airflow core, providing authentication, authorization, and security management features
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Core security management functionality providing authentication, authorization, session management, and security configuration. This module serves as the foundation for all security operations in the Airflow FAB integration.
Authenticate users using various backends including database, LDAP, OAuth, OpenID, and remote user authentication.
def auth_user_db(self, username: str, password: str) -> User | None:
"""
Authenticate user using database credentials.
Parameters:
- username: Username or email address
- password: Plain text password to verify
Returns:
User object if authenticated successfully, None otherwise
"""
def auth_user_ldap(self, username: str, password: str) -> User | None:
"""
Authenticate user using LDAP/Active Directory.
Parameters:
- username: LDAP username
- password: LDAP password
Returns:
User object if authenticated successfully, None otherwise
"""
def auth_user_oauth(self, userinfo: dict) -> User | None:
"""
Authenticate user using OAuth provider information.
Parameters:
- userinfo: Dictionary containing user information from OAuth provider
Keys should match User model columns
Returns:
User object if authenticated successfully, None otherwise
"""
def auth_user_oid(self, email: str) -> User | None:
"""
Authenticate user using OpenID.
Parameters:
- email: User's email address from OpenID provider
Returns:
User object if authenticated successfully, None otherwise
"""
def auth_user_remote_user(self, username: str) -> User | None:
"""
Authenticate user using remote user authentication (e.g., from web server).
Parameters:
- username: Username from remote authentication system
Returns:
User object if authenticated successfully, None otherwise
"""Password reset and authentication statistics management for database authentication.
def reset_password(self, userid: int, password: str) -> bool:
"""
Reset user password for database authentication.
Parameters:
- userid: User ID to reset password for
- password: New plain text password (will be hashed)
Returns:
True if password reset successfully, False otherwise
"""
def update_user_auth_stat(self, user: User, success: bool = True) -> None:
"""
Update user authentication statistics.
Parameters:
- user: User object to update statistics for
- success: Whether authentication attempt was successful
"""Manage user sessions and JWT token handling for authentication state.
def load_user(self, user_id: str) -> User:
"""
Load user by ID for session management.
Parameters:
- user_id: String representation of user ID
Returns:
User object
"""
def load_user_jwt(self, _jwt_header: dict, jwt_data: dict) -> User:
"""
Load user from JWT token data.
Parameters:
- _jwt_header: JWT header (unused)
- jwt_data: JWT payload data containing user identity
Returns:
User object
"""
@staticmethod
def before_request() -> None:
"""
Hook that runs before each request to set up user context.
Sets flask g.user to current_user for request processing.
"""OAuth provider integration with support for multiple providers and token management.
def oauth_user_info_getter(self, f: callable) -> callable:
"""
Decorator to set OAuth user info getter function.
Parameters:
- f: Function that takes (sm, provider, response) and returns user info dict
Returns:
Decorated function
"""
def get_oauth_token_key_name(self, provider: str) -> str:
"""
Get token key name for OAuth provider.
Parameters:
- provider: OAuth provider name
Returns:
Token key name (defaults to 'oauth_token')
"""
def get_oauth_token_secret_name(self, provider: str) -> str:
"""
Get token secret name for OAuth provider.
Parameters:
- provider: OAuth provider name
Returns:
Token secret name (defaults to 'oauth_token_secret')
"""
def set_oauth_session(self, provider: str, oauth_response: dict) -> None:
"""
Set OAuth session data for authenticated user.
Parameters:
- provider: OAuth provider name
- oauth_response: Response from OAuth provider containing tokens
"""
def get_oauth_user_info(self, provider: str, resp: dict) -> dict:
"""
Extract user information from OAuth provider response.
Parameters:
- provider: OAuth provider name ('github', 'google', 'azure', etc.)
- resp: OAuth provider response
Returns:
Dictionary with user information (username, email, first_name, last_name, etc.)
"""LDAP authentication with support for TLS, search filters, and user attribute mapping.
@staticmethod
def ldap_extract(ldap_dict: dict[str, list[bytes]], field_name: str, fallback: str) -> str:
"""
Extract single value from LDAP attribute dictionary.
Parameters:
- ldap_dict: LDAP attributes dictionary
- field_name: Attribute name to extract
- fallback: Default value if attribute is empty
Returns:
Decoded string value or fallback
"""
@staticmethod
def ldap_extract_list(ldap_dict: dict[str, list[bytes]], field_name: str) -> list[str]:
"""
Extract list of values from LDAP attribute dictionary.
Parameters:
- ldap_dict: LDAP attributes dictionary
- field_name: Attribute name to extract
Returns:
List of decoded string values
"""Access to security configuration properties and settings.
@property
def auth_type(self) -> int:
"""Get configured authentication type."""
@property
def auth_role_admin(self) -> str:
"""Get administrator role name."""
@property
def auth_role_public(self) -> str:
"""Get public/anonymous role name."""
@property
def auth_user_registration(self) -> bool:
"""Check if user self-registration is enabled."""
@property
def auth_username_ci(self) -> bool:
"""Check if username matching is case-insensitive."""
@property
def current_user(self) -> User | None:
"""Get current authenticated user."""from airflow.www.fab_security.sqla.manager import SecurityManager
# Database authentication
user = security_manager.auth_user_db("john_doe", "password123")
if user:
print(f"Authenticated user: {user.get_full_name()}")
# Update authentication statistics
security_manager.update_user_auth_stat(user, success=True)# OAuth user info from provider
oauth_userinfo = {
'username': 'john_doe',
'email': 'john@example.com',
'first_name': 'John',
'last_name': 'Doe'
}
user = security_manager.auth_user_oauth(oauth_userinfo)
if user:
print(f"OAuth authenticated: {user.username}")# Reset user password
user = security_manager.find_user(username="john_doe")
if user:
security_manager.reset_password(user.id, "new_secure_password")Authentication methods return None on failure and log appropriate error messages. Password operations may raise exceptions for invalid user IDs or database errors.
Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-fab-security