or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

hooks.mdindex.mdnotifications.md
tile.json

hooks.mddocs/

Hooks Integration

Comprehensive integration with PagerDuty's REST API and Events API through specialized hook classes. These hooks provide secure connection management, automatic client configuration, and full access to PagerDuty's platform capabilities.

Capabilities

REST API Integration

The PagerdutyHook provides access to PagerDuty's full REST API v2 for comprehensive platform management including services, incidents, users, teams, escalation policies, schedules, and analytics.

class PagerdutyHook:
    """
    Hook for interacting with PagerDuty REST API v2.
    
    Takes both PagerDuty API token directly and connection that has PagerDuty API token.
    If both supplied, PagerDuty API token will be used.
    """
    
    conn_name_attr = "pagerduty_conn_id"
    default_conn_name = "pagerduty_default"
    conn_type = "pagerduty"
    hook_name = "Pagerduty"
    
    def __init__(self, token: str = "", pagerduty_conn_id: str | None = None):
        """
        Initialize PagerdutyHook.
        
        Args:
            token: PagerDuty API token (account or personal access token)
            pagerduty_conn_id: Airflow connection ID containing PagerDuty credentials
            
        Raises:
            AirflowException: If no valid token or connection ID provided
        """
    
    def client(self) -> pagerduty.RestApiV2Client:
        """
        Return PagerDuty REST API v2 client for making API calls.
        
        Returns:
            Configured RestApiV2Client instance
        """
    
    def test_connection(self):
        """
        Test the PagerDuty connection validity.
        
        Returns:
            Tuple of (success: bool, message: str)
        """
    
    @classmethod
    def get_ui_field_behaviour(cls) -> dict[str, Any]:
        """Return custom field behaviour for Airflow UI."""
    
    @classmethod  
    def get_connection_form_widgets(cls) -> dict[str, Any]:
        """Return connection widgets to add to connection form."""

Usage Example

from airflow.providers.pagerduty.hooks.pagerduty import PagerdutyHook

# Using connection ID
hook = PagerdutyHook(pagerduty_conn_id="my_pagerduty_connection")

# Get REST API client
client = hook.client()

# List all services
services = client.list_all("services")

# Get a specific service
service = client.rget("/services/SERVICE_ID")

# Create an incident
incident_data = {
    "incident": {
        "type": "incident",
        "title": "Server down",
        "service": {
            "id": "SERVICE_ID",
            "type": "service_reference"
        }
    }
}
response = client.rpost("/incidents", data=incident_data)

Events API Integration

The PagerdutyEventsHook specializes in incident and alert lifecycle management through PagerDuty's Events API v2, supporting trigger, acknowledge, and resolve actions with comprehensive event metadata and change tracking.

class PagerdutyEventsHook:
    """
    Hook for interacting with PagerDuty Events API v2.
    
    Takes both an Events API token and a connection with the Events API token
    (integration key) as the password. If both supplied, the token will be used.
    """
    
    conn_name_attr = "pagerduty_events_conn_id"
    default_conn_name = "pagerduty_events_default"
    conn_type = "pagerduty_events"
    hook_name = "Pagerduty Events"
    
    def __init__(self, integration_key: str | None = None, pagerduty_events_conn_id: str | None = None):
        """
        Initialize PagerdutyEventsHook.
        
        Args:
            integration_key: PagerDuty Events API integration key
            pagerduty_events_conn_id: Airflow connection ID containing integration key
            
        Raises:
            AirflowException: If no valid integration key or connection ID provided
        """
    
    def send_event(
        self,
        summary: str,
        severity: str,
        source: str = "airflow",
        action: str = "trigger",
        dedup_key: str | None = None,
        custom_details: Any | None = None,
        group: str | None = None,
        component: str | None = None,
        class_type: str | None = None,
        images: list[Any] | None = None,
        links: list[Any] | None = None,
    ) -> str:
        """
        Create event for service integration.
        
        Args:
            summary: Summary for the event
            severity: Event severity ('info', 'warning', 'error', 'critical')
            source: System identifier having the problem (default: 'airflow')
            action: Event action ('trigger', 'acknowledge', 'resolve')
            dedup_key: Alert identifier for acknowledge/resolve actions
            custom_details: Free-form event details (dict or string)
            group: Cluster or grouping of sources
            component: Affected system component
            class_type: Event class/type
            images: List of image attachments with 'src', 'href', 'alt' keys
            links: List of link attachments with 'href', 'text' keys
            
        Returns:
            PagerDuty Events API v2 response
            
        Raises:
            ValueError: If action is invalid or dedup_key missing for acknowledge/resolve
        """
    
    def create_change_event(
        self,
        summary: str,
        source: str = "airflow",
        custom_details: Any | None = None,
        timestamp: datetime | None = None,
        links: list[Any] | None = None,
    ) -> str:
        """
        Create change event for service integration.
        
        Args:
            summary: Summary for the change event
            source: System identifier generating the change (default: 'airflow')
            custom_details: Free-form change details (dict or string)
            timestamp: When the change was detected/generated
            links: List of link attachments with 'href', 'text' keys
            
        Returns:
            PagerDuty Change Events API v2 response
        """
    
    def test_connection(self):
        """
        Test the PagerDuty Events connection validity.
        
        Returns:
            Tuple of (success: bool, message: str)
        """
    
    @classmethod
    def get_ui_field_behaviour(cls) -> dict[str, Any]:
        """Return custom field behaviour for Airflow UI."""
    
    @staticmethod
    def prepare_event_data(
        summary,
        severity,
        source,
        custom_details,
        component,
        group,
        class_type,
        action,
        dedup_key,
        images,
        links,
        action_key_name: str = "action",
    ) -> dict:
        """
        Prepare event data for send_event API calls.
        
        Returns:
            Formatted event data dictionary
        """

Usage Examples

Triggering Alerts
from airflow.providers.pagerduty.hooks.pagerduty_events import PagerdutyEventsHook

hook = PagerdutyEventsHook(pagerduty_events_conn_id="pagerduty_events_default")

# Trigger a critical alert
response = hook.send_event(
    summary="Database connection failure",
    severity="critical",
    source="web-app-db",
    component="database",
    group="production",
    custom_details={
        "error_code": "CONNECTION_TIMEOUT",
        "database_host": "prod-db-01.internal",
        "connection_count": 0
    },
    links=[{
        "href": "https://monitoring.company.com/db-status",
        "text": "Database Status Dashboard"
    }]
)
Managing Alert Lifecycle
# Acknowledge an existing alert
hook.send_event(
    summary="Database issue acknowledged - investigating",
    severity="error",
    source="web-app-db",
    action="acknowledge",
    dedup_key="db-connection-failure-prod-db-01"
)

# Resolve the alert
hook.send_event(
    summary="Database connection restored",
    severity="info",
    source="web-app-db", 
    action="resolve",
    dedup_key="db-connection-failure-prod-db-01",
    custom_details={"resolution": "Restarted database service"}
)
Change Event Tracking
from datetime import datetime

# Track a deployment change
hook.create_change_event(
    summary="Application deployment v2.3.1",
    source="ci-cd-pipeline",
    custom_details={
        "version": "v2.3.1",
        "environment": "production",
        "deployment_type": "rolling_update",
        "services_affected": ["api", "worker", "scheduler"]
    },
    timestamp=datetime.utcnow(),
    links=[{
        "href": "https://github.com/company/app/releases/tag/v2.3.1",
        "text": "Release Notes"
    }]
)

Connection Configuration

REST API Connection (pagerduty)

For PagerdutyHook usage:

  • Connection Type: pagerduty
  • Password: PagerDuty API token (account or personal access token)
  • Extra: Optional routing_key for Events API integration

Events API Connection (pagerduty_events)

For PagerdutyEventsHook usage:

  • Connection Type: pagerduty_events
  • Password: PagerDuty integration key (from service integration)

Both connection types include built-in connection testing and custom UI field configurations for simplified setup in Airflow's web interface.