CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-docker

Docker integration provider for Apache Airflow workflows, enabling containerized task execution and Docker Swarm orchestration.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

docker-operations.mddocs/

Docker Container Operations

Execute commands and run tasks inside Docker containers with comprehensive configuration options for production workflows. The DockerOperator provides full control over container lifecycle, resource allocation, networking, security, and data persistence.

Capabilities

DockerOperator

Execute commands inside Docker containers with complete control over container configuration.

class DockerOperator(BaseOperator):
    def __init__(
        self,
        *,
        image: str,
        api_version: str | None = None,
        command: str | list[str] | None = None,
        container_name: str | None = None,
        cpus: float = 1.0,
        docker_url: str | list[str] | None = None,
        environment: dict | None = None,
        private_environment: dict | None = None,
        env_file: str | None = None,
        force_pull: bool = False,
        mem_limit: float | str | None = None,
        host_tmp_dir: str | None = None,
        network_mode: str | None = None,
        tls_ca_cert: str | None = None,
        tls_client_cert: str | None = None,
        tls_client_key: str | None = None,
        tls_verify: bool = True,
        tls_hostname: str | bool | None = None,
        tls_ssl_version: str | None = None,
        mount_tmp_dir: bool = True,
        tmp_dir: str = "/tmp/airflow",
        user: str | int | None = None,
        mounts: list[Mount] | None = None,
        entrypoint: str | list[str] | None = None,
        working_dir: str | None = None,
        xcom_all: bool = False,
        docker_conn_id: str | None = None,
        dns: list[str] | None = None,
        dns_search: list[str] | None = None,
        auto_remove: Literal["never", "success", "force"] = "never",
        shm_size: int | None = None,
        tty: bool = False,
        hostname: str | None = None,
        privileged: bool = False,
        cap_add: Iterable[str] | None = None,
        extra_hosts: dict[str, str] | None = None,
        retrieve_output: bool = False,
        retrieve_output_path: str | None = None,
        timeout: int = 60,
        device_requests: list[DeviceRequest] | None = None,
        log_opts_max_size: str | None = None,
        log_opts_max_file: str | None = None,
        ipc_mode: str | None = None,
        skip_on_exit_code: int | Container[int] | None = None,
        port_bindings: dict | None = None,
        ulimits: list[Ulimit] | None = None,
        labels: dict[str, str] | list[str] | None = None,
        **kwargs
    ) -> None

Parameters:

  • image: Docker image from which to create the container (templated)
  • api_version: Remote API version, set to "auto" to automatically detect server version
  • command: Command to run in the container (templated)
  • container_name: Name of the container (templated)
  • cpus: Number of CPUs to assign (multiplied by 1024)
  • docker_url: URL(s) of Docker daemon host, defaults to DOCKER_HOST env var or unix://var/run/docker.sock
  • environment: Environment variables dictionary (templated)
  • private_environment: Private environment variables (not templated, hidden from UI)
  • env_file: Relative path to .env file with environment variables (templated)
  • force_pull: Pull the Docker image on every run
  • mem_limit: Maximum memory limit (float bytes or string like "128m", "1g")
  • host_tmp_dir: Host temporary directory location for mounting
  • network_mode: Network mode ("bridge", "none", "container:<name>", "host", "<network-name>")
  • tls_ca_cert: Path to PEM-encoded CA certificate for TLS
  • tls_client_cert: Path to PEM-encoded client certificate for TLS
  • tls_client_key: Path to PEM-encoded client key for TLS
  • tls_verify: Verify certificate validity
  • tls_hostname: Hostname to match against server certificate
  • tls_ssl_version: SSL version for Docker daemon communication
  • mount_tmp_dir: Whether to bind-mount temporary directory from host
  • tmp_dir: Container mount point for temporary directory
  • user: Default user inside the container
  • mounts: List of docker.types.Mount instances for volume mounting (templated)
  • entrypoint: Override container ENTRYPOINT
  • working_dir: Working directory in container
  • xcom_all: Push all stdout lines or just the last line to XCom
  • docker_conn_id: Docker connection ID for authentication
  • dns: Custom DNS servers list
  • dns_search: Custom DNS search domains list
  • auto_remove: Container removal policy ("never", "success", "force")
  • shm_size: Size of /dev/shm in bytes
  • tty: Allocate pseudo-TTY
  • hostname: Container hostname
  • privileged: Give extended privileges to container
  • cap_add: Container capabilities to add
  • extra_hosts: Additional hostname to IP address mappings
  • retrieve_output: Retrieve output file before shutdown
  • retrieve_output_path: Path for retrieving output file
  • timeout: API timeout in seconds
  • device_requests: GPU/device requests list
  • log_opts_max_size: Maximum log size before rolling
  • log_opts_max_file: Maximum number of log files
  • ipc_mode: IPC mode for container
  • skip_on_exit_code: Exit codes to treat as skipped task
  • port_bindings: Port bindings dictionary
  • ulimits: List of ulimit configurations
  • labels: Container labels dictionary or list

Execution Methods

def execute(self, context: Context) -> list[str] | str | None:
    """Execute the Docker container and return output."""

def hook(self) -> DockerHook:
    """Get DockerHook instance for this operator."""

def cli(self) -> APIClient:
    """Get Docker API client."""

def on_kill(self) -> None:
    """Handle task cancellation by stopping container."""

Utility Methods

@staticmethod
def format_command(command: list[str] | str | None) -> list[str] | str | None:
    """Format command for execution."""

@staticmethod  
def unpack_environment_variables(env_str: str) -> dict:
    """Parse environment variable string into dictionary."""

Usage Examples

Basic Container Execution

from airflow.providers.docker.operators.docker import DockerOperator

# Simple command execution
basic_task = DockerOperator(
    task_id='hello_world',
    image='alpine:latest',
    command=['echo', 'Hello World from Docker!']
)

Volume Mounting and Environment Variables

from docker.types import Mount

# Data processing with mounted volumes
data_processor = DockerOperator(
    task_id='process_data',
    image='python:3.9',
    command=['python', '/app/process.py'],
    mounts=[
        Mount(
            source='/host/input',
            target='/app/input',
            type='bind',
            read_only=True
        ),
        Mount(
            source='/host/output', 
            target='/app/output',
            type='bind'
        )
    ],
    environment={
        'INPUT_PATH': '/app/input',
        'OUTPUT_PATH': '/app/output',
        'LOG_LEVEL': 'info'
    },
    working_dir='/app'
)

Resource Constraints and Security

# Container with resource limits and security settings
secure_task = DockerOperator(
    task_id='secure_processing',
    image='myapp:latest',
    command=['./process.sh'],
    mem_limit='2g',
    cpus=2.0,
    user='1000:1000',
    privileged=False,
    cap_add=['NET_ADMIN'],
    ulimits=[
        Ulimit(name='nofile', soft=65536, hard=65536)
    ],
    shm_size=268435456  # 256MB
)

Network Configuration

# Custom networking with DNS and port bindings
networked_service = DockerOperator(
    task_id='web_service',
    image='nginx:alpine',
    network_mode='bridge',
    port_bindings={'80/tcp': 8080},
    dns=['8.8.8.8', '8.8.4.4'],
    dns_search=['example.com'],
    extra_hosts={'database': '192.168.1.100'},
    hostname='web-container'
)

TLS and Authentication

# Secure Docker daemon connection
secure_docker = DockerOperator(
    task_id='secure_docker',
    image='ubuntu:20.04',
    command=['apt', 'update'],
    docker_conn_id='secure_docker_conn',
    tls_verify=True,
    tls_ca_cert='/path/to/ca.pem',
    tls_client_cert='/path/to/cert.pem',
    tls_client_key='/path/to/key.pem',
    tls_hostname='docker.example.com'
)

GPU and Device Access

from docker.types import DeviceRequest

# GPU-enabled container
gpu_task = DockerOperator(
    task_id='gpu_computation',
    image='tensorflow/tensorflow:latest-gpu',
    command=['python', '-c', 'import tensorflow as tf; print(tf.config.list_physical_devices("GPU"))'],
    device_requests=[
        DeviceRequest(count=1, capabilities=[['gpu']])
    ]
)

Error Handling with Skip Conditions

# Skip task on specific exit codes
conditional_task = DockerOperator(
    task_id='conditional_process',
    image='myapp:latest',
    command=['./check_and_process.sh'],
    skip_on_exit_code=[2, 3],  # Skip if exit code is 2 or 3
    auto_remove='success'  # Remove container on successful completion
)

Template Fields

The following fields support Jinja templating:

  • image
  • command
  • environment
  • env_file
  • container_name
  • mounts

Template Extensions

Files with these extensions are treated as templates:

  • .sh - Shell scripts
  • .bash - Bash scripts
  • .env - Environment files

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-docker

docs

docker-api.md

docker-decorators.md

docker-operations.md

docker-swarm.md

error-handling.md

index.md

tile.json