CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-fab-security

Flask-AppBuilder (FAB) security integration component within Apache Airflow core, providing authentication, authorization, and security management features

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

security-management.mddocs/

Security Management

Core security management functionality providing authentication, authorization, session management, and security configuration. This module serves as the foundation for all security operations in the Airflow FAB integration.

Capabilities

Authentication Methods

Authenticate users using various backends including database, LDAP, OAuth, OpenID, and remote user authentication.

def auth_user_db(self, username: str, password: str) -> User | None:
    """
    Authenticate user using database credentials.
    
    Parameters:
    - username: Username or email address
    - password: Plain text password to verify
    
    Returns:
    User object if authenticated successfully, None otherwise
    """

def auth_user_ldap(self, username: str, password: str) -> User | None:
    """
    Authenticate user using LDAP/Active Directory.
    
    Parameters:
    - username: LDAP username
    - password: LDAP password
    
    Returns:
    User object if authenticated successfully, None otherwise
    """

def auth_user_oauth(self, userinfo: dict) -> User | None:
    """
    Authenticate user using OAuth provider information.
    
    Parameters:
    - userinfo: Dictionary containing user information from OAuth provider
                Keys should match User model columns
    
    Returns:
    User object if authenticated successfully, None otherwise
    """

def auth_user_oid(self, email: str) -> User | None:
    """
    Authenticate user using OpenID.
    
    Parameters:
    - email: User's email address from OpenID provider
    
    Returns:
    User object if authenticated successfully, None otherwise
    """

def auth_user_remote_user(self, username: str) -> User | None:
    """
    Authenticate user using remote user authentication (e.g., from web server).
    
    Parameters:
    - username: Username from remote authentication system
    
    Returns:
    User object if authenticated successfully, None otherwise
    """

Password Management

Password reset and authentication statistics management for database authentication.

def reset_password(self, userid: int, password: str) -> bool:
    """
    Reset user password for database authentication.
    
    Parameters:
    - userid: User ID to reset password for
    - password: New plain text password (will be hashed)
    
    Returns:
    True if password reset successfully, False otherwise
    """

def update_user_auth_stat(self, user: User, success: bool = True) -> None:
    """
    Update user authentication statistics.
    
    Parameters:
    - user: User object to update statistics for
    - success: Whether authentication attempt was successful
    """

Session Management

Manage user sessions and JWT token handling for authentication state.

def load_user(self, user_id: str) -> User:
    """
    Load user by ID for session management.
    
    Parameters:
    - user_id: String representation of user ID
    
    Returns:
    User object
    """

def load_user_jwt(self, _jwt_header: dict, jwt_data: dict) -> User:
    """
    Load user from JWT token data.
    
    Parameters:
    - _jwt_header: JWT header (unused)
    - jwt_data: JWT payload data containing user identity
    
    Returns:
    User object
    """

@staticmethod
def before_request() -> None:
    """
    Hook that runs before each request to set up user context.
    Sets flask g.user to current_user for request processing.
    """

OAuth Integration

OAuth provider integration with support for multiple providers and token management.

def oauth_user_info_getter(self, f: callable) -> callable:
    """
    Decorator to set OAuth user info getter function.
    
    Parameters:
    - f: Function that takes (sm, provider, response) and returns user info dict
    
    Returns:
    Decorated function
    """

def get_oauth_token_key_name(self, provider: str) -> str:
    """
    Get token key name for OAuth provider.
    
    Parameters:
    - provider: OAuth provider name
    
    Returns:
    Token key name (defaults to 'oauth_token')
    """

def get_oauth_token_secret_name(self, provider: str) -> str:
    """
    Get token secret name for OAuth provider.
    
    Parameters:
    - provider: OAuth provider name
    
    Returns:
    Token secret name (defaults to 'oauth_token_secret')
    """

def set_oauth_session(self, provider: str, oauth_response: dict) -> None:
    """
    Set OAuth session data for authenticated user.
    
    Parameters:
    - provider: OAuth provider name
    - oauth_response: Response from OAuth provider containing tokens
    """

def get_oauth_user_info(self, provider: str, resp: dict) -> dict:
    """
    Extract user information from OAuth provider response.
    
    Parameters:
    - provider: OAuth provider name ('github', 'google', 'azure', etc.)
    - resp: OAuth provider response
    
    Returns:
    Dictionary with user information (username, email, first_name, last_name, etc.)
    """

LDAP Integration

LDAP authentication with support for TLS, search filters, and user attribute mapping.

@staticmethod
def ldap_extract(ldap_dict: dict[str, list[bytes]], field_name: str, fallback: str) -> str:
    """
    Extract single value from LDAP attribute dictionary.
    
    Parameters:
    - ldap_dict: LDAP attributes dictionary
    - field_name: Attribute name to extract
    - fallback: Default value if attribute is empty
    
    Returns:
    Decoded string value or fallback
    """

@staticmethod
def ldap_extract_list(ldap_dict: dict[str, list[bytes]], field_name: str) -> list[str]:
    """
    Extract list of values from LDAP attribute dictionary.
    
    Parameters:
    - ldap_dict: LDAP attributes dictionary  
    - field_name: Attribute name to extract
    
    Returns:
    List of decoded string values
    """

Security Configuration

Access to security configuration properties and settings.

@property
def auth_type(self) -> int:
    """Get configured authentication type."""

@property
def auth_role_admin(self) -> str:
    """Get administrator role name."""

@property
def auth_role_public(self) -> str:
    """Get public/anonymous role name."""

@property
def auth_user_registration(self) -> bool:
    """Check if user self-registration is enabled."""

@property
def auth_username_ci(self) -> bool:
    """Check if username matching is case-insensitive."""

@property
def current_user(self) -> User | None:
    """Get current authenticated user."""

Usage Examples

Basic Authentication

from airflow.www.fab_security.sqla.manager import SecurityManager

# Database authentication
user = security_manager.auth_user_db("john_doe", "password123")
if user:
    print(f"Authenticated user: {user.get_full_name()}")

# Update authentication statistics
security_manager.update_user_auth_stat(user, success=True)

OAuth Authentication

# OAuth user info from provider
oauth_userinfo = {
    'username': 'john_doe',
    'email': 'john@example.com',
    'first_name': 'John',
    'last_name': 'Doe'
}

user = security_manager.auth_user_oauth(oauth_userinfo)
if user:
    print(f"OAuth authenticated: {user.username}")

Password Reset

# Reset user password
user = security_manager.find_user(username="john_doe")
if user:
    security_manager.reset_password(user.id, "new_secure_password")

Error Handling

Authentication methods return None on failure and log appropriate error messages. Password operations may raise exceptions for invalid user IDs or database errors.

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-fab-security

docs

authentication-backends.md

data-models.md

index.md

role-permission-management.md

security-management.md

user-management.md

web-views.md

tile.json