CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-docker

Docker integration provider for Apache Airflow workflows, enabling containerized task execution and Docker Swarm orchestration.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

error-handling.mddocs/

Error Management

Custom exception classes for Docker container execution failures, providing detailed error information and logs for debugging containerized tasks. The provider includes specialized exceptions for different failure scenarios with comprehensive logging support.

Capabilities

DockerContainerFailedException

Raised when a Docker container returns an error during execution.

class DockerContainerFailedException(AirflowException):
    """
    Exception raised when a Docker container fails during execution.
    
    Provides access to container logs and failure details for debugging.
    """
    
    def __init__(
        self, 
        message: str | None = None, 
        logs: list[str | bytes] | None = None
    ) -> None:
        """
        Initialize container failure exception.
        
        Args:
            message: Error description
            logs: Container log output from failed execution
        """

Attributes:

  • logs: list[str | bytes] | None - Container logs from the failed execution

DockerContainerFailedSkipException

Raised when a Docker container returns an error and the task should be skipped.

class DockerContainerFailedSkipException(AirflowSkipException):
    """
    Exception raised when a Docker container fails and task should be skipped.
    
    Used with skip_on_exit_code parameter to treat specific exit codes as skipped tasks.
    """
    
    def __init__(
        self, 
        message: str | None = None, 
        logs: list[str | bytes] | None = None
    ) -> None:
        """
        Initialize container skip exception.
        
        Args:
            message: Skip reason description
            logs: Container log output from failed execution
        """

Attributes:

  • logs: list[str | bytes] | None - Container logs from the failed execution

Usage Examples

Basic Exception Handling

from airflow.providers.docker.operators.docker import DockerOperator
from airflow.providers.docker.exceptions import (
    DockerContainerFailedException,
    DockerContainerFailedSkipException
)

def handle_docker_task():
    """Handle Docker task with error management."""
    try:
        task = DockerOperator(
            task_id='risky_operation',
            image='ubuntu:20.04',
            command=['./risky_script.sh']
        )
        result = task.execute({})
        return result
        
    except DockerContainerFailedException as e:
        print(f"Container failed: {e}")
        if e.logs:
            print("Container logs:")
            for log_line in e.logs:
                print(log_line.decode() if isinstance(log_line, bytes) else log_line)
        raise
        
    except DockerContainerFailedSkipException as e:
        print(f"Container skipped: {e}")
        # Task will be marked as skipped
        raise

Skip on Specific Exit Codes

# Configure task to skip on specific exit codes
conditional_task = DockerOperator(
    task_id='conditional_processing',
    image='myapp:latest',
    command=['./check_and_process.sh'],
    skip_on_exit_code=[1, 2, 3],  # Skip if exit code is 1, 2, or 3
    auto_remove='success'
)

# The operator will raise DockerContainerFailedSkipException
# for these exit codes instead of DockerContainerFailedException

Custom Error Handling in Tasks

from airflow import DAG
from airflow.providers.docker.operators.docker import DockerOperator
from airflow.providers.docker.exceptions import DockerContainerFailedException

def error_callback(context):
    """Custom error callback for Docker task failures."""
    task_instance = context['task_instance']
    exception = context.get('exception')
    
    if isinstance(exception, DockerContainerFailedException):
        print(f"Docker container failed in task {task_instance.task_id}")
        
        if exception.logs:
            # Send logs to monitoring system
            send_to_monitoring({
                'task_id': task_instance.task_id,
                'error': str(exception),
                'logs': [log.decode() if isinstance(log, bytes) else log 
                        for log in exception.logs]
            })

dag = DAG(
    'docker_with_error_handling',
    start_date=datetime(2024, 1, 1),
    on_failure_callback=error_callback
)

risky_task = DockerOperator(
    task_id='data_processing',
    image='data-processor:v1.0',
    command=['python', '/app/process.py'],
    dag=dag
)

Logging Container Output

import logging

def analyze_container_failure(logs: list[str | bytes] | None):
    """Analyze container logs for failure patterns."""
    if not logs:
        return "No logs available"
    
    error_patterns = []
    for log_line in logs:
        line = log_line.decode() if isinstance(log_line, bytes) else log_line
        
        if 'ERROR' in line:
            error_patterns.append(f"Error found: {line.strip()}")
        elif 'FATAL' in line:
            error_patterns.append(f"Fatal error: {line.strip()}")
        elif 'Exception' in line:
            error_patterns.append(f"Exception: {line.strip()}")
    
    return error_patterns

# Use in exception handler
try:
    docker_task.execute({})
except DockerContainerFailedException as e:
    error_analysis = analyze_container_failure(e.logs)
    logging.error(f"Container analysis: {error_analysis}")

Retry Logic with Error Analysis

from airflow.utils.decorators import apply_defaults
from airflow.providers.docker.operators.docker import DockerOperator

class RetryableDockerOperator(DockerOperator):
    """Docker operator with intelligent retry logic."""
    
    @apply_defaults
    def __init__(self, max_analysis_retries=2, **kwargs):
        super().__init__(**kwargs)
        self.max_analysis_retries = max_analysis_retries
        self.retry_count = 0
    
    def execute(self, context):
        while self.retry_count <= self.max_analysis_retries:
            try:
                return super().execute(context)
                
            except DockerContainerFailedException as e:
                self.retry_count += 1
                
                # Analyze failure
                if e.logs and self.should_retry(e.logs):
                    if self.retry_count <= self.max_analysis_retries:
                        self.log.warning(
                            f"Retrying container execution (attempt {self.retry_count})"
                        )
                        continue
                
                # Re-raise if not retryable or max retries exceeded
                raise
    
    def should_retry(self, logs: list[str | bytes]) -> bool:
        """Determine if failure is retryable based on logs."""
        retryable_patterns = [
            'connection timeout',
            'network unreachable', 
            'temporary failure',
            'resource temporarily unavailable'
        ]
        
        log_text = ' '.join([
            log.decode() if isinstance(log, bytes) else log 
            for log in logs
        ]).lower()
        
        return any(pattern in log_text for pattern in retryable_patterns)

# Usage
resilient_task = RetryableDockerOperator(
    task_id='resilient_processing',
    image='processor:latest',
    command=['python', '/app/process.py'],
    max_analysis_retries=3
)

Integration with Monitoring

import json
from airflow.providers.docker.exceptions import DockerContainerFailedException

def send_docker_metrics(task_id: str, exception: DockerContainerFailedException):
    """Send Docker failure metrics to monitoring system."""
    
    metrics = {
        'task_id': task_id,
        'timestamp': datetime.utcnow().isoformat(),
        'error_message': str(exception),
        'log_count': len(exception.logs) if exception.logs else 0
    }
    
    # Extract error types from logs
    if exception.logs:
        error_types = []
        for log in exception.logs:
            line = log.decode() if isinstance(log, bytes) else log
            if 'ERROR' in line:
                error_types.append('error')
            elif 'WARNING' in line:
                error_types.append('warning')
                
        metrics['error_types'] = list(set(error_types))
    
    # Send to monitoring (example)
    # monitoring_client.send_metrics('docker_failures', metrics)
    print(f"Docker failure metrics: {json.dumps(metrics, indent=2)}")

# In task failure callback
def docker_failure_callback(context):
    exception = context.get('exception')
    if isinstance(exception, DockerContainerFailedException):
        send_docker_metrics(context['task_instance'].task_id, exception)

Log Streaming with Error Handling

def stream_container_logs(container_id: str, hook: DockerHook):
    """Stream container logs with error handling."""
    try:
        client = hook.get_conn()
        
        # Stream logs in real-time
        for log_line in client.logs(container_id, stream=True, follow=True):
            decoded_line = log_line.decode('utf-8').strip()
            print(f"Container log: {decoded_line}")
            
            # Check for error patterns
            if 'FATAL' in decoded_line:
                logging.error(f"Fatal error detected: {decoded_line}")
            elif 'ERROR' in decoded_line:
                logging.warning(f"Error detected: {decoded_line}")
                
    except Exception as e:
        logging.error(f"Failed to stream logs: {e}")

Error Categories

Container Execution Errors

  • Exit code failures: Non-zero exit codes from container processes
  • Signal termination: Containers killed by signals (SIGTERM, SIGKILL)
  • Resource exhaustion: Out of memory, disk space, or CPU limits
  • Permission errors: Insufficient privileges for container operations

Docker Daemon Errors

  • Connection failures: Cannot connect to Docker daemon
  • Image pull failures: Unable to pull specified Docker images
  • Network errors: Container networking configuration issues
  • Volume mount errors: Failed to mount volumes or bind mounts

Configuration Errors

  • Invalid parameters: Malformed operator configuration
  • Missing dependencies: Required Docker images or volumes not available
  • Authentication failures: Docker registry authentication issues
  • TLS/Security errors: Certificate or security configuration problems

Best Practices

Error Handling Strategy

  1. Specific Exception Handling: Catch specific Docker exceptions rather than generic ones
  2. Log Analysis: Analyze container logs to understand failure root causes
  3. Retry Logic: Implement intelligent retry for transient failures
  4. Monitoring Integration: Send failure metrics to monitoring systems
  5. Graceful Degradation: Provide fallback behavior for non-critical failures

Debugging Tips

  1. Enable Verbose Logging: Set tty=True to see container logs
  2. Preserve Containers: Use auto_remove='never' for debugging
  3. Check Exit Codes: Use skip_on_exit_code for expected failure scenarios
  4. Volume Inspection: Mount debug volumes to inspect container state
  5. Network Debugging: Use network_mode='host' for network troubleshooting

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-docker

docs

docker-api.md

docker-decorators.md

docker-operations.md

docker-swarm.md

error-handling.md

index.md

tile.json