CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-zendesk

Apache Airflow provider package for integrating with Zendesk API through hooks and connection types

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

index.mddocs/

Apache Airflow Providers Zendesk

Apache Airflow provider package for integrating with Zendesk customer service platform. This provider enables Airflow DAGs to interact with Zendesk's API through a standardized hook interface, allowing automated customer service workflows and data synchronization.

Package Information

  • Package Name: apache-airflow-providers-zendesk
  • Package Type: pypi
  • Language: Python
  • Installation: pip install apache-airflow-providers-zendesk
  • Requirements: Apache Airflow >= 2.10.0, zenpy >= 2.0.40
  • Python Versions: 3.10, 3.11, 3.12, 3.13

Core Imports

from airflow.providers.zendesk.hooks.zendesk import ZendeskHook

Version compatibility imports:

from airflow.providers.zendesk.version_compat import BaseHook, BaseOperator

Provider info:

from airflow.providers.zendesk.get_provider_info import get_provider_info

Basic Usage

from airflow.providers.zendesk.hooks.zendesk import ZendeskHook
from airflow import DAG
from airflow.decorators import task
from datetime import datetime

@task
def manage_zendesk_tickets():
    # Initialize hook with connection ID
    hook = ZendeskHook(zendesk_conn_id='zendesk_default')
    
    # Search for tickets
    tickets = list(hook.search_tickets(status='open', type='incident'))
    
    # Get specific ticket
    if tickets:
        ticket = hook.get_ticket(tickets[0].id)
        print(f"Ticket {ticket.id}: {ticket.subject}")
    
    return len(tickets)

# DAG definition
with DAG(
    'zendesk_workflow',
    start_date=datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False
) as dag:
    manage_zendesk_tickets()

Connection Configuration

Configure Zendesk connection in Airflow:

  • Connection Type: zendesk
  • Host: Your Zendesk domain (e.g., yourcompany.zendesk.com)
  • Login: Zendesk email address
  • Password: Zendesk password or API token

Capabilities

Zendesk Hook

Main hook class for interacting with Zendesk API through the zenpy client library.

class ZendeskHook(BaseHook):
    """
    Interact with Zendesk. This hook uses the Zendesk conn_id.
    
    :param zendesk_conn_id: The Airflow connection used for Zendesk credentials.
    """
    
    conn_name_attr: str = "zendesk_conn_id"
    default_conn_name: str = "zendesk_default" 
    conn_type: str = "zendesk"
    hook_name: str = "Zendesk"
    
    def __init__(self, zendesk_conn_id: str = default_conn_name) -> None:
        """Initialize ZendeskHook with connection ID."""
    
    @classmethod
    def get_ui_field_behaviour(cls) -> dict[str, Any]:
        """
        Return UI field configuration for Airflow connection form.
        
        Returns:
            dict: Configuration hiding schema, port, extra fields and relabeling host/login
        """
    
    def get_conn(self) -> Zenpy:
        """
        Get the underlying Zenpy client.
        
        Returns:
            zenpy.Zenpy: Configured Zenpy client instance
        """
    
    def get_ticket(self, ticket_id: int) -> Ticket:
        """
        Retrieve ticket by ID.
        
        Args:
            ticket_id: The ID of the ticket to retrieve
            
        Returns:
            Ticket: Zendesk ticket object
        """
    
    def search_tickets(self, **kwargs) -> SearchResultGenerator:
        """
        Search tickets with optional parameters.
        
        Args:
            **kwargs: Search fields for zenpy search method (status, type, priority, etc.)
            
        Returns:
            SearchResultGenerator: Generator of matching Ticket objects
        """
    
    def create_tickets(self, tickets: Ticket | list[Ticket], **kwargs) -> TicketAudit | JobStatus:
        """
        Create single ticket or multiple tickets.
        
        Args:
            tickets: Single Ticket or list of Ticket objects to create
            **kwargs: Additional fields for zenpy create method
            
        Returns:
            TicketAudit: Information about ticket created (single ticket)
            JobStatus: Job status object (bulk request)
        """
    
    def update_tickets(self, tickets: Ticket | list[Ticket], **kwargs) -> TicketAudit | JobStatus:
        """
        Update single ticket or multiple tickets.
        
        Args:
            tickets: Updated Ticket or list of Ticket objects
            **kwargs: Additional fields for zenpy update method
            
        Returns:
            TicketAudit: Information about ticket updated (single ticket)
            JobStatus: Job status object (bulk request)
        """
    
    def delete_tickets(self, tickets: Ticket | list[Ticket], **kwargs) -> None:
        """
        Delete single ticket or multiple tickets.
        
        Args:
            tickets: Ticket or list of Ticket objects to delete
            **kwargs: Additional fields for zenpy delete method
            
        Returns:
            None: Returns nothing on success, raises APIException on failure
        """
    
    def get(self, *args, **kwargs):
        """
        Make custom GET request using zenpy client's users API.
        
        Note: This is an alias to the underlying zenpy client's users._get method.
        
        Args:
            *args: Positional arguments passed to zenpy client
            **kwargs: Keyword arguments passed to zenpy client
            
        Returns:
            API response from zenpy client
        """

Version Compatibility Utilities

Compatibility functions and constants for different Airflow versions.

def get_base_airflow_version_tuple() -> tuple[int, int, int]:
    """
    Get Airflow version as tuple.
    
    Returns:
        tuple[int, int, int]: Major, minor, micro version numbers
    """

# Version check constants  
AIRFLOW_V_3_0_PLUS: bool  # Whether running Airflow 3.0+
AIRFLOW_V_3_1_PLUS: bool  # Whether running Airflow 3.1+

# Version-dependent imports
BaseHook: type  # From airflow.sdk or airflow.hooks.base
BaseOperator: type  # From airflow.sdk or airflow.models

Import from version_compat module:

from airflow.providers.zendesk.version_compat import (
    get_base_airflow_version_tuple,
    AIRFLOW_V_3_0_PLUS,
    AIRFLOW_V_3_1_PLUS,
    BaseHook,
    BaseOperator
)

Provider Information

Function to retrieve provider metadata for Airflow discovery.

def get_provider_info() -> dict:
    """
    Get provider package information.
    
    Returns:
        dict: Provider metadata including package name, integrations, hooks, and connection types
    """

Types

Types from the zenpy library that are commonly used with this provider:

# From zenpy library
class Zenpy:
    """Main zenpy client for Zendesk API."""

class Ticket:
    """Zendesk ticket object containing ticket data and methods."""
    
    id: int
    subject: str
    description: str
    status: str
    priority: str
    # ... additional ticket fields

class TicketAudit:
    """Information about ticket creation/update operations."""

class JobStatus:
    """Status information for bulk operations."""

class SearchResultGenerator:
    """Generator yielding search results from Zendesk API."""

# Union types for method parameters
TicketInput = Ticket | list[Ticket]
TicketResult = TicketAudit | JobStatus

Usage Examples

Create and Update Tickets

from airflow.providers.zendesk.hooks.zendesk import ZendeskHook
from zenpy.lib.api_objects import Ticket

hook = ZendeskHook()

# Create a new ticket
new_ticket = Ticket(
    subject="Integration Test Ticket",
    description="This is a test ticket created via Airflow",
    priority="normal"
)

audit = hook.create_tickets(new_ticket)
print(f"Created ticket ID: {audit.ticket.id}")

# Update the ticket
new_ticket.status = "solved"
update_audit = hook.update_tickets(new_ticket)
print(f"Updated ticket status: {update_audit.ticket.status}")

Bulk Operations

# Create multiple tickets
tickets = [
    Ticket(subject=f"Batch Ticket {i}", description=f"Description {i}")
    for i in range(1, 4)
]

job_status = hook.create_tickets(tickets)
print(f"Bulk create job ID: {job_status.id}")

# Search and filter tickets
open_tickets = list(hook.search_tickets(status="open", type="incident"))
print(f"Found {len(open_tickets)} open incident tickets")

Custom API Access

# Access underlying zenpy client for advanced operations
zenpy_client = hook.get_conn()

# Use zenpy client directly
users = zenpy_client.users.me()
print(f"Current user: {users.name}")

# Note: The get method is an alias to zenpy client's users._get method
# For other API endpoints, use the zenpy client directly
response = zenpy_client.organizations()
organizations = [org.to_dict() for org in response]

Error Handling

The provider uses zenpy's exception handling. Common exceptions include:

  • APIException: Raised when API operations fail
  • Connection errors: Raised when unable to connect to Zendesk
  • Authentication errors: Raised when credentials are invalid
from zenpy.lib.exception import APIException

try:
    ticket = hook.get_ticket(999999)  # Non-existent ticket
except APIException as e:
    print(f"API Error: {e}")

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-zendesk

docs

index.md

tile.json