or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

exceptions.mdhooks.mdindex.mdoperators.mdtriggers.mdversion_compat.md

hooks.mddocs/

0

# OpenAI Hook

1

2

The OpenAIHook provides a comprehensive interface to OpenAI's API services, handling authentication, connection management, and all OpenAI operations. It serves as the foundational component for all OpenAI interactions within Airflow workflows.

3

4

## Capabilities

5

6

### Connection Management

7

8

Handles OpenAI API authentication and connection setup using Airflow's connection management system.

9

10

```python { .api }

11

class OpenAIHook(BaseHook):

12

"""

13

Use OpenAI SDK to interact with OpenAI APIs.

14

15

Args:

16

conn_id (str): OpenAI connection id, defaults to 'openai_default'

17

"""

18

19

conn_name_attr = "conn_id"

20

default_conn_name = "openai_default"

21

conn_type = "openai"

22

hook_name = "OpenAI"

23

24

def __init__(self, conn_id: str = default_conn_name, *args, **kwargs) -> None: ...

25

26

def get_conn(self) -> OpenAI:

27

"""Return an OpenAI connection object."""

28

29

@cached_property

30

def conn(self) -> OpenAI:

31

"""Return a cached OpenAI connection object."""

32

33

def test_connection(self) -> tuple[bool, str]:

34

"""Test the OpenAI connection."""

35

36

@classmethod

37

def get_ui_field_behaviour(cls) -> dict[str, Any]:

38

"""Return custom field behaviour for connection UI."""

39

```

40

41

### Chat Completions

42

43

Generate conversational responses and text completions using OpenAI's chat models.

44

45

```python { .api }

46

def create_chat_completion(

47

self,

48

messages: list[ChatCompletionSystemMessageParam | ChatCompletionUserMessageParam | ChatCompletionAssistantMessageParam | ChatCompletionToolMessageParam | ChatCompletionFunctionMessageParam],

49

model: str = "gpt-3.5-turbo",

50

**kwargs: Any,

51

) -> list[ChatCompletionMessage]:

52

"""

53

Create a model response for the given chat conversation.

54

55

Args:

56

messages: A list of messages comprising the conversation so far

57

model: ID of the model to use

58

**kwargs: Additional parameters for the completion request

59

60

Returns:

61

List of chat completion choices

62

"""

63

```

64

65

### Assistant Management

66

67

Create and manage OpenAI assistants for more complex conversational workflows.

68

69

```python { .api }

70

def create_assistant(self, model: str = "gpt-3.5-turbo", **kwargs: Any) -> Assistant:

71

"""

72

Create an OpenAI assistant using the given model.

73

74

Args:

75

model: The OpenAI model for the assistant to use

76

**kwargs: Additional assistant configuration parameters

77

78

Returns:

79

Assistant object

80

"""

81

82

def get_assistant(self, assistant_id: str) -> Assistant:

83

"""

84

Get an OpenAI assistant.

85

86

Args:

87

assistant_id: The ID of the assistant to retrieve

88

89

Returns:

90

Assistant object

91

"""

92

93

def get_assistants(self, **kwargs: Any) -> list[Assistant]:

94

"""

95

Get a list of Assistant objects.

96

97

Args:

98

**kwargs: Query parameters for filtering assistants

99

100

Returns:

101

List of Assistant objects

102

"""

103

104

def modify_assistant(self, assistant_id: str, **kwargs: Any) -> Assistant:

105

"""

106

Modify an existing Assistant object.

107

108

Args:

109

assistant_id: The ID of the assistant to be modified

110

**kwargs: Parameters to update

111

112

Returns:

113

Updated Assistant object

114

"""

115

116

def delete_assistant(self, assistant_id: str) -> AssistantDeleted:

117

"""

118

Delete an OpenAI Assistant for a given ID.

119

120

Args:

121

assistant_id: The ID of the assistant to delete

122

123

Returns:

124

AssistantDeleted confirmation object

125

"""

126

```

127

128

### Thread Management

129

130

Manage conversation threads for assistant interactions.

131

132

```python { .api }

133

def create_thread(self, **kwargs: Any) -> Thread:

134

"""

135

Create an OpenAI thread.

136

137

Args:

138

**kwargs: Thread configuration parameters

139

140

Returns:

141

Thread object

142

"""

143

144

def modify_thread(self, thread_id: str, metadata: dict[str, Any]) -> Thread:

145

"""

146

Modify an existing Thread object.

147

148

Args:

149

thread_id: The ID of the thread to modify

150

metadata: Set of 16 key-value pairs that can be attached to an object

151

152

Returns:

153

Updated Thread object

154

"""

155

156

def delete_thread(self, thread_id: str) -> ThreadDeleted:

157

"""

158

Delete an OpenAI thread for a given thread_id.

159

160

Args:

161

thread_id: The ID of the thread to delete

162

163

Returns:

164

ThreadDeleted confirmation object

165

"""

166

```

167

168

### Message Management

169

170

Handle messages within conversation threads.

171

172

```python { .api }

173

def create_message(

174

self,

175

thread_id: str,

176

role: Literal["user", "assistant"],

177

content: str,

178

**kwargs: Any

179

) -> Message:

180

"""

181

Create a message for a given Thread.

182

183

Args:

184

thread_id: The ID of the thread to create a message for

185

role: The role of the entity that is creating the message ('user' or 'assistant')

186

content: The content of the message

187

**kwargs: Additional message parameters

188

189

Returns:

190

Message object

191

"""

192

193

def get_messages(self, thread_id: str, **kwargs: Any) -> list[Message]:

194

"""

195

Return a list of messages for a given Thread.

196

197

Args:

198

thread_id: The ID of the thread the messages belong to

199

**kwargs: Query parameters for filtering messages

200

201

Returns:

202

List of Message objects

203

"""

204

205

def modify_message(self, thread_id: str, message_id, **kwargs: Any) -> Message:

206

"""

207

Modify an existing message for a given Thread.

208

209

Args:

210

thread_id: The ID of the thread to which this message belongs

211

message_id: The ID of the message to modify

212

**kwargs: Parameters to update

213

214

Returns:

215

Updated Message object

216

"""

217

```

218

219

### Run Management

220

221

Execute and monitor assistant runs within threads.

222

223

```python { .api }

224

def create_run(self, thread_id: str, assistant_id: str, **kwargs: Any) -> Run:

225

"""

226

Create a run for a given thread and assistant.

227

228

Args:

229

thread_id: The ID of the thread to run

230

assistant_id: The ID of the assistant to use to execute this run

231

**kwargs: Additional run parameters

232

233

Returns:

234

Run object

235

"""

236

237

def create_run_and_poll(self, thread_id: str, assistant_id: str, **kwargs: Any) -> Run:

238

"""

239

Create a run for a given thread and assistant and then polls until completion.

240

241

Args:

242

thread_id: The ID of the thread to run

243

assistant_id: The ID of the assistant to use to execute this run

244

**kwargs: Additional run parameters

245

246

Returns:

247

Completed Run object

248

"""

249

250

def get_run(self, thread_id: str, run_id: str) -> Run:

251

"""

252

Retrieve a run for a given thread and run.

253

254

Args:

255

thread_id: The ID of the thread that was run

256

run_id: The ID of the run to retrieve

257

258

Returns:

259

Run object

260

"""

261

262

def get_runs(self, thread_id: str, **kwargs: Any) -> list[Run]:

263

"""

264

Return a list of runs belonging to a thread.

265

266

Args:

267

thread_id: The ID of the thread the run belongs to

268

**kwargs: Query parameters for filtering runs

269

270

Returns:

271

List of Run objects

272

"""

273

274

def modify_run(self, thread_id: str, run_id: str, **kwargs: Any) -> Run:

275

"""

276

Modify a run on a given thread.

277

278

Args:

279

thread_id: The ID of the thread that was run

280

run_id: The ID of the run to modify

281

**kwargs: Parameters to update

282

283

Returns:

284

Updated Run object

285

"""

286

```

287

288

### Embeddings

289

290

Generate vector embeddings from text using OpenAI's embedding models.

291

292

```python { .api }

293

def create_embeddings(

294

self,

295

text: str | list[str] | list[int] | list[list[int]],

296

model: str = "text-embedding-ada-002",

297

**kwargs: Any,

298

) -> list[float]:

299

"""

300

Generate embeddings for the given text using the given model.

301

302

Args:

303

text: The text to generate embeddings for (string, list of strings, tokens, or token lists)

304

model: The model to use for generating embeddings

305

**kwargs: Additional embedding parameters

306

307

Returns:

308

List of embedding values (floats)

309

"""

310

```

311

312

### File Operations

313

314

Upload, retrieve, and manage files for use with OpenAI services.

315

316

```python { .api }

317

def upload_file(self, file: str, purpose: Literal["fine-tune", "assistants", "batch"]) -> FileObject:

318

"""

319

Upload a file that can be used across various endpoints.

320

321

Args:

322

file: The file path to be uploaded

323

purpose: The intended purpose of the uploaded file

324

325

Returns:

326

FileObject with upload details

327

"""

328

329

def get_file(self, file_id: str) -> FileObject:

330

"""

331

Return information about a specific file.

332

333

Args:

334

file_id: The ID of the file to use for this request

335

336

Returns:

337

FileObject with file details

338

"""

339

340

def get_files(self) -> list[FileObject]:

341

"""

342

Return a list of files that belong to the user's organization.

343

344

Returns:

345

List of FileObject instances

346

"""

347

348

def delete_file(self, file_id: str) -> FileDeleted:

349

"""

350

Delete a file.

351

352

Args:

353

file_id: The ID of the file to be deleted

354

355

Returns:

356

FileDeleted confirmation object

357

"""

358

```

359

360

### Vector Store Operations

361

362

Manage vector stores for semantic search and retrieval operations.

363

364

```python { .api }

365

def create_vector_store(self, **kwargs: Any) -> VectorStore:

366

"""

367

Create a vector store.

368

369

Args:

370

**kwargs: Vector store configuration parameters

371

372

Returns:

373

VectorStore object

374

"""

375

376

def get_vector_stores(self, **kwargs: Any) -> list[VectorStore]:

377

"""

378

Return a list of vector stores.

379

380

Args:

381

**kwargs: Query parameters for filtering

382

383

Returns:

384

List of VectorStore objects

385

"""

386

387

def get_vector_store(self, vector_store_id: str) -> VectorStore:

388

"""

389

Retrieve a vector store.

390

391

Args:

392

vector_store_id: The ID of the vector store to retrieve

393

394

Returns:

395

VectorStore object

396

"""

397

398

def modify_vector_store(self, vector_store_id: str, **kwargs: Any) -> VectorStore:

399

"""

400

Modify a vector store.

401

402

Args:

403

vector_store_id: The ID of the vector store to modify

404

**kwargs: Parameters to update

405

406

Returns:

407

Updated VectorStore object

408

"""

409

410

def delete_vector_store(self, vector_store_id: str) -> VectorStoreDeleted:

411

"""

412

Delete a vector store.

413

414

Args:

415

vector_store_id: The ID of the vector store to delete

416

417

Returns:

418

VectorStoreDeleted confirmation object

419

"""

420

421

def upload_files_to_vector_store(

422

self, vector_store_id: str, files: list[BinaryIO]

423

) -> VectorStoreFileBatch:

424

"""

425

Upload files to a vector store and poll until completion.

426

427

Args:

428

vector_store_id: The ID of the vector store the files are to be uploaded to

429

files: A list of binary files to upload

430

431

Returns:

432

VectorStoreFileBatch object with batch details

433

"""

434

435

def get_vector_store_files(self, vector_store_id: str) -> list[VectorStoreFile]:

436

"""

437

Return a list of vector store files.

438

439

Args:

440

vector_store_id: The ID of the vector store

441

442

Returns:

443

List of VectorStoreFile objects

444

"""

445

446

def delete_vector_store_file(self, vector_store_id: str, file_id: str) -> VectorStoreFileDeleted:

447

"""

448

Delete a vector store file.

449

450

Args:

451

vector_store_id: The ID of the vector store that the file belongs to

452

file_id: The ID of the file to delete

453

454

Returns:

455

VectorStoreFileDeleted confirmation object

456

"""

457

```

458

459

### Batch Processing

460

461

Handle batch operations for large-scale processing with proper monitoring and timeout handling.

462

463

```python { .api }

464

def create_batch(

465

self,

466

file_id: str,

467

endpoint: Literal["/v1/chat/completions", "/v1/embeddings", "/v1/completions"],

468

metadata: dict[str, str] | None = None,

469

completion_window: Literal["24h"] = "24h",

470

) -> Batch:

471

"""

472

Create a batch for a given model and files.

473

474

Args:

475

file_id: The ID of the file to be used for this batch

476

endpoint: The endpoint to use for this batch

477

metadata: A set of key-value pairs that can be attached to an object

478

completion_window: The time window for the batch to complete

479

480

Returns:

481

Batch object

482

"""

483

484

def get_batch(self, batch_id: str) -> Batch:

485

"""

486

Get the status of a batch.

487

488

Args:

489

batch_id: The ID of the batch to get the status of

490

491

Returns:

492

Batch object with current status

493

"""

494

495

def wait_for_batch(self, batch_id: str, wait_seconds: float = 3, timeout: float = 3600) -> None:

496

"""

497

Poll a batch to check if it finishes.

498

499

Args:

500

batch_id: Id of the Batch to wait for

501

wait_seconds: Number of seconds between checks

502

timeout: How many seconds wait for batch to be ready

503

504

Raises:

505

OpenAIBatchTimeout: If batch doesn't complete within timeout

506

OpenAIBatchJobException: If batch fails or is cancelled

507

"""

508

509

def cancel_batch(self, batch_id: str) -> Batch:

510

"""

511

Cancel a batch.

512

513

Args:

514

batch_id: The ID of the batch to delete

515

516

Returns:

517

Cancelled Batch object

518

"""

519

```

520

521

## Usage Examples

522

523

### Basic Hook Usage

524

525

```python

526

from airflow.providers.openai.hooks.openai import OpenAIHook

527

528

# Initialize hook with connection

529

hook = OpenAIHook(conn_id='openai_default')

530

531

# Test connection

532

success, message = hook.test_connection()

533

if success:

534

print(f"Connection successful: {message}")

535

else:

536

print(f"Connection failed: {message}")

537

```

538

539

### Chat Completion Example

540

541

```python

542

# Create a chat completion

543

messages = [

544

{"role": "system", "content": "You are a helpful assistant."},

545

{"role": "user", "content": "What is Apache Airflow?"}

546

]

547

548

response = hook.create_chat_completion(

549

messages=messages,

550

model="gpt-3.5-turbo",

551

max_tokens=150,

552

temperature=0.7

553

)

554

555

for choice in response:

556

print(choice.message.content)

557

```

558

559

### Embedding Generation Example

560

561

```python

562

# Generate embeddings for text

563

texts = [

564

"Apache Airflow is a platform for workflow orchestration",

565

"OpenAI provides AI models and services",

566

"Data pipelines help process information"

567

]

568

569

embeddings = hook.create_embeddings(

570

text=texts,

571

model="text-embedding-ada-002"

572

)

573

574

print(f"Generated {len(embeddings)} embedding dimensions")

575

```

576

577

### Batch Processing Example

578

579

```python

580

# Upload a batch file

581

file_obj = hook.upload_file(

582

file="/path/to/batch_requests.jsonl",

583

purpose="batch"

584

)

585

586

# Create and monitor batch

587

batch = hook.create_batch(

588

file_id=file_obj.id,

589

endpoint="/v1/chat/completions"

590

)

591

592

# Wait for completion

593

hook.wait_for_batch(batch.id, wait_seconds=10, timeout=7200)

594

595

# Get final batch status

596

final_batch = hook.get_batch(batch.id)

597

print(f"Batch completed with status: {final_batch.status}")

598

```

599

600

## Types

601

602

### BatchStatus Enum

603

604

Enum for representing the status values of OpenAI batch operations.

605

606

```python { .api }

607

from enum import Enum

608

609

class BatchStatus(str, Enum):

610

"""Enum for the status of a batch."""

611

612

VALIDATING = "validating"

613

FAILED = "failed"

614

IN_PROGRESS = "in_progress"

615

FINALIZING = "finalizing"

616

COMPLETED = "completed"

617

EXPIRED = "expired"

618

CANCELLING = "cancelling"

619

CANCELLED = "cancelled"

620

621

def __str__(self) -> str:

622

"""Return string representation of the status."""

623

624

@classmethod

625

def is_in_progress(cls, status: str) -> bool:

626

"""

627

Check if the batch status indicates the batch is still in progress.

628

629

Args:

630

status: The batch status string to check

631

632

Returns:

633

True if status is validating, in_progress, or finalizing

634

"""

635

```

636

637

### Connection Property

638

639

The OpenAIHook provides a cached property for efficient connection reuse.

640

641

```python { .api }

642

@cached_property

643

def conn(self) -> OpenAI:

644

"""

645

Return a cached OpenAI connection object.

646

647

This property provides efficient access to the OpenAI client by caching

648

the connection after first access. Subsequent calls return the same

649

connection instance without re-authentication.

650

651

Returns:

652

OpenAI client instance configured with connection settings

653

"""

654

```