or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

exceptions.mdhooks.mdindex.mdoperators.mdtriggers.mdversion_compat.md

exceptions.mddocs/

0

# Exceptions

1

2

Specialized exception classes for handling OpenAI-specific error conditions and batch processing failures within Airflow workflows.

3

4

## Capabilities

5

6

### Batch Job Exception

7

8

Exception raised when OpenAI Batch API operations fail after processing has begun.

9

10

```python { .api }

11

class OpenAIBatchJobException(AirflowException):

12

"""

13

Raise when OpenAI Batch Job fails to start AFTER processing the request.

14

15

This exception is raised when:

16

- Batch processing fails during execution

17

- Batch is cancelled unexpectedly

18

- Batch expires before completion

19

- Batch encounters an unexpected terminal status

20

21

Inherits from AirflowException for proper integration with Airflow's

22

error handling and retry mechanisms.

23

"""

24

```

25

26

### Batch Timeout Exception

27

28

Exception raised when OpenAI Batch API operations exceed specified timeout limits.

29

30

```python { .api }

31

class OpenAIBatchTimeout(AirflowException):

32

"""

33

Raise when OpenAI Batch Job times out.

34

35

This exception is raised when:

36

- Batch processing exceeds the specified timeout duration

37

- Polling operations reach their time limit

38

- Long-running batch operations don't complete within expected timeframes

39

40

Inherits from AirflowException for proper integration with Airflow's

41

error handling and retry mechanisms.

42

"""

43

```

44

45

## Usage Examples

46

47

### Basic Exception Handling

48

49

```python

50

from airflow.providers.openai.hooks.openai import OpenAIHook

51

from airflow.providers.openai.exceptions import OpenAIBatchJobException, OpenAIBatchTimeout

52

53

def process_batch_with_error_handling(**context):

54

"""Process a batch with comprehensive error handling."""

55

hook = OpenAIHook(conn_id='openai_default')

56

57

try:

58

# Create and monitor batch

59

batch = hook.create_batch(

60

file_id=context['params']['file_id'],

61

endpoint="/v1/chat/completions"

62

)

63

64

# Wait for completion with timeout

65

hook.wait_for_batch(

66

batch_id=batch.id,

67

wait_seconds=10,

68

timeout=3600 # 1 hour timeout

69

)

70

71

return batch.id

72

73

except OpenAIBatchTimeout as e:

74

print(f"Batch processing timed out: {e}")

75

# Log timeout details

76

context['task_instance'].log.error(f"Batch timeout after 1 hour: {e}")

77

# Optionally cancel the batch

78

try:

79

hook.cancel_batch(batch.id)

80

print(f"Cancelled batch {batch.id} due to timeout")

81

except Exception as cancel_error:

82

print(f"Failed to cancel batch: {cancel_error}")

83

raise

84

85

except OpenAIBatchJobException as e:

86

print(f"Batch processing failed: {e}")

87

# Log failure details

88

context['task_instance'].log.error(f"Batch job failure: {e}")

89

# Get final batch status for debugging

90

try:

91

final_batch = hook.get_batch(batch.id)

92

print(f"Final batch status: {final_batch.status}")

93

except Exception as status_error:

94

print(f"Could not retrieve final batch status: {status_error}")

95

raise

96

97

except Exception as e:

98

print(f"Unexpected error during batch processing: {e}")

99

context['task_instance'].log.error(f"Unexpected batch error: {e}")

100

raise

101

```

102

103

### Operator Exception Handling

104

105

```python

106

from airflow import DAG

107

from airflow.operators.python_operator import PythonOperator

108

from airflow.providers.openai.operators.openai import OpenAITriggerBatchOperator

109

from airflow.providers.openai.exceptions import OpenAIBatchJobException, OpenAIBatchTimeout

110

111

dag = DAG(

112

'batch_with_exception_handling',

113

start_date=datetime(2024, 1, 1),

114

schedule_interval=None,

115

catchup=False

116

)

117

118

def handle_batch_failure(**context):

119

"""Handle batch failure scenarios."""

120

# Get the exception from the failed task

121

failed_task_instance = context['dag_run'].get_task_instance('batch_processing_task')

122

123

if failed_task_instance and failed_task_instance.state == 'failed':

124

print("Batch processing task failed, implementing recovery logic")

125

126

# Could implement retry logic, notification, or cleanup here

127

# For example, notify stakeholders or trigger alternative processing

128

129

# Return a value to indicate handling was successful

130

return "failure_handled"

131

132

# Main batch processing task

133

batch_task = OpenAITriggerBatchOperator(

134

task_id='batch_processing_task',

135

file_id='file-abc123',

136

endpoint='/v1/chat/completions',

137

conn_id='openai_default',

138

timeout=1800, # 30 minutes

139

dag=dag

140

)

141

142

# Failure handling task

143

failure_handler = PythonOperator(

144

task_id='handle_failure',

145

python_callable=handle_batch_failure,

146

trigger_rule='one_failed', # Run when upstream task fails

147

dag=dag

148

)

149

150

batch_task >> failure_handler

151

```

152

153

### Custom Exception Handling with Retries

154

155

```python

156

from airflow.utils.decorators import apply_defaults

157

from airflow.operators.python_operator import PythonOperator

158

from airflow.providers.openai.hooks.openai import OpenAIHook

159

from airflow.providers.openai.exceptions import OpenAIBatchJobException, OpenAIBatchTimeout

160

161

class RetryableBatchOperator(PythonOperator):

162

"""Custom operator with intelligent retry logic for batch operations."""

163

164

@apply_defaults

165

def __init__(

166

self,

167

file_id: str,

168

endpoint: str,

169

conn_id: str = 'openai_default',

170

max_batch_retries: int = 3,

171

retry_backoff: int = 60,

172

**kwargs

173

):

174

self.file_id = file_id

175

self.endpoint = endpoint

176

self.conn_id = conn_id

177

self.max_batch_retries = max_batch_retries

178

self.retry_backoff = retry_backoff

179

super().__init__(python_callable=self._execute_with_retry, **kwargs)

180

181

def _execute_with_retry(self, **context):

182

"""Execute batch with custom retry logic."""

183

hook = OpenAIHook(conn_id=self.conn_id)

184

185

for attempt in range(self.max_batch_retries + 1):

186

try:

187

# Create batch

188

batch = hook.create_batch(

189

file_id=self.file_id,

190

endpoint=self.endpoint

191

)

192

193

# Wait for completion

194

hook.wait_for_batch(batch.id, timeout=3600)

195

196

return batch.id

197

198

except OpenAIBatchTimeout as e:

199

if attempt < self.max_batch_retries:

200

print(f"Batch timeout on attempt {attempt + 1}, retrying in {self.retry_backoff} seconds")

201

# Cancel the timed-out batch

202

try:

203

hook.cancel_batch(batch.id)

204

except Exception:

205

pass # Best effort cancellation

206

207

import time

208

time.sleep(self.retry_backoff * (attempt + 1)) # Exponential backoff

209

continue

210

else:

211

print(f"Batch failed after {self.max_batch_retries + 1} attempts due to timeout")

212

raise

213

214

except OpenAIBatchJobException as e:

215

if attempt < self.max_batch_retries:

216

# Check if this is a retryable failure

217

if "rate limit" in str(e).lower() or "server error" in str(e).lower():

218

print(f"Retryable batch failure on attempt {attempt + 1}: {e}")

219

import time

220

time.sleep(self.retry_backoff * (attempt + 1))

221

continue

222

else:

223

print(f"Non-retryable batch failure: {e}")

224

raise

225

else:

226

print(f"Batch failed after {self.max_batch_retries + 1} attempts: {e}")

227

raise

228

229

except Exception as e:

230

print(f"Unexpected error on attempt {attempt + 1}: {e}")

231

if attempt >= self.max_batch_retries:

232

raise

233

import time

234

time.sleep(self.retry_backoff)

235

236

# Usage

237

retryable_batch = RetryableBatchOperator(

238

task_id='retryable_batch_processing',

239

file_id='file-xyz789',

240

endpoint='/v1/chat/completions',

241

conn_id='openai_default',

242

max_batch_retries=2,

243

retry_backoff=30,

244

dag=dag

245

)

246

```

247

248

### Trigger Exception Handling

249

250

```python

251

from airflow.providers.openai.triggers.openai import OpenAIBatchTrigger

252

from airflow.providers.openai.exceptions import OpenAIBatchJobException, OpenAIBatchTimeout

253

254

class SafeBatchTrigger(OpenAIBatchTrigger):

255

"""Enhanced batch trigger with comprehensive exception handling."""

256

257

async def run(self):

258

"""Monitor batch with enhanced error handling."""

259

from airflow.triggers.base import TriggerEvent

260

from airflow.providers.openai.hooks.openai import OpenAIHook, BatchStatus

261

262

hook = OpenAIHook(conn_id=self.conn_id)

263

264

try:

265

async for event in super().run():

266

# Intercept error events and add additional context

267

if event.payload.get('status') == 'error':

268

error_msg = event.payload.get('message', 'Unknown error')

269

270

# Enhance error message with debugging information

271

try:

272

batch = hook.get_batch(self.batch_id)

273

enhanced_msg = f"{error_msg} (Final batch status: {batch.status})"

274

275

# Create enhanced event

276

yield TriggerEvent({

277

**event.payload,

278

'message': enhanced_msg,

279

'batch_status': batch.status,

280

'debug_info': {

281

'created_at': getattr(batch, 'created_at', None),

282

'request_counts': getattr(batch, 'request_counts', None),

283

'metadata': getattr(batch, 'metadata', None)

284

}

285

})

286

except Exception as debug_error:

287

# If we can't get batch details, yield original event

288

yield TriggerEvent({

289

**event.payload,

290

'debug_error': str(debug_error)

291

})

292

else:

293

# Pass through non-error events

294

yield event

295

296

except Exception as e:

297

# Handle trigger-level exceptions

298

yield TriggerEvent({

299

'status': 'error',

300

'message': f'Trigger exception: {str(e)}',

301

'batch_id': self.batch_id,

302

'exception_type': type(e).__name__

303

})

304

305

# Usage with enhanced trigger

306

def use_safe_trigger(**context):

307

"""Use the enhanced trigger with better error handling."""

308

import time

309

310

safe_trigger = SafeBatchTrigger(

311

conn_id='openai_default',

312

batch_id=context['params']['batch_id'],

313

poll_interval=60,

314

end_time=time.time() + 7200

315

)

316

317

context['task_instance'].defer(

318

trigger=safe_trigger,

319

method_name='handle_safe_completion'

320

)

321

322

def handle_safe_completion(**context):

323

"""Handle completion with enhanced error information."""

324

event = context['event']

325

326

if event['status'] == 'error':

327

error_msg = event['message']

328

debug_info = event.get('debug_info', {})

329

330

print(f"Enhanced error information: {error_msg}")

331

if debug_info:

332

print(f"Debug details: {debug_info}")

333

334

# Decide whether to retry or fail based on error details

335

if 'timeout' in error_msg.lower():

336

raise OpenAIBatchTimeout(error_msg)

337

else:

338

raise OpenAIBatchJobException(error_msg)

339

340

return event['batch_id']

341

```

342

343

### Exception Monitoring and Alerting

344

345

```python

346

from airflow.operators.email_operator import EmailOperator

347

from airflow.sensors.base import BaseSensorOperator

348

349

def create_exception_monitoring_dag():

350

"""Create a DAG with comprehensive exception monitoring."""

351

352

dag = DAG(

353

'openai_with_monitoring',

354

start_date=datetime(2024, 1, 1),

355

schedule_interval=None,

356

catchup=False

357

)

358

359

def monitored_batch_processing(**context):

360

"""Batch processing with comprehensive monitoring."""

361

from airflow.providers.openai.hooks.openai import OpenAIHook

362

from airflow.providers.openai.exceptions import OpenAIBatchJobException, OpenAIBatchTimeout

363

364

hook = OpenAIHook(conn_id='openai_default')

365

366

try:

367

batch = hook.create_batch(

368

file_id=context['params']['file_id'],

369

endpoint='/v1/chat/completions'

370

)

371

372

# Store batch ID for monitoring

373

context['task_instance'].xcom_push(key='batch_id', value=batch.id)

374

context['task_instance'].xcom_push(key='batch_status', value='started')

375

376

hook.wait_for_batch(batch.id, timeout=1800)

377

378

context['task_instance'].xcom_push(key='batch_status', value='completed')

379

return batch.id

380

381

except OpenAIBatchTimeout as e:

382

context['task_instance'].xcom_push(key='batch_status', value='timeout')

383

context['task_instance'].xcom_push(key='error_message', value=str(e))

384

raise

385

386

except OpenAIBatchJobException as e:

387

context['task_instance'].xcom_push(key='batch_status', value='failed')

388

context['task_instance'].xcom_push(key='error_message', value=str(e))

389

raise

390

391

# Main processing task

392

batch_task = PythonOperator(

393

task_id='monitored_batch_processing',

394

python_callable=monitored_batch_processing,

395

params={'file_id': 'file-monitoring-test'},

396

dag=dag

397

)

398

399

# Alert on timeout

400

timeout_alert = EmailOperator(

401

task_id='timeout_alert',

402

to=['data-team@company.com'],

403

subject='OpenAI Batch Timeout Alert',

404

html_content="""

405

<h3>OpenAI Batch Processing Timeout</h3>

406

<p>Batch ID: {{ task_instance.xcom_pull(task_ids='monitored_batch_processing', key='batch_id') }}</p>

407

<p>Error: {{ task_instance.xcom_pull(task_ids='monitored_batch_processing', key='error_message') }}</p>

408

<p>DAG: {{ dag.dag_id }}</p>

409

<p>Execution Date: {{ ds }}</p>

410

""",

411

trigger_rule='one_failed',

412

dag=dag

413

)

414

415

# Alert on failure

416

failure_alert = EmailOperator(

417

task_id='failure_alert',

418

to=['data-team@company.com'],

419

subject='OpenAI Batch Processing Failure',

420

html_content="""

421

<h3>OpenAI Batch Processing Failed</h3>

422

<p>Batch ID: {{ task_instance.xcom_pull(task_ids='monitored_batch_processing', key='batch_id') }}</p>

423

<p>Status: {{ task_instance.xcom_pull(task_ids='monitored_batch_processing', key='batch_status') }}</p>

424

<p>Error: {{ task_instance.xcom_pull(task_ids='monitored_batch_processing', key='error_message') }}</p>

425

<p>DAG: {{ dag.dag_id }}</p>

426

<p>Execution Date: {{ ds }}</p>

427

""",

428

trigger_rule='one_failed',

429

dag=dag

430

)

431

432

batch_task >> [timeout_alert, failure_alert]

433

434

return dag

435

436

# Create the monitoring DAG

437

monitoring_dag = create_exception_monitoring_dag()

438

```