or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

exceptions.mdhooks.mdindex.mdoperators.mdtriggers.mdversion_compat.md

operators.mddocs/

0

# Operators

1

2

Airflow operators provide task-level abstractions for OpenAI operations, integrating seamlessly with DAG workflows and providing proper task lifecycle management, templating, and error handling.

3

4

## Capabilities

5

6

### Embedding Generation Operator

7

8

Generate OpenAI embeddings as part of an Airflow DAG task, with support for text templating and various input formats.

9

10

```python { .api }

11

class OpenAIEmbeddingOperator(BaseOperator):

12

"""

13

Operator that accepts input text to generate OpenAI embeddings using the specified model.

14

15

Args:

16

conn_id (str): The OpenAI connection ID to use

17

input_text (str | list[str] | list[int] | list[list[int]]): The text to generate embeddings for

18

model (str): The OpenAI model to use for generating embeddings, defaults to "text-embedding-ada-002"

19

embedding_kwargs (dict, optional): Additional keyword arguments for the create_embeddings method

20

**kwargs: Additional BaseOperator arguments

21

"""

22

23

template_fields: Sequence[str] = ("input_text",)

24

25

def __init__(

26

self,

27

conn_id: str,

28

input_text: str | list[str] | list[int] | list[list[int]],

29

model: str = "text-embedding-ada-002",

30

embedding_kwargs: dict | None = None,

31

**kwargs: Any,

32

): ...

33

34

@cached_property

35

def hook(self) -> OpenAIHook:

36

"""Return an instance of the OpenAIHook."""

37

38

def execute(self, context: Context) -> list[float]:

39

"""

40

Execute the embedding generation task.

41

42

Args:

43

context: Airflow task context

44

45

Returns:

46

List of embedding values (floats)

47

48

Raises:

49

ValueError: If input_text is empty or invalid format

50

"""

51

```

52

53

### Batch Processing Operator

54

55

Trigger OpenAI Batch API operations with support for both synchronous and asynchronous (deferrable) execution modes.

56

57

```python { .api }

58

class OpenAITriggerBatchOperator(BaseOperator):

59

"""

60

Operator that triggers an OpenAI Batch API endpoint and waits for the batch to complete.

61

62

Args:

63

file_id (str): The ID of the batch file to trigger

64

endpoint (Literal): The OpenAI Batch API endpoint ("/v1/chat/completions", "/v1/embeddings", "/v1/completions")

65

conn_id (str): The OpenAI connection ID, defaults to 'openai_default'

66

deferrable (bool): Run operator in deferrable mode, defaults to system configuration setting

67

wait_seconds (float): Number of seconds between checks when not deferrable, defaults to 3

68

timeout (float): Time to wait for completion in seconds, defaults to 24 hours

69

wait_for_completion (bool): Whether to wait for batch completion, defaults to True

70

**kwargs: Additional BaseOperator arguments

71

"""

72

73

template_fields: Sequence[str] = ("file_id",)

74

batch_id: str | None = None # Set during execution with the created batch ID

75

76

def __init__(

77

self,

78

file_id: str,

79

endpoint: Literal["/v1/chat/completions", "/v1/embeddings", "/v1/completions"],

80

conn_id: str = OpenAIHook.default_conn_name,

81

deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),

82

wait_seconds: float = 3,

83

timeout: float = 24 * 60 * 60,

84

wait_for_completion: bool = True,

85

**kwargs: Any,

86

): ...

87

88

@cached_property

89

def hook(self) -> OpenAIHook:

90

"""Return an instance of the OpenAIHook."""

91

92

def execute(self, context: Context) -> str | None:

93

"""

94

Execute the batch operation.

95

96

Args:

97

context: Airflow task context

98

99

Returns:

100

Batch ID if successful, None if not waiting for completion

101

"""

102

103

def execute_complete(self, context: Context, event: Any = None) -> str:

104

"""

105

Callback for deferrable execution completion.

106

107

Args:

108

context: Airflow task context

109

event: Event data from trigger

110

111

Returns:

112

Batch ID

113

114

Raises:

115

OpenAIBatchJobException: If batch processing failed

116

"""

117

118

def on_kill(self) -> None:

119

"""Cancel the batch if task is cancelled."""

120

```

121

122

## Usage Examples

123

124

### Embedding Operator Example

125

126

```python

127

from datetime import datetime

128

from airflow import DAG

129

from airflow.providers.openai.operators.openai import OpenAIEmbeddingOperator

130

131

dag = DAG(

132

'embedding_example',

133

start_date=datetime(2024, 1, 1),

134

schedule_interval=None,

135

catchup=False

136

)

137

138

# Simple text embedding

139

embedding_task = OpenAIEmbeddingOperator(

140

task_id='generate_embeddings',

141

conn_id='openai_default',

142

input_text="This is sample text for embedding generation",

143

model="text-embedding-ada-002",

144

dag=dag

145

)

146

147

# Multiple texts with custom parameters

148

batch_embedding_task = OpenAIEmbeddingOperator(

149

task_id='batch_embeddings',

150

conn_id='openai_default',

151

input_text=[

152

"First document to embed",

153

"Second document to embed",

154

"Third document to embed"

155

],

156

model="text-embedding-3-large",

157

embedding_kwargs={

158

"dimensions": 1024,

159

"encoding_format": "float"

160

},

161

dag=dag

162

)

163

```

164

165

### Templated Input Example

166

167

```python

168

# Using Airflow templating for dynamic input

169

templated_embedding_task = OpenAIEmbeddingOperator(

170

task_id='templated_embeddings',

171

conn_id='openai_default',

172

input_text="{{ dag_run.conf.get('text_content', 'Default text') }}",

173

model="text-embedding-ada-002",

174

dag=dag

175

)

176

```

177

178

### Batch Processing Operator Example

179

180

```python

181

from airflow.providers.openai.operators.openai import OpenAITriggerBatchOperator

182

183

# Synchronous batch processing

184

sync_batch_task = OpenAITriggerBatchOperator(

185

task_id='process_batch_sync',

186

file_id="{{ task_instance.xcom_pull(task_ids='upload_batch_file') }}",

187

endpoint="/v1/chat/completions",

188

conn_id='openai_default',

189

deferrable=False,

190

wait_seconds=5,

191

timeout=7200, # 2 hours

192

dag=dag

193

)

194

195

# Asynchronous (deferrable) batch processing

196

async_batch_task = OpenAITriggerBatchOperator(

197

task_id='process_batch_async',

198

file_id="file-abc123",

199

endpoint="/v1/embeddings",

200

conn_id='openai_default',

201

deferrable=True,

202

timeout=86400, # 24 hours

203

dag=dag

204

)

205

206

# Trigger batch without waiting

207

fire_and_forget_batch = OpenAITriggerBatchOperator(

208

task_id='trigger_batch_only',

209

file_id="file-def456",

210

endpoint="/v1/completions",

211

conn_id='openai_default',

212

wait_for_completion=False,

213

dag=dag

214

)

215

```

216

217

### Complete DAG Example

218

219

```python

220

from datetime import datetime, timedelta

221

from airflow import DAG

222

from airflow.operators.python_operator import PythonOperator

223

from airflow.providers.openai.hooks.openai import OpenAIHook

224

from airflow.providers.openai.operators.openai import (

225

OpenAIEmbeddingOperator,

226

OpenAITriggerBatchOperator

227

)

228

229

default_args = {

230

'owner': 'data-team',

231

'depends_on_past': False,

232

'start_date': datetime(2024, 1, 1),

233

'email_on_failure': False,

234

'email_on_retry': False,

235

'retries': 1,

236

'retry_delay': timedelta(minutes=5)

237

}

238

239

dag = DAG(

240

'openai_processing_pipeline',

241

default_args=default_args,

242

description='Process data using OpenAI services',

243

schedule_interval=timedelta(days=1),

244

catchup=False

245

)

246

247

def upload_batch_file(**context):

248

"""Upload a batch processing file."""

249

hook = OpenAIHook(conn_id='openai_default')

250

251

# Create batch file content

252

batch_requests = []

253

for i in range(10):

254

request = {

255

"custom_id": f"request-{i}",

256

"method": "POST",

257

"url": "/v1/chat/completions",

258

"body": {

259

"model": "gpt-3.5-turbo",

260

"messages": [{"role": "user", "content": f"Process item {i}"}],

261

"max_tokens": 100

262

}

263

}

264

batch_requests.append(request)

265

266

# Write to temporary file

267

import tempfile

268

import json

269

with tempfile.NamedTemporaryFile(mode='w', suffix='.jsonl', delete=False) as f:

270

for request in batch_requests:

271

f.write(json.dumps(request) + '\n')

272

temp_file = f.name

273

274

# Upload file

275

file_obj = hook.upload_file(temp_file, purpose="batch")

276

return file_obj.id

277

278

# Task to upload batch file

279

upload_task = PythonOperator(

280

task_id='upload_batch_file',

281

python_callable=upload_batch_file,

282

dag=dag

283

)

284

285

# Generate embeddings for input data

286

embedding_task = OpenAIEmbeddingOperator(

287

task_id='generate_embeddings',

288

conn_id='openai_default',

289

input_text="{{ dag_run.conf.get('input_texts', ['Default text']) }}",

290

model="text-embedding-ada-002",

291

dag=dag

292

)

293

294

# Process batch requests

295

batch_task = OpenAITriggerBatchOperator(

296

task_id='process_chat_batch',

297

file_id="{{ task_instance.xcom_pull(task_ids='upload_batch_file') }}",

298

endpoint="/v1/chat/completions",

299

conn_id='openai_default',

300

deferrable=True,

301

dag=dag

302

)

303

304

# Set task dependencies

305

upload_task >> batch_task

306

embedding_task

307

```

308

309

### Error Handling Example

310

311

```python

312

from airflow.providers.openai.exceptions import OpenAIBatchJobException, OpenAIBatchTimeout

313

314

def handle_batch_with_retry(**context):

315

"""Handle batch processing with custom retry logic."""

316

from airflow.providers.openai.operators.openai import OpenAITriggerBatchOperator

317

318

try:

319

operator = OpenAITriggerBatchOperator(

320

task_id='batch_with_handling',

321

file_id=context['params']['file_id'],

322

endpoint="/v1/chat/completions",

323

conn_id='openai_default',

324

timeout=1800 # 30 minutes

325

)

326

result = operator.execute(context)

327

return result

328

329

except OpenAIBatchTimeout as e:

330

print(f"Batch timed out: {e}")

331

# Implement custom timeout handling

332

raise

333

334

except OpenAIBatchJobException as e:

335

print(f"Batch job failed: {e}")

336

# Implement custom failure handling

337

raise

338

339

error_handling_task = PythonOperator(

340

task_id='batch_with_error_handling',

341

python_callable=handle_batch_with_retry,

342

params={'file_id': 'file-123'},

343

dag=dag

344

)

345

```