Apache Airflow provider package for integrating with Zendesk API through hooks and connection types
npx @tessl/cli install tessl/pypi-apache-airflow-providers-zendesk@4.10.0Apache Airflow provider package for integrating with Zendesk customer service platform. This provider enables Airflow DAGs to interact with Zendesk's API through a standardized hook interface, allowing automated customer service workflows and data synchronization.
pip install apache-airflow-providers-zendeskfrom airflow.providers.zendesk.hooks.zendesk import ZendeskHookVersion compatibility imports:
from airflow.providers.zendesk.version_compat import BaseHook, BaseOperatorProvider info:
from airflow.providers.zendesk.get_provider_info import get_provider_infofrom airflow.providers.zendesk.hooks.zendesk import ZendeskHook
from airflow import DAG
from airflow.decorators import task
from datetime import datetime
@task
def manage_zendesk_tickets():
# Initialize hook with connection ID
hook = ZendeskHook(zendesk_conn_id='zendesk_default')
# Search for tickets
tickets = list(hook.search_tickets(status='open', type='incident'))
# Get specific ticket
if tickets:
ticket = hook.get_ticket(tickets[0].id)
print(f"Ticket {ticket.id}: {ticket.subject}")
return len(tickets)
# DAG definition
with DAG(
'zendesk_workflow',
start_date=datetime(2023, 1, 1),
schedule_interval=None,
catchup=False
) as dag:
manage_zendesk_tickets()Configure Zendesk connection in Airflow:
zendeskyourcompany.zendesk.com)Main hook class for interacting with Zendesk API through the zenpy client library.
class ZendeskHook(BaseHook):
"""
Interact with Zendesk. This hook uses the Zendesk conn_id.
:param zendesk_conn_id: The Airflow connection used for Zendesk credentials.
"""
conn_name_attr: str = "zendesk_conn_id"
default_conn_name: str = "zendesk_default"
conn_type: str = "zendesk"
hook_name: str = "Zendesk"
def __init__(self, zendesk_conn_id: str = default_conn_name) -> None:
"""Initialize ZendeskHook with connection ID."""
@classmethod
def get_ui_field_behaviour(cls) -> dict[str, Any]:
"""
Return UI field configuration for Airflow connection form.
Returns:
dict: Configuration hiding schema, port, extra fields and relabeling host/login
"""
def get_conn(self) -> Zenpy:
"""
Get the underlying Zenpy client.
Returns:
zenpy.Zenpy: Configured Zenpy client instance
"""
def get_ticket(self, ticket_id: int) -> Ticket:
"""
Retrieve ticket by ID.
Args:
ticket_id: The ID of the ticket to retrieve
Returns:
Ticket: Zendesk ticket object
"""
def search_tickets(self, **kwargs) -> SearchResultGenerator:
"""
Search tickets with optional parameters.
Args:
**kwargs: Search fields for zenpy search method (status, type, priority, etc.)
Returns:
SearchResultGenerator: Generator of matching Ticket objects
"""
def create_tickets(self, tickets: Ticket | list[Ticket], **kwargs) -> TicketAudit | JobStatus:
"""
Create single ticket or multiple tickets.
Args:
tickets: Single Ticket or list of Ticket objects to create
**kwargs: Additional fields for zenpy create method
Returns:
TicketAudit: Information about ticket created (single ticket)
JobStatus: Job status object (bulk request)
"""
def update_tickets(self, tickets: Ticket | list[Ticket], **kwargs) -> TicketAudit | JobStatus:
"""
Update single ticket or multiple tickets.
Args:
tickets: Updated Ticket or list of Ticket objects
**kwargs: Additional fields for zenpy update method
Returns:
TicketAudit: Information about ticket updated (single ticket)
JobStatus: Job status object (bulk request)
"""
def delete_tickets(self, tickets: Ticket | list[Ticket], **kwargs) -> None:
"""
Delete single ticket or multiple tickets.
Args:
tickets: Ticket or list of Ticket objects to delete
**kwargs: Additional fields for zenpy delete method
Returns:
None: Returns nothing on success, raises APIException on failure
"""
def get(self, *args, **kwargs):
"""
Make custom GET request using zenpy client's users API.
Note: This is an alias to the underlying zenpy client's users._get method.
Args:
*args: Positional arguments passed to zenpy client
**kwargs: Keyword arguments passed to zenpy client
Returns:
API response from zenpy client
"""Compatibility functions and constants for different Airflow versions.
def get_base_airflow_version_tuple() -> tuple[int, int, int]:
"""
Get Airflow version as tuple.
Returns:
tuple[int, int, int]: Major, minor, micro version numbers
"""
# Version check constants
AIRFLOW_V_3_0_PLUS: bool # Whether running Airflow 3.0+
AIRFLOW_V_3_1_PLUS: bool # Whether running Airflow 3.1+
# Version-dependent imports
BaseHook: type # From airflow.sdk or airflow.hooks.base
BaseOperator: type # From airflow.sdk or airflow.modelsImport from version_compat module:
from airflow.providers.zendesk.version_compat import (
get_base_airflow_version_tuple,
AIRFLOW_V_3_0_PLUS,
AIRFLOW_V_3_1_PLUS,
BaseHook,
BaseOperator
)Function to retrieve provider metadata for Airflow discovery.
def get_provider_info() -> dict:
"""
Get provider package information.
Returns:
dict: Provider metadata including package name, integrations, hooks, and connection types
"""Types from the zenpy library that are commonly used with this provider:
# From zenpy library
class Zenpy:
"""Main zenpy client for Zendesk API."""
class Ticket:
"""Zendesk ticket object containing ticket data and methods."""
id: int
subject: str
description: str
status: str
priority: str
# ... additional ticket fields
class TicketAudit:
"""Information about ticket creation/update operations."""
class JobStatus:
"""Status information for bulk operations."""
class SearchResultGenerator:
"""Generator yielding search results from Zendesk API."""
# Union types for method parameters
TicketInput = Ticket | list[Ticket]
TicketResult = TicketAudit | JobStatusfrom airflow.providers.zendesk.hooks.zendesk import ZendeskHook
from zenpy.lib.api_objects import Ticket
hook = ZendeskHook()
# Create a new ticket
new_ticket = Ticket(
subject="Integration Test Ticket",
description="This is a test ticket created via Airflow",
priority="normal"
)
audit = hook.create_tickets(new_ticket)
print(f"Created ticket ID: {audit.ticket.id}")
# Update the ticket
new_ticket.status = "solved"
update_audit = hook.update_tickets(new_ticket)
print(f"Updated ticket status: {update_audit.ticket.status}")# Create multiple tickets
tickets = [
Ticket(subject=f"Batch Ticket {i}", description=f"Description {i}")
for i in range(1, 4)
]
job_status = hook.create_tickets(tickets)
print(f"Bulk create job ID: {job_status.id}")
# Search and filter tickets
open_tickets = list(hook.search_tickets(status="open", type="incident"))
print(f"Found {len(open_tickets)} open incident tickets")# Access underlying zenpy client for advanced operations
zenpy_client = hook.get_conn()
# Use zenpy client directly
users = zenpy_client.users.me()
print(f"Current user: {users.name}")
# Note: The get method is an alias to zenpy client's users._get method
# For other API endpoints, use the zenpy client directly
response = zenpy_client.organizations()
organizations = [org.to_dict() for org in response]The provider uses zenpy's exception handling. Common exceptions include:
from zenpy.lib.exception import APIException
try:
ticket = hook.get_ticket(999999) # Non-existent ticket
except APIException as e:
print(f"API Error: {e}")