Provider package that enables OpenAI integration for Apache Airflow, including hooks, operators, and triggers for AI-powered data pipelines.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Specialized exception classes for handling OpenAI-specific error conditions and batch processing failures within Airflow workflows.
Exception raised when OpenAI Batch API operations fail after processing has begun.
class OpenAIBatchJobException(AirflowException):
"""
Raise when OpenAI Batch Job fails to start AFTER processing the request.
This exception is raised when:
- Batch processing fails during execution
- Batch is cancelled unexpectedly
- Batch expires before completion
- Batch encounters an unexpected terminal status
Inherits from AirflowException for proper integration with Airflow's
error handling and retry mechanisms.
"""Exception raised when OpenAI Batch API operations exceed specified timeout limits.
class OpenAIBatchTimeout(AirflowException):
"""
Raise when OpenAI Batch Job times out.
This exception is raised when:
- Batch processing exceeds the specified timeout duration
- Polling operations reach their time limit
- Long-running batch operations don't complete within expected timeframes
Inherits from AirflowException for proper integration with Airflow's
error handling and retry mechanisms.
"""from airflow.providers.openai.hooks.openai import OpenAIHook
from airflow.providers.openai.exceptions import OpenAIBatchJobException, OpenAIBatchTimeout
def process_batch_with_error_handling(**context):
"""Process a batch with comprehensive error handling."""
hook = OpenAIHook(conn_id='openai_default')
try:
# Create and monitor batch
batch = hook.create_batch(
file_id=context['params']['file_id'],
endpoint="/v1/chat/completions"
)
# Wait for completion with timeout
hook.wait_for_batch(
batch_id=batch.id,
wait_seconds=10,
timeout=3600 # 1 hour timeout
)
return batch.id
except OpenAIBatchTimeout as e:
print(f"Batch processing timed out: {e}")
# Log timeout details
context['task_instance'].log.error(f"Batch timeout after 1 hour: {e}")
# Optionally cancel the batch
try:
hook.cancel_batch(batch.id)
print(f"Cancelled batch {batch.id} due to timeout")
except Exception as cancel_error:
print(f"Failed to cancel batch: {cancel_error}")
raise
except OpenAIBatchJobException as e:
print(f"Batch processing failed: {e}")
# Log failure details
context['task_instance'].log.error(f"Batch job failure: {e}")
# Get final batch status for debugging
try:
final_batch = hook.get_batch(batch.id)
print(f"Final batch status: {final_batch.status}")
except Exception as status_error:
print(f"Could not retrieve final batch status: {status_error}")
raise
except Exception as e:
print(f"Unexpected error during batch processing: {e}")
context['task_instance'].log.error(f"Unexpected batch error: {e}")
raisefrom airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.providers.openai.operators.openai import OpenAITriggerBatchOperator
from airflow.providers.openai.exceptions import OpenAIBatchJobException, OpenAIBatchTimeout
dag = DAG(
'batch_with_exception_handling',
start_date=datetime(2024, 1, 1),
schedule_interval=None,
catchup=False
)
def handle_batch_failure(**context):
"""Handle batch failure scenarios."""
# Get the exception from the failed task
failed_task_instance = context['dag_run'].get_task_instance('batch_processing_task')
if failed_task_instance and failed_task_instance.state == 'failed':
print("Batch processing task failed, implementing recovery logic")
# Could implement retry logic, notification, or cleanup here
# For example, notify stakeholders or trigger alternative processing
# Return a value to indicate handling was successful
return "failure_handled"
# Main batch processing task
batch_task = OpenAITriggerBatchOperator(
task_id='batch_processing_task',
file_id='file-abc123',
endpoint='/v1/chat/completions',
conn_id='openai_default',
timeout=1800, # 30 minutes
dag=dag
)
# Failure handling task
failure_handler = PythonOperator(
task_id='handle_failure',
python_callable=handle_batch_failure,
trigger_rule='one_failed', # Run when upstream task fails
dag=dag
)
batch_task >> failure_handlerfrom airflow.utils.decorators import apply_defaults
from airflow.operators.python_operator import PythonOperator
from airflow.providers.openai.hooks.openai import OpenAIHook
from airflow.providers.openai.exceptions import OpenAIBatchJobException, OpenAIBatchTimeout
class RetryableBatchOperator(PythonOperator):
"""Custom operator with intelligent retry logic for batch operations."""
@apply_defaults
def __init__(
self,
file_id: str,
endpoint: str,
conn_id: str = 'openai_default',
max_batch_retries: int = 3,
retry_backoff: int = 60,
**kwargs
):
self.file_id = file_id
self.endpoint = endpoint
self.conn_id = conn_id
self.max_batch_retries = max_batch_retries
self.retry_backoff = retry_backoff
super().__init__(python_callable=self._execute_with_retry, **kwargs)
def _execute_with_retry(self, **context):
"""Execute batch with custom retry logic."""
hook = OpenAIHook(conn_id=self.conn_id)
for attempt in range(self.max_batch_retries + 1):
try:
# Create batch
batch = hook.create_batch(
file_id=self.file_id,
endpoint=self.endpoint
)
# Wait for completion
hook.wait_for_batch(batch.id, timeout=3600)
return batch.id
except OpenAIBatchTimeout as e:
if attempt < self.max_batch_retries:
print(f"Batch timeout on attempt {attempt + 1}, retrying in {self.retry_backoff} seconds")
# Cancel the timed-out batch
try:
hook.cancel_batch(batch.id)
except Exception:
pass # Best effort cancellation
import time
time.sleep(self.retry_backoff * (attempt + 1)) # Exponential backoff
continue
else:
print(f"Batch failed after {self.max_batch_retries + 1} attempts due to timeout")
raise
except OpenAIBatchJobException as e:
if attempt < self.max_batch_retries:
# Check if this is a retryable failure
if "rate limit" in str(e).lower() or "server error" in str(e).lower():
print(f"Retryable batch failure on attempt {attempt + 1}: {e}")
import time
time.sleep(self.retry_backoff * (attempt + 1))
continue
else:
print(f"Non-retryable batch failure: {e}")
raise
else:
print(f"Batch failed after {self.max_batch_retries + 1} attempts: {e}")
raise
except Exception as e:
print(f"Unexpected error on attempt {attempt + 1}: {e}")
if attempt >= self.max_batch_retries:
raise
import time
time.sleep(self.retry_backoff)
# Usage
retryable_batch = RetryableBatchOperator(
task_id='retryable_batch_processing',
file_id='file-xyz789',
endpoint='/v1/chat/completions',
conn_id='openai_default',
max_batch_retries=2,
retry_backoff=30,
dag=dag
)from airflow.providers.openai.triggers.openai import OpenAIBatchTrigger
from airflow.providers.openai.exceptions import OpenAIBatchJobException, OpenAIBatchTimeout
class SafeBatchTrigger(OpenAIBatchTrigger):
"""Enhanced batch trigger with comprehensive exception handling."""
async def run(self):
"""Monitor batch with enhanced error handling."""
from airflow.triggers.base import TriggerEvent
from airflow.providers.openai.hooks.openai import OpenAIHook, BatchStatus
hook = OpenAIHook(conn_id=self.conn_id)
try:
async for event in super().run():
# Intercept error events and add additional context
if event.payload.get('status') == 'error':
error_msg = event.payload.get('message', 'Unknown error')
# Enhance error message with debugging information
try:
batch = hook.get_batch(self.batch_id)
enhanced_msg = f"{error_msg} (Final batch status: {batch.status})"
# Create enhanced event
yield TriggerEvent({
**event.payload,
'message': enhanced_msg,
'batch_status': batch.status,
'debug_info': {
'created_at': getattr(batch, 'created_at', None),
'request_counts': getattr(batch, 'request_counts', None),
'metadata': getattr(batch, 'metadata', None)
}
})
except Exception as debug_error:
# If we can't get batch details, yield original event
yield TriggerEvent({
**event.payload,
'debug_error': str(debug_error)
})
else:
# Pass through non-error events
yield event
except Exception as e:
# Handle trigger-level exceptions
yield TriggerEvent({
'status': 'error',
'message': f'Trigger exception: {str(e)}',
'batch_id': self.batch_id,
'exception_type': type(e).__name__
})
# Usage with enhanced trigger
def use_safe_trigger(**context):
"""Use the enhanced trigger with better error handling."""
import time
safe_trigger = SafeBatchTrigger(
conn_id='openai_default',
batch_id=context['params']['batch_id'],
poll_interval=60,
end_time=time.time() + 7200
)
context['task_instance'].defer(
trigger=safe_trigger,
method_name='handle_safe_completion'
)
def handle_safe_completion(**context):
"""Handle completion with enhanced error information."""
event = context['event']
if event['status'] == 'error':
error_msg = event['message']
debug_info = event.get('debug_info', {})
print(f"Enhanced error information: {error_msg}")
if debug_info:
print(f"Debug details: {debug_info}")
# Decide whether to retry or fail based on error details
if 'timeout' in error_msg.lower():
raise OpenAIBatchTimeout(error_msg)
else:
raise OpenAIBatchJobException(error_msg)
return event['batch_id']from airflow.operators.email_operator import EmailOperator
from airflow.sensors.base import BaseSensorOperator
def create_exception_monitoring_dag():
"""Create a DAG with comprehensive exception monitoring."""
dag = DAG(
'openai_with_monitoring',
start_date=datetime(2024, 1, 1),
schedule_interval=None,
catchup=False
)
def monitored_batch_processing(**context):
"""Batch processing with comprehensive monitoring."""
from airflow.providers.openai.hooks.openai import OpenAIHook
from airflow.providers.openai.exceptions import OpenAIBatchJobException, OpenAIBatchTimeout
hook = OpenAIHook(conn_id='openai_default')
try:
batch = hook.create_batch(
file_id=context['params']['file_id'],
endpoint='/v1/chat/completions'
)
# Store batch ID for monitoring
context['task_instance'].xcom_push(key='batch_id', value=batch.id)
context['task_instance'].xcom_push(key='batch_status', value='started')
hook.wait_for_batch(batch.id, timeout=1800)
context['task_instance'].xcom_push(key='batch_status', value='completed')
return batch.id
except OpenAIBatchTimeout as e:
context['task_instance'].xcom_push(key='batch_status', value='timeout')
context['task_instance'].xcom_push(key='error_message', value=str(e))
raise
except OpenAIBatchJobException as e:
context['task_instance'].xcom_push(key='batch_status', value='failed')
context['task_instance'].xcom_push(key='error_message', value=str(e))
raise
# Main processing task
batch_task = PythonOperator(
task_id='monitored_batch_processing',
python_callable=monitored_batch_processing,
params={'file_id': 'file-monitoring-test'},
dag=dag
)
# Alert on timeout
timeout_alert = EmailOperator(
task_id='timeout_alert',
to=['data-team@company.com'],
subject='OpenAI Batch Timeout Alert',
html_content="""
<h3>OpenAI Batch Processing Timeout</h3>
<p>Batch ID: {{ task_instance.xcom_pull(task_ids='monitored_batch_processing', key='batch_id') }}</p>
<p>Error: {{ task_instance.xcom_pull(task_ids='monitored_batch_processing', key='error_message') }}</p>
<p>DAG: {{ dag.dag_id }}</p>
<p>Execution Date: {{ ds }}</p>
""",
trigger_rule='one_failed',
dag=dag
)
# Alert on failure
failure_alert = EmailOperator(
task_id='failure_alert',
to=['data-team@company.com'],
subject='OpenAI Batch Processing Failure',
html_content="""
<h3>OpenAI Batch Processing Failed</h3>
<p>Batch ID: {{ task_instance.xcom_pull(task_ids='monitored_batch_processing', key='batch_id') }}</p>
<p>Status: {{ task_instance.xcom_pull(task_ids='monitored_batch_processing', key='batch_status') }}</p>
<p>Error: {{ task_instance.xcom_pull(task_ids='monitored_batch_processing', key='error_message') }}</p>
<p>DAG: {{ dag.dag_id }}</p>
<p>Execution Date: {{ ds }}</p>
""",
trigger_rule='one_failed',
dag=dag
)
batch_task >> [timeout_alert, failure_alert]
return dag
# Create the monitoring DAG
monitoring_dag = create_exception_monitoring_dag()Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-openai