0
# Error Management
1
2
Custom exception classes for Docker container execution failures, providing detailed error information and logs for debugging containerized tasks. The provider includes specialized exceptions for different failure scenarios with comprehensive logging support.
3
4
## Capabilities
5
6
### DockerContainerFailedException
7
8
Raised when a Docker container returns an error during execution.
9
10
```python { .api }
11
class DockerContainerFailedException(AirflowException):
12
"""
13
Exception raised when a Docker container fails during execution.
14
15
Provides access to container logs and failure details for debugging.
16
"""
17
18
def __init__(
19
self,
20
message: str | None = None,
21
logs: list[str | bytes] | None = None
22
) -> None:
23
"""
24
Initialize container failure exception.
25
26
Args:
27
message: Error description
28
logs: Container log output from failed execution
29
"""
30
```
31
32
**Attributes:**
33
- `logs: list[str | bytes] | None` - Container logs from the failed execution
34
35
### DockerContainerFailedSkipException
36
37
Raised when a Docker container returns an error and the task should be skipped.
38
39
```python { .api }
40
class DockerContainerFailedSkipException(AirflowSkipException):
41
"""
42
Exception raised when a Docker container fails and task should be skipped.
43
44
Used with skip_on_exit_code parameter to treat specific exit codes as skipped tasks.
45
"""
46
47
def __init__(
48
self,
49
message: str | None = None,
50
logs: list[str | bytes] | None = None
51
) -> None:
52
"""
53
Initialize container skip exception.
54
55
Args:
56
message: Skip reason description
57
logs: Container log output from failed execution
58
"""
59
```
60
61
**Attributes:**
62
- `logs: list[str | bytes] | None` - Container logs from the failed execution
63
64
## Usage Examples
65
66
### Basic Exception Handling
67
68
```python
69
from airflow.providers.docker.operators.docker import DockerOperator
70
from airflow.providers.docker.exceptions import (
71
DockerContainerFailedException,
72
DockerContainerFailedSkipException
73
)
74
75
def handle_docker_task():
76
"""Handle Docker task with error management."""
77
try:
78
task = DockerOperator(
79
task_id='risky_operation',
80
image='ubuntu:20.04',
81
command=['./risky_script.sh']
82
)
83
result = task.execute({})
84
return result
85
86
except DockerContainerFailedException as e:
87
print(f"Container failed: {e}")
88
if e.logs:
89
print("Container logs:")
90
for log_line in e.logs:
91
print(log_line.decode() if isinstance(log_line, bytes) else log_line)
92
raise
93
94
except DockerContainerFailedSkipException as e:
95
print(f"Container skipped: {e}")
96
# Task will be marked as skipped
97
raise
98
```
99
100
### Skip on Specific Exit Codes
101
102
```python
103
# Configure task to skip on specific exit codes
104
conditional_task = DockerOperator(
105
task_id='conditional_processing',
106
image='myapp:latest',
107
command=['./check_and_process.sh'],
108
skip_on_exit_code=[1, 2, 3], # Skip if exit code is 1, 2, or 3
109
auto_remove='success'
110
)
111
112
# The operator will raise DockerContainerFailedSkipException
113
# for these exit codes instead of DockerContainerFailedException
114
```
115
116
### Custom Error Handling in Tasks
117
118
```python
119
from airflow import DAG
120
from airflow.providers.docker.operators.docker import DockerOperator
121
from airflow.providers.docker.exceptions import DockerContainerFailedException
122
123
def error_callback(context):
124
"""Custom error callback for Docker task failures."""
125
task_instance = context['task_instance']
126
exception = context.get('exception')
127
128
if isinstance(exception, DockerContainerFailedException):
129
print(f"Docker container failed in task {task_instance.task_id}")
130
131
if exception.logs:
132
# Send logs to monitoring system
133
send_to_monitoring({
134
'task_id': task_instance.task_id,
135
'error': str(exception),
136
'logs': [log.decode() if isinstance(log, bytes) else log
137
for log in exception.logs]
138
})
139
140
dag = DAG(
141
'docker_with_error_handling',
142
start_date=datetime(2024, 1, 1),
143
on_failure_callback=error_callback
144
)
145
146
risky_task = DockerOperator(
147
task_id='data_processing',
148
image='data-processor:v1.0',
149
command=['python', '/app/process.py'],
150
dag=dag
151
)
152
```
153
154
### Logging Container Output
155
156
```python
157
import logging
158
159
def analyze_container_failure(logs: list[str | bytes] | None):
160
"""Analyze container logs for failure patterns."""
161
if not logs:
162
return "No logs available"
163
164
error_patterns = []
165
for log_line in logs:
166
line = log_line.decode() if isinstance(log_line, bytes) else log_line
167
168
if 'ERROR' in line:
169
error_patterns.append(f"Error found: {line.strip()}")
170
elif 'FATAL' in line:
171
error_patterns.append(f"Fatal error: {line.strip()}")
172
elif 'Exception' in line:
173
error_patterns.append(f"Exception: {line.strip()}")
174
175
return error_patterns
176
177
# Use in exception handler
178
try:
179
docker_task.execute({})
180
except DockerContainerFailedException as e:
181
error_analysis = analyze_container_failure(e.logs)
182
logging.error(f"Container analysis: {error_analysis}")
183
```
184
185
### Retry Logic with Error Analysis
186
187
```python
188
from airflow.utils.decorators import apply_defaults
189
from airflow.providers.docker.operators.docker import DockerOperator
190
191
class RetryableDockerOperator(DockerOperator):
192
"""Docker operator with intelligent retry logic."""
193
194
@apply_defaults
195
def __init__(self, max_analysis_retries=2, **kwargs):
196
super().__init__(**kwargs)
197
self.max_analysis_retries = max_analysis_retries
198
self.retry_count = 0
199
200
def execute(self, context):
201
while self.retry_count <= self.max_analysis_retries:
202
try:
203
return super().execute(context)
204
205
except DockerContainerFailedException as e:
206
self.retry_count += 1
207
208
# Analyze failure
209
if e.logs and self.should_retry(e.logs):
210
if self.retry_count <= self.max_analysis_retries:
211
self.log.warning(
212
f"Retrying container execution (attempt {self.retry_count})"
213
)
214
continue
215
216
# Re-raise if not retryable or max retries exceeded
217
raise
218
219
def should_retry(self, logs: list[str | bytes]) -> bool:
220
"""Determine if failure is retryable based on logs."""
221
retryable_patterns = [
222
'connection timeout',
223
'network unreachable',
224
'temporary failure',
225
'resource temporarily unavailable'
226
]
227
228
log_text = ' '.join([
229
log.decode() if isinstance(log, bytes) else log
230
for log in logs
231
]).lower()
232
233
return any(pattern in log_text for pattern in retryable_patterns)
234
235
# Usage
236
resilient_task = RetryableDockerOperator(
237
task_id='resilient_processing',
238
image='processor:latest',
239
command=['python', '/app/process.py'],
240
max_analysis_retries=3
241
)
242
```
243
244
### Integration with Monitoring
245
246
```python
247
import json
248
from airflow.providers.docker.exceptions import DockerContainerFailedException
249
250
def send_docker_metrics(task_id: str, exception: DockerContainerFailedException):
251
"""Send Docker failure metrics to monitoring system."""
252
253
metrics = {
254
'task_id': task_id,
255
'timestamp': datetime.utcnow().isoformat(),
256
'error_message': str(exception),
257
'log_count': len(exception.logs) if exception.logs else 0
258
}
259
260
# Extract error types from logs
261
if exception.logs:
262
error_types = []
263
for log in exception.logs:
264
line = log.decode() if isinstance(log, bytes) else log
265
if 'ERROR' in line:
266
error_types.append('error')
267
elif 'WARNING' in line:
268
error_types.append('warning')
269
270
metrics['error_types'] = list(set(error_types))
271
272
# Send to monitoring (example)
273
# monitoring_client.send_metrics('docker_failures', metrics)
274
print(f"Docker failure metrics: {json.dumps(metrics, indent=2)}")
275
276
# In task failure callback
277
def docker_failure_callback(context):
278
exception = context.get('exception')
279
if isinstance(exception, DockerContainerFailedException):
280
send_docker_metrics(context['task_instance'].task_id, exception)
281
```
282
283
### Log Streaming with Error Handling
284
285
```python
286
def stream_container_logs(container_id: str, hook: DockerHook):
287
"""Stream container logs with error handling."""
288
try:
289
client = hook.get_conn()
290
291
# Stream logs in real-time
292
for log_line in client.logs(container_id, stream=True, follow=True):
293
decoded_line = log_line.decode('utf-8').strip()
294
print(f"Container log: {decoded_line}")
295
296
# Check for error patterns
297
if 'FATAL' in decoded_line:
298
logging.error(f"Fatal error detected: {decoded_line}")
299
elif 'ERROR' in decoded_line:
300
logging.warning(f"Error detected: {decoded_line}")
301
302
except Exception as e:
303
logging.error(f"Failed to stream logs: {e}")
304
```
305
306
## Error Categories
307
308
### Container Execution Errors
309
- **Exit code failures**: Non-zero exit codes from container processes
310
- **Signal termination**: Containers killed by signals (SIGTERM, SIGKILL)
311
- **Resource exhaustion**: Out of memory, disk space, or CPU limits
312
- **Permission errors**: Insufficient privileges for container operations
313
314
### Docker Daemon Errors
315
- **Connection failures**: Cannot connect to Docker daemon
316
- **Image pull failures**: Unable to pull specified Docker images
317
- **Network errors**: Container networking configuration issues
318
- **Volume mount errors**: Failed to mount volumes or bind mounts
319
320
### Configuration Errors
321
- **Invalid parameters**: Malformed operator configuration
322
- **Missing dependencies**: Required Docker images or volumes not available
323
- **Authentication failures**: Docker registry authentication issues
324
- **TLS/Security errors**: Certificate or security configuration problems
325
326
## Best Practices
327
328
### Error Handling Strategy
329
330
1. **Specific Exception Handling**: Catch specific Docker exceptions rather than generic ones
331
2. **Log Analysis**: Analyze container logs to understand failure root causes
332
3. **Retry Logic**: Implement intelligent retry for transient failures
333
4. **Monitoring Integration**: Send failure metrics to monitoring systems
334
5. **Graceful Degradation**: Provide fallback behavior for non-critical failures
335
336
### Debugging Tips
337
338
1. **Enable Verbose Logging**: Set `tty=True` to see container logs
339
2. **Preserve Containers**: Use `auto_remove='never'` for debugging
340
3. **Check Exit Codes**: Use `skip_on_exit_code` for expected failure scenarios
341
4. **Volume Inspection**: Mount debug volumes to inspect container state
342
5. **Network Debugging**: Use `network_mode='host'` for network troubleshooting