Programmatically author, schedule and monitor data pipelines
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Plugin system, provider packages, operator links, notifications, and custom component development. Airflow's extensibility system allows custom operators, hooks, sensors, and integrations.
Framework for extending Airflow with additional functionality.
class BaseOperatorLink:
"""Base class for operator links in the web UI."""
name: str = None
def get_link(
self,
operator: BaseOperator,
dttm: datetime,
ti_key: TaskInstanceKey = None
) -> str:
"""
Get link URL for operator instance.
Args:
operator: Operator instance
dttm: Execution datetime
ti_key: Task instance key
Returns:
Link URL
"""
class ProviderInfo:
"""Information about a provider package."""
provider_name: str
package_name: str
version: str
description: str
connection_types: List[str]
extra_links: List[str]Send notifications for DAG and task events.
class BaseNotifier:
"""Base class for notification backends."""
def __init__(self, **kwargs):
"""Initialize notifier with configuration."""
def notify(self, context: Context) -> None:
"""
Send notification.
Args:
context: Task execution context
"""
class EmailNotifier(BaseNotifier):
"""Email notification backend."""
def __init__(
self,
to: List[str],
subject: str = None,
html_content: str = None,
**kwargs
):
"""
Initialize email notifier.
Args:
to: Recipient email addresses
subject: Email subject template
html_content: HTML email content
"""
class SlackNotifier(BaseNotifier):
"""Slack notification backend."""
def __init__(
self,
slack_conn_id: str,
channel: str = None,
username: str = None,
**kwargs
):
"""
Initialize Slack notifier.
Args:
slack_conn_id: Slack connection ID
channel: Slack channel
username: Bot username
"""Develop custom Airflow plugins.
class AirflowPlugin:
"""Base class for Airflow plugins."""
name: str = None
operators: List[type] = []
sensors: List[type] = []
hooks: List[type] = []
executors: List[type] = []
macros: List[Any] = []
admin_views: List[Any] = []
flask_blueprints: List[Any] = []
menu_links: List[Any] = []
appbuilder_views: List[Any] = []
appbuilder_menu_items: List[Any] = []
global_operator_extra_links: List[BaseOperatorLink] = []
operator_extra_links: List[BaseOperatorLink] = []
# Example plugin
class CustomPlugin(AirflowPlugin):
name = "custom_plugin"
operators = [CustomOperator]
hooks = [CustomHook]
macros = [custom_macro_function]Develop custom task operators.
class CustomOperator(BaseOperator):
"""Example custom operator implementation."""
template_fields = ['param1', 'param2']
template_ext = ['.sql', '.txt']
ui_color = '#ffcccc'
def __init__(
self,
param1: str,
param2: Optional[str] = None,
**kwargs
):
"""
Initialize custom operator.
Args:
param1: Required parameter
param2: Optional parameter
"""
super().__init__(**kwargs)
self.param1 = param1
self.param2 = param2
def execute(self, context: Context) -> Any:
"""
Execute custom logic.
Args:
context: Task execution context
Returns:
Task result
"""
# Custom implementation
return f"Executed with {self.param1}, {self.param2}"Event listeners for monitoring workflow execution.
def on_dag_run_running(
dag_run: DagRun,
msg: str
) -> None:
"""
Called when DAG run starts.
Args:
dag_run: DAG run instance
msg: Event message
"""
def on_dag_run_success(
dag_run: DagRun,
msg: str
) -> None:
"""
Called when DAG run succeeds.
Args:
dag_run: DAG run instance
msg: Event message
"""
def on_dag_run_failed(
dag_run: DagRun,
msg: str
) -> None:
"""
Called when DAG run fails.
Args:
dag_run: DAG run instance
msg: Event message
"""
def on_task_instance_running(
previous_state: str,
task_instance: TaskInstance,
session: Session
) -> None:
"""
Called when task instance starts running.
Args:
previous_state: Previous task state
task_instance: Task instance
session: Database session
"""
def on_task_instance_success(
previous_state: str,
task_instance: TaskInstance,
session: Session
) -> None:
"""
Called when task instance succeeds.
Args:
previous_state: Previous task state
task_instance: Task instance
session: Database session
"""
def on_task_instance_failed(
previous_state: str,
task_instance: TaskInstance,
session: Session
) -> None:
"""
Called when task instance fails.
Args:
previous_state: Previous task state
task_instance: Task instance
session: Database session
"""from typing import List, Dict, Any, Optional, Callable
from datetime import datetime
from airflow.models.baseoperator import BaseOperator
from airflow.models.taskinstance import TaskInstance, TaskInstanceKey
from airflow.models.dagrun import DagRun
from airflow.utils.context import Context
PluginComponent = type
NotificationChannel = str
EventListener = CallableInstall with Tessl CLI
npx tessl i tessl/pypi-apache-airflow