CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-docker

Docker integration provider for Apache Airflow workflows, enabling containerized task execution and Docker Swarm orchestration.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

docker-decorators.mddocs/

Containerized Task Decorators

Transform Python functions into containerized tasks using the @docker_task decorator. This provides seamless integration of containerized execution with Python function workflows, enabling you to run functions in isolated Docker environments with automatic serialization and result handling.

Capabilities

docker_task Decorator

Convert Python functions into DockerOperator tasks with automatic serialization.

def docker_task(
    image: str,
    python_command: str = "python",
    serializer: Literal["pickle", "dill", "cloudpickle"] = "pickle",
    multiple_outputs: bool | None = None,
    **kwargs
) -> TaskDecorator:
    """
    Decorator that converts a Python function into a DockerOperator task.
    
    Args:
        image: Docker image to run the function in
        python_command: Python command to use in container (default: "python")
        serializer: Serialization method for function arguments and return values
        multiple_outputs: Whether the function returns multiple outputs
        **kwargs: All DockerOperator parameters are supported
        
    Returns:
        TaskDecorator function that creates _DockerDecoratedOperator instances
    """

Parameters:

  • image: Docker image containing Python runtime for function execution
  • python_command: Python executable command in container (e.g., "python", "python3", "/opt/python/bin/python")
  • serializer: Method for serializing function arguments and return values:
    • "pickle": Standard Python pickle (default, fastest)
    • "dill": Extended pickle with broader object support
    • "cloudpickle": Cloud-optimized pickle for distributed computing
  • multiple_outputs: Set to True if function returns multiple values as dictionary
  • **kwargs: All DockerOperator parameters (environment, mounts, resources, etc.)

Supported Serializers

# Available serialization options
Serializer = Literal["pickle", "dill", "cloudpickle"]

# Serializer modules (lazy-loaded)
_SERIALIZERS: dict[Serializer, Any] = {
    "pickle": pickle,
    "dill": dill,  # Requires: pip install dill
    "cloudpickle": cloudpickle  # Requires: pip install cloudpickle
}

Usage Examples

Basic Function Containerization

from airflow.providers.docker.decorators.docker import docker_task

@docker_task(image='python:3.9-slim')
def hello_world():
    """Simple containerized function."""
    return "Hello from Docker container!"

# Use in DAG
hello_task = hello_world()

Function with Arguments

@docker_task(image='python:3.9')
def process_data(data_list: list, multiplier: int = 2):
    """Process data with arguments."""
    return [x * multiplier for x in data_list]

# Call with arguments
result = process_data([1, 2, 3, 4], multiplier=3)

Scientific Computing Function

@docker_task(
    image='python:3.9',
    serializer='cloudpickle'  # Better for scientific objects
)
def analyze_dataset():
    """Perform data analysis using scientific libraries."""
    import numpy as np
    import pandas as pd
    
    # Generate sample data
    data = np.random.randn(1000, 5)
    df = pd.DataFrame(data, columns=['A', 'B', 'C', 'D', 'E'])
    
    # Perform analysis
    stats = {
        'mean': df.mean().to_dict(),
        'std': df.std().to_dict(),
        'correlation': df.corr().to_dict()
    }
    
    return stats

analysis_task = analyze_dataset()

Function with Custom Environment

@docker_task(
    image='python:3.9',
    environment={
        'API_KEY': '{{ var.value.api_key }}',
        'LOG_LEVEL': 'DEBUG',
        'WORKERS': '4'
    }
)  
def api_data_fetch(endpoint: str):
    """Fetch data from API with environment configuration."""
    import os
    import requests
    
    api_key = os.environ['API_KEY']
    log_level = os.environ.get('LOG_LEVEL', 'INFO')
    
    response = requests.get(
        endpoint,
        headers={'Authorization': f'Bearer {api_key}'}
    )
    
    return response.json()

fetch_task = api_data_fetch('https://api.example.com/data')

Function with Volume Mounts

from docker.types import Mount

@docker_task(
    image='python:3.9',
    mounts=[
        Mount(
            source='/host/data',
            target='/app/data', 
            type='bind',
            read_only=True
        ),
        Mount(
            source='/host/output',
            target='/app/output',
            type='bind'
        )
    ]
)
def file_processor():
    """Process files from mounted volumes."""
    import os
    import json
    
    # Read input files
    input_dir = '/app/data'
    output_dir = '/app/output'
    
    results = []
    for filename in os.listdir(input_dir):
        filepath = os.path.join(input_dir, filename)
        with open(filepath, 'r') as f:
            data = f.read()
            results.append({
                'file': filename,
                'size': len(data),
                'lines': len(data.splitlines())
            })
    
    # Write results
    output_file = os.path.join(output_dir, 'results.json')
    with open(output_file, 'w') as f:
        json.dump(results, f, indent=2)
    
    return results

process_task = file_processor()

Function with Multiple Outputs

@docker_task(
    image='python:3.9',
    multiple_outputs=True
)
def data_pipeline():
    """Process data and return multiple outputs."""
    import random
    
    # Simulate data processing
    raw_data = [random.randint(1, 100) for _ in range(50)]
    
    return {
        'processed_data': [x * 2 for x in raw_data],
        'statistics': {
            'count': len(raw_data),
            'mean': sum(raw_data) / len(raw_data),
            'max': max(raw_data),
            'min': min(raw_data)
        },
        'metadata': {
            'processing_version': '1.0',
            'timestamp': '2024-01-01T00:00:00Z'
        }
    }

pipeline_task = data_pipeline()

# Access individual outputs
processed = pipeline_task['processed_data']
stats = pipeline_task['statistics'] 
meta = pipeline_task['metadata']

GPU-Enabled Function

from docker.types import DeviceRequest

@docker_task(
    image='tensorflow/tensorflow:latest-gpu',
    device_requests=[
        DeviceRequest(count=1, capabilities=[['gpu']])
    ],
    serializer='cloudpickle'
)
def gpu_computation():
    """Perform GPU-accelerated computation."""
    import tensorflow as tf
    
    # Check GPU availability
    gpus = tf.config.list_physical_devices('GPU')
    print(f"Available GPUs: {len(gpus)}")
    
    # Simple GPU computation
    with tf.device('/GPU:0'):
        a = tf.constant([[1.0, 2.0], [3.0, 4.0]])
        b = tf.constant([[2.0, 1.0], [1.0, 2.0]])
        result = tf.matmul(a, b)
    
    return result.numpy().tolist()

gpu_task = gpu_computation()

Function with Custom Python Environment

@docker_task(
    image='continuumio/miniconda3:latest',
    python_command='conda run -n myenv python'
)
def conda_analysis():
    """Run function in conda environment."""
    import sys
    import numpy as np
    import pandas as pd
    
    # Conda environment info
    env_info = {
        'python_version': sys.version,
        'numpy_version': np.__version__,
        'pandas_version': pd.__version__
    }
    
    return env_info

conda_task = conda_analysis()

Function with Dill Serialization

@docker_task(
    image='python:3.9',
    serializer='dill'  # Better support for complex objects
)
def complex_object_handler():
    """Handle complex Python objects with dill."""
    import functools
    
    # Create complex objects that pickle can't serialize
    def multiplier(factor):
        return lambda x: x * factor
    
    # Partial functions
    double = functools.partial(multiplier, 2)
    triple = functools.partial(multiplier, 3)
    
    # Nested functions
    def outer_func():
        local_var = 42
        def inner_func():
            return local_var * 2
        return inner_func
    
    nested = outer_func()
    
    return {
        'double_result': double(5),
        'triple_result': triple(5),
        'nested_result': nested()
    }

complex_task = complex_object_handler()

Advanced Configuration

Resource Management

@docker_task(
    image='python:3.9',
    mem_limit='2g',
    cpus=2.0,
    shm_size=268435456  # 256MB shared memory
)
def memory_intensive_task():
    """Function with resource constraints."""
    import numpy as np
    
    # Memory-intensive operation
    large_array = np.random.rand(10000, 10000)
    result = np.sum(large_array)
    
    return float(result)

resource_task = memory_intensive_task()

Network Configuration

@docker_task(
    image='python:3.9',
    network_mode='host',
    extra_hosts={'database': '192.168.1.100'}
)
def network_service():
    """Function with custom networking."""
    import socket
    import requests
    
    # Get container hostname
    hostname = socket.gethostname()
    
    # Make network request
    response = requests.get('http://database:5432/health')
    
    return {
        'hostname': hostname,
        'database_status': response.status_code
    }

network_task = network_service()

Decorator Internals

_DockerDecoratedOperator

Internal implementation class (not directly used):

class _DockerDecoratedOperator(DecoratedOperator, DockerOperator):
    """
    Internal class that combines DecoratedOperator and DockerOperator.
    
    Handles:
    - Function serialization and deserialization
    - Argument passing to containerized function
    - Return value extraction and XCom storage
    - Error handling and logging
    """

Serialization Considerations

Pickle (Default)

  • Pros: Fast, built-in, handles most Python objects
  • Cons: Limited support for complex objects (lambdas, nested functions)
  • Use for: Simple data types, standard library objects

Dill

  • Pros: Extended object support, handles lambdas and nested functions
  • Cons: Slower than pickle, requires additional dependency
  • Use for: Complex functions, closures, partial functions

CloudPickle

  • Pros: Optimized for distributed computing, cloud environments
  • Cons: Additional dependency, may be slower for simple objects
  • Use for: Scientific computing, distributed workflows, cloud deployments

Error Handling

Containerized functions handle errors through:

  • Serialization errors: Function arguments can't be serialized
  • Execution errors: Function fails inside container
  • Deserialization errors: Return value can't be deserialized
  • Container errors: Docker container fails to start or execute

All errors are propagated as Airflow task failures with detailed logging.

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-docker

docs

docker-api.md

docker-decorators.md

docker-operations.md

docker-swarm.md

error-handling.md

index.md

tile.json