or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

docker-api.mddocker-decorators.mddocker-operations.mddocker-swarm.mderror-handling.mdindex.md

error-handling.mddocs/

0

# Error Management

1

2

Custom exception classes for Docker container execution failures, providing detailed error information and logs for debugging containerized tasks. The provider includes specialized exceptions for different failure scenarios with comprehensive logging support.

3

4

## Capabilities

5

6

### DockerContainerFailedException

7

8

Raised when a Docker container returns an error during execution.

9

10

```python { .api }

11

class DockerContainerFailedException(AirflowException):

12

"""

13

Exception raised when a Docker container fails during execution.

14

15

Provides access to container logs and failure details for debugging.

16

"""

17

18

def __init__(

19

self,

20

message: str | None = None,

21

logs: list[str | bytes] | None = None

22

) -> None:

23

"""

24

Initialize container failure exception.

25

26

Args:

27

message: Error description

28

logs: Container log output from failed execution

29

"""

30

```

31

32

**Attributes:**

33

- `logs: list[str | bytes] | None` - Container logs from the failed execution

34

35

### DockerContainerFailedSkipException

36

37

Raised when a Docker container returns an error and the task should be skipped.

38

39

```python { .api }

40

class DockerContainerFailedSkipException(AirflowSkipException):

41

"""

42

Exception raised when a Docker container fails and task should be skipped.

43

44

Used with skip_on_exit_code parameter to treat specific exit codes as skipped tasks.

45

"""

46

47

def __init__(

48

self,

49

message: str | None = None,

50

logs: list[str | bytes] | None = None

51

) -> None:

52

"""

53

Initialize container skip exception.

54

55

Args:

56

message: Skip reason description

57

logs: Container log output from failed execution

58

"""

59

```

60

61

**Attributes:**

62

- `logs: list[str | bytes] | None` - Container logs from the failed execution

63

64

## Usage Examples

65

66

### Basic Exception Handling

67

68

```python

69

from airflow.providers.docker.operators.docker import DockerOperator

70

from airflow.providers.docker.exceptions import (

71

DockerContainerFailedException,

72

DockerContainerFailedSkipException

73

)

74

75

def handle_docker_task():

76

"""Handle Docker task with error management."""

77

try:

78

task = DockerOperator(

79

task_id='risky_operation',

80

image='ubuntu:20.04',

81

command=['./risky_script.sh']

82

)

83

result = task.execute({})

84

return result

85

86

except DockerContainerFailedException as e:

87

print(f"Container failed: {e}")

88

if e.logs:

89

print("Container logs:")

90

for log_line in e.logs:

91

print(log_line.decode() if isinstance(log_line, bytes) else log_line)

92

raise

93

94

except DockerContainerFailedSkipException as e:

95

print(f"Container skipped: {e}")

96

# Task will be marked as skipped

97

raise

98

```

99

100

### Skip on Specific Exit Codes

101

102

```python

103

# Configure task to skip on specific exit codes

104

conditional_task = DockerOperator(

105

task_id='conditional_processing',

106

image='myapp:latest',

107

command=['./check_and_process.sh'],

108

skip_on_exit_code=[1, 2, 3], # Skip if exit code is 1, 2, or 3

109

auto_remove='success'

110

)

111

112

# The operator will raise DockerContainerFailedSkipException

113

# for these exit codes instead of DockerContainerFailedException

114

```

115

116

### Custom Error Handling in Tasks

117

118

```python

119

from airflow import DAG

120

from airflow.providers.docker.operators.docker import DockerOperator

121

from airflow.providers.docker.exceptions import DockerContainerFailedException

122

123

def error_callback(context):

124

"""Custom error callback for Docker task failures."""

125

task_instance = context['task_instance']

126

exception = context.get('exception')

127

128

if isinstance(exception, DockerContainerFailedException):

129

print(f"Docker container failed in task {task_instance.task_id}")

130

131

if exception.logs:

132

# Send logs to monitoring system

133

send_to_monitoring({

134

'task_id': task_instance.task_id,

135

'error': str(exception),

136

'logs': [log.decode() if isinstance(log, bytes) else log

137

for log in exception.logs]

138

})

139

140

dag = DAG(

141

'docker_with_error_handling',

142

start_date=datetime(2024, 1, 1),

143

on_failure_callback=error_callback

144

)

145

146

risky_task = DockerOperator(

147

task_id='data_processing',

148

image='data-processor:v1.0',

149

command=['python', '/app/process.py'],

150

dag=dag

151

)

152

```

153

154

### Logging Container Output

155

156

```python

157

import logging

158

159

def analyze_container_failure(logs: list[str | bytes] | None):

160

"""Analyze container logs for failure patterns."""

161

if not logs:

162

return "No logs available"

163

164

error_patterns = []

165

for log_line in logs:

166

line = log_line.decode() if isinstance(log_line, bytes) else log_line

167

168

if 'ERROR' in line:

169

error_patterns.append(f"Error found: {line.strip()}")

170

elif 'FATAL' in line:

171

error_patterns.append(f"Fatal error: {line.strip()}")

172

elif 'Exception' in line:

173

error_patterns.append(f"Exception: {line.strip()}")

174

175

return error_patterns

176

177

# Use in exception handler

178

try:

179

docker_task.execute({})

180

except DockerContainerFailedException as e:

181

error_analysis = analyze_container_failure(e.logs)

182

logging.error(f"Container analysis: {error_analysis}")

183

```

184

185

### Retry Logic with Error Analysis

186

187

```python

188

from airflow.utils.decorators import apply_defaults

189

from airflow.providers.docker.operators.docker import DockerOperator

190

191

class RetryableDockerOperator(DockerOperator):

192

"""Docker operator with intelligent retry logic."""

193

194

@apply_defaults

195

def __init__(self, max_analysis_retries=2, **kwargs):

196

super().__init__(**kwargs)

197

self.max_analysis_retries = max_analysis_retries

198

self.retry_count = 0

199

200

def execute(self, context):

201

while self.retry_count <= self.max_analysis_retries:

202

try:

203

return super().execute(context)

204

205

except DockerContainerFailedException as e:

206

self.retry_count += 1

207

208

# Analyze failure

209

if e.logs and self.should_retry(e.logs):

210

if self.retry_count <= self.max_analysis_retries:

211

self.log.warning(

212

f"Retrying container execution (attempt {self.retry_count})"

213

)

214

continue

215

216

# Re-raise if not retryable or max retries exceeded

217

raise

218

219

def should_retry(self, logs: list[str | bytes]) -> bool:

220

"""Determine if failure is retryable based on logs."""

221

retryable_patterns = [

222

'connection timeout',

223

'network unreachable',

224

'temporary failure',

225

'resource temporarily unavailable'

226

]

227

228

log_text = ' '.join([

229

log.decode() if isinstance(log, bytes) else log

230

for log in logs

231

]).lower()

232

233

return any(pattern in log_text for pattern in retryable_patterns)

234

235

# Usage

236

resilient_task = RetryableDockerOperator(

237

task_id='resilient_processing',

238

image='processor:latest',

239

command=['python', '/app/process.py'],

240

max_analysis_retries=3

241

)

242

```

243

244

### Integration with Monitoring

245

246

```python

247

import json

248

from airflow.providers.docker.exceptions import DockerContainerFailedException

249

250

def send_docker_metrics(task_id: str, exception: DockerContainerFailedException):

251

"""Send Docker failure metrics to monitoring system."""

252

253

metrics = {

254

'task_id': task_id,

255

'timestamp': datetime.utcnow().isoformat(),

256

'error_message': str(exception),

257

'log_count': len(exception.logs) if exception.logs else 0

258

}

259

260

# Extract error types from logs

261

if exception.logs:

262

error_types = []

263

for log in exception.logs:

264

line = log.decode() if isinstance(log, bytes) else log

265

if 'ERROR' in line:

266

error_types.append('error')

267

elif 'WARNING' in line:

268

error_types.append('warning')

269

270

metrics['error_types'] = list(set(error_types))

271

272

# Send to monitoring (example)

273

# monitoring_client.send_metrics('docker_failures', metrics)

274

print(f"Docker failure metrics: {json.dumps(metrics, indent=2)}")

275

276

# In task failure callback

277

def docker_failure_callback(context):

278

exception = context.get('exception')

279

if isinstance(exception, DockerContainerFailedException):

280

send_docker_metrics(context['task_instance'].task_id, exception)

281

```

282

283

### Log Streaming with Error Handling

284

285

```python

286

def stream_container_logs(container_id: str, hook: DockerHook):

287

"""Stream container logs with error handling."""

288

try:

289

client = hook.get_conn()

290

291

# Stream logs in real-time

292

for log_line in client.logs(container_id, stream=True, follow=True):

293

decoded_line = log_line.decode('utf-8').strip()

294

print(f"Container log: {decoded_line}")

295

296

# Check for error patterns

297

if 'FATAL' in decoded_line:

298

logging.error(f"Fatal error detected: {decoded_line}")

299

elif 'ERROR' in decoded_line:

300

logging.warning(f"Error detected: {decoded_line}")

301

302

except Exception as e:

303

logging.error(f"Failed to stream logs: {e}")

304

```

305

306

## Error Categories

307

308

### Container Execution Errors

309

- **Exit code failures**: Non-zero exit codes from container processes

310

- **Signal termination**: Containers killed by signals (SIGTERM, SIGKILL)

311

- **Resource exhaustion**: Out of memory, disk space, or CPU limits

312

- **Permission errors**: Insufficient privileges for container operations

313

314

### Docker Daemon Errors

315

- **Connection failures**: Cannot connect to Docker daemon

316

- **Image pull failures**: Unable to pull specified Docker images

317

- **Network errors**: Container networking configuration issues

318

- **Volume mount errors**: Failed to mount volumes or bind mounts

319

320

### Configuration Errors

321

- **Invalid parameters**: Malformed operator configuration

322

- **Missing dependencies**: Required Docker images or volumes not available

323

- **Authentication failures**: Docker registry authentication issues

324

- **TLS/Security errors**: Certificate or security configuration problems

325

326

## Best Practices

327

328

### Error Handling Strategy

329

330

1. **Specific Exception Handling**: Catch specific Docker exceptions rather than generic ones

331

2. **Log Analysis**: Analyze container logs to understand failure root causes

332

3. **Retry Logic**: Implement intelligent retry for transient failures

333

4. **Monitoring Integration**: Send failure metrics to monitoring systems

334

5. **Graceful Degradation**: Provide fallback behavior for non-critical failures

335

336

### Debugging Tips

337

338

1. **Enable Verbose Logging**: Set `tty=True` to see container logs

339

2. **Preserve Containers**: Use `auto_remove='never'` for debugging

340

3. **Check Exit Codes**: Use `skip_on_exit_code` for expected failure scenarios

341

4. **Volume Inspection**: Mount debug volumes to inspect container state

342

5. **Network Debugging**: Use `network_mode='host'` for network troubleshooting