Integration with Airflow's notification framework that enables automatic alerting to PagerDuty based on workflow events. The notification system supports templating, context-aware messaging, and seamless integration with DAG callbacks.
The PagerdutyNotifier integrates with Airflow's BaseNotifier system to send alerts to PagerDuty's Events API when workflow events occur. It supports full templating of alert content and automatic context injection from Airflow task and DAG execution.
class PagerdutyNotifier:
"""
PagerDuty BaseNotifier for sending notifications via PagerDuty Events API.
All template fields support Jinja2 templating with Airflow context.
"""
template_fields = (
"summary",
"severity",
"source",
"action",
"dedup_key",
"custom_details",
"group",
"component",
"class_type",
"images",
"links",
)
def __init__(
self,
*,
summary: str,
severity: str,
source: str = "airflow",
action: str = "trigger",
dedup_key: str | None = None,
custom_details: Any | None = None,
group: str | None = None,
component: str | None = None,
class_type: str | None = None,
images: list[Any] | None = None,
links: list[Any] | None = None,
pagerduty_events_conn_id: str | None = "pagerduty_events_default",
integration_key: str | None = None,
):
"""
Initialize PagerdutyNotifier.
Args:
summary: Alert summary (supports templating)
severity: Alert severity ('info', 'warning', 'error', 'critical')
source: System identifier (default: 'airflow', supports templating)
action: Event action ('trigger', 'acknowledge', 'resolve')
dedup_key: Alert deduplication key (supports templating)
custom_details: Free-form alert details (supports templating)
group: Source grouping (supports templating)
component: System component (supports templating)
class_type: Event class/type (supports templating)
images: Image attachments list (supports templating)
links: Link attachments list (supports templating)
pagerduty_events_conn_id: Airflow connection ID for PagerDuty Events API
integration_key: Direct integration key (overrides connection)
"""
def notify(self, context):
"""
Send alert to PagerDuty Events API v2.
Args:
context: Airflow task/DAG execution context for templating
"""
@property
def hook(self) -> PagerdutyEventsHook:
"""
PagerDuty Events Hook instance.
Returns:
Configured PagerdutyEventsHook for API interactions
"""
# Module-level alias for backward compatibility
send_pagerduty_notification = PagerdutyNotifierfrom airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.providers.pagerduty.notifications.pagerduty import PagerdutyNotifier
from datetime import datetime
# Configure failure notification
failure_notifier = PagerdutyNotifier(
summary="DAG {{ dag.dag_id }} failed in {{ ds }}",
severity="error",
source="airflow-{{ dag.dag_id }}",
group="airflow-production",
component="workflow-engine",
custom_details={
"dag_id": "{{ dag.dag_id }}",
"run_id": "{{ dag_run.run_id }}",
"execution_date": "{{ ds }}",
"failed_task_count": "{{ dag_run.get_task_instances(state=['failed']) | length }}"
},
links=[{
"href": "{{ var.value.airflow_base_url }}/dags/{{ dag.dag_id }}/grid",
"text": "View DAG in Airflow"
}],
pagerduty_events_conn_id="pagerduty_production"
)
# Apply to DAG
dag = DAG(
'critical_data_pipeline',
start_date=datetime(2024, 1, 1),
on_failure_callback=failure_notifier,
catchup=False
)from airflow.operators.python import PythonOperator
# Task success notification
success_notifier = PagerdutyNotifier(
summary="Critical backup task completed successfully",
severity="info",
source="backup-system",
action="resolve",
dedup_key="backup-{{ ds_nodash }}",
custom_details={
"backup_size": "{{ ti.xcom_pull(key='backup_size') }}",
"backup_location": "{{ ti.xcom_pull(key='backup_path') }}",
"execution_time": "{{ ti.duration }}"
}
)
# Task failure notification
failure_notifier = PagerdutyNotifier(
summary="CRITICAL: Daily backup failed on {{ ds }}",
severity="critical",
source="backup-system",
group="data-infrastructure",
component="backup-service",
dedup_key="backup-failure-{{ ds_nodash }}",
custom_details={
"task_id": "{{ task.task_id }}",
"dag_id": "{{ dag.dag_id }}",
"error_message": "{{ ti.log.error if ti.log.error else 'Unknown error' }}",
"retry_number": "{{ task_instance.try_number }}"
}
)
backup_task = PythonOperator(
task_id='daily_backup',
python_callable=perform_backup,
on_success_callback=success_notifier,
on_failure_callback=failure_notifier,
dag=dag
)# SLA violation notification
sla_notifier = PagerdutyNotifier(
summary="SLA VIOLATION: {{ dag.dag_id }} task {{ task.task_id }} missed SLA",
severity="warning",
source="airflow-sla-monitor",
group="sla-violations",
component="scheduler",
custom_details={
"dag_id": "{{ dag.dag_id }}",
"task_id": "{{ task.task_id }}",
"expected_duration": "{{ task.sla }}",
"actual_duration": "{{ ti.duration }}",
"sla_miss_time": "{{ ts }}"
},
dedup_key="sla-{{ dag.dag_id }}-{{ task.task_id }}-{{ ds_nodash }}"
)
dag = DAG(
'time_sensitive_pipeline',
sla_miss_callback=sla_notifier,
# ... other DAG config
)# Advanced notification with dynamic severity
dynamic_notifier = PagerdutyNotifier(
summary="{% if ti.state == 'failed' %}FAILURE{% elif ti.duration > 3600 %}SLOW EXECUTION{% else %}INFO{% endif %}: {{ task.task_id }}",
severity="{% if ti.state == 'failed' %}critical{% elif ti.duration > 3600 %}warning{% else %}info{% endif %}",
source="data-pipeline-{{ dag.dag_id }}",
custom_details={
"execution_context": {
"task_state": "{{ ti.state }}",
"duration": "{{ ti.duration }}",
"start_date": "{{ ti.start_date }}",
"end_date": "{{ ti.end_date }}",
"operator": "{{ task.__class__.__name__ }}",
"pool": "{{ task.pool }}",
"queue": "{{ task.queue }}"
},
"dag_context": {
"dag_id": "{{ dag.dag_id }}",
"is_paused": "{{ dag.is_paused }}",
"tags": "{{ dag.tags }}",
"owner": "{{ dag.owner }}"
}
},
images=[{
"src": "https://monitoring.company.com/graphs/{{ dag.dag_id }}/{{ task.task_id }}.png",
"alt": "Task execution metrics"
}] if "{{ task.task_id }}" in ["critical_task1", "critical_task2"] else None
)from airflow.providers.pagerduty.notifications.pagerduty import send_pagerduty_notification
# Both approaches are equivalent
notifier1 = PagerdutyNotifier(summary="Alert", severity="error")
notifier2 = send_pagerduty_notification(summary="Alert", severity="error")All template fields have access to the full Airflow context including:
dag, dag_run, ds, ts, etc.task, ti (task_instance), task_instance_key_strexecution_date, run_id, logical_datevar.value.variable_nameti.xcom_pull(key='key_name')conf.get('section', 'key')This enables dynamic, context-aware alert content that provides rich operational information for effective incident response.