or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/pypi-apache-airflow-providers-openai

Provider package that enables OpenAI integration for Apache Airflow, including hooks, operators, and triggers for AI-powered data pipelines.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/apache-airflow-providers-openai@1.6.x

To install, run

npx @tessl/cli install tessl/pypi-apache-airflow-providers-openai@1.6.0

0

# Apache Airflow Providers OpenAI

1

2

Apache Airflow provider package that enables OpenAI integration for data pipelines and workflows. This package provides comprehensive hooks for connecting to OpenAI services, operators for executing OpenAI operations, triggers for monitoring batch jobs, and utilities for AI-powered workflow orchestration.

3

4

## Package Information

5

6

- **Package Name**: apache-airflow-providers-openai

7

- **Package Type**: pypi

8

- **Language**: Python

9

- **Installation**: `pip install apache-airflow-providers-openai`

10

- **Minimum Airflow Version**: 2.10.0

11

- **Dependencies**: `openai[datalib]>=1.66.0`

12

13

## Core Imports

14

15

```python

16

from airflow.providers.openai.hooks.openai import OpenAIHook, BatchStatus

17

from airflow.providers.openai.operators.openai import OpenAIEmbeddingOperator, OpenAITriggerBatchOperator

18

from airflow.providers.openai.triggers.openai import OpenAIBatchTrigger

19

from airflow.providers.openai.exceptions import OpenAIBatchJobException, OpenAIBatchTimeout

20

from airflow.providers.openai.version_compat import BaseHook, BaseOperator, AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_1_PLUS

21

```

22

23

## Basic Usage

24

25

```python

26

from datetime import datetime

27

from airflow import DAG

28

from airflow.providers.openai.hooks.openai import OpenAIHook

29

from airflow.providers.openai.operators.openai import OpenAIEmbeddingOperator

30

31

# Create a simple DAG

32

dag = DAG(

33

'openai_example',

34

start_date=datetime(2024, 1, 1),

35

schedule_interval=None,

36

catchup=False

37

)

38

39

# Generate embeddings using operator

40

embedding_task = OpenAIEmbeddingOperator(

41

task_id='generate_embeddings',

42

conn_id='openai_default',

43

input_text="Hello, this is a test for OpenAI embeddings",

44

model="text-embedding-ada-002",

45

dag=dag

46

)

47

48

# Use hook directly in a task

49

def chat_completion_task():

50

hook = OpenAIHook(conn_id='openai_default')

51

messages = [

52

{"role": "user", "content": "What is Apache Airflow?"}

53

]

54

response = hook.create_chat_completion(messages=messages, model="gpt-3.5-turbo")

55

return response

56

57

from airflow.operators.python_operator import PythonOperator

58

chat_task = PythonOperator(

59

task_id='chat_completion',

60

python_callable=chat_completion_task,

61

dag=dag

62

)

63

```

64

65

## Architecture

66

67

The provider follows Airflow's standard architecture patterns:

68

69

- **Hooks**: Low-level interfaces to OpenAI API services, handling authentication and connection management

70

- **Operators**: Task-level abstractions that use hooks to perform specific OpenAI operations in DAGs

71

- **Triggers**: Asynchronous monitoring components for long-running operations like batch processing

72

- **Exceptions**: Specialized exception classes for OpenAI-specific error handling

73

74

## Connection Configuration

75

76

Configure OpenAI connections in Airflow:

77

78

```python

79

# Connection ID: openai_default

80

# Connection Type: OpenAI

81

# Password: your-openai-api-key

82

# Host: https://api.openai.com (optional, uses default if not specified)

83

```

84

85

## Capabilities

86

87

### OpenAI Hook Interface

88

89

Comprehensive hook providing direct access to OpenAI API functionality including chat completions, assistants, embeddings, file operations, vector stores, and batch processing.

90

91

```python { .api }

92

class OpenAIHook(BaseHook):

93

def __init__(self, conn_id: str = "openai_default", *args, **kwargs): ...

94

def get_conn(self) -> OpenAI: ...

95

def test_connection(self) -> tuple[bool, str]: ...

96

def create_chat_completion(self, messages: list, model: str = "gpt-3.5-turbo", **kwargs) -> list: ...

97

def create_embeddings(self, text: str | list, model: str = "text-embedding-ada-002", **kwargs) -> list[float]: ...

98

```

99

100

[OpenAI Hook](./hooks.md)

101

102

### Operators for Task Execution

103

104

Ready-to-use operators for common OpenAI operations in Airflow DAGs, including embedding generation and batch processing with full integration into Airflow's task lifecycle.

105

106

```python { .api }

107

class OpenAIEmbeddingOperator(BaseOperator):

108

def __init__(self, conn_id: str, input_text: str | list, model: str = "text-embedding-ada-002", **kwargs): ...

109

def execute(self, context) -> list[float]: ...

110

111

class OpenAITriggerBatchOperator(BaseOperator):

112

def __init__(self, file_id: str, endpoint: str, conn_id: str = "openai_default", **kwargs): ...

113

def execute(self, context) -> str | None: ...

114

```

115

116

[Operators](./operators.md)

117

118

### Triggers for Asynchronous Operations

119

120

Trigger for monitoring long-running OpenAI batch operations with proper async handling and timeout management.

121

122

```python { .api }

123

class OpenAIBatchTrigger(BaseTrigger):

124

def __init__(self, conn_id: str, batch_id: str, poll_interval: float, end_time: float): ...

125

def serialize(self) -> tuple[str, dict]: ...

126

async def run(self) -> AsyncIterator: ...

127

```

128

129

[Triggers](./triggers.md)

130

131

### Exception Handling

132

133

Specialized exceptions for OpenAI-specific error conditions and batch processing failures.

134

135

```python { .api }

136

class OpenAIBatchJobException(AirflowException): ...

137

class OpenAIBatchTimeout(AirflowException): ...

138

```

139

140

[Exceptions](./exceptions.md)

141

142

### Version Compatibility

143

144

Utilities and constants for handling different Apache Airflow versions, providing compatible base classes and version detection capabilities.

145

146

```python { .api }

147

def get_base_airflow_version_tuple() -> tuple[int, int, int]: ...

148

149

AIRFLOW_V_3_0_PLUS: bool

150

AIRFLOW_V_3_1_PLUS: bool

151

152

# Compatible base classes

153

BaseHook: type

154

BaseOperator: type

155

```

156

157

[Version Compatibility](./version_compat.md)

158

159

## Types

160

161

```python { .api }

162

from enum import Enum

163

164

class BatchStatus(str, Enum):

165

VALIDATING = "validating"

166

FAILED = "failed"

167

IN_PROGRESS = "in_progress"

168

FINALIZING = "finalizing"

169

COMPLETED = "completed"

170

EXPIRED = "expired"

171

CANCELLING = "cancelling"

172

CANCELLED = "cancelled"

173

174

@classmethod

175

def is_in_progress(cls, status: str) -> bool: ...

176

```