Provider package that enables OpenAI integration for Apache Airflow, including hooks, operators, and triggers for AI-powered data pipelines.
npx @tessl/cli install tessl/pypi-apache-airflow-providers-openai@1.6.00
# Apache Airflow Providers OpenAI
1
2
Apache Airflow provider package that enables OpenAI integration for data pipelines and workflows. This package provides comprehensive hooks for connecting to OpenAI services, operators for executing OpenAI operations, triggers for monitoring batch jobs, and utilities for AI-powered workflow orchestration.
3
4
## Package Information
5
6
- **Package Name**: apache-airflow-providers-openai
7
- **Package Type**: pypi
8
- **Language**: Python
9
- **Installation**: `pip install apache-airflow-providers-openai`
10
- **Minimum Airflow Version**: 2.10.0
11
- **Dependencies**: `openai[datalib]>=1.66.0`
12
13
## Core Imports
14
15
```python
16
from airflow.providers.openai.hooks.openai import OpenAIHook, BatchStatus
17
from airflow.providers.openai.operators.openai import OpenAIEmbeddingOperator, OpenAITriggerBatchOperator
18
from airflow.providers.openai.triggers.openai import OpenAIBatchTrigger
19
from airflow.providers.openai.exceptions import OpenAIBatchJobException, OpenAIBatchTimeout
20
from airflow.providers.openai.version_compat import BaseHook, BaseOperator, AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_1_PLUS
21
```
22
23
## Basic Usage
24
25
```python
26
from datetime import datetime
27
from airflow import DAG
28
from airflow.providers.openai.hooks.openai import OpenAIHook
29
from airflow.providers.openai.operators.openai import OpenAIEmbeddingOperator
30
31
# Create a simple DAG
32
dag = DAG(
33
'openai_example',
34
start_date=datetime(2024, 1, 1),
35
schedule_interval=None,
36
catchup=False
37
)
38
39
# Generate embeddings using operator
40
embedding_task = OpenAIEmbeddingOperator(
41
task_id='generate_embeddings',
42
conn_id='openai_default',
43
input_text="Hello, this is a test for OpenAI embeddings",
44
model="text-embedding-ada-002",
45
dag=dag
46
)
47
48
# Use hook directly in a task
49
def chat_completion_task():
50
hook = OpenAIHook(conn_id='openai_default')
51
messages = [
52
{"role": "user", "content": "What is Apache Airflow?"}
53
]
54
response = hook.create_chat_completion(messages=messages, model="gpt-3.5-turbo")
55
return response
56
57
from airflow.operators.python_operator import PythonOperator
58
chat_task = PythonOperator(
59
task_id='chat_completion',
60
python_callable=chat_completion_task,
61
dag=dag
62
)
63
```
64
65
## Architecture
66
67
The provider follows Airflow's standard architecture patterns:
68
69
- **Hooks**: Low-level interfaces to OpenAI API services, handling authentication and connection management
70
- **Operators**: Task-level abstractions that use hooks to perform specific OpenAI operations in DAGs
71
- **Triggers**: Asynchronous monitoring components for long-running operations like batch processing
72
- **Exceptions**: Specialized exception classes for OpenAI-specific error handling
73
74
## Connection Configuration
75
76
Configure OpenAI connections in Airflow:
77
78
```python
79
# Connection ID: openai_default
80
# Connection Type: OpenAI
81
# Password: your-openai-api-key
82
# Host: https://api.openai.com (optional, uses default if not specified)
83
```
84
85
## Capabilities
86
87
### OpenAI Hook Interface
88
89
Comprehensive hook providing direct access to OpenAI API functionality including chat completions, assistants, embeddings, file operations, vector stores, and batch processing.
90
91
```python { .api }
92
class OpenAIHook(BaseHook):
93
def __init__(self, conn_id: str = "openai_default", *args, **kwargs): ...
94
def get_conn(self) -> OpenAI: ...
95
def test_connection(self) -> tuple[bool, str]: ...
96
def create_chat_completion(self, messages: list, model: str = "gpt-3.5-turbo", **kwargs) -> list: ...
97
def create_embeddings(self, text: str | list, model: str = "text-embedding-ada-002", **kwargs) -> list[float]: ...
98
```
99
100
[OpenAI Hook](./hooks.md)
101
102
### Operators for Task Execution
103
104
Ready-to-use operators for common OpenAI operations in Airflow DAGs, including embedding generation and batch processing with full integration into Airflow's task lifecycle.
105
106
```python { .api }
107
class OpenAIEmbeddingOperator(BaseOperator):
108
def __init__(self, conn_id: str, input_text: str | list, model: str = "text-embedding-ada-002", **kwargs): ...
109
def execute(self, context) -> list[float]: ...
110
111
class OpenAITriggerBatchOperator(BaseOperator):
112
def __init__(self, file_id: str, endpoint: str, conn_id: str = "openai_default", **kwargs): ...
113
def execute(self, context) -> str | None: ...
114
```
115
116
[Operators](./operators.md)
117
118
### Triggers for Asynchronous Operations
119
120
Trigger for monitoring long-running OpenAI batch operations with proper async handling and timeout management.
121
122
```python { .api }
123
class OpenAIBatchTrigger(BaseTrigger):
124
def __init__(self, conn_id: str, batch_id: str, poll_interval: float, end_time: float): ...
125
def serialize(self) -> tuple[str, dict]: ...
126
async def run(self) -> AsyncIterator: ...
127
```
128
129
[Triggers](./triggers.md)
130
131
### Exception Handling
132
133
Specialized exceptions for OpenAI-specific error conditions and batch processing failures.
134
135
```python { .api }
136
class OpenAIBatchJobException(AirflowException): ...
137
class OpenAIBatchTimeout(AirflowException): ...
138
```
139
140
[Exceptions](./exceptions.md)
141
142
### Version Compatibility
143
144
Utilities and constants for handling different Apache Airflow versions, providing compatible base classes and version detection capabilities.
145
146
```python { .api }
147
def get_base_airflow_version_tuple() -> tuple[int, int, int]: ...
148
149
AIRFLOW_V_3_0_PLUS: bool
150
AIRFLOW_V_3_1_PLUS: bool
151
152
# Compatible base classes
153
BaseHook: type
154
BaseOperator: type
155
```
156
157
[Version Compatibility](./version_compat.md)
158
159
## Types
160
161
```python { .api }
162
from enum import Enum
163
164
class BatchStatus(str, Enum):
165
VALIDATING = "validating"
166
FAILED = "failed"
167
IN_PROGRESS = "in_progress"
168
FINALIZING = "finalizing"
169
COMPLETED = "completed"
170
EXPIRED = "expired"
171
CANCELLING = "cancelling"
172
CANCELLED = "cancelled"
173
174
@classmethod
175
def is_in_progress(cls, status: str) -> bool: ...
176
```