or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.md
tile.json

tessl/pypi-apache-airflow-providers-zendesk

Apache Airflow provider package for integrating with Zendesk API through hooks and connection types

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/apache-airflow-providers-zendesk@4.10.x

To install, run

npx @tessl/cli install tessl/pypi-apache-airflow-providers-zendesk@4.10.0

index.mddocs/

Apache Airflow Providers Zendesk

Apache Airflow provider package for integrating with Zendesk customer service platform. This provider enables Airflow DAGs to interact with Zendesk's API through a standardized hook interface, allowing automated customer service workflows and data synchronization.

Package Information

  • Package Name: apache-airflow-providers-zendesk
  • Package Type: pypi
  • Language: Python
  • Installation: pip install apache-airflow-providers-zendesk
  • Requirements: Apache Airflow >= 2.10.0, zenpy >= 2.0.40
  • Python Versions: 3.10, 3.11, 3.12, 3.13

Core Imports

from airflow.providers.zendesk.hooks.zendesk import ZendeskHook

Version compatibility imports:

from airflow.providers.zendesk.version_compat import BaseHook, BaseOperator

Provider info:

from airflow.providers.zendesk.get_provider_info import get_provider_info

Basic Usage

from airflow.providers.zendesk.hooks.zendesk import ZendeskHook
from airflow import DAG
from airflow.decorators import task
from datetime import datetime

@task
def manage_zendesk_tickets():
    # Initialize hook with connection ID
    hook = ZendeskHook(zendesk_conn_id='zendesk_default')
    
    # Search for tickets
    tickets = list(hook.search_tickets(status='open', type='incident'))
    
    # Get specific ticket
    if tickets:
        ticket = hook.get_ticket(tickets[0].id)
        print(f"Ticket {ticket.id}: {ticket.subject}")
    
    return len(tickets)

# DAG definition
with DAG(
    'zendesk_workflow',
    start_date=datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False
) as dag:
    manage_zendesk_tickets()

Connection Configuration

Configure Zendesk connection in Airflow:

  • Connection Type: zendesk
  • Host: Your Zendesk domain (e.g., yourcompany.zendesk.com)
  • Login: Zendesk email address
  • Password: Zendesk password or API token

Capabilities

Zendesk Hook

Main hook class for interacting with Zendesk API through the zenpy client library.

class ZendeskHook(BaseHook):
    """
    Interact with Zendesk. This hook uses the Zendesk conn_id.
    
    :param zendesk_conn_id: The Airflow connection used for Zendesk credentials.
    """
    
    conn_name_attr: str = "zendesk_conn_id"
    default_conn_name: str = "zendesk_default" 
    conn_type: str = "zendesk"
    hook_name: str = "Zendesk"
    
    def __init__(self, zendesk_conn_id: str = default_conn_name) -> None:
        """Initialize ZendeskHook with connection ID."""
    
    @classmethod
    def get_ui_field_behaviour(cls) -> dict[str, Any]:
        """
        Return UI field configuration for Airflow connection form.
        
        Returns:
            dict: Configuration hiding schema, port, extra fields and relabeling host/login
        """
    
    def get_conn(self) -> Zenpy:
        """
        Get the underlying Zenpy client.
        
        Returns:
            zenpy.Zenpy: Configured Zenpy client instance
        """
    
    def get_ticket(self, ticket_id: int) -> Ticket:
        """
        Retrieve ticket by ID.
        
        Args:
            ticket_id: The ID of the ticket to retrieve
            
        Returns:
            Ticket: Zendesk ticket object
        """
    
    def search_tickets(self, **kwargs) -> SearchResultGenerator:
        """
        Search tickets with optional parameters.
        
        Args:
            **kwargs: Search fields for zenpy search method (status, type, priority, etc.)
            
        Returns:
            SearchResultGenerator: Generator of matching Ticket objects
        """
    
    def create_tickets(self, tickets: Ticket | list[Ticket], **kwargs) -> TicketAudit | JobStatus:
        """
        Create single ticket or multiple tickets.
        
        Args:
            tickets: Single Ticket or list of Ticket objects to create
            **kwargs: Additional fields for zenpy create method
            
        Returns:
            TicketAudit: Information about ticket created (single ticket)
            JobStatus: Job status object (bulk request)
        """
    
    def update_tickets(self, tickets: Ticket | list[Ticket], **kwargs) -> TicketAudit | JobStatus:
        """
        Update single ticket or multiple tickets.
        
        Args:
            tickets: Updated Ticket or list of Ticket objects
            **kwargs: Additional fields for zenpy update method
            
        Returns:
            TicketAudit: Information about ticket updated (single ticket)
            JobStatus: Job status object (bulk request)
        """
    
    def delete_tickets(self, tickets: Ticket | list[Ticket], **kwargs) -> None:
        """
        Delete single ticket or multiple tickets.
        
        Args:
            tickets: Ticket or list of Ticket objects to delete
            **kwargs: Additional fields for zenpy delete method
            
        Returns:
            None: Returns nothing on success, raises APIException on failure
        """
    
    def get(self, *args, **kwargs):
        """
        Make custom GET request using zenpy client's users API.
        
        Note: This is an alias to the underlying zenpy client's users._get method.
        
        Args:
            *args: Positional arguments passed to zenpy client
            **kwargs: Keyword arguments passed to zenpy client
            
        Returns:
            API response from zenpy client
        """

Version Compatibility Utilities

Compatibility functions and constants for different Airflow versions.

def get_base_airflow_version_tuple() -> tuple[int, int, int]:
    """
    Get Airflow version as tuple.
    
    Returns:
        tuple[int, int, int]: Major, minor, micro version numbers
    """

# Version check constants  
AIRFLOW_V_3_0_PLUS: bool  # Whether running Airflow 3.0+
AIRFLOW_V_3_1_PLUS: bool  # Whether running Airflow 3.1+

# Version-dependent imports
BaseHook: type  # From airflow.sdk or airflow.hooks.base
BaseOperator: type  # From airflow.sdk or airflow.models

Import from version_compat module:

from airflow.providers.zendesk.version_compat import (
    get_base_airflow_version_tuple,
    AIRFLOW_V_3_0_PLUS,
    AIRFLOW_V_3_1_PLUS,
    BaseHook,
    BaseOperator
)

Provider Information

Function to retrieve provider metadata for Airflow discovery.

def get_provider_info() -> dict:
    """
    Get provider package information.
    
    Returns:
        dict: Provider metadata including package name, integrations, hooks, and connection types
    """

Types

Types from the zenpy library that are commonly used with this provider:

# From zenpy library
class Zenpy:
    """Main zenpy client for Zendesk API."""

class Ticket:
    """Zendesk ticket object containing ticket data and methods."""
    
    id: int
    subject: str
    description: str
    status: str
    priority: str
    # ... additional ticket fields

class TicketAudit:
    """Information about ticket creation/update operations."""

class JobStatus:
    """Status information for bulk operations."""

class SearchResultGenerator:
    """Generator yielding search results from Zendesk API."""

# Union types for method parameters
TicketInput = Ticket | list[Ticket]
TicketResult = TicketAudit | JobStatus

Usage Examples

Create and Update Tickets

from airflow.providers.zendesk.hooks.zendesk import ZendeskHook
from zenpy.lib.api_objects import Ticket

hook = ZendeskHook()

# Create a new ticket
new_ticket = Ticket(
    subject="Integration Test Ticket",
    description="This is a test ticket created via Airflow",
    priority="normal"
)

audit = hook.create_tickets(new_ticket)
print(f"Created ticket ID: {audit.ticket.id}")

# Update the ticket
new_ticket.status = "solved"
update_audit = hook.update_tickets(new_ticket)
print(f"Updated ticket status: {update_audit.ticket.status}")

Bulk Operations

# Create multiple tickets
tickets = [
    Ticket(subject=f"Batch Ticket {i}", description=f"Description {i}")
    for i in range(1, 4)
]

job_status = hook.create_tickets(tickets)
print(f"Bulk create job ID: {job_status.id}")

# Search and filter tickets
open_tickets = list(hook.search_tickets(status="open", type="incident"))
print(f"Found {len(open_tickets)} open incident tickets")

Custom API Access

# Access underlying zenpy client for advanced operations
zenpy_client = hook.get_conn()

# Use zenpy client directly
users = zenpy_client.users.me()
print(f"Current user: {users.name}")

# Note: The get method is an alias to zenpy client's users._get method
# For other API endpoints, use the zenpy client directly
response = zenpy_client.organizations()
organizations = [org.to_dict() for org in response]

Error Handling

The provider uses zenpy's exception handling. Common exceptions include:

  • APIException: Raised when API operations fail
  • Connection errors: Raised when unable to connect to Zendesk
  • Authentication errors: Raised when credentials are invalid
from zenpy.lib.exception import APIException

try:
    ticket = hook.get_ticket(999999)  # Non-existent ticket
except APIException as e:
    print(f"API Error: {e}")