0
# OpenAI Hook
1
2
The OpenAIHook provides a comprehensive interface to OpenAI's API services, handling authentication, connection management, and all OpenAI operations. It serves as the foundational component for all OpenAI interactions within Airflow workflows.
3
4
## Capabilities
5
6
### Connection Management
7
8
Handles OpenAI API authentication and connection setup using Airflow's connection management system.
9
10
```python { .api }
11
class OpenAIHook(BaseHook):
12
"""
13
Use OpenAI SDK to interact with OpenAI APIs.
14
15
Args:
16
conn_id (str): OpenAI connection id, defaults to 'openai_default'
17
"""
18
19
conn_name_attr = "conn_id"
20
default_conn_name = "openai_default"
21
conn_type = "openai"
22
hook_name = "OpenAI"
23
24
def __init__(self, conn_id: str = default_conn_name, *args, **kwargs) -> None: ...
25
26
def get_conn(self) -> OpenAI:
27
"""Return an OpenAI connection object."""
28
29
@cached_property
30
def conn(self) -> OpenAI:
31
"""Return a cached OpenAI connection object."""
32
33
def test_connection(self) -> tuple[bool, str]:
34
"""Test the OpenAI connection."""
35
36
@classmethod
37
def get_ui_field_behaviour(cls) -> dict[str, Any]:
38
"""Return custom field behaviour for connection UI."""
39
```
40
41
### Chat Completions
42
43
Generate conversational responses and text completions using OpenAI's chat models.
44
45
```python { .api }
46
def create_chat_completion(
47
self,
48
messages: list[ChatCompletionSystemMessageParam | ChatCompletionUserMessageParam | ChatCompletionAssistantMessageParam | ChatCompletionToolMessageParam | ChatCompletionFunctionMessageParam],
49
model: str = "gpt-3.5-turbo",
50
**kwargs: Any,
51
) -> list[ChatCompletionMessage]:
52
"""
53
Create a model response for the given chat conversation.
54
55
Args:
56
messages: A list of messages comprising the conversation so far
57
model: ID of the model to use
58
**kwargs: Additional parameters for the completion request
59
60
Returns:
61
List of chat completion choices
62
"""
63
```
64
65
### Assistant Management
66
67
Create and manage OpenAI assistants for more complex conversational workflows.
68
69
```python { .api }
70
def create_assistant(self, model: str = "gpt-3.5-turbo", **kwargs: Any) -> Assistant:
71
"""
72
Create an OpenAI assistant using the given model.
73
74
Args:
75
model: The OpenAI model for the assistant to use
76
**kwargs: Additional assistant configuration parameters
77
78
Returns:
79
Assistant object
80
"""
81
82
def get_assistant(self, assistant_id: str) -> Assistant:
83
"""
84
Get an OpenAI assistant.
85
86
Args:
87
assistant_id: The ID of the assistant to retrieve
88
89
Returns:
90
Assistant object
91
"""
92
93
def get_assistants(self, **kwargs: Any) -> list[Assistant]:
94
"""
95
Get a list of Assistant objects.
96
97
Args:
98
**kwargs: Query parameters for filtering assistants
99
100
Returns:
101
List of Assistant objects
102
"""
103
104
def modify_assistant(self, assistant_id: str, **kwargs: Any) -> Assistant:
105
"""
106
Modify an existing Assistant object.
107
108
Args:
109
assistant_id: The ID of the assistant to be modified
110
**kwargs: Parameters to update
111
112
Returns:
113
Updated Assistant object
114
"""
115
116
def delete_assistant(self, assistant_id: str) -> AssistantDeleted:
117
"""
118
Delete an OpenAI Assistant for a given ID.
119
120
Args:
121
assistant_id: The ID of the assistant to delete
122
123
Returns:
124
AssistantDeleted confirmation object
125
"""
126
```
127
128
### Thread Management
129
130
Manage conversation threads for assistant interactions.
131
132
```python { .api }
133
def create_thread(self, **kwargs: Any) -> Thread:
134
"""
135
Create an OpenAI thread.
136
137
Args:
138
**kwargs: Thread configuration parameters
139
140
Returns:
141
Thread object
142
"""
143
144
def modify_thread(self, thread_id: str, metadata: dict[str, Any]) -> Thread:
145
"""
146
Modify an existing Thread object.
147
148
Args:
149
thread_id: The ID of the thread to modify
150
metadata: Set of 16 key-value pairs that can be attached to an object
151
152
Returns:
153
Updated Thread object
154
"""
155
156
def delete_thread(self, thread_id: str) -> ThreadDeleted:
157
"""
158
Delete an OpenAI thread for a given thread_id.
159
160
Args:
161
thread_id: The ID of the thread to delete
162
163
Returns:
164
ThreadDeleted confirmation object
165
"""
166
```
167
168
### Message Management
169
170
Handle messages within conversation threads.
171
172
```python { .api }
173
def create_message(
174
self,
175
thread_id: str,
176
role: Literal["user", "assistant"],
177
content: str,
178
**kwargs: Any
179
) -> Message:
180
"""
181
Create a message for a given Thread.
182
183
Args:
184
thread_id: The ID of the thread to create a message for
185
role: The role of the entity that is creating the message ('user' or 'assistant')
186
content: The content of the message
187
**kwargs: Additional message parameters
188
189
Returns:
190
Message object
191
"""
192
193
def get_messages(self, thread_id: str, **kwargs: Any) -> list[Message]:
194
"""
195
Return a list of messages for a given Thread.
196
197
Args:
198
thread_id: The ID of the thread the messages belong to
199
**kwargs: Query parameters for filtering messages
200
201
Returns:
202
List of Message objects
203
"""
204
205
def modify_message(self, thread_id: str, message_id, **kwargs: Any) -> Message:
206
"""
207
Modify an existing message for a given Thread.
208
209
Args:
210
thread_id: The ID of the thread to which this message belongs
211
message_id: The ID of the message to modify
212
**kwargs: Parameters to update
213
214
Returns:
215
Updated Message object
216
"""
217
```
218
219
### Run Management
220
221
Execute and monitor assistant runs within threads.
222
223
```python { .api }
224
def create_run(self, thread_id: str, assistant_id: str, **kwargs: Any) -> Run:
225
"""
226
Create a run for a given thread and assistant.
227
228
Args:
229
thread_id: The ID of the thread to run
230
assistant_id: The ID of the assistant to use to execute this run
231
**kwargs: Additional run parameters
232
233
Returns:
234
Run object
235
"""
236
237
def create_run_and_poll(self, thread_id: str, assistant_id: str, **kwargs: Any) -> Run:
238
"""
239
Create a run for a given thread and assistant and then polls until completion.
240
241
Args:
242
thread_id: The ID of the thread to run
243
assistant_id: The ID of the assistant to use to execute this run
244
**kwargs: Additional run parameters
245
246
Returns:
247
Completed Run object
248
"""
249
250
def get_run(self, thread_id: str, run_id: str) -> Run:
251
"""
252
Retrieve a run for a given thread and run.
253
254
Args:
255
thread_id: The ID of the thread that was run
256
run_id: The ID of the run to retrieve
257
258
Returns:
259
Run object
260
"""
261
262
def get_runs(self, thread_id: str, **kwargs: Any) -> list[Run]:
263
"""
264
Return a list of runs belonging to a thread.
265
266
Args:
267
thread_id: The ID of the thread the run belongs to
268
**kwargs: Query parameters for filtering runs
269
270
Returns:
271
List of Run objects
272
"""
273
274
def modify_run(self, thread_id: str, run_id: str, **kwargs: Any) -> Run:
275
"""
276
Modify a run on a given thread.
277
278
Args:
279
thread_id: The ID of the thread that was run
280
run_id: The ID of the run to modify
281
**kwargs: Parameters to update
282
283
Returns:
284
Updated Run object
285
"""
286
```
287
288
### Embeddings
289
290
Generate vector embeddings from text using OpenAI's embedding models.
291
292
```python { .api }
293
def create_embeddings(
294
self,
295
text: str | list[str] | list[int] | list[list[int]],
296
model: str = "text-embedding-ada-002",
297
**kwargs: Any,
298
) -> list[float]:
299
"""
300
Generate embeddings for the given text using the given model.
301
302
Args:
303
text: The text to generate embeddings for (string, list of strings, tokens, or token lists)
304
model: The model to use for generating embeddings
305
**kwargs: Additional embedding parameters
306
307
Returns:
308
List of embedding values (floats)
309
"""
310
```
311
312
### File Operations
313
314
Upload, retrieve, and manage files for use with OpenAI services.
315
316
```python { .api }
317
def upload_file(self, file: str, purpose: Literal["fine-tune", "assistants", "batch"]) -> FileObject:
318
"""
319
Upload a file that can be used across various endpoints.
320
321
Args:
322
file: The file path to be uploaded
323
purpose: The intended purpose of the uploaded file
324
325
Returns:
326
FileObject with upload details
327
"""
328
329
def get_file(self, file_id: str) -> FileObject:
330
"""
331
Return information about a specific file.
332
333
Args:
334
file_id: The ID of the file to use for this request
335
336
Returns:
337
FileObject with file details
338
"""
339
340
def get_files(self) -> list[FileObject]:
341
"""
342
Return a list of files that belong to the user's organization.
343
344
Returns:
345
List of FileObject instances
346
"""
347
348
def delete_file(self, file_id: str) -> FileDeleted:
349
"""
350
Delete a file.
351
352
Args:
353
file_id: The ID of the file to be deleted
354
355
Returns:
356
FileDeleted confirmation object
357
"""
358
```
359
360
### Vector Store Operations
361
362
Manage vector stores for semantic search and retrieval operations.
363
364
```python { .api }
365
def create_vector_store(self, **kwargs: Any) -> VectorStore:
366
"""
367
Create a vector store.
368
369
Args:
370
**kwargs: Vector store configuration parameters
371
372
Returns:
373
VectorStore object
374
"""
375
376
def get_vector_stores(self, **kwargs: Any) -> list[VectorStore]:
377
"""
378
Return a list of vector stores.
379
380
Args:
381
**kwargs: Query parameters for filtering
382
383
Returns:
384
List of VectorStore objects
385
"""
386
387
def get_vector_store(self, vector_store_id: str) -> VectorStore:
388
"""
389
Retrieve a vector store.
390
391
Args:
392
vector_store_id: The ID of the vector store to retrieve
393
394
Returns:
395
VectorStore object
396
"""
397
398
def modify_vector_store(self, vector_store_id: str, **kwargs: Any) -> VectorStore:
399
"""
400
Modify a vector store.
401
402
Args:
403
vector_store_id: The ID of the vector store to modify
404
**kwargs: Parameters to update
405
406
Returns:
407
Updated VectorStore object
408
"""
409
410
def delete_vector_store(self, vector_store_id: str) -> VectorStoreDeleted:
411
"""
412
Delete a vector store.
413
414
Args:
415
vector_store_id: The ID of the vector store to delete
416
417
Returns:
418
VectorStoreDeleted confirmation object
419
"""
420
421
def upload_files_to_vector_store(
422
self, vector_store_id: str, files: list[BinaryIO]
423
) -> VectorStoreFileBatch:
424
"""
425
Upload files to a vector store and poll until completion.
426
427
Args:
428
vector_store_id: The ID of the vector store the files are to be uploaded to
429
files: A list of binary files to upload
430
431
Returns:
432
VectorStoreFileBatch object with batch details
433
"""
434
435
def get_vector_store_files(self, vector_store_id: str) -> list[VectorStoreFile]:
436
"""
437
Return a list of vector store files.
438
439
Args:
440
vector_store_id: The ID of the vector store
441
442
Returns:
443
List of VectorStoreFile objects
444
"""
445
446
def delete_vector_store_file(self, vector_store_id: str, file_id: str) -> VectorStoreFileDeleted:
447
"""
448
Delete a vector store file.
449
450
Args:
451
vector_store_id: The ID of the vector store that the file belongs to
452
file_id: The ID of the file to delete
453
454
Returns:
455
VectorStoreFileDeleted confirmation object
456
"""
457
```
458
459
### Batch Processing
460
461
Handle batch operations for large-scale processing with proper monitoring and timeout handling.
462
463
```python { .api }
464
def create_batch(
465
self,
466
file_id: str,
467
endpoint: Literal["/v1/chat/completions", "/v1/embeddings", "/v1/completions"],
468
metadata: dict[str, str] | None = None,
469
completion_window: Literal["24h"] = "24h",
470
) -> Batch:
471
"""
472
Create a batch for a given model and files.
473
474
Args:
475
file_id: The ID of the file to be used for this batch
476
endpoint: The endpoint to use for this batch
477
metadata: A set of key-value pairs that can be attached to an object
478
completion_window: The time window for the batch to complete
479
480
Returns:
481
Batch object
482
"""
483
484
def get_batch(self, batch_id: str) -> Batch:
485
"""
486
Get the status of a batch.
487
488
Args:
489
batch_id: The ID of the batch to get the status of
490
491
Returns:
492
Batch object with current status
493
"""
494
495
def wait_for_batch(self, batch_id: str, wait_seconds: float = 3, timeout: float = 3600) -> None:
496
"""
497
Poll a batch to check if it finishes.
498
499
Args:
500
batch_id: Id of the Batch to wait for
501
wait_seconds: Number of seconds between checks
502
timeout: How many seconds wait for batch to be ready
503
504
Raises:
505
OpenAIBatchTimeout: If batch doesn't complete within timeout
506
OpenAIBatchJobException: If batch fails or is cancelled
507
"""
508
509
def cancel_batch(self, batch_id: str) -> Batch:
510
"""
511
Cancel a batch.
512
513
Args:
514
batch_id: The ID of the batch to delete
515
516
Returns:
517
Cancelled Batch object
518
"""
519
```
520
521
## Usage Examples
522
523
### Basic Hook Usage
524
525
```python
526
from airflow.providers.openai.hooks.openai import OpenAIHook
527
528
# Initialize hook with connection
529
hook = OpenAIHook(conn_id='openai_default')
530
531
# Test connection
532
success, message = hook.test_connection()
533
if success:
534
print(f"Connection successful: {message}")
535
else:
536
print(f"Connection failed: {message}")
537
```
538
539
### Chat Completion Example
540
541
```python
542
# Create a chat completion
543
messages = [
544
{"role": "system", "content": "You are a helpful assistant."},
545
{"role": "user", "content": "What is Apache Airflow?"}
546
]
547
548
response = hook.create_chat_completion(
549
messages=messages,
550
model="gpt-3.5-turbo",
551
max_tokens=150,
552
temperature=0.7
553
)
554
555
for choice in response:
556
print(choice.message.content)
557
```
558
559
### Embedding Generation Example
560
561
```python
562
# Generate embeddings for text
563
texts = [
564
"Apache Airflow is a platform for workflow orchestration",
565
"OpenAI provides AI models and services",
566
"Data pipelines help process information"
567
]
568
569
embeddings = hook.create_embeddings(
570
text=texts,
571
model="text-embedding-ada-002"
572
)
573
574
print(f"Generated {len(embeddings)} embedding dimensions")
575
```
576
577
### Batch Processing Example
578
579
```python
580
# Upload a batch file
581
file_obj = hook.upload_file(
582
file="/path/to/batch_requests.jsonl",
583
purpose="batch"
584
)
585
586
# Create and monitor batch
587
batch = hook.create_batch(
588
file_id=file_obj.id,
589
endpoint="/v1/chat/completions"
590
)
591
592
# Wait for completion
593
hook.wait_for_batch(batch.id, wait_seconds=10, timeout=7200)
594
595
# Get final batch status
596
final_batch = hook.get_batch(batch.id)
597
print(f"Batch completed with status: {final_batch.status}")
598
```
599
600
## Types
601
602
### BatchStatus Enum
603
604
Enum for representing the status values of OpenAI batch operations.
605
606
```python { .api }
607
from enum import Enum
608
609
class BatchStatus(str, Enum):
610
"""Enum for the status of a batch."""
611
612
VALIDATING = "validating"
613
FAILED = "failed"
614
IN_PROGRESS = "in_progress"
615
FINALIZING = "finalizing"
616
COMPLETED = "completed"
617
EXPIRED = "expired"
618
CANCELLING = "cancelling"
619
CANCELLED = "cancelled"
620
621
def __str__(self) -> str:
622
"""Return string representation of the status."""
623
624
@classmethod
625
def is_in_progress(cls, status: str) -> bool:
626
"""
627
Check if the batch status indicates the batch is still in progress.
628
629
Args:
630
status: The batch status string to check
631
632
Returns:
633
True if status is validating, in_progress, or finalizing
634
"""
635
```
636
637
### Connection Property
638
639
The OpenAIHook provides a cached property for efficient connection reuse.
640
641
```python { .api }
642
@cached_property
643
def conn(self) -> OpenAI:
644
"""
645
Return a cached OpenAI connection object.
646
647
This property provides efficient access to the OpenAI client by caching
648
the connection after first access. Subsequent calls return the same
649
connection instance without re-authentication.
650
651
Returns:
652
OpenAI client instance configured with connection settings
653
"""
654
```