Comprehensive integration with PagerDuty's REST API and Events API through specialized hook classes. These hooks provide secure connection management, automatic client configuration, and full access to PagerDuty's platform capabilities.
The PagerdutyHook provides access to PagerDuty's full REST API v2 for comprehensive platform management including services, incidents, users, teams, escalation policies, schedules, and analytics.
class PagerdutyHook:
"""
Hook for interacting with PagerDuty REST API v2.
Takes both PagerDuty API token directly and connection that has PagerDuty API token.
If both supplied, PagerDuty API token will be used.
"""
conn_name_attr = "pagerduty_conn_id"
default_conn_name = "pagerduty_default"
conn_type = "pagerduty"
hook_name = "Pagerduty"
def __init__(self, token: str = "", pagerduty_conn_id: str | None = None):
"""
Initialize PagerdutyHook.
Args:
token: PagerDuty API token (account or personal access token)
pagerduty_conn_id: Airflow connection ID containing PagerDuty credentials
Raises:
AirflowException: If no valid token or connection ID provided
"""
def client(self) -> pagerduty.RestApiV2Client:
"""
Return PagerDuty REST API v2 client for making API calls.
Returns:
Configured RestApiV2Client instance
"""
def test_connection(self):
"""
Test the PagerDuty connection validity.
Returns:
Tuple of (success: bool, message: str)
"""
@classmethod
def get_ui_field_behaviour(cls) -> dict[str, Any]:
"""Return custom field behaviour for Airflow UI."""
@classmethod
def get_connection_form_widgets(cls) -> dict[str, Any]:
"""Return connection widgets to add to connection form."""from airflow.providers.pagerduty.hooks.pagerduty import PagerdutyHook
# Using connection ID
hook = PagerdutyHook(pagerduty_conn_id="my_pagerduty_connection")
# Get REST API client
client = hook.client()
# List all services
services = client.list_all("services")
# Get a specific service
service = client.rget("/services/SERVICE_ID")
# Create an incident
incident_data = {
"incident": {
"type": "incident",
"title": "Server down",
"service": {
"id": "SERVICE_ID",
"type": "service_reference"
}
}
}
response = client.rpost("/incidents", data=incident_data)The PagerdutyEventsHook specializes in incident and alert lifecycle management through PagerDuty's Events API v2, supporting trigger, acknowledge, and resolve actions with comprehensive event metadata and change tracking.
class PagerdutyEventsHook:
"""
Hook for interacting with PagerDuty Events API v2.
Takes both an Events API token and a connection with the Events API token
(integration key) as the password. If both supplied, the token will be used.
"""
conn_name_attr = "pagerduty_events_conn_id"
default_conn_name = "pagerduty_events_default"
conn_type = "pagerduty_events"
hook_name = "Pagerduty Events"
def __init__(self, integration_key: str | None = None, pagerduty_events_conn_id: str | None = None):
"""
Initialize PagerdutyEventsHook.
Args:
integration_key: PagerDuty Events API integration key
pagerduty_events_conn_id: Airflow connection ID containing integration key
Raises:
AirflowException: If no valid integration key or connection ID provided
"""
def send_event(
self,
summary: str,
severity: str,
source: str = "airflow",
action: str = "trigger",
dedup_key: str | None = None,
custom_details: Any | None = None,
group: str | None = None,
component: str | None = None,
class_type: str | None = None,
images: list[Any] | None = None,
links: list[Any] | None = None,
) -> str:
"""
Create event for service integration.
Args:
summary: Summary for the event
severity: Event severity ('info', 'warning', 'error', 'critical')
source: System identifier having the problem (default: 'airflow')
action: Event action ('trigger', 'acknowledge', 'resolve')
dedup_key: Alert identifier for acknowledge/resolve actions
custom_details: Free-form event details (dict or string)
group: Cluster or grouping of sources
component: Affected system component
class_type: Event class/type
images: List of image attachments with 'src', 'href', 'alt' keys
links: List of link attachments with 'href', 'text' keys
Returns:
PagerDuty Events API v2 response
Raises:
ValueError: If action is invalid or dedup_key missing for acknowledge/resolve
"""
def create_change_event(
self,
summary: str,
source: str = "airflow",
custom_details: Any | None = None,
timestamp: datetime | None = None,
links: list[Any] | None = None,
) -> str:
"""
Create change event for service integration.
Args:
summary: Summary for the change event
source: System identifier generating the change (default: 'airflow')
custom_details: Free-form change details (dict or string)
timestamp: When the change was detected/generated
links: List of link attachments with 'href', 'text' keys
Returns:
PagerDuty Change Events API v2 response
"""
def test_connection(self):
"""
Test the PagerDuty Events connection validity.
Returns:
Tuple of (success: bool, message: str)
"""
@classmethod
def get_ui_field_behaviour(cls) -> dict[str, Any]:
"""Return custom field behaviour for Airflow UI."""
@staticmethod
def prepare_event_data(
summary,
severity,
source,
custom_details,
component,
group,
class_type,
action,
dedup_key,
images,
links,
action_key_name: str = "action",
) -> dict:
"""
Prepare event data for send_event API calls.
Returns:
Formatted event data dictionary
"""from airflow.providers.pagerduty.hooks.pagerduty_events import PagerdutyEventsHook
hook = PagerdutyEventsHook(pagerduty_events_conn_id="pagerduty_events_default")
# Trigger a critical alert
response = hook.send_event(
summary="Database connection failure",
severity="critical",
source="web-app-db",
component="database",
group="production",
custom_details={
"error_code": "CONNECTION_TIMEOUT",
"database_host": "prod-db-01.internal",
"connection_count": 0
},
links=[{
"href": "https://monitoring.company.com/db-status",
"text": "Database Status Dashboard"
}]
)# Acknowledge an existing alert
hook.send_event(
summary="Database issue acknowledged - investigating",
severity="error",
source="web-app-db",
action="acknowledge",
dedup_key="db-connection-failure-prod-db-01"
)
# Resolve the alert
hook.send_event(
summary="Database connection restored",
severity="info",
source="web-app-db",
action="resolve",
dedup_key="db-connection-failure-prod-db-01",
custom_details={"resolution": "Restarted database service"}
)from datetime import datetime
# Track a deployment change
hook.create_change_event(
summary="Application deployment v2.3.1",
source="ci-cd-pipeline",
custom_details={
"version": "v2.3.1",
"environment": "production",
"deployment_type": "rolling_update",
"services_affected": ["api", "worker", "scheduler"]
},
timestamp=datetime.utcnow(),
links=[{
"href": "https://github.com/company/app/releases/tag/v2.3.1",
"text": "Release Notes"
}]
)For PagerdutyHook usage:
For PagerdutyEventsHook usage:
Both connection types include built-in connection testing and custom UI field configurations for simplified setup in Airflow's web interface.