0
# Exceptions
1
2
Specialized exception classes for handling OpenAI-specific error conditions and batch processing failures within Airflow workflows.
3
4
## Capabilities
5
6
### Batch Job Exception
7
8
Exception raised when OpenAI Batch API operations fail after processing has begun.
9
10
```python { .api }
11
class OpenAIBatchJobException(AirflowException):
12
"""
13
Raise when OpenAI Batch Job fails to start AFTER processing the request.
14
15
This exception is raised when:
16
- Batch processing fails during execution
17
- Batch is cancelled unexpectedly
18
- Batch expires before completion
19
- Batch encounters an unexpected terminal status
20
21
Inherits from AirflowException for proper integration with Airflow's
22
error handling and retry mechanisms.
23
"""
24
```
25
26
### Batch Timeout Exception
27
28
Exception raised when OpenAI Batch API operations exceed specified timeout limits.
29
30
```python { .api }
31
class OpenAIBatchTimeout(AirflowException):
32
"""
33
Raise when OpenAI Batch Job times out.
34
35
This exception is raised when:
36
- Batch processing exceeds the specified timeout duration
37
- Polling operations reach their time limit
38
- Long-running batch operations don't complete within expected timeframes
39
40
Inherits from AirflowException for proper integration with Airflow's
41
error handling and retry mechanisms.
42
"""
43
```
44
45
## Usage Examples
46
47
### Basic Exception Handling
48
49
```python
50
from airflow.providers.openai.hooks.openai import OpenAIHook
51
from airflow.providers.openai.exceptions import OpenAIBatchJobException, OpenAIBatchTimeout
52
53
def process_batch_with_error_handling(**context):
54
"""Process a batch with comprehensive error handling."""
55
hook = OpenAIHook(conn_id='openai_default')
56
57
try:
58
# Create and monitor batch
59
batch = hook.create_batch(
60
file_id=context['params']['file_id'],
61
endpoint="/v1/chat/completions"
62
)
63
64
# Wait for completion with timeout
65
hook.wait_for_batch(
66
batch_id=batch.id,
67
wait_seconds=10,
68
timeout=3600 # 1 hour timeout
69
)
70
71
return batch.id
72
73
except OpenAIBatchTimeout as e:
74
print(f"Batch processing timed out: {e}")
75
# Log timeout details
76
context['task_instance'].log.error(f"Batch timeout after 1 hour: {e}")
77
# Optionally cancel the batch
78
try:
79
hook.cancel_batch(batch.id)
80
print(f"Cancelled batch {batch.id} due to timeout")
81
except Exception as cancel_error:
82
print(f"Failed to cancel batch: {cancel_error}")
83
raise
84
85
except OpenAIBatchJobException as e:
86
print(f"Batch processing failed: {e}")
87
# Log failure details
88
context['task_instance'].log.error(f"Batch job failure: {e}")
89
# Get final batch status for debugging
90
try:
91
final_batch = hook.get_batch(batch.id)
92
print(f"Final batch status: {final_batch.status}")
93
except Exception as status_error:
94
print(f"Could not retrieve final batch status: {status_error}")
95
raise
96
97
except Exception as e:
98
print(f"Unexpected error during batch processing: {e}")
99
context['task_instance'].log.error(f"Unexpected batch error: {e}")
100
raise
101
```
102
103
### Operator Exception Handling
104
105
```python
106
from airflow import DAG
107
from airflow.operators.python_operator import PythonOperator
108
from airflow.providers.openai.operators.openai import OpenAITriggerBatchOperator
109
from airflow.providers.openai.exceptions import OpenAIBatchJobException, OpenAIBatchTimeout
110
111
dag = DAG(
112
'batch_with_exception_handling',
113
start_date=datetime(2024, 1, 1),
114
schedule_interval=None,
115
catchup=False
116
)
117
118
def handle_batch_failure(**context):
119
"""Handle batch failure scenarios."""
120
# Get the exception from the failed task
121
failed_task_instance = context['dag_run'].get_task_instance('batch_processing_task')
122
123
if failed_task_instance and failed_task_instance.state == 'failed':
124
print("Batch processing task failed, implementing recovery logic")
125
126
# Could implement retry logic, notification, or cleanup here
127
# For example, notify stakeholders or trigger alternative processing
128
129
# Return a value to indicate handling was successful
130
return "failure_handled"
131
132
# Main batch processing task
133
batch_task = OpenAITriggerBatchOperator(
134
task_id='batch_processing_task',
135
file_id='file-abc123',
136
endpoint='/v1/chat/completions',
137
conn_id='openai_default',
138
timeout=1800, # 30 minutes
139
dag=dag
140
)
141
142
# Failure handling task
143
failure_handler = PythonOperator(
144
task_id='handle_failure',
145
python_callable=handle_batch_failure,
146
trigger_rule='one_failed', # Run when upstream task fails
147
dag=dag
148
)
149
150
batch_task >> failure_handler
151
```
152
153
### Custom Exception Handling with Retries
154
155
```python
156
from airflow.utils.decorators import apply_defaults
157
from airflow.operators.python_operator import PythonOperator
158
from airflow.providers.openai.hooks.openai import OpenAIHook
159
from airflow.providers.openai.exceptions import OpenAIBatchJobException, OpenAIBatchTimeout
160
161
class RetryableBatchOperator(PythonOperator):
162
"""Custom operator with intelligent retry logic for batch operations."""
163
164
@apply_defaults
165
def __init__(
166
self,
167
file_id: str,
168
endpoint: str,
169
conn_id: str = 'openai_default',
170
max_batch_retries: int = 3,
171
retry_backoff: int = 60,
172
**kwargs
173
):
174
self.file_id = file_id
175
self.endpoint = endpoint
176
self.conn_id = conn_id
177
self.max_batch_retries = max_batch_retries
178
self.retry_backoff = retry_backoff
179
super().__init__(python_callable=self._execute_with_retry, **kwargs)
180
181
def _execute_with_retry(self, **context):
182
"""Execute batch with custom retry logic."""
183
hook = OpenAIHook(conn_id=self.conn_id)
184
185
for attempt in range(self.max_batch_retries + 1):
186
try:
187
# Create batch
188
batch = hook.create_batch(
189
file_id=self.file_id,
190
endpoint=self.endpoint
191
)
192
193
# Wait for completion
194
hook.wait_for_batch(batch.id, timeout=3600)
195
196
return batch.id
197
198
except OpenAIBatchTimeout as e:
199
if attempt < self.max_batch_retries:
200
print(f"Batch timeout on attempt {attempt + 1}, retrying in {self.retry_backoff} seconds")
201
# Cancel the timed-out batch
202
try:
203
hook.cancel_batch(batch.id)
204
except Exception:
205
pass # Best effort cancellation
206
207
import time
208
time.sleep(self.retry_backoff * (attempt + 1)) # Exponential backoff
209
continue
210
else:
211
print(f"Batch failed after {self.max_batch_retries + 1} attempts due to timeout")
212
raise
213
214
except OpenAIBatchJobException as e:
215
if attempt < self.max_batch_retries:
216
# Check if this is a retryable failure
217
if "rate limit" in str(e).lower() or "server error" in str(e).lower():
218
print(f"Retryable batch failure on attempt {attempt + 1}: {e}")
219
import time
220
time.sleep(self.retry_backoff * (attempt + 1))
221
continue
222
else:
223
print(f"Non-retryable batch failure: {e}")
224
raise
225
else:
226
print(f"Batch failed after {self.max_batch_retries + 1} attempts: {e}")
227
raise
228
229
except Exception as e:
230
print(f"Unexpected error on attempt {attempt + 1}: {e}")
231
if attempt >= self.max_batch_retries:
232
raise
233
import time
234
time.sleep(self.retry_backoff)
235
236
# Usage
237
retryable_batch = RetryableBatchOperator(
238
task_id='retryable_batch_processing',
239
file_id='file-xyz789',
240
endpoint='/v1/chat/completions',
241
conn_id='openai_default',
242
max_batch_retries=2,
243
retry_backoff=30,
244
dag=dag
245
)
246
```
247
248
### Trigger Exception Handling
249
250
```python
251
from airflow.providers.openai.triggers.openai import OpenAIBatchTrigger
252
from airflow.providers.openai.exceptions import OpenAIBatchJobException, OpenAIBatchTimeout
253
254
class SafeBatchTrigger(OpenAIBatchTrigger):
255
"""Enhanced batch trigger with comprehensive exception handling."""
256
257
async def run(self):
258
"""Monitor batch with enhanced error handling."""
259
from airflow.triggers.base import TriggerEvent
260
from airflow.providers.openai.hooks.openai import OpenAIHook, BatchStatus
261
262
hook = OpenAIHook(conn_id=self.conn_id)
263
264
try:
265
async for event in super().run():
266
# Intercept error events and add additional context
267
if event.payload.get('status') == 'error':
268
error_msg = event.payload.get('message', 'Unknown error')
269
270
# Enhance error message with debugging information
271
try:
272
batch = hook.get_batch(self.batch_id)
273
enhanced_msg = f"{error_msg} (Final batch status: {batch.status})"
274
275
# Create enhanced event
276
yield TriggerEvent({
277
**event.payload,
278
'message': enhanced_msg,
279
'batch_status': batch.status,
280
'debug_info': {
281
'created_at': getattr(batch, 'created_at', None),
282
'request_counts': getattr(batch, 'request_counts', None),
283
'metadata': getattr(batch, 'metadata', None)
284
}
285
})
286
except Exception as debug_error:
287
# If we can't get batch details, yield original event
288
yield TriggerEvent({
289
**event.payload,
290
'debug_error': str(debug_error)
291
})
292
else:
293
# Pass through non-error events
294
yield event
295
296
except Exception as e:
297
# Handle trigger-level exceptions
298
yield TriggerEvent({
299
'status': 'error',
300
'message': f'Trigger exception: {str(e)}',
301
'batch_id': self.batch_id,
302
'exception_type': type(e).__name__
303
})
304
305
# Usage with enhanced trigger
306
def use_safe_trigger(**context):
307
"""Use the enhanced trigger with better error handling."""
308
import time
309
310
safe_trigger = SafeBatchTrigger(
311
conn_id='openai_default',
312
batch_id=context['params']['batch_id'],
313
poll_interval=60,
314
end_time=time.time() + 7200
315
)
316
317
context['task_instance'].defer(
318
trigger=safe_trigger,
319
method_name='handle_safe_completion'
320
)
321
322
def handle_safe_completion(**context):
323
"""Handle completion with enhanced error information."""
324
event = context['event']
325
326
if event['status'] == 'error':
327
error_msg = event['message']
328
debug_info = event.get('debug_info', {})
329
330
print(f"Enhanced error information: {error_msg}")
331
if debug_info:
332
print(f"Debug details: {debug_info}")
333
334
# Decide whether to retry or fail based on error details
335
if 'timeout' in error_msg.lower():
336
raise OpenAIBatchTimeout(error_msg)
337
else:
338
raise OpenAIBatchJobException(error_msg)
339
340
return event['batch_id']
341
```
342
343
### Exception Monitoring and Alerting
344
345
```python
346
from airflow.operators.email_operator import EmailOperator
347
from airflow.sensors.base import BaseSensorOperator
348
349
def create_exception_monitoring_dag():
350
"""Create a DAG with comprehensive exception monitoring."""
351
352
dag = DAG(
353
'openai_with_monitoring',
354
start_date=datetime(2024, 1, 1),
355
schedule_interval=None,
356
catchup=False
357
)
358
359
def monitored_batch_processing(**context):
360
"""Batch processing with comprehensive monitoring."""
361
from airflow.providers.openai.hooks.openai import OpenAIHook
362
from airflow.providers.openai.exceptions import OpenAIBatchJobException, OpenAIBatchTimeout
363
364
hook = OpenAIHook(conn_id='openai_default')
365
366
try:
367
batch = hook.create_batch(
368
file_id=context['params']['file_id'],
369
endpoint='/v1/chat/completions'
370
)
371
372
# Store batch ID for monitoring
373
context['task_instance'].xcom_push(key='batch_id', value=batch.id)
374
context['task_instance'].xcom_push(key='batch_status', value='started')
375
376
hook.wait_for_batch(batch.id, timeout=1800)
377
378
context['task_instance'].xcom_push(key='batch_status', value='completed')
379
return batch.id
380
381
except OpenAIBatchTimeout as e:
382
context['task_instance'].xcom_push(key='batch_status', value='timeout')
383
context['task_instance'].xcom_push(key='error_message', value=str(e))
384
raise
385
386
except OpenAIBatchJobException as e:
387
context['task_instance'].xcom_push(key='batch_status', value='failed')
388
context['task_instance'].xcom_push(key='error_message', value=str(e))
389
raise
390
391
# Main processing task
392
batch_task = PythonOperator(
393
task_id='monitored_batch_processing',
394
python_callable=monitored_batch_processing,
395
params={'file_id': 'file-monitoring-test'},
396
dag=dag
397
)
398
399
# Alert on timeout
400
timeout_alert = EmailOperator(
401
task_id='timeout_alert',
402
to=['data-team@company.com'],
403
subject='OpenAI Batch Timeout Alert',
404
html_content="""
405
<h3>OpenAI Batch Processing Timeout</h3>
406
<p>Batch ID: {{ task_instance.xcom_pull(task_ids='monitored_batch_processing', key='batch_id') }}</p>
407
<p>Error: {{ task_instance.xcom_pull(task_ids='monitored_batch_processing', key='error_message') }}</p>
408
<p>DAG: {{ dag.dag_id }}</p>
409
<p>Execution Date: {{ ds }}</p>
410
""",
411
trigger_rule='one_failed',
412
dag=dag
413
)
414
415
# Alert on failure
416
failure_alert = EmailOperator(
417
task_id='failure_alert',
418
to=['data-team@company.com'],
419
subject='OpenAI Batch Processing Failure',
420
html_content="""
421
<h3>OpenAI Batch Processing Failed</h3>
422
<p>Batch ID: {{ task_instance.xcom_pull(task_ids='monitored_batch_processing', key='batch_id') }}</p>
423
<p>Status: {{ task_instance.xcom_pull(task_ids='monitored_batch_processing', key='batch_status') }}</p>
424
<p>Error: {{ task_instance.xcom_pull(task_ids='monitored_batch_processing', key='error_message') }}</p>
425
<p>DAG: {{ dag.dag_id }}</p>
426
<p>Execution Date: {{ ds }}</p>
427
""",
428
trigger_rule='one_failed',
429
dag=dag
430
)
431
432
batch_task >> [timeout_alert, failure_alert]
433
434
return dag
435
436
# Create the monitoring DAG
437
monitoring_dag = create_exception_monitoring_dag()
438
```