Provider package that enables OpenAI integration for Apache Airflow, including hooks, operators, and triggers for AI-powered data pipelines.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
The OpenAIHook provides a comprehensive interface to OpenAI's API services, handling authentication, connection management, and all OpenAI operations. It serves as the foundational component for all OpenAI interactions within Airflow workflows.
Handles OpenAI API authentication and connection setup using Airflow's connection management system.
class OpenAIHook(BaseHook):
"""
Use OpenAI SDK to interact with OpenAI APIs.
Args:
conn_id (str): OpenAI connection id, defaults to 'openai_default'
"""
conn_name_attr = "conn_id"
default_conn_name = "openai_default"
conn_type = "openai"
hook_name = "OpenAI"
def __init__(self, conn_id: str = default_conn_name, *args, **kwargs) -> None: ...
def get_conn(self) -> OpenAI:
"""Return an OpenAI connection object."""
@cached_property
def conn(self) -> OpenAI:
"""Return a cached OpenAI connection object."""
def test_connection(self) -> tuple[bool, str]:
"""Test the OpenAI connection."""
@classmethod
def get_ui_field_behaviour(cls) -> dict[str, Any]:
"""Return custom field behaviour for connection UI."""Generate conversational responses and text completions using OpenAI's chat models.
def create_chat_completion(
self,
messages: list[ChatCompletionSystemMessageParam | ChatCompletionUserMessageParam | ChatCompletionAssistantMessageParam | ChatCompletionToolMessageParam | ChatCompletionFunctionMessageParam],
model: str = "gpt-3.5-turbo",
**kwargs: Any,
) -> list[ChatCompletionMessage]:
"""
Create a model response for the given chat conversation.
Args:
messages: A list of messages comprising the conversation so far
model: ID of the model to use
**kwargs: Additional parameters for the completion request
Returns:
List of chat completion choices
"""Create and manage OpenAI assistants for more complex conversational workflows.
def create_assistant(self, model: str = "gpt-3.5-turbo", **kwargs: Any) -> Assistant:
"""
Create an OpenAI assistant using the given model.
Args:
model: The OpenAI model for the assistant to use
**kwargs: Additional assistant configuration parameters
Returns:
Assistant object
"""
def get_assistant(self, assistant_id: str) -> Assistant:
"""
Get an OpenAI assistant.
Args:
assistant_id: The ID of the assistant to retrieve
Returns:
Assistant object
"""
def get_assistants(self, **kwargs: Any) -> list[Assistant]:
"""
Get a list of Assistant objects.
Args:
**kwargs: Query parameters for filtering assistants
Returns:
List of Assistant objects
"""
def modify_assistant(self, assistant_id: str, **kwargs: Any) -> Assistant:
"""
Modify an existing Assistant object.
Args:
assistant_id: The ID of the assistant to be modified
**kwargs: Parameters to update
Returns:
Updated Assistant object
"""
def delete_assistant(self, assistant_id: str) -> AssistantDeleted:
"""
Delete an OpenAI Assistant for a given ID.
Args:
assistant_id: The ID of the assistant to delete
Returns:
AssistantDeleted confirmation object
"""Manage conversation threads for assistant interactions.
def create_thread(self, **kwargs: Any) -> Thread:
"""
Create an OpenAI thread.
Args:
**kwargs: Thread configuration parameters
Returns:
Thread object
"""
def modify_thread(self, thread_id: str, metadata: dict[str, Any]) -> Thread:
"""
Modify an existing Thread object.
Args:
thread_id: The ID of the thread to modify
metadata: Set of 16 key-value pairs that can be attached to an object
Returns:
Updated Thread object
"""
def delete_thread(self, thread_id: str) -> ThreadDeleted:
"""
Delete an OpenAI thread for a given thread_id.
Args:
thread_id: The ID of the thread to delete
Returns:
ThreadDeleted confirmation object
"""Handle messages within conversation threads.
def create_message(
self,
thread_id: str,
role: Literal["user", "assistant"],
content: str,
**kwargs: Any
) -> Message:
"""
Create a message for a given Thread.
Args:
thread_id: The ID of the thread to create a message for
role: The role of the entity that is creating the message ('user' or 'assistant')
content: The content of the message
**kwargs: Additional message parameters
Returns:
Message object
"""
def get_messages(self, thread_id: str, **kwargs: Any) -> list[Message]:
"""
Return a list of messages for a given Thread.
Args:
thread_id: The ID of the thread the messages belong to
**kwargs: Query parameters for filtering messages
Returns:
List of Message objects
"""
def modify_message(self, thread_id: str, message_id, **kwargs: Any) -> Message:
"""
Modify an existing message for a given Thread.
Args:
thread_id: The ID of the thread to which this message belongs
message_id: The ID of the message to modify
**kwargs: Parameters to update
Returns:
Updated Message object
"""Execute and monitor assistant runs within threads.
def create_run(self, thread_id: str, assistant_id: str, **kwargs: Any) -> Run:
"""
Create a run for a given thread and assistant.
Args:
thread_id: The ID of the thread to run
assistant_id: The ID of the assistant to use to execute this run
**kwargs: Additional run parameters
Returns:
Run object
"""
def create_run_and_poll(self, thread_id: str, assistant_id: str, **kwargs: Any) -> Run:
"""
Create a run for a given thread and assistant and then polls until completion.
Args:
thread_id: The ID of the thread to run
assistant_id: The ID of the assistant to use to execute this run
**kwargs: Additional run parameters
Returns:
Completed Run object
"""
def get_run(self, thread_id: str, run_id: str) -> Run:
"""
Retrieve a run for a given thread and run.
Args:
thread_id: The ID of the thread that was run
run_id: The ID of the run to retrieve
Returns:
Run object
"""
def get_runs(self, thread_id: str, **kwargs: Any) -> list[Run]:
"""
Return a list of runs belonging to a thread.
Args:
thread_id: The ID of the thread the run belongs to
**kwargs: Query parameters for filtering runs
Returns:
List of Run objects
"""
def modify_run(self, thread_id: str, run_id: str, **kwargs: Any) -> Run:
"""
Modify a run on a given thread.
Args:
thread_id: The ID of the thread that was run
run_id: The ID of the run to modify
**kwargs: Parameters to update
Returns:
Updated Run object
"""Generate vector embeddings from text using OpenAI's embedding models.
def create_embeddings(
self,
text: str | list[str] | list[int] | list[list[int]],
model: str = "text-embedding-ada-002",
**kwargs: Any,
) -> list[float]:
"""
Generate embeddings for the given text using the given model.
Args:
text: The text to generate embeddings for (string, list of strings, tokens, or token lists)
model: The model to use for generating embeddings
**kwargs: Additional embedding parameters
Returns:
List of embedding values (floats)
"""Upload, retrieve, and manage files for use with OpenAI services.
def upload_file(self, file: str, purpose: Literal["fine-tune", "assistants", "batch"]) -> FileObject:
"""
Upload a file that can be used across various endpoints.
Args:
file: The file path to be uploaded
purpose: The intended purpose of the uploaded file
Returns:
FileObject with upload details
"""
def get_file(self, file_id: str) -> FileObject:
"""
Return information about a specific file.
Args:
file_id: The ID of the file to use for this request
Returns:
FileObject with file details
"""
def get_files(self) -> list[FileObject]:
"""
Return a list of files that belong to the user's organization.
Returns:
List of FileObject instances
"""
def delete_file(self, file_id: str) -> FileDeleted:
"""
Delete a file.
Args:
file_id: The ID of the file to be deleted
Returns:
FileDeleted confirmation object
"""Manage vector stores for semantic search and retrieval operations.
def create_vector_store(self, **kwargs: Any) -> VectorStore:
"""
Create a vector store.
Args:
**kwargs: Vector store configuration parameters
Returns:
VectorStore object
"""
def get_vector_stores(self, **kwargs: Any) -> list[VectorStore]:
"""
Return a list of vector stores.
Args:
**kwargs: Query parameters for filtering
Returns:
List of VectorStore objects
"""
def get_vector_store(self, vector_store_id: str) -> VectorStore:
"""
Retrieve a vector store.
Args:
vector_store_id: The ID of the vector store to retrieve
Returns:
VectorStore object
"""
def modify_vector_store(self, vector_store_id: str, **kwargs: Any) -> VectorStore:
"""
Modify a vector store.
Args:
vector_store_id: The ID of the vector store to modify
**kwargs: Parameters to update
Returns:
Updated VectorStore object
"""
def delete_vector_store(self, vector_store_id: str) -> VectorStoreDeleted:
"""
Delete a vector store.
Args:
vector_store_id: The ID of the vector store to delete
Returns:
VectorStoreDeleted confirmation object
"""
def upload_files_to_vector_store(
self, vector_store_id: str, files: list[BinaryIO]
) -> VectorStoreFileBatch:
"""
Upload files to a vector store and poll until completion.
Args:
vector_store_id: The ID of the vector store the files are to be uploaded to
files: A list of binary files to upload
Returns:
VectorStoreFileBatch object with batch details
"""
def get_vector_store_files(self, vector_store_id: str) -> list[VectorStoreFile]:
"""
Return a list of vector store files.
Args:
vector_store_id: The ID of the vector store
Returns:
List of VectorStoreFile objects
"""
def delete_vector_store_file(self, vector_store_id: str, file_id: str) -> VectorStoreFileDeleted:
"""
Delete a vector store file.
Args:
vector_store_id: The ID of the vector store that the file belongs to
file_id: The ID of the file to delete
Returns:
VectorStoreFileDeleted confirmation object
"""Handle batch operations for large-scale processing with proper monitoring and timeout handling.
def create_batch(
self,
file_id: str,
endpoint: Literal["/v1/chat/completions", "/v1/embeddings", "/v1/completions"],
metadata: dict[str, str] | None = None,
completion_window: Literal["24h"] = "24h",
) -> Batch:
"""
Create a batch for a given model and files.
Args:
file_id: The ID of the file to be used for this batch
endpoint: The endpoint to use for this batch
metadata: A set of key-value pairs that can be attached to an object
completion_window: The time window for the batch to complete
Returns:
Batch object
"""
def get_batch(self, batch_id: str) -> Batch:
"""
Get the status of a batch.
Args:
batch_id: The ID of the batch to get the status of
Returns:
Batch object with current status
"""
def wait_for_batch(self, batch_id: str, wait_seconds: float = 3, timeout: float = 3600) -> None:
"""
Poll a batch to check if it finishes.
Args:
batch_id: Id of the Batch to wait for
wait_seconds: Number of seconds between checks
timeout: How many seconds wait for batch to be ready
Raises:
OpenAIBatchTimeout: If batch doesn't complete within timeout
OpenAIBatchJobException: If batch fails or is cancelled
"""
def cancel_batch(self, batch_id: str) -> Batch:
"""
Cancel a batch.
Args:
batch_id: The ID of the batch to delete
Returns:
Cancelled Batch object
"""from airflow.providers.openai.hooks.openai import OpenAIHook
# Initialize hook with connection
hook = OpenAIHook(conn_id='openai_default')
# Test connection
success, message = hook.test_connection()
if success:
print(f"Connection successful: {message}")
else:
print(f"Connection failed: {message}")# Create a chat completion
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "What is Apache Airflow?"}
]
response = hook.create_chat_completion(
messages=messages,
model="gpt-3.5-turbo",
max_tokens=150,
temperature=0.7
)
for choice in response:
print(choice.message.content)# Generate embeddings for text
texts = [
"Apache Airflow is a platform for workflow orchestration",
"OpenAI provides AI models and services",
"Data pipelines help process information"
]
embeddings = hook.create_embeddings(
text=texts,
model="text-embedding-ada-002"
)
print(f"Generated {len(embeddings)} embedding dimensions")# Upload a batch file
file_obj = hook.upload_file(
file="/path/to/batch_requests.jsonl",
purpose="batch"
)
# Create and monitor batch
batch = hook.create_batch(
file_id=file_obj.id,
endpoint="/v1/chat/completions"
)
# Wait for completion
hook.wait_for_batch(batch.id, wait_seconds=10, timeout=7200)
# Get final batch status
final_batch = hook.get_batch(batch.id)
print(f"Batch completed with status: {final_batch.status}")Enum for representing the status values of OpenAI batch operations.
from enum import Enum
class BatchStatus(str, Enum):
"""Enum for the status of a batch."""
VALIDATING = "validating"
FAILED = "failed"
IN_PROGRESS = "in_progress"
FINALIZING = "finalizing"
COMPLETED = "completed"
EXPIRED = "expired"
CANCELLING = "cancelling"
CANCELLED = "cancelled"
def __str__(self) -> str:
"""Return string representation of the status."""
@classmethod
def is_in_progress(cls, status: str) -> bool:
"""
Check if the batch status indicates the batch is still in progress.
Args:
status: The batch status string to check
Returns:
True if status is validating, in_progress, or finalizing
"""The OpenAIHook provides a cached property for efficient connection reuse.
@cached_property
def conn(self) -> OpenAI:
"""
Return a cached OpenAI connection object.
This property provides efficient access to the OpenAI client by caching
the connection after first access. Subsequent calls return the same
connection instance without re-authentication.
Returns:
OpenAI client instance configured with connection settings
"""Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-openai