Apache Airflow provider package for integrating with Zendesk API through hooks and connection types
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Apache Airflow provider package for integrating with Zendesk customer service platform. This provider enables Airflow DAGs to interact with Zendesk's API through a standardized hook interface, allowing automated customer service workflows and data synchronization.
pip install apache-airflow-providers-zendeskfrom airflow.providers.zendesk.hooks.zendesk import ZendeskHookVersion compatibility imports:
from airflow.providers.zendesk.version_compat import BaseHook, BaseOperatorProvider info:
from airflow.providers.zendesk.get_provider_info import get_provider_infofrom airflow.providers.zendesk.hooks.zendesk import ZendeskHook
from airflow import DAG
from airflow.decorators import task
from datetime import datetime
@task
def manage_zendesk_tickets():
# Initialize hook with connection ID
hook = ZendeskHook(zendesk_conn_id='zendesk_default')
# Search for tickets
tickets = list(hook.search_tickets(status='open', type='incident'))
# Get specific ticket
if tickets:
ticket = hook.get_ticket(tickets[0].id)
print(f"Ticket {ticket.id}: {ticket.subject}")
return len(tickets)
# DAG definition
with DAG(
'zendesk_workflow',
start_date=datetime(2023, 1, 1),
schedule_interval=None,
catchup=False
) as dag:
manage_zendesk_tickets()Configure Zendesk connection in Airflow:
zendeskyourcompany.zendesk.com)Main hook class for interacting with Zendesk API through the zenpy client library.
class ZendeskHook(BaseHook):
"""
Interact with Zendesk. This hook uses the Zendesk conn_id.
:param zendesk_conn_id: The Airflow connection used for Zendesk credentials.
"""
conn_name_attr: str = "zendesk_conn_id"
default_conn_name: str = "zendesk_default"
conn_type: str = "zendesk"
hook_name: str = "Zendesk"
def __init__(self, zendesk_conn_id: str = default_conn_name) -> None:
"""Initialize ZendeskHook with connection ID."""
@classmethod
def get_ui_field_behaviour(cls) -> dict[str, Any]:
"""
Return UI field configuration for Airflow connection form.
Returns:
dict: Configuration hiding schema, port, extra fields and relabeling host/login
"""
def get_conn(self) -> Zenpy:
"""
Get the underlying Zenpy client.
Returns:
zenpy.Zenpy: Configured Zenpy client instance
"""
def get_ticket(self, ticket_id: int) -> Ticket:
"""
Retrieve ticket by ID.
Args:
ticket_id: The ID of the ticket to retrieve
Returns:
Ticket: Zendesk ticket object
"""
def search_tickets(self, **kwargs) -> SearchResultGenerator:
"""
Search tickets with optional parameters.
Args:
**kwargs: Search fields for zenpy search method (status, type, priority, etc.)
Returns:
SearchResultGenerator: Generator of matching Ticket objects
"""
def create_tickets(self, tickets: Ticket | list[Ticket], **kwargs) -> TicketAudit | JobStatus:
"""
Create single ticket or multiple tickets.
Args:
tickets: Single Ticket or list of Ticket objects to create
**kwargs: Additional fields for zenpy create method
Returns:
TicketAudit: Information about ticket created (single ticket)
JobStatus: Job status object (bulk request)
"""
def update_tickets(self, tickets: Ticket | list[Ticket], **kwargs) -> TicketAudit | JobStatus:
"""
Update single ticket or multiple tickets.
Args:
tickets: Updated Ticket or list of Ticket objects
**kwargs: Additional fields for zenpy update method
Returns:
TicketAudit: Information about ticket updated (single ticket)
JobStatus: Job status object (bulk request)
"""
def delete_tickets(self, tickets: Ticket | list[Ticket], **kwargs) -> None:
"""
Delete single ticket or multiple tickets.
Args:
tickets: Ticket or list of Ticket objects to delete
**kwargs: Additional fields for zenpy delete method
Returns:
None: Returns nothing on success, raises APIException on failure
"""
def get(self, *args, **kwargs):
"""
Make custom GET request using zenpy client's users API.
Note: This is an alias to the underlying zenpy client's users._get method.
Args:
*args: Positional arguments passed to zenpy client
**kwargs: Keyword arguments passed to zenpy client
Returns:
API response from zenpy client
"""Compatibility functions and constants for different Airflow versions.
def get_base_airflow_version_tuple() -> tuple[int, int, int]:
"""
Get Airflow version as tuple.
Returns:
tuple[int, int, int]: Major, minor, micro version numbers
"""
# Version check constants
AIRFLOW_V_3_0_PLUS: bool # Whether running Airflow 3.0+
AIRFLOW_V_3_1_PLUS: bool # Whether running Airflow 3.1+
# Version-dependent imports
BaseHook: type # From airflow.sdk or airflow.hooks.base
BaseOperator: type # From airflow.sdk or airflow.modelsImport from version_compat module:
from airflow.providers.zendesk.version_compat import (
get_base_airflow_version_tuple,
AIRFLOW_V_3_0_PLUS,
AIRFLOW_V_3_1_PLUS,
BaseHook,
BaseOperator
)Function to retrieve provider metadata for Airflow discovery.
def get_provider_info() -> dict:
"""
Get provider package information.
Returns:
dict: Provider metadata including package name, integrations, hooks, and connection types
"""Types from the zenpy library that are commonly used with this provider:
# From zenpy library
class Zenpy:
"""Main zenpy client for Zendesk API."""
class Ticket:
"""Zendesk ticket object containing ticket data and methods."""
id: int
subject: str
description: str
status: str
priority: str
# ... additional ticket fields
class TicketAudit:
"""Information about ticket creation/update operations."""
class JobStatus:
"""Status information for bulk operations."""
class SearchResultGenerator:
"""Generator yielding search results from Zendesk API."""
# Union types for method parameters
TicketInput = Ticket | list[Ticket]
TicketResult = TicketAudit | JobStatusfrom airflow.providers.zendesk.hooks.zendesk import ZendeskHook
from zenpy.lib.api_objects import Ticket
hook = ZendeskHook()
# Create a new ticket
new_ticket = Ticket(
subject="Integration Test Ticket",
description="This is a test ticket created via Airflow",
priority="normal"
)
audit = hook.create_tickets(new_ticket)
print(f"Created ticket ID: {audit.ticket.id}")
# Update the ticket
new_ticket.status = "solved"
update_audit = hook.update_tickets(new_ticket)
print(f"Updated ticket status: {update_audit.ticket.status}")# Create multiple tickets
tickets = [
Ticket(subject=f"Batch Ticket {i}", description=f"Description {i}")
for i in range(1, 4)
]
job_status = hook.create_tickets(tickets)
print(f"Bulk create job ID: {job_status.id}")
# Search and filter tickets
open_tickets = list(hook.search_tickets(status="open", type="incident"))
print(f"Found {len(open_tickets)} open incident tickets")# Access underlying zenpy client for advanced operations
zenpy_client = hook.get_conn()
# Use zenpy client directly
users = zenpy_client.users.me()
print(f"Current user: {users.name}")
# Note: The get method is an alias to zenpy client's users._get method
# For other API endpoints, use the zenpy client directly
response = zenpy_client.organizations()
organizations = [org.to_dict() for org in response]The provider uses zenpy's exception handling. Common exceptions include:
from zenpy.lib.exception import APIException
try:
ticket = hook.get_ticket(999999) # Non-existent ticket
except APIException as e:
print(f"API Error: {e}")Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-zendesk