or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

exceptions.mdhooks.mdindex.mdoperators.mdtriggers.mdversion_compat.md
tile.json

tessl/pypi-apache-airflow-providers-openai

Provider package that enables OpenAI integration for Apache Airflow, including hooks, operators, and triggers for AI-powered data pipelines.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/apache-airflow-providers-openai@1.6.x

To install, run

npx @tessl/cli install tessl/pypi-apache-airflow-providers-openai@1.6.0

index.mddocs/

Apache Airflow Providers OpenAI

Apache Airflow provider package that enables OpenAI integration for data pipelines and workflows. This package provides comprehensive hooks for connecting to OpenAI services, operators for executing OpenAI operations, triggers for monitoring batch jobs, and utilities for AI-powered workflow orchestration.

Package Information

  • Package Name: apache-airflow-providers-openai
  • Package Type: pypi
  • Language: Python
  • Installation: pip install apache-airflow-providers-openai
  • Minimum Airflow Version: 2.10.0
  • Dependencies: openai[datalib]>=1.66.0

Core Imports

from airflow.providers.openai.hooks.openai import OpenAIHook, BatchStatus
from airflow.providers.openai.operators.openai import OpenAIEmbeddingOperator, OpenAITriggerBatchOperator
from airflow.providers.openai.triggers.openai import OpenAIBatchTrigger
from airflow.providers.openai.exceptions import OpenAIBatchJobException, OpenAIBatchTimeout
from airflow.providers.openai.version_compat import BaseHook, BaseOperator, AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_1_PLUS

Basic Usage

from datetime import datetime
from airflow import DAG
from airflow.providers.openai.hooks.openai import OpenAIHook
from airflow.providers.openai.operators.openai import OpenAIEmbeddingOperator

# Create a simple DAG
dag = DAG(
    'openai_example',
    start_date=datetime(2024, 1, 1),
    schedule_interval=None,
    catchup=False
)

# Generate embeddings using operator
embedding_task = OpenAIEmbeddingOperator(
    task_id='generate_embeddings',
    conn_id='openai_default',
    input_text="Hello, this is a test for OpenAI embeddings",
    model="text-embedding-ada-002",
    dag=dag
)

# Use hook directly in a task
def chat_completion_task():
    hook = OpenAIHook(conn_id='openai_default')
    messages = [
        {"role": "user", "content": "What is Apache Airflow?"}
    ]
    response = hook.create_chat_completion(messages=messages, model="gpt-3.5-turbo")
    return response

from airflow.operators.python_operator import PythonOperator
chat_task = PythonOperator(
    task_id='chat_completion',
    python_callable=chat_completion_task,
    dag=dag
)

Architecture

The provider follows Airflow's standard architecture patterns:

  • Hooks: Low-level interfaces to OpenAI API services, handling authentication and connection management
  • Operators: Task-level abstractions that use hooks to perform specific OpenAI operations in DAGs
  • Triggers: Asynchronous monitoring components for long-running operations like batch processing
  • Exceptions: Specialized exception classes for OpenAI-specific error handling

Connection Configuration

Configure OpenAI connections in Airflow:

# Connection ID: openai_default
# Connection Type: OpenAI
# Password: your-openai-api-key
# Host: https://api.openai.com (optional, uses default if not specified)

Capabilities

OpenAI Hook Interface

Comprehensive hook providing direct access to OpenAI API functionality including chat completions, assistants, embeddings, file operations, vector stores, and batch processing.

class OpenAIHook(BaseHook):
    def __init__(self, conn_id: str = "openai_default", *args, **kwargs): ...
    def get_conn(self) -> OpenAI: ...
    def test_connection(self) -> tuple[bool, str]: ...
    def create_chat_completion(self, messages: list, model: str = "gpt-3.5-turbo", **kwargs) -> list: ...
    def create_embeddings(self, text: str | list, model: str = "text-embedding-ada-002", **kwargs) -> list[float]: ...

OpenAI Hook

Operators for Task Execution

Ready-to-use operators for common OpenAI operations in Airflow DAGs, including embedding generation and batch processing with full integration into Airflow's task lifecycle.

class OpenAIEmbeddingOperator(BaseOperator):
    def __init__(self, conn_id: str, input_text: str | list, model: str = "text-embedding-ada-002", **kwargs): ...
    def execute(self, context) -> list[float]: ...

class OpenAITriggerBatchOperator(BaseOperator):
    def __init__(self, file_id: str, endpoint: str, conn_id: str = "openai_default", **kwargs): ...
    def execute(self, context) -> str | None: ...

Operators

Triggers for Asynchronous Operations

Trigger for monitoring long-running OpenAI batch operations with proper async handling and timeout management.

class OpenAIBatchTrigger(BaseTrigger):
    def __init__(self, conn_id: str, batch_id: str, poll_interval: float, end_time: float): ...
    def serialize(self) -> tuple[str, dict]: ...
    async def run(self) -> AsyncIterator: ...

Triggers

Exception Handling

Specialized exceptions for OpenAI-specific error conditions and batch processing failures.

class OpenAIBatchJobException(AirflowException): ...
class OpenAIBatchTimeout(AirflowException): ...

Exceptions

Version Compatibility

Utilities and constants for handling different Apache Airflow versions, providing compatible base classes and version detection capabilities.

def get_base_airflow_version_tuple() -> tuple[int, int, int]: ...

AIRFLOW_V_3_0_PLUS: bool
AIRFLOW_V_3_1_PLUS: bool

# Compatible base classes
BaseHook: type
BaseOperator: type

Version Compatibility

Types

from enum import Enum

class BatchStatus(str, Enum):
    VALIDATING = "validating"
    FAILED = "failed"
    IN_PROGRESS = "in_progress"
    FINALIZING = "finalizing"
    COMPLETED = "completed"
    EXPIRED = "expired"
    CANCELLING = "cancelling"
    CANCELLED = "cancelled"
    
    @classmethod
    def is_in_progress(cls, status: str) -> bool: ...