Docker integration provider for Apache Airflow workflows, enabling containerized task execution and Docker Swarm orchestration.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Custom exception classes for Docker container execution failures, providing detailed error information and logs for debugging containerized tasks. The provider includes specialized exceptions for different failure scenarios with comprehensive logging support.
Raised when a Docker container returns an error during execution.
class DockerContainerFailedException(AirflowException):
"""
Exception raised when a Docker container fails during execution.
Provides access to container logs and failure details for debugging.
"""
def __init__(
self,
message: str | None = None,
logs: list[str | bytes] | None = None
) -> None:
"""
Initialize container failure exception.
Args:
message: Error description
logs: Container log output from failed execution
"""Attributes:
logs: list[str | bytes] | None - Container logs from the failed executionRaised when a Docker container returns an error and the task should be skipped.
class DockerContainerFailedSkipException(AirflowSkipException):
"""
Exception raised when a Docker container fails and task should be skipped.
Used with skip_on_exit_code parameter to treat specific exit codes as skipped tasks.
"""
def __init__(
self,
message: str | None = None,
logs: list[str | bytes] | None = None
) -> None:
"""
Initialize container skip exception.
Args:
message: Skip reason description
logs: Container log output from failed execution
"""Attributes:
logs: list[str | bytes] | None - Container logs from the failed executionfrom airflow.providers.docker.operators.docker import DockerOperator
from airflow.providers.docker.exceptions import (
DockerContainerFailedException,
DockerContainerFailedSkipException
)
def handle_docker_task():
"""Handle Docker task with error management."""
try:
task = DockerOperator(
task_id='risky_operation',
image='ubuntu:20.04',
command=['./risky_script.sh']
)
result = task.execute({})
return result
except DockerContainerFailedException as e:
print(f"Container failed: {e}")
if e.logs:
print("Container logs:")
for log_line in e.logs:
print(log_line.decode() if isinstance(log_line, bytes) else log_line)
raise
except DockerContainerFailedSkipException as e:
print(f"Container skipped: {e}")
# Task will be marked as skipped
raise# Configure task to skip on specific exit codes
conditional_task = DockerOperator(
task_id='conditional_processing',
image='myapp:latest',
command=['./check_and_process.sh'],
skip_on_exit_code=[1, 2, 3], # Skip if exit code is 1, 2, or 3
auto_remove='success'
)
# The operator will raise DockerContainerFailedSkipException
# for these exit codes instead of DockerContainerFailedExceptionfrom airflow import DAG
from airflow.providers.docker.operators.docker import DockerOperator
from airflow.providers.docker.exceptions import DockerContainerFailedException
def error_callback(context):
"""Custom error callback for Docker task failures."""
task_instance = context['task_instance']
exception = context.get('exception')
if isinstance(exception, DockerContainerFailedException):
print(f"Docker container failed in task {task_instance.task_id}")
if exception.logs:
# Send logs to monitoring system
send_to_monitoring({
'task_id': task_instance.task_id,
'error': str(exception),
'logs': [log.decode() if isinstance(log, bytes) else log
for log in exception.logs]
})
dag = DAG(
'docker_with_error_handling',
start_date=datetime(2024, 1, 1),
on_failure_callback=error_callback
)
risky_task = DockerOperator(
task_id='data_processing',
image='data-processor:v1.0',
command=['python', '/app/process.py'],
dag=dag
)import logging
def analyze_container_failure(logs: list[str | bytes] | None):
"""Analyze container logs for failure patterns."""
if not logs:
return "No logs available"
error_patterns = []
for log_line in logs:
line = log_line.decode() if isinstance(log_line, bytes) else log_line
if 'ERROR' in line:
error_patterns.append(f"Error found: {line.strip()}")
elif 'FATAL' in line:
error_patterns.append(f"Fatal error: {line.strip()}")
elif 'Exception' in line:
error_patterns.append(f"Exception: {line.strip()}")
return error_patterns
# Use in exception handler
try:
docker_task.execute({})
except DockerContainerFailedException as e:
error_analysis = analyze_container_failure(e.logs)
logging.error(f"Container analysis: {error_analysis}")from airflow.utils.decorators import apply_defaults
from airflow.providers.docker.operators.docker import DockerOperator
class RetryableDockerOperator(DockerOperator):
"""Docker operator with intelligent retry logic."""
@apply_defaults
def __init__(self, max_analysis_retries=2, **kwargs):
super().__init__(**kwargs)
self.max_analysis_retries = max_analysis_retries
self.retry_count = 0
def execute(self, context):
while self.retry_count <= self.max_analysis_retries:
try:
return super().execute(context)
except DockerContainerFailedException as e:
self.retry_count += 1
# Analyze failure
if e.logs and self.should_retry(e.logs):
if self.retry_count <= self.max_analysis_retries:
self.log.warning(
f"Retrying container execution (attempt {self.retry_count})"
)
continue
# Re-raise if not retryable or max retries exceeded
raise
def should_retry(self, logs: list[str | bytes]) -> bool:
"""Determine if failure is retryable based on logs."""
retryable_patterns = [
'connection timeout',
'network unreachable',
'temporary failure',
'resource temporarily unavailable'
]
log_text = ' '.join([
log.decode() if isinstance(log, bytes) else log
for log in logs
]).lower()
return any(pattern in log_text for pattern in retryable_patterns)
# Usage
resilient_task = RetryableDockerOperator(
task_id='resilient_processing',
image='processor:latest',
command=['python', '/app/process.py'],
max_analysis_retries=3
)import json
from airflow.providers.docker.exceptions import DockerContainerFailedException
def send_docker_metrics(task_id: str, exception: DockerContainerFailedException):
"""Send Docker failure metrics to monitoring system."""
metrics = {
'task_id': task_id,
'timestamp': datetime.utcnow().isoformat(),
'error_message': str(exception),
'log_count': len(exception.logs) if exception.logs else 0
}
# Extract error types from logs
if exception.logs:
error_types = []
for log in exception.logs:
line = log.decode() if isinstance(log, bytes) else log
if 'ERROR' in line:
error_types.append('error')
elif 'WARNING' in line:
error_types.append('warning')
metrics['error_types'] = list(set(error_types))
# Send to monitoring (example)
# monitoring_client.send_metrics('docker_failures', metrics)
print(f"Docker failure metrics: {json.dumps(metrics, indent=2)}")
# In task failure callback
def docker_failure_callback(context):
exception = context.get('exception')
if isinstance(exception, DockerContainerFailedException):
send_docker_metrics(context['task_instance'].task_id, exception)def stream_container_logs(container_id: str, hook: DockerHook):
"""Stream container logs with error handling."""
try:
client = hook.get_conn()
# Stream logs in real-time
for log_line in client.logs(container_id, stream=True, follow=True):
decoded_line = log_line.decode('utf-8').strip()
print(f"Container log: {decoded_line}")
# Check for error patterns
if 'FATAL' in decoded_line:
logging.error(f"Fatal error detected: {decoded_line}")
elif 'ERROR' in decoded_line:
logging.warning(f"Error detected: {decoded_line}")
except Exception as e:
logging.error(f"Failed to stream logs: {e}")tty=True to see container logsauto_remove='never' for debuggingskip_on_exit_code for expected failure scenariosnetwork_mode='host' for network troubleshootingInstall with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-docker