0
# Operators
1
2
Airflow operators provide task-level abstractions for OpenAI operations, integrating seamlessly with DAG workflows and providing proper task lifecycle management, templating, and error handling.
3
4
## Capabilities
5
6
### Embedding Generation Operator
7
8
Generate OpenAI embeddings as part of an Airflow DAG task, with support for text templating and various input formats.
9
10
```python { .api }
11
class OpenAIEmbeddingOperator(BaseOperator):
12
"""
13
Operator that accepts input text to generate OpenAI embeddings using the specified model.
14
15
Args:
16
conn_id (str): The OpenAI connection ID to use
17
input_text (str | list[str] | list[int] | list[list[int]]): The text to generate embeddings for
18
model (str): The OpenAI model to use for generating embeddings, defaults to "text-embedding-ada-002"
19
embedding_kwargs (dict, optional): Additional keyword arguments for the create_embeddings method
20
**kwargs: Additional BaseOperator arguments
21
"""
22
23
template_fields: Sequence[str] = ("input_text",)
24
25
def __init__(
26
self,
27
conn_id: str,
28
input_text: str | list[str] | list[int] | list[list[int]],
29
model: str = "text-embedding-ada-002",
30
embedding_kwargs: dict | None = None,
31
**kwargs: Any,
32
): ...
33
34
@cached_property
35
def hook(self) -> OpenAIHook:
36
"""Return an instance of the OpenAIHook."""
37
38
def execute(self, context: Context) -> list[float]:
39
"""
40
Execute the embedding generation task.
41
42
Args:
43
context: Airflow task context
44
45
Returns:
46
List of embedding values (floats)
47
48
Raises:
49
ValueError: If input_text is empty or invalid format
50
"""
51
```
52
53
### Batch Processing Operator
54
55
Trigger OpenAI Batch API operations with support for both synchronous and asynchronous (deferrable) execution modes.
56
57
```python { .api }
58
class OpenAITriggerBatchOperator(BaseOperator):
59
"""
60
Operator that triggers an OpenAI Batch API endpoint and waits for the batch to complete.
61
62
Args:
63
file_id (str): The ID of the batch file to trigger
64
endpoint (Literal): The OpenAI Batch API endpoint ("/v1/chat/completions", "/v1/embeddings", "/v1/completions")
65
conn_id (str): The OpenAI connection ID, defaults to 'openai_default'
66
deferrable (bool): Run operator in deferrable mode, defaults to system configuration setting
67
wait_seconds (float): Number of seconds between checks when not deferrable, defaults to 3
68
timeout (float): Time to wait for completion in seconds, defaults to 24 hours
69
wait_for_completion (bool): Whether to wait for batch completion, defaults to True
70
**kwargs: Additional BaseOperator arguments
71
"""
72
73
template_fields: Sequence[str] = ("file_id",)
74
batch_id: str | None = None # Set during execution with the created batch ID
75
76
def __init__(
77
self,
78
file_id: str,
79
endpoint: Literal["/v1/chat/completions", "/v1/embeddings", "/v1/completions"],
80
conn_id: str = OpenAIHook.default_conn_name,
81
deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
82
wait_seconds: float = 3,
83
timeout: float = 24 * 60 * 60,
84
wait_for_completion: bool = True,
85
**kwargs: Any,
86
): ...
87
88
@cached_property
89
def hook(self) -> OpenAIHook:
90
"""Return an instance of the OpenAIHook."""
91
92
def execute(self, context: Context) -> str | None:
93
"""
94
Execute the batch operation.
95
96
Args:
97
context: Airflow task context
98
99
Returns:
100
Batch ID if successful, None if not waiting for completion
101
"""
102
103
def execute_complete(self, context: Context, event: Any = None) -> str:
104
"""
105
Callback for deferrable execution completion.
106
107
Args:
108
context: Airflow task context
109
event: Event data from trigger
110
111
Returns:
112
Batch ID
113
114
Raises:
115
OpenAIBatchJobException: If batch processing failed
116
"""
117
118
def on_kill(self) -> None:
119
"""Cancel the batch if task is cancelled."""
120
```
121
122
## Usage Examples
123
124
### Embedding Operator Example
125
126
```python
127
from datetime import datetime
128
from airflow import DAG
129
from airflow.providers.openai.operators.openai import OpenAIEmbeddingOperator
130
131
dag = DAG(
132
'embedding_example',
133
start_date=datetime(2024, 1, 1),
134
schedule_interval=None,
135
catchup=False
136
)
137
138
# Simple text embedding
139
embedding_task = OpenAIEmbeddingOperator(
140
task_id='generate_embeddings',
141
conn_id='openai_default',
142
input_text="This is sample text for embedding generation",
143
model="text-embedding-ada-002",
144
dag=dag
145
)
146
147
# Multiple texts with custom parameters
148
batch_embedding_task = OpenAIEmbeddingOperator(
149
task_id='batch_embeddings',
150
conn_id='openai_default',
151
input_text=[
152
"First document to embed",
153
"Second document to embed",
154
"Third document to embed"
155
],
156
model="text-embedding-3-large",
157
embedding_kwargs={
158
"dimensions": 1024,
159
"encoding_format": "float"
160
},
161
dag=dag
162
)
163
```
164
165
### Templated Input Example
166
167
```python
168
# Using Airflow templating for dynamic input
169
templated_embedding_task = OpenAIEmbeddingOperator(
170
task_id='templated_embeddings',
171
conn_id='openai_default',
172
input_text="{{ dag_run.conf.get('text_content', 'Default text') }}",
173
model="text-embedding-ada-002",
174
dag=dag
175
)
176
```
177
178
### Batch Processing Operator Example
179
180
```python
181
from airflow.providers.openai.operators.openai import OpenAITriggerBatchOperator
182
183
# Synchronous batch processing
184
sync_batch_task = OpenAITriggerBatchOperator(
185
task_id='process_batch_sync',
186
file_id="{{ task_instance.xcom_pull(task_ids='upload_batch_file') }}",
187
endpoint="/v1/chat/completions",
188
conn_id='openai_default',
189
deferrable=False,
190
wait_seconds=5,
191
timeout=7200, # 2 hours
192
dag=dag
193
)
194
195
# Asynchronous (deferrable) batch processing
196
async_batch_task = OpenAITriggerBatchOperator(
197
task_id='process_batch_async',
198
file_id="file-abc123",
199
endpoint="/v1/embeddings",
200
conn_id='openai_default',
201
deferrable=True,
202
timeout=86400, # 24 hours
203
dag=dag
204
)
205
206
# Trigger batch without waiting
207
fire_and_forget_batch = OpenAITriggerBatchOperator(
208
task_id='trigger_batch_only',
209
file_id="file-def456",
210
endpoint="/v1/completions",
211
conn_id='openai_default',
212
wait_for_completion=False,
213
dag=dag
214
)
215
```
216
217
### Complete DAG Example
218
219
```python
220
from datetime import datetime, timedelta
221
from airflow import DAG
222
from airflow.operators.python_operator import PythonOperator
223
from airflow.providers.openai.hooks.openai import OpenAIHook
224
from airflow.providers.openai.operators.openai import (
225
OpenAIEmbeddingOperator,
226
OpenAITriggerBatchOperator
227
)
228
229
default_args = {
230
'owner': 'data-team',
231
'depends_on_past': False,
232
'start_date': datetime(2024, 1, 1),
233
'email_on_failure': False,
234
'email_on_retry': False,
235
'retries': 1,
236
'retry_delay': timedelta(minutes=5)
237
}
238
239
dag = DAG(
240
'openai_processing_pipeline',
241
default_args=default_args,
242
description='Process data using OpenAI services',
243
schedule_interval=timedelta(days=1),
244
catchup=False
245
)
246
247
def upload_batch_file(**context):
248
"""Upload a batch processing file."""
249
hook = OpenAIHook(conn_id='openai_default')
250
251
# Create batch file content
252
batch_requests = []
253
for i in range(10):
254
request = {
255
"custom_id": f"request-{i}",
256
"method": "POST",
257
"url": "/v1/chat/completions",
258
"body": {
259
"model": "gpt-3.5-turbo",
260
"messages": [{"role": "user", "content": f"Process item {i}"}],
261
"max_tokens": 100
262
}
263
}
264
batch_requests.append(request)
265
266
# Write to temporary file
267
import tempfile
268
import json
269
with tempfile.NamedTemporaryFile(mode='w', suffix='.jsonl', delete=False) as f:
270
for request in batch_requests:
271
f.write(json.dumps(request) + '\n')
272
temp_file = f.name
273
274
# Upload file
275
file_obj = hook.upload_file(temp_file, purpose="batch")
276
return file_obj.id
277
278
# Task to upload batch file
279
upload_task = PythonOperator(
280
task_id='upload_batch_file',
281
python_callable=upload_batch_file,
282
dag=dag
283
)
284
285
# Generate embeddings for input data
286
embedding_task = OpenAIEmbeddingOperator(
287
task_id='generate_embeddings',
288
conn_id='openai_default',
289
input_text="{{ dag_run.conf.get('input_texts', ['Default text']) }}",
290
model="text-embedding-ada-002",
291
dag=dag
292
)
293
294
# Process batch requests
295
batch_task = OpenAITriggerBatchOperator(
296
task_id='process_chat_batch',
297
file_id="{{ task_instance.xcom_pull(task_ids='upload_batch_file') }}",
298
endpoint="/v1/chat/completions",
299
conn_id='openai_default',
300
deferrable=True,
301
dag=dag
302
)
303
304
# Set task dependencies
305
upload_task >> batch_task
306
embedding_task
307
```
308
309
### Error Handling Example
310
311
```python
312
from airflow.providers.openai.exceptions import OpenAIBatchJobException, OpenAIBatchTimeout
313
314
def handle_batch_with_retry(**context):
315
"""Handle batch processing with custom retry logic."""
316
from airflow.providers.openai.operators.openai import OpenAITriggerBatchOperator
317
318
try:
319
operator = OpenAITriggerBatchOperator(
320
task_id='batch_with_handling',
321
file_id=context['params']['file_id'],
322
endpoint="/v1/chat/completions",
323
conn_id='openai_default',
324
timeout=1800 # 30 minutes
325
)
326
result = operator.execute(context)
327
return result
328
329
except OpenAIBatchTimeout as e:
330
print(f"Batch timed out: {e}")
331
# Implement custom timeout handling
332
raise
333
334
except OpenAIBatchJobException as e:
335
print(f"Batch job failed: {e}")
336
# Implement custom failure handling
337
raise
338
339
error_handling_task = PythonOperator(
340
task_id='batch_with_error_handling',
341
python_callable=handle_batch_with_retry,
342
params={'file_id': 'file-123'},
343
dag=dag
344
)
345
```