Backport provider package for JIRA integration with Apache Airflow 1.10.x, providing operators, sensors, and hooks for interacting with JIRA systems.
npx @tessl/cli install tessl/pypi-apache-airflow-backport-providers-jira@2020.10.0A backport provider package for Apache Airflow 1.10.x that enables workflow orchestration systems to integrate with JIRA issue tracking and project management. Provides comprehensive operators for creating and updating tickets, sensors for monitoring ticket status and state changes, and hooks for direct API interactions with JIRA instances.
pip install apache-airflow-backport-providers-jiraapache-airflow~=1.10, JIRA>1.0.7from airflow.providers.jira.hooks.jira import JiraHook
from airflow.providers.jira.operators.jira import JiraOperator
from airflow.providers.jira.sensors.jira import JiraSensor, JiraTicketSensorfrom airflow import DAG
from airflow.providers.jira.hooks.jira import JiraHook
from airflow.providers.jira.operators.jira import JiraOperator
from airflow.providers.jira.sensors.jira import JiraTicketSensor
from datetime import datetime, timedelta
# Create a DAG
dag = DAG(
'jira_integration_example',
default_args={
'owner': 'data_team',
'retries': 1,
'retry_delay': timedelta(minutes=5),
},
description='Example JIRA integration workflow',
schedule_interval='@daily',
start_date=datetime(2023, 1, 1),
catchup=False,
)
# Create a JIRA issue using JiraOperator
create_ticket = JiraOperator(
task_id='create_jira_ticket',
jira_conn_id='jira_default',
jira_method='create_issue',
jira_method_args={
'fields': {
'project': {'key': 'TEST'},
'summary': 'Daily workflow completed',
'description': 'Automated ticket from Airflow workflow',
'issuetype': {'name': 'Task'},
}
},
dag=dag,
)
# Monitor ticket status using JiraTicketSensor
wait_for_approval = JiraTicketSensor(
task_id='wait_for_ticket_approval',
jira_conn_id='jira_default',
ticket_id='TEST-123',
field='status',
expected_value='In Progress',
poke_interval=300, # Check every 5 minutes
timeout=3600, # Timeout after 1 hour
dag=dag,
)
# Set task dependencies
create_ticket >> wait_for_approvalThe JIRA provider follows Apache Airflow's standard architecture pattern with three main component types:
All components support Airflow's connection management system for secure credential handling and support templating for dynamic parameter substitution.
Establishes and manages connections to JIRA instances with authentication, SSL verification, and proxy support.
class JiraHook(BaseHook):
def __init__(self, jira_conn_id: str = 'jira_default', proxies: Optional[Any] = None) -> None:
"""
Initialize JIRA hook for API interactions.
Automatically calls get_conn() to establish connection.
Parameters:
- jira_conn_id: Connection ID reference for JIRA instance (default: 'jira_default')
- proxies: Optional proxy configuration for JIRA client
Attributes:
- jira_conn_id: Stored connection ID reference
- proxies: Stored proxy configuration
- client: JIRA client instance (initialized to None, set by get_conn())
Note: Constructor automatically calls get_conn() to initialize the client connection.
"""
def get_conn(self) -> JIRA:
"""
Get or create authenticated JIRA client connection.
Called automatically during initialization and whenever client is None.
Connection Configuration:
- Supports basic authentication via connection login/password
- Respects connection extra options: verify, validate, get_server_info
- Handles proxy configuration if provided during initialization
- Raises AirflowException if no jira_conn_id provided
Returns:
JIRA client instance ready for API operations
Raises:
AirflowException: If connection fails, no jira_conn_id provided, or authentication error occurs
JIRAError: Re-raised as AirflowException for JIRA-specific connection errors
"""Executes any JIRA Python SDK method through a generic operator interface, enabling comprehensive JIRA API access within Airflow workflows.
class JiraOperator(BaseOperator):
template_fields = ("jira_method_args",)
@apply_defaults
def __init__(
self,
*,
jira_method: str,
jira_conn_id: str = 'jira_default',
jira_method_args: Optional[dict] = None,
result_processor: Optional[Callable] = None,
get_jira_resource_method: Optional[Callable] = None,
**kwargs,
) -> None:
"""
Generic operator for JIRA API operations using JIRA Python SDK.
Reference: http://jira.readthedocs.io
Parameters:
- jira_method: Method name from JIRA Python SDK to be called
- jira_conn_id: Connection ID reference for JIRA instance (default: 'jira_default')
- jira_method_args: Required method parameters for the jira_method (templated, default: None)
- result_processor: Optional function to further process the response from JIRA
- get_jira_resource_method: Optional function or operator to get jira resource on which the provided jira_method will be executed
Attributes:
- jira_conn_id: Stored connection ID reference
- method_name: Internal storage of jira_method parameter
- jira_method_args: Stored method arguments (supports templating)
- result_processor: Stored result processing function
- get_jira_resource_method: Stored resource method for advanced usage
"""
def execute(self, context: Dict) -> Any:
"""
Execute the configured JIRA method on the appropriate resource.
Execution Logic:
- If get_jira_resource_method is provided, executes jira_method on that resource
- If get_jira_resource_method is a JiraOperator, executes it to get the resource
- Otherwise executes jira_method on the top-level JIRA client
- Applies result_processor to the result if provided
Parameters:
- context: Airflow task execution context
Returns:
Result from JIRA method execution, optionally processed by result_processor
Raises:
AirflowException: If JIRA operation fails, method execution error occurs, or general error
JIRAError: JIRA-specific errors are caught and re-raised as AirflowException
Note: Current JIRA Python SDK (1.0.7) has issues with pickling JIRA responses for XCom.
"""Provides sensor capabilities for monitoring JIRA ticket states and field changes with customizable polling and timeout behavior.
class JiraSensor(BaseSensorOperator):
@apply_defaults
def __init__(
self,
*,
method_name: str,
jira_conn_id: str = 'jira_default',
method_params: Optional[dict] = None,
result_processor: Optional[Callable] = None,
**kwargs,
) -> None:
"""
Generic sensor for monitoring JIRA state changes using jira-python-sdk methods.
Uses JiraOperator internally for JIRA API interactions.
Parameters:
- method_name: Method name from jira-python-sdk to be executed for monitoring
- jira_conn_id: Connection ID reference for JIRA instance (default: 'jira_default')
- method_params: Parameters for the method method_name (default: None)
- result_processor: Function that returns boolean and acts as sensor response (default: None)
Attributes:
- jira_conn_id: Stored connection ID reference
- result_processor: Initially set to None, then conditionally assigned from parameter
- method_name: Stored method name for JIRA API calls
- method_params: Stored method parameters
- jira_operator: Internal JiraOperator instance configured with sensor parameters
Initialization Logic:
- Sets result_processor to None first, then conditionally assigns if parameter provided
- Creates internal JiraOperator with task_id, connection, method, and parameters
"""
def poke(self, context: Dict) -> Any:
"""
Execute sensor check by delegating to internal JiraOperator.
Parameters:
- context: Airflow task execution context
Returns:
Result from internal jira_operator.execute() call, typically boolean for sensor evaluation
"""Specialized sensor for monitoring specific fields in JIRA tickets with built-in field comparison logic for common data types.
class JiraTicketSensor(JiraSensor):
template_fields = ("ticket_id",)
@apply_defaults
def __init__(
self,
*,
jira_conn_id: str = 'jira_default',
ticket_id: Optional[str] = None,
field: Optional[str] = None,
expected_value: Optional[str] = None,
field_checker_func: Optional[Callable] = None,
**kwargs,
) -> None:
"""
Sensor for monitoring specific JIRA ticket field changes in terms of function.
Parameters:
- jira_conn_id: Connection ID reference for JIRA instance (default: 'jira_default')
- ticket_id: ID of the ticket to be monitored (templated, default: None)
- field: Field of the ticket to be monitored (default: None)
- expected_value: Expected value of the field (default: None)
- field_checker_func: Function that returns boolean and acts as sensor response (default: None, uses issue_field_checker)
Attributes:
- jira_conn_id: Stored connection ID reference
- ticket_id: Stored ticket ID for monitoring
- field: Stored field name to monitor
- expected_value: Stored expected field value
Initialization Logic:
- Sets instance attributes before calling parent constructor
- If field_checker_func is None, defaults to self.issue_field_checker
- Calls parent constructor with connection ID and result_processor
"""
def poke(self, context: Dict) -> Any:
"""
Check if ticket field matches expected value by dynamically configuring the internal operator.
Dynamic Configuration:
- Sets jira_operator.method_name to "issue"
- Sets jira_operator.jira_method_args to {'id': ticket_id, 'fields': field}
- Delegates to parent JiraSensor.poke() for execution
Parameters:
- context: Airflow task execution context
Returns:
Boolean indicating if field condition is met (from result_processor evaluation)
"""
def issue_field_checker(self, issue: Issue) -> Optional[bool]:
"""
Built-in field checker with type-specific comparison logic and error handling.
Field Type Handling:
- List fields: checks if expected_value is in the list
- String fields: case-insensitive string comparison
- Resource fields: case-insensitive comparison with resource.name attribute
- Other types: logs warning about unimplemented checker
Parameters:
- issue: JIRA Issue object to check
Returns:
Boolean result of field comparison, None if comparison cannot be performed
Error Handling:
- Catches JIRAError and logs as error
- Catches general exceptions and logs with details
- Returns None for any error conditions
Logging:
- Logs success when field matches expected value
- Logs info when field doesn't match expected value yet
"""# Core Python typing imports
from typing import Any, Callable, Dict, Optional
# JIRA Python SDK imports
from jira import JIRA
from jira.resources import Issue, Resource
from jira.exceptions import JIRAError
# Airflow core imports
from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook
from airflow.models import BaseOperator
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
# Provider-specific imports
from airflow.providers.jira.hooks.jira import JiraHook
from airflow.providers.jira.operators.jira import JiraOperator
# Note: JIRAError is imported from jira.exceptions but also available through
# airflow.providers.jira.operators.jira for convenience (circular import pattern)All components convert JIRA-specific exceptions to Airflow exceptions for consistent error handling:
JIRA connections are configured through Airflow's connection management system:
# Connection configuration supports:
# - Basic authentication (username/password)
# - SSL verification control
# - Server info validation
# - Proxy configuration
# - Custom JIRA client options via connection extrasThe following fields support Airflow templating for dynamic values:
JiraOperator.jira_method_args: All method arguments can use templated valuesJiraTicketSensor.ticket_id: Ticket ID can be dynamically determined from contextcreate_issue = JiraOperator(
task_id='create_issue',
jira_method='create_issue',
jira_method_args={
'fields': {
'project': {'key': 'PROJ'},
'summary': 'Issue from Airflow',
'description': 'Automated issue creation',
'issuetype': {'name': 'Bug'},
'priority': {'name': 'High'},
}
}
)update_issue = JiraOperator(
task_id='update_issue',
jira_method='issue',
jira_method_args={'id': 'PROJ-123'},
get_jira_resource_method=lambda: hook.client.issue('PROJ-123').update(
summary='Updated summary',
description='Updated description'
)
)wait_for_resolved = JiraTicketSensor(
task_id='wait_resolved',
ticket_id='PROJ-123',
field='status',
expected_value='Resolved',
poke_interval=60,
timeout=1800,
)def custom_checker(context, jira_result):
issue = jira_result
return (issue.fields.assignee is not None and
issue.fields.priority.name == 'High')
wait_assigned_high_priority = JiraSensor(
task_id='wait_assigned_high',
method_name='issue',
method_params={'id': 'PROJ-123'},
result_processor=custom_checker,
)