CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-openai

Provider package that enables OpenAI integration for Apache Airflow, including hooks, operators, and triggers for AI-powered data pipelines.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

hooks.mddocs/

OpenAI Hook

The OpenAIHook provides a comprehensive interface to OpenAI's API services, handling authentication, connection management, and all OpenAI operations. It serves as the foundational component for all OpenAI interactions within Airflow workflows.

Capabilities

Connection Management

Handles OpenAI API authentication and connection setup using Airflow's connection management system.

class OpenAIHook(BaseHook):
    """
    Use OpenAI SDK to interact with OpenAI APIs.
    
    Args:
        conn_id (str): OpenAI connection id, defaults to 'openai_default'
    """
    
    conn_name_attr = "conn_id"
    default_conn_name = "openai_default"
    conn_type = "openai"
    hook_name = "OpenAI"
    
    def __init__(self, conn_id: str = default_conn_name, *args, **kwargs) -> None: ...
    
    def get_conn(self) -> OpenAI:
        """Return an OpenAI connection object."""
        
    @cached_property
    def conn(self) -> OpenAI:
        """Return a cached OpenAI connection object."""
        
    def test_connection(self) -> tuple[bool, str]:
        """Test the OpenAI connection."""
        
    @classmethod
    def get_ui_field_behaviour(cls) -> dict[str, Any]:
        """Return custom field behaviour for connection UI."""

Chat Completions

Generate conversational responses and text completions using OpenAI's chat models.

def create_chat_completion(
    self,
    messages: list[ChatCompletionSystemMessageParam | ChatCompletionUserMessageParam | ChatCompletionAssistantMessageParam | ChatCompletionToolMessageParam | ChatCompletionFunctionMessageParam],
    model: str = "gpt-3.5-turbo",
    **kwargs: Any,
) -> list[ChatCompletionMessage]:
    """
    Create a model response for the given chat conversation.
    
    Args:
        messages: A list of messages comprising the conversation so far
        model: ID of the model to use
        **kwargs: Additional parameters for the completion request
        
    Returns:
        List of chat completion choices
    """

Assistant Management

Create and manage OpenAI assistants for more complex conversational workflows.

def create_assistant(self, model: str = "gpt-3.5-turbo", **kwargs: Any) -> Assistant:
    """
    Create an OpenAI assistant using the given model.
    
    Args:
        model: The OpenAI model for the assistant to use
        **kwargs: Additional assistant configuration parameters
        
    Returns:
        Assistant object
    """

def get_assistant(self, assistant_id: str) -> Assistant:
    """
    Get an OpenAI assistant.
    
    Args:
        assistant_id: The ID of the assistant to retrieve
        
    Returns:
        Assistant object
    """

def get_assistants(self, **kwargs: Any) -> list[Assistant]:
    """
    Get a list of Assistant objects.
    
    Args:
        **kwargs: Query parameters for filtering assistants
        
    Returns:
        List of Assistant objects
    """

def modify_assistant(self, assistant_id: str, **kwargs: Any) -> Assistant:
    """
    Modify an existing Assistant object.
    
    Args:
        assistant_id: The ID of the assistant to be modified
        **kwargs: Parameters to update
        
    Returns:
        Updated Assistant object
    """

def delete_assistant(self, assistant_id: str) -> AssistantDeleted:
    """
    Delete an OpenAI Assistant for a given ID.
    
    Args:
        assistant_id: The ID of the assistant to delete
        
    Returns:
        AssistantDeleted confirmation object
    """

Thread Management

Manage conversation threads for assistant interactions.

def create_thread(self, **kwargs: Any) -> Thread:
    """
    Create an OpenAI thread.
    
    Args:
        **kwargs: Thread configuration parameters
        
    Returns:
        Thread object
    """

def modify_thread(self, thread_id: str, metadata: dict[str, Any]) -> Thread:
    """
    Modify an existing Thread object.
    
    Args:
        thread_id: The ID of the thread to modify
        metadata: Set of 16 key-value pairs that can be attached to an object
        
    Returns:
        Updated Thread object
    """

def delete_thread(self, thread_id: str) -> ThreadDeleted:
    """
    Delete an OpenAI thread for a given thread_id.
    
    Args:
        thread_id: The ID of the thread to delete
        
    Returns:
        ThreadDeleted confirmation object
    """

Message Management

Handle messages within conversation threads.

def create_message(
    self, 
    thread_id: str, 
    role: Literal["user", "assistant"], 
    content: str, 
    **kwargs: Any
) -> Message:
    """
    Create a message for a given Thread.
    
    Args:
        thread_id: The ID of the thread to create a message for
        role: The role of the entity that is creating the message ('user' or 'assistant')
        content: The content of the message
        **kwargs: Additional message parameters
        
    Returns:
        Message object
    """

def get_messages(self, thread_id: str, **kwargs: Any) -> list[Message]:
    """
    Return a list of messages for a given Thread.
    
    Args:
        thread_id: The ID of the thread the messages belong to
        **kwargs: Query parameters for filtering messages
        
    Returns:
        List of Message objects
    """

def modify_message(self, thread_id: str, message_id, **kwargs: Any) -> Message:
    """
    Modify an existing message for a given Thread.
    
    Args:
        thread_id: The ID of the thread to which this message belongs
        message_id: The ID of the message to modify
        **kwargs: Parameters to update
        
    Returns:
        Updated Message object
    """

Run Management

Execute and monitor assistant runs within threads.

def create_run(self, thread_id: str, assistant_id: str, **kwargs: Any) -> Run:
    """
    Create a run for a given thread and assistant.
    
    Args:
        thread_id: The ID of the thread to run
        assistant_id: The ID of the assistant to use to execute this run
        **kwargs: Additional run parameters
        
    Returns:
        Run object
    """

def create_run_and_poll(self, thread_id: str, assistant_id: str, **kwargs: Any) -> Run:
    """
    Create a run for a given thread and assistant and then polls until completion.
    
    Args:
        thread_id: The ID of the thread to run
        assistant_id: The ID of the assistant to use to execute this run
        **kwargs: Additional run parameters
        
    Returns:
        Completed Run object
    """

def get_run(self, thread_id: str, run_id: str) -> Run:
    """
    Retrieve a run for a given thread and run.
    
    Args:
        thread_id: The ID of the thread that was run
        run_id: The ID of the run to retrieve
        
    Returns:
        Run object
    """

def get_runs(self, thread_id: str, **kwargs: Any) -> list[Run]:
    """
    Return a list of runs belonging to a thread.
    
    Args:
        thread_id: The ID of the thread the run belongs to
        **kwargs: Query parameters for filtering runs
        
    Returns:
        List of Run objects
    """

def modify_run(self, thread_id: str, run_id: str, **kwargs: Any) -> Run:
    """
    Modify a run on a given thread.
    
    Args:
        thread_id: The ID of the thread that was run
        run_id: The ID of the run to modify
        **kwargs: Parameters to update
        
    Returns:
        Updated Run object
    """

Embeddings

Generate vector embeddings from text using OpenAI's embedding models.

def create_embeddings(
    self,
    text: str | list[str] | list[int] | list[list[int]],
    model: str = "text-embedding-ada-002",
    **kwargs: Any,
) -> list[float]:
    """
    Generate embeddings for the given text using the given model.
    
    Args:
        text: The text to generate embeddings for (string, list of strings, tokens, or token lists)
        model: The model to use for generating embeddings
        **kwargs: Additional embedding parameters
        
    Returns:
        List of embedding values (floats)
    """

File Operations

Upload, retrieve, and manage files for use with OpenAI services.

def upload_file(self, file: str, purpose: Literal["fine-tune", "assistants", "batch"]) -> FileObject:
    """
    Upload a file that can be used across various endpoints.
    
    Args:
        file: The file path to be uploaded
        purpose: The intended purpose of the uploaded file
        
    Returns:
        FileObject with upload details
    """

def get_file(self, file_id: str) -> FileObject:
    """
    Return information about a specific file.
    
    Args:
        file_id: The ID of the file to use for this request
        
    Returns:
        FileObject with file details
    """

def get_files(self) -> list[FileObject]:
    """
    Return a list of files that belong to the user's organization.
    
    Returns:
        List of FileObject instances
    """

def delete_file(self, file_id: str) -> FileDeleted:
    """
    Delete a file.
    
    Args:
        file_id: The ID of the file to be deleted
        
    Returns:
        FileDeleted confirmation object
    """

Vector Store Operations

Manage vector stores for semantic search and retrieval operations.

def create_vector_store(self, **kwargs: Any) -> VectorStore:
    """
    Create a vector store.
    
    Args:
        **kwargs: Vector store configuration parameters
        
    Returns:
        VectorStore object
    """

def get_vector_stores(self, **kwargs: Any) -> list[VectorStore]:
    """
    Return a list of vector stores.
    
    Args:
        **kwargs: Query parameters for filtering
        
    Returns:
        List of VectorStore objects
    """

def get_vector_store(self, vector_store_id: str) -> VectorStore:
    """
    Retrieve a vector store.
    
    Args:
        vector_store_id: The ID of the vector store to retrieve
        
    Returns:
        VectorStore object
    """

def modify_vector_store(self, vector_store_id: str, **kwargs: Any) -> VectorStore:
    """
    Modify a vector store.
    
    Args:
        vector_store_id: The ID of the vector store to modify
        **kwargs: Parameters to update
        
    Returns:
        Updated VectorStore object
    """

def delete_vector_store(self, vector_store_id: str) -> VectorStoreDeleted:
    """
    Delete a vector store.
    
    Args:
        vector_store_id: The ID of the vector store to delete
        
    Returns:
        VectorStoreDeleted confirmation object
    """

def upload_files_to_vector_store(
    self, vector_store_id: str, files: list[BinaryIO]
) -> VectorStoreFileBatch:
    """
    Upload files to a vector store and poll until completion.
    
    Args:
        vector_store_id: The ID of the vector store the files are to be uploaded to
        files: A list of binary files to upload
        
    Returns:
        VectorStoreFileBatch object with batch details
    """

def get_vector_store_files(self, vector_store_id: str) -> list[VectorStoreFile]:
    """
    Return a list of vector store files.
    
    Args:
        vector_store_id: The ID of the vector store
        
    Returns:
        List of VectorStoreFile objects
    """

def delete_vector_store_file(self, vector_store_id: str, file_id: str) -> VectorStoreFileDeleted:
    """
    Delete a vector store file.
    
    Args:
        vector_store_id: The ID of the vector store that the file belongs to
        file_id: The ID of the file to delete
        
    Returns:
        VectorStoreFileDeleted confirmation object
    """

Batch Processing

Handle batch operations for large-scale processing with proper monitoring and timeout handling.

def create_batch(
    self,
    file_id: str,
    endpoint: Literal["/v1/chat/completions", "/v1/embeddings", "/v1/completions"],
    metadata: dict[str, str] | None = None,
    completion_window: Literal["24h"] = "24h",
) -> Batch:
    """
    Create a batch for a given model and files.
    
    Args:
        file_id: The ID of the file to be used for this batch
        endpoint: The endpoint to use for this batch
        metadata: A set of key-value pairs that can be attached to an object
        completion_window: The time window for the batch to complete
        
    Returns:
        Batch object
    """

def get_batch(self, batch_id: str) -> Batch:
    """
    Get the status of a batch.
    
    Args:
        batch_id: The ID of the batch to get the status of
        
    Returns:
        Batch object with current status
    """

def wait_for_batch(self, batch_id: str, wait_seconds: float = 3, timeout: float = 3600) -> None:
    """
    Poll a batch to check if it finishes.
    
    Args:
        batch_id: Id of the Batch to wait for
        wait_seconds: Number of seconds between checks
        timeout: How many seconds wait for batch to be ready
        
    Raises:
        OpenAIBatchTimeout: If batch doesn't complete within timeout
        OpenAIBatchJobException: If batch fails or is cancelled
    """

def cancel_batch(self, batch_id: str) -> Batch:
    """
    Cancel a batch.
    
    Args:
        batch_id: The ID of the batch to delete
        
    Returns:
        Cancelled Batch object
    """

Usage Examples

Basic Hook Usage

from airflow.providers.openai.hooks.openai import OpenAIHook

# Initialize hook with connection
hook = OpenAIHook(conn_id='openai_default')

# Test connection
success, message = hook.test_connection()
if success:
    print(f"Connection successful: {message}")
else:
    print(f"Connection failed: {message}")

Chat Completion Example

# Create a chat completion
messages = [
    {"role": "system", "content": "You are a helpful assistant."},
    {"role": "user", "content": "What is Apache Airflow?"}
]

response = hook.create_chat_completion(
    messages=messages,
    model="gpt-3.5-turbo",
    max_tokens=150,
    temperature=0.7
)

for choice in response:
    print(choice.message.content)

Embedding Generation Example

# Generate embeddings for text
texts = [
    "Apache Airflow is a platform for workflow orchestration",
    "OpenAI provides AI models and services",
    "Data pipelines help process information"
]

embeddings = hook.create_embeddings(
    text=texts,
    model="text-embedding-ada-002"
)

print(f"Generated {len(embeddings)} embedding dimensions")

Batch Processing Example

# Upload a batch file
file_obj = hook.upload_file(
    file="/path/to/batch_requests.jsonl",
    purpose="batch"
)

# Create and monitor batch
batch = hook.create_batch(
    file_id=file_obj.id,
    endpoint="/v1/chat/completions"
)

# Wait for completion
hook.wait_for_batch(batch.id, wait_seconds=10, timeout=7200)

# Get final batch status
final_batch = hook.get_batch(batch.id)
print(f"Batch completed with status: {final_batch.status}")

Types

BatchStatus Enum

Enum for representing the status values of OpenAI batch operations.

from enum import Enum

class BatchStatus(str, Enum):
    """Enum for the status of a batch."""
    
    VALIDATING = "validating"
    FAILED = "failed"
    IN_PROGRESS = "in_progress"
    FINALIZING = "finalizing"
    COMPLETED = "completed"
    EXPIRED = "expired"
    CANCELLING = "cancelling"
    CANCELLED = "cancelled"
    
    def __str__(self) -> str:
        """Return string representation of the status."""
        
    @classmethod
    def is_in_progress(cls, status: str) -> bool:
        """
        Check if the batch status indicates the batch is still in progress.
        
        Args:
            status: The batch status string to check
            
        Returns:
            True if status is validating, in_progress, or finalizing
        """

Connection Property

The OpenAIHook provides a cached property for efficient connection reuse.

@cached_property
def conn(self) -> OpenAI:
    """
    Return a cached OpenAI connection object.
    
    This property provides efficient access to the OpenAI client by caching
    the connection after first access. Subsequent calls return the same
    connection instance without re-authentication.
    
    Returns:
        OpenAI client instance configured with connection settings
    """

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-openai

docs

exceptions.md

hooks.md

index.md

operators.md

triggers.md

version_compat.md

tile.json