Provider package that enables OpenAI integration for Apache Airflow, including hooks, operators, and triggers for AI-powered data pipelines.
npx @tessl/cli install tessl/pypi-apache-airflow-providers-openai@1.6.0Apache Airflow provider package that enables OpenAI integration for data pipelines and workflows. This package provides comprehensive hooks for connecting to OpenAI services, operators for executing OpenAI operations, triggers for monitoring batch jobs, and utilities for AI-powered workflow orchestration.
pip install apache-airflow-providers-openaiopenai[datalib]>=1.66.0from airflow.providers.openai.hooks.openai import OpenAIHook, BatchStatus
from airflow.providers.openai.operators.openai import OpenAIEmbeddingOperator, OpenAITriggerBatchOperator
from airflow.providers.openai.triggers.openai import OpenAIBatchTrigger
from airflow.providers.openai.exceptions import OpenAIBatchJobException, OpenAIBatchTimeout
from airflow.providers.openai.version_compat import BaseHook, BaseOperator, AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_1_PLUSfrom datetime import datetime
from airflow import DAG
from airflow.providers.openai.hooks.openai import OpenAIHook
from airflow.providers.openai.operators.openai import OpenAIEmbeddingOperator
# Create a simple DAG
dag = DAG(
'openai_example',
start_date=datetime(2024, 1, 1),
schedule_interval=None,
catchup=False
)
# Generate embeddings using operator
embedding_task = OpenAIEmbeddingOperator(
task_id='generate_embeddings',
conn_id='openai_default',
input_text="Hello, this is a test for OpenAI embeddings",
model="text-embedding-ada-002",
dag=dag
)
# Use hook directly in a task
def chat_completion_task():
hook = OpenAIHook(conn_id='openai_default')
messages = [
{"role": "user", "content": "What is Apache Airflow?"}
]
response = hook.create_chat_completion(messages=messages, model="gpt-3.5-turbo")
return response
from airflow.operators.python_operator import PythonOperator
chat_task = PythonOperator(
task_id='chat_completion',
python_callable=chat_completion_task,
dag=dag
)The provider follows Airflow's standard architecture patterns:
Configure OpenAI connections in Airflow:
# Connection ID: openai_default
# Connection Type: OpenAI
# Password: your-openai-api-key
# Host: https://api.openai.com (optional, uses default if not specified)Comprehensive hook providing direct access to OpenAI API functionality including chat completions, assistants, embeddings, file operations, vector stores, and batch processing.
class OpenAIHook(BaseHook):
def __init__(self, conn_id: str = "openai_default", *args, **kwargs): ...
def get_conn(self) -> OpenAI: ...
def test_connection(self) -> tuple[bool, str]: ...
def create_chat_completion(self, messages: list, model: str = "gpt-3.5-turbo", **kwargs) -> list: ...
def create_embeddings(self, text: str | list, model: str = "text-embedding-ada-002", **kwargs) -> list[float]: ...Ready-to-use operators for common OpenAI operations in Airflow DAGs, including embedding generation and batch processing with full integration into Airflow's task lifecycle.
class OpenAIEmbeddingOperator(BaseOperator):
def __init__(self, conn_id: str, input_text: str | list, model: str = "text-embedding-ada-002", **kwargs): ...
def execute(self, context) -> list[float]: ...
class OpenAITriggerBatchOperator(BaseOperator):
def __init__(self, file_id: str, endpoint: str, conn_id: str = "openai_default", **kwargs): ...
def execute(self, context) -> str | None: ...Trigger for monitoring long-running OpenAI batch operations with proper async handling and timeout management.
class OpenAIBatchTrigger(BaseTrigger):
def __init__(self, conn_id: str, batch_id: str, poll_interval: float, end_time: float): ...
def serialize(self) -> tuple[str, dict]: ...
async def run(self) -> AsyncIterator: ...Specialized exceptions for OpenAI-specific error conditions and batch processing failures.
class OpenAIBatchJobException(AirflowException): ...
class OpenAIBatchTimeout(AirflowException): ...Utilities and constants for handling different Apache Airflow versions, providing compatible base classes and version detection capabilities.
def get_base_airflow_version_tuple() -> tuple[int, int, int]: ...
AIRFLOW_V_3_0_PLUS: bool
AIRFLOW_V_3_1_PLUS: bool
# Compatible base classes
BaseHook: type
BaseOperator: typefrom enum import Enum
class BatchStatus(str, Enum):
VALIDATING = "validating"
FAILED = "failed"
IN_PROGRESS = "in_progress"
FINALIZING = "finalizing"
COMPLETED = "completed"
EXPIRED = "expired"
CANCELLING = "cancelling"
CANCELLED = "cancelled"
@classmethod
def is_in_progress(cls, status: str) -> bool: ...