Flask-AppBuilder (FAB) security integration component within Apache Airflow core, providing authentication, authorization, and security management features
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Role-based access control with granular permission management. Supports creating roles, managing permissions, and implementing fine-grained access control for resources and actions in the Airflow security system.
Create, update, and manage user roles with comprehensive role lifecycle operations.
def add_role(self, name: str) -> Role | None:
"""
Create a new role.
Parameters:
- name: Unique name for the role
Returns:
Role object if created successfully, None if role already exists
"""
def update_role(self, role_id: int, name: str) -> Role | None:
"""
Update an existing role's name.
Parameters:
- role_id: ID of the role to update
- name: New name for the role
Returns:
Updated Role object if successful, None otherwise
"""
def find_role(self, name: str) -> Role | None:
"""
Find role by name.
Parameters:
- name: Role name to search for
Returns:
Role object if found, None otherwise
"""
def get_all_roles(self) -> list[Role]:
"""
Get all roles in the system.
Returns:
List of all Role objects
"""
def get_public_role(self) -> Role | None:
"""
Get the public/anonymous role.
Returns:
Public Role object if configured, None otherwise
"""Create and manage permissions linking actions to resources for fine-grained access control.
def create_permission(self, action_name: str, resource_name: str) -> Permission | None:
"""
Create a permission linking an action to a resource.
Parameters:
- action_name: Name of the action (e.g., 'can_read', 'can_edit')
- resource_name: Name of the resource (e.g., 'Users', 'DAGs')
Returns:
Permission object if created successfully, None otherwise
"""
def get_permission(self, action_name: str, resource_name: str) -> Permission | None:
"""
Get existing permission by action and resource names.
Parameters:
- action_name: Name of the action
- resource_name: Name of the resource
Returns:
Permission object if found, None otherwise
"""
def delete_permission(self, action_name: str, resource_name: str) -> None:
"""
Delete a permission linking an action to a resource.
Parameters:
- action_name: Name of the action
- resource_name: Name of the resource
"""Assign and remove permissions from roles to control access.
def add_permission_to_role(self, role: Role, permission: Permission) -> None:
"""
Add a permission to a role.
Parameters:
- role: Role object to add permission to
- permission: Permission object to add
"""
def remove_permission_from_role(self, role: Role, permission: Permission) -> None:
"""
Remove a permission from a role.
Parameters:
- role: Role object to remove permission from
- permission: Permission object to remove
"""Manage action definitions that define what operations can be performed.
def create_action(self, name: str) -> Action | None:
"""
Create a new action.
Parameters:
- name: Action name (e.g., 'can_read', 'can_create', 'can_delete')
Returns:
Action object if created successfully, existing Action if already exists
"""
def get_action(self, name: str) -> Action | None:
"""
Get an existing action by name.
Parameters:
- name: Action name to retrieve
Returns:
Action object if found, None otherwise
"""
def delete_action(self, name: str) -> bool:
"""
Delete an action.
Parameters:
- name: Name of action to delete
Returns:
True if deletion successful, False otherwise
"""Manage resource definitions that represent objects or areas being protected.
def create_resource(self, name: str) -> Resource:
"""
Create a new resource.
Parameters:
- name: Resource name (e.g., 'Users', 'DAGs', 'Connections')
Returns:
Resource object (existing if already exists, new if created)
"""
def get_resource(self, name: str) -> Resource | None:
"""
Get an existing resource by name.
Parameters:
- name: Resource name to retrieve
Returns:
Resource object if found, None otherwise
"""
def get_all_resources(self) -> list[Resource]:
"""
Get all resources in the system.
Returns:
List of all Resource objects
"""
def delete_resource(self, name: str) -> bool:
"""
Delete a resource.
Parameters:
- name: Name of resource to delete
Returns:
True if deletion successful, False otherwise
"""
def get_resource_permissions(self, resource: Resource) -> list[Permission]:
"""
Get all permissions associated with a resource.
Parameters:
- resource: Resource object to get permissions for
Returns:
List of Permission objects associated with the resource
"""Query and validate permissions for authorization checks.
def permission_exists_in_one_or_more_roles(
self,
resource_name: str,
action_name: str,
role_ids: list[int]
) -> bool:
"""
Check if a permission exists in any of the specified roles.
Parameters:
- resource_name: Name of the resource
- action_name: Name of the action
- role_ids: List of role IDs to check
Returns:
True if permission exists in at least one role, False otherwise
"""
def filter_roles_by_perm_with_action(
self,
action_name: str,
role_ids: list[int]
) -> list[Permission]:
"""
Find permissions with specific action in the given roles.
Parameters:
- action_name: Name of the action to filter by
- role_ids: List of role IDs to search in
Returns:
List of Permission objects matching the criteria
"""
def perms_include_action(self, perms: list[Permission], action_name: str) -> bool:
"""
Check if a list of permissions includes a specific action.
Parameters:
- perms: List of Permission objects
- action_name: Action name to check for
Returns:
True if action is found in permissions, False otherwise
"""Map external role keys to internal roles for integration with external systems.
def get_roles_from_keys(self, role_keys: list[str]) -> set[Role]:
"""
Get roles from external role keys using AUTH_ROLES_MAPPING.
Parameters:
- role_keys: List of external role identifiers (LDAP groups, OAuth roles, etc.)
Returns:
Set of Role objects mapped from the keys
"""from airflow.www.fab_security.sqla.manager import SecurityManager
# Create roles
admin_role = security_manager.add_role("DataAdmin")
viewer_role = security_manager.add_role("DataViewer")
# Create actions and resources
read_action = security_manager.create_action("can_read")
edit_action = security_manager.create_action("can_edit")
dag_resource = security_manager.create_resource("DAGs")
# Create permissions
read_dags_perm = security_manager.create_permission("can_read", "DAGs")
edit_dags_perm = security_manager.create_permission("can_edit", "DAGs")
# Assign permissions to roles
security_manager.add_permission_to_role(admin_role, read_dags_perm)
security_manager.add_permission_to_role(admin_role, edit_dags_perm)
security_manager.add_permission_to_role(viewer_role, read_dags_perm)# Check if user has specific permissions
user = security_manager.find_user(username="john_doe")
role_ids = [role.id for role in user.roles]
has_read_permission = security_manager.permission_exists_in_one_or_more_roles(
"DAGs", "can_read", role_ids
)
if has_read_permission:
print("User can read DAGs")# Find and update role
role = security_manager.find_role("DataAdmin")
if role:
# Add new permission
delete_perm = security_manager.create_permission("can_delete", "DAGs")
security_manager.add_permission_to_role(role, delete_perm)
# Remove permission
edit_perm = security_manager.get_permission("can_edit", "DAGs")
security_manager.remove_permission_from_role(role, edit_perm)# Map LDAP groups to internal roles
ldap_groups = ["cn=airflow-admins,ou=groups,dc=company,dc=com"]
mapped_roles = security_manager.get_roles_from_keys(ldap_groups)
for role in mapped_roles:
print(f"Mapped to role: {role.name}")# Get all resources and their permissions
all_resources = security_manager.get_all_resources()
for resource in all_resources:
permissions = security_manager.get_resource_permissions(resource)
print(f"Resource {resource.name} has {len(permissions)} permissions")
# Clean up unused actions
unused_action = security_manager.get_action("obsolete_action")
if unused_action:
success = security_manager.delete_action("obsolete_action")
if success:
print("Cleaned up obsolete action")The system supports built-in roles configured via FAB_ROLES:
Role and permission operations include comprehensive error handling:
NoneInstall with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-fab-security