CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-openai

Provider package that enables OpenAI integration for Apache Airflow, including hooks, operators, and triggers for AI-powered data pipelines.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

exceptions.mddocs/

Exceptions

Specialized exception classes for handling OpenAI-specific error conditions and batch processing failures within Airflow workflows.

Capabilities

Batch Job Exception

Exception raised when OpenAI Batch API operations fail after processing has begun.

class OpenAIBatchJobException(AirflowException):
    """
    Raise when OpenAI Batch Job fails to start AFTER processing the request.
    
    This exception is raised when:
    - Batch processing fails during execution
    - Batch is cancelled unexpectedly
    - Batch expires before completion
    - Batch encounters an unexpected terminal status
    
    Inherits from AirflowException for proper integration with Airflow's
    error handling and retry mechanisms.
    """

Batch Timeout Exception

Exception raised when OpenAI Batch API operations exceed specified timeout limits.

class OpenAIBatchTimeout(AirflowException):
    """
    Raise when OpenAI Batch Job times out.
    
    This exception is raised when:
    - Batch processing exceeds the specified timeout duration
    - Polling operations reach their time limit
    - Long-running batch operations don't complete within expected timeframes
    
    Inherits from AirflowException for proper integration with Airflow's
    error handling and retry mechanisms.
    """

Usage Examples

Basic Exception Handling

from airflow.providers.openai.hooks.openai import OpenAIHook
from airflow.providers.openai.exceptions import OpenAIBatchJobException, OpenAIBatchTimeout

def process_batch_with_error_handling(**context):
    """Process a batch with comprehensive error handling."""
    hook = OpenAIHook(conn_id='openai_default')
    
    try:
        # Create and monitor batch
        batch = hook.create_batch(
            file_id=context['params']['file_id'],
            endpoint="/v1/chat/completions"
        )
        
        # Wait for completion with timeout
        hook.wait_for_batch(
            batch_id=batch.id,
            wait_seconds=10,
            timeout=3600  # 1 hour timeout
        )
        
        return batch.id
        
    except OpenAIBatchTimeout as e:
        print(f"Batch processing timed out: {e}")
        # Log timeout details
        context['task_instance'].log.error(f"Batch timeout after 1 hour: {e}")
        # Optionally cancel the batch
        try:
            hook.cancel_batch(batch.id)
            print(f"Cancelled batch {batch.id} due to timeout")
        except Exception as cancel_error:
            print(f"Failed to cancel batch: {cancel_error}")
        raise
        
    except OpenAIBatchJobException as e:
        print(f"Batch processing failed: {e}")
        # Log failure details
        context['task_instance'].log.error(f"Batch job failure: {e}")
        # Get final batch status for debugging
        try:
            final_batch = hook.get_batch(batch.id)
            print(f"Final batch status: {final_batch.status}")
        except Exception as status_error:
            print(f"Could not retrieve final batch status: {status_error}")
        raise
        
    except Exception as e:
        print(f"Unexpected error during batch processing: {e}")
        context['task_instance'].log.error(f"Unexpected batch error: {e}")
        raise

Operator Exception Handling

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.providers.openai.operators.openai import OpenAITriggerBatchOperator
from airflow.providers.openai.exceptions import OpenAIBatchJobException, OpenAIBatchTimeout

dag = DAG(
    'batch_with_exception_handling',
    start_date=datetime(2024, 1, 1),
    schedule_interval=None,
    catchup=False
)

def handle_batch_failure(**context):
    """Handle batch failure scenarios."""
    # Get the exception from the failed task
    failed_task_instance = context['dag_run'].get_task_instance('batch_processing_task')
    
    if failed_task_instance and failed_task_instance.state == 'failed':
        print("Batch processing task failed, implementing recovery logic")
        
        # Could implement retry logic, notification, or cleanup here
        # For example, notify stakeholders or trigger alternative processing
        
        # Return a value to indicate handling was successful
        return "failure_handled"

# Main batch processing task
batch_task = OpenAITriggerBatchOperator(
    task_id='batch_processing_task',
    file_id='file-abc123',
    endpoint='/v1/chat/completions',
    conn_id='openai_default',
    timeout=1800,  # 30 minutes
    dag=dag
)

# Failure handling task
failure_handler = PythonOperator(
    task_id='handle_failure',
    python_callable=handle_batch_failure,
    trigger_rule='one_failed',  # Run when upstream task fails
    dag=dag
)

batch_task >> failure_handler

Custom Exception Handling with Retries

from airflow.utils.decorators import apply_defaults
from airflow.operators.python_operator import PythonOperator
from airflow.providers.openai.hooks.openai import OpenAIHook
from airflow.providers.openai.exceptions import OpenAIBatchJobException, OpenAIBatchTimeout

class RetryableBatchOperator(PythonOperator):
    """Custom operator with intelligent retry logic for batch operations."""
    
    @apply_defaults
    def __init__(
        self,
        file_id: str,
        endpoint: str,
        conn_id: str = 'openai_default',
        max_batch_retries: int = 3,
        retry_backoff: int = 60,
        **kwargs
    ):
        self.file_id = file_id
        self.endpoint = endpoint
        self.conn_id = conn_id
        self.max_batch_retries = max_batch_retries
        self.retry_backoff = retry_backoff
        super().__init__(python_callable=self._execute_with_retry, **kwargs)
    
    def _execute_with_retry(self, **context):
        """Execute batch with custom retry logic."""
        hook = OpenAIHook(conn_id=self.conn_id)
        
        for attempt in range(self.max_batch_retries + 1):
            try:
                # Create batch
                batch = hook.create_batch(
                    file_id=self.file_id,
                    endpoint=self.endpoint
                )
                
                # Wait for completion
                hook.wait_for_batch(batch.id, timeout=3600)
                
                return batch.id
                
            except OpenAIBatchTimeout as e:
                if attempt < self.max_batch_retries:
                    print(f"Batch timeout on attempt {attempt + 1}, retrying in {self.retry_backoff} seconds")
                    # Cancel the timed-out batch
                    try:
                        hook.cancel_batch(batch.id)
                    except Exception:
                        pass  # Best effort cancellation
                    
                    import time
                    time.sleep(self.retry_backoff * (attempt + 1))  # Exponential backoff
                    continue
                else:
                    print(f"Batch failed after {self.max_batch_retries + 1} attempts due to timeout")
                    raise
                    
            except OpenAIBatchJobException as e:
                if attempt < self.max_batch_retries:
                    # Check if this is a retryable failure
                    if "rate limit" in str(e).lower() or "server error" in str(e).lower():
                        print(f"Retryable batch failure on attempt {attempt + 1}: {e}")
                        import time
                        time.sleep(self.retry_backoff * (attempt + 1))
                        continue
                    else:
                        print(f"Non-retryable batch failure: {e}")
                        raise
                else:
                    print(f"Batch failed after {self.max_batch_retries + 1} attempts: {e}")
                    raise
                    
            except Exception as e:
                print(f"Unexpected error on attempt {attempt + 1}: {e}")
                if attempt >= self.max_batch_retries:
                    raise
                import time
                time.sleep(self.retry_backoff)

# Usage
retryable_batch = RetryableBatchOperator(
    task_id='retryable_batch_processing',
    file_id='file-xyz789',
    endpoint='/v1/chat/completions',
    conn_id='openai_default',
    max_batch_retries=2,
    retry_backoff=30,
    dag=dag
)

Trigger Exception Handling

from airflow.providers.openai.triggers.openai import OpenAIBatchTrigger
from airflow.providers.openai.exceptions import OpenAIBatchJobException, OpenAIBatchTimeout

class SafeBatchTrigger(OpenAIBatchTrigger):
    """Enhanced batch trigger with comprehensive exception handling."""
    
    async def run(self):
        """Monitor batch with enhanced error handling."""
        from airflow.triggers.base import TriggerEvent
        from airflow.providers.openai.hooks.openai import OpenAIHook, BatchStatus
        
        hook = OpenAIHook(conn_id=self.conn_id)
        
        try:
            async for event in super().run():
                # Intercept error events and add additional context
                if event.payload.get('status') == 'error':
                    error_msg = event.payload.get('message', 'Unknown error')
                    
                    # Enhance error message with debugging information
                    try:
                        batch = hook.get_batch(self.batch_id)
                        enhanced_msg = f"{error_msg} (Final batch status: {batch.status})"
                        
                        # Create enhanced event
                        yield TriggerEvent({
                            **event.payload,
                            'message': enhanced_msg,
                            'batch_status': batch.status,
                            'debug_info': {
                                'created_at': getattr(batch, 'created_at', None),
                                'request_counts': getattr(batch, 'request_counts', None),
                                'metadata': getattr(batch, 'metadata', None)
                            }
                        })
                    except Exception as debug_error:
                        # If we can't get batch details, yield original event
                        yield TriggerEvent({
                            **event.payload,
                            'debug_error': str(debug_error)
                        })
                else:
                    # Pass through non-error events
                    yield event
                    
        except Exception as e:
            # Handle trigger-level exceptions
            yield TriggerEvent({
                'status': 'error',
                'message': f'Trigger exception: {str(e)}',
                'batch_id': self.batch_id,
                'exception_type': type(e).__name__
            })

# Usage with enhanced trigger
def use_safe_trigger(**context):
    """Use the enhanced trigger with better error handling."""
    import time
    
    safe_trigger = SafeBatchTrigger(
        conn_id='openai_default',
        batch_id=context['params']['batch_id'],
        poll_interval=60,
        end_time=time.time() + 7200
    )
    
    context['task_instance'].defer(
        trigger=safe_trigger,
        method_name='handle_safe_completion'
    )

def handle_safe_completion(**context):
    """Handle completion with enhanced error information."""
    event = context['event']
    
    if event['status'] == 'error':
        error_msg = event['message']
        debug_info = event.get('debug_info', {})
        
        print(f"Enhanced error information: {error_msg}")
        if debug_info:
            print(f"Debug details: {debug_info}")
        
        # Decide whether to retry or fail based on error details
        if 'timeout' in error_msg.lower():
            raise OpenAIBatchTimeout(error_msg)
        else:
            raise OpenAIBatchJobException(error_msg)
    
    return event['batch_id']

Exception Monitoring and Alerting

from airflow.operators.email_operator import EmailOperator
from airflow.sensors.base import BaseSensorOperator

def create_exception_monitoring_dag():
    """Create a DAG with comprehensive exception monitoring."""
    
    dag = DAG(
        'openai_with_monitoring',
        start_date=datetime(2024, 1, 1),
        schedule_interval=None,
        catchup=False
    )
    
    def monitored_batch_processing(**context):
        """Batch processing with comprehensive monitoring."""
        from airflow.providers.openai.hooks.openai import OpenAIHook
        from airflow.providers.openai.exceptions import OpenAIBatchJobException, OpenAIBatchTimeout
        
        hook = OpenAIHook(conn_id='openai_default')
        
        try:
            batch = hook.create_batch(
                file_id=context['params']['file_id'],
                endpoint='/v1/chat/completions'
            )
            
            # Store batch ID for monitoring
            context['task_instance'].xcom_push(key='batch_id', value=batch.id)
            context['task_instance'].xcom_push(key='batch_status', value='started')
            
            hook.wait_for_batch(batch.id, timeout=1800)
            
            context['task_instance'].xcom_push(key='batch_status', value='completed')
            return batch.id
            
        except OpenAIBatchTimeout as e:
            context['task_instance'].xcom_push(key='batch_status', value='timeout')
            context['task_instance'].xcom_push(key='error_message', value=str(e))
            raise
            
        except OpenAIBatchJobException as e:
            context['task_instance'].xcom_push(key='batch_status', value='failed')
            context['task_instance'].xcom_push(key='error_message', value=str(e))
            raise
    
    # Main processing task
    batch_task = PythonOperator(
        task_id='monitored_batch_processing',
        python_callable=monitored_batch_processing,
        params={'file_id': 'file-monitoring-test'},
        dag=dag
    )
    
    # Alert on timeout
    timeout_alert = EmailOperator(
        task_id='timeout_alert',
        to=['data-team@company.com'],
        subject='OpenAI Batch Timeout Alert',
        html_content="""
        <h3>OpenAI Batch Processing Timeout</h3>
        <p>Batch ID: {{ task_instance.xcom_pull(task_ids='monitored_batch_processing', key='batch_id') }}</p>
        <p>Error: {{ task_instance.xcom_pull(task_ids='monitored_batch_processing', key='error_message') }}</p>
        <p>DAG: {{ dag.dag_id }}</p>
        <p>Execution Date: {{ ds }}</p>
        """,
        trigger_rule='one_failed',
        dag=dag
    )
    
    # Alert on failure
    failure_alert = EmailOperator(
        task_id='failure_alert',
        to=['data-team@company.com'],
        subject='OpenAI Batch Processing Failure',
        html_content="""
        <h3>OpenAI Batch Processing Failed</h3>
        <p>Batch ID: {{ task_instance.xcom_pull(task_ids='monitored_batch_processing', key='batch_id') }}</p>
        <p>Status: {{ task_instance.xcom_pull(task_ids='monitored_batch_processing', key='batch_status') }}</p>
        <p>Error: {{ task_instance.xcom_pull(task_ids='monitored_batch_processing', key='error_message') }}</p>
        <p>DAG: {{ dag.dag_id }}</p>
        <p>Execution Date: {{ ds }}</p>
        """,
        trigger_rule='one_failed',
        dag=dag
    )
    
    batch_task >> [timeout_alert, failure_alert]
    
    return dag

# Create the monitoring DAG
monitoring_dag = create_exception_monitoring_dag()

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-openai

docs

exceptions.md

hooks.md

index.md

operators.md

triggers.md

version_compat.md

tile.json