CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow

Programmatically author, schedule and monitor data pipelines

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

extensions.mddocs/

Extensions and Providers

Plugin system, provider packages, operator links, notifications, and custom component development. Airflow's extensibility system allows custom operators, hooks, sensors, and integrations.

Capabilities

Provider System

Framework for extending Airflow with additional functionality.

class BaseOperatorLink:
    """Base class for operator links in the web UI."""
    
    name: str = None
    
    def get_link(
        self,
        operator: BaseOperator,
        dttm: datetime,
        ti_key: TaskInstanceKey = None
    ) -> str:
        """
        Get link URL for operator instance.
        
        Args:
            operator: Operator instance
            dttm: Execution datetime
            ti_key: Task instance key
            
        Returns:
            Link URL
        """

class ProviderInfo:
    """Information about a provider package."""
    
    provider_name: str
    package_name: str
    version: str
    description: str
    connection_types: List[str]
    extra_links: List[str]

Notification System

Send notifications for DAG and task events.

class BaseNotifier:
    """Base class for notification backends."""
    
    def __init__(self, **kwargs):
        """Initialize notifier with configuration."""
    
    def notify(self, context: Context) -> None:
        """
        Send notification.
        
        Args:
            context: Task execution context
        """

class EmailNotifier(BaseNotifier):
    """Email notification backend."""
    
    def __init__(
        self,
        to: List[str],
        subject: str = None,
        html_content: str = None,
        **kwargs
    ):
        """
        Initialize email notifier.
        
        Args:
            to: Recipient email addresses
            subject: Email subject template
            html_content: HTML email content
        """

class SlackNotifier(BaseNotifier):
    """Slack notification backend."""
    
    def __init__(
        self,
        slack_conn_id: str,
        channel: str = None,
        username: str = None,
        **kwargs
    ):
        """
        Initialize Slack notifier.
        
        Args:
            slack_conn_id: Slack connection ID
            channel: Slack channel
            username: Bot username
        """

Plugin Framework

Develop custom Airflow plugins.

class AirflowPlugin:
    """Base class for Airflow plugins."""
    
    name: str = None
    operators: List[type] = []
    sensors: List[type] = []
    hooks: List[type] = []
    executors: List[type] = []
    macros: List[Any] = []
    admin_views: List[Any] = []
    flask_blueprints: List[Any] = []
    menu_links: List[Any] = []
    appbuilder_views: List[Any] = []
    appbuilder_menu_items: List[Any] = []
    global_operator_extra_links: List[BaseOperatorLink] = []
    operator_extra_links: List[BaseOperatorLink] = []

# Example plugin
class CustomPlugin(AirflowPlugin):
    name = "custom_plugin"
    operators = [CustomOperator]
    hooks = [CustomHook]
    macros = [custom_macro_function]

Custom Operators

Develop custom task operators.

class CustomOperator(BaseOperator):
    """Example custom operator implementation."""
    
    template_fields = ['param1', 'param2']
    template_ext = ['.sql', '.txt']
    ui_color = '#ffcccc'
    
    def __init__(
        self,
        param1: str,
        param2: Optional[str] = None,
        **kwargs
    ):
        """
        Initialize custom operator.
        
        Args:
            param1: Required parameter
            param2: Optional parameter
        """
        super().__init__(**kwargs)
        self.param1 = param1
        self.param2 = param2
    
    def execute(self, context: Context) -> Any:
        """
        Execute custom logic.
        
        Args:
            context: Task execution context
            
        Returns:
            Task result
        """
        # Custom implementation
        return f"Executed with {self.param1}, {self.param2}"

Listener Framework

Event listeners for monitoring workflow execution.

def on_dag_run_running(
    dag_run: DagRun,
    msg: str
) -> None:
    """
    Called when DAG run starts.
    
    Args:
        dag_run: DAG run instance
        msg: Event message
    """

def on_dag_run_success(
    dag_run: DagRun,
    msg: str
) -> None:
    """
    Called when DAG run succeeds.
    
    Args:
        dag_run: DAG run instance
        msg: Event message
    """

def on_dag_run_failed(
    dag_run: DagRun,
    msg: str
) -> None:
    """
    Called when DAG run fails.
    
    Args:
        dag_run: DAG run instance
        msg: Event message
    """

def on_task_instance_running(
    previous_state: str,
    task_instance: TaskInstance,
    session: Session
) -> None:
    """
    Called when task instance starts running.
    
    Args:
        previous_state: Previous task state
        task_instance: Task instance
        session: Database session
    """

def on_task_instance_success(
    previous_state: str,
    task_instance: TaskInstance,
    session: Session
) -> None:
    """
    Called when task instance succeeds.
    
    Args:
        previous_state: Previous task state
        task_instance: Task instance
        session: Database session
    """

def on_task_instance_failed(
    previous_state: str,
    task_instance: TaskInstance,
    session: Session
) -> None:
    """
    Called when task instance fails.
    
    Args:
        previous_state: Previous task state
        task_instance: Task instance
        session: Database session
    """

Types

from typing import List, Dict, Any, Optional, Callable
from datetime import datetime
from airflow.models.baseoperator import BaseOperator
from airflow.models.taskinstance import TaskInstance, TaskInstanceKey
from airflow.models.dagrun import DagRun
from airflow.utils.context import Context

PluginComponent = type
NotificationChannel = str
EventListener = Callable

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow

docs

assets-scheduling.md

cli-utilities.md

configuration.md

dag-management.md

database-models.md

exceptions.md

executors.md

extensions.md

index.md

task-operators.md

xcom.md

tile.json