or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

docker-api.mddocker-decorators.mddocker-operations.mddocker-swarm.mderror-handling.mdindex.md
tile.json

tessl/pypi-apache-airflow-providers-docker

Docker integration provider for Apache Airflow workflows, enabling containerized task execution and Docker Swarm orchestration.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/apache-airflow-providers-docker@4.4.x

To install, run

npx @tessl/cli install tessl/pypi-apache-airflow-providers-docker@4.4.0

index.mddocs/

Apache Airflow Providers Docker

Docker integration provider for Apache Airflow that enables containerized task execution and orchestration. This provider allows you to run tasks in Docker containers, manage Docker Swarm services, and integrate Docker daemon operations into your Airflow workflows with comprehensive configuration options for networking, volumes, security, and resource management.

Package Information

  • Package Name: apache-airflow-providers-docker
  • Package Type: pypi
  • Language: Python
  • Installation: pip install apache-airflow-providers-docker
  • Requires: Apache Airflow >=2.10.0, docker >=7.1.0

Core Imports

from airflow.providers.docker.operators.docker import DockerOperator
from airflow.providers.docker.operators.docker_swarm import DockerSwarmOperator
from airflow.providers.docker.hooks.docker import DockerHook
from airflow.providers.docker.decorators.docker import docker_task

Exception handling:

from airflow.providers.docker.exceptions import (
    DockerContainerFailedException,
    DockerContainerFailedSkipException
)

Basic Usage

from airflow import DAG
from airflow.providers.docker.operators.docker import DockerOperator
from datetime import datetime

# Define DAG
dag = DAG(
    'docker_example',
    start_date=datetime(2024, 1, 1),
    schedule_interval='@daily'
)

# Basic container execution
run_container = DockerOperator(
    task_id='run_python_container',
    image='python:3.9-slim',
    command=['python', '-c', 'print("Hello from Docker!")'],
    dag=dag
)

# Container with volume mounting and environment variables
process_data = DockerOperator(
    task_id='process_data',
    image='python:3.9',
    command=['python', '/app/process.py'],
    mounts=[
        {
            'type': 'bind',
            'source': '/host/data',
            'target': '/app/data'
        }
    ],
    environment={
        'ENV': 'production',
        'LOG_LEVEL': 'info'
    },
    dag=dag
)

# Using the docker_task decorator
@docker_task(image='python:3.9')
def containerized_function():
    import json
    result = {"message": "Processing complete", "status": "success"}
    return json.dumps(result)

containerized_task = containerized_function()

Architecture

The provider follows Apache Airflow's standard architecture patterns:

  • Operators: High-level task definitions that extend BaseOperator for DAG integration
  • Hooks: Low-level API clients that handle Docker daemon connections and operations
  • Decorators: Python function wrappers that transform regular functions into containerized tasks
  • Connection Types: Airflow connection definitions for Docker daemon configuration
  • Exceptions: Custom error handling for container execution failures

The provider supports both single-container execution (DockerOperator) and distributed service orchestration (DockerSwarmOperator), with comprehensive configuration options for production deployments including TLS security, resource constraints, networking, and volume management.

Capabilities

Docker Container Execution

Execute commands and run tasks inside Docker containers with full control over container configuration, resource allocation, networking, and volume mounting. Supports both simple command execution and complex containerized workflows.

class DockerOperator(BaseOperator):
    def __init__(
        self,
        *,
        image: str,
        command: str | list[str] | None = None,
        environment: dict | None = None,
        mounts: list[Mount] | None = None,
        **kwargs
    ) -> None: ...
    
    def execute(self, context: Context) -> list[str] | str | None: ...

Docker Container Operations

Docker Swarm Services

Deploy and manage Docker Swarm services for distributed containerized workloads. Provides orchestration capabilities for multi-container applications with service discovery, load balancing, and scaling features.

class DockerSwarmOperator(DockerOperator):
    def __init__(
        self,
        *,
        image: str,
        configs: list | None = None,
        secrets: list | None = None,
        networks: list | None = None,
        **kwargs
    ) -> None: ...
    
    def execute(self, context: Context) -> None: ...

Docker Swarm Orchestration

Docker API Integration

Low-level Docker API client for direct Docker daemon interactions, connection management, and custom Docker operations not covered by the operators.

class DockerHook(BaseHook):
    def __init__(
        self,
        docker_conn_id: str | None = "docker_default",
        base_url: str | list[str] | None = None,
        version: str | None = None,
        **kwargs
    ) -> None: ...
    
    def get_conn(self) -> APIClient: ...
    def api_client(self) -> APIClient: ...

Docker API Client

Task Decorators

Transform Python functions into containerized tasks using the @docker_task decorator. Provides seamless integration of containerized execution with Python function workflows.

def docker_task(
    image: str,
    python_command: str = "python",
    serializer: Literal["pickle", "dill", "cloudpickle"] = "pickle",
    multiple_outputs: bool | None = None,
    **kwargs
) -> TaskDecorator: ...

Containerized Task Decorators

Error Handling

Custom exception classes for Docker container execution failures, providing detailed error information and logs for debugging containerized tasks.

class DockerContainerFailedException(AirflowException):
    def __init__(
        self, 
        message: str | None = None, 
        logs: list[str | bytes] | None = None
    ) -> None: ...

class DockerContainerFailedSkipException(AirflowSkipException):
    def __init__(
        self, 
        message: str | None = None, 
        logs: list[str | bytes] | None = None
    ) -> None: ...

Error Management

Types

# Docker mount configuration
Mount = docker.types.Mount

# Docker device requests for GPU access
DeviceRequest = docker.types.DeviceRequest

# Docker ulimit configuration
Ulimit = docker.types.Ulimit

# Docker log configuration
LogConfig = docker.types.LogConfig

# Connection context for task execution
Context = dict[str, Any]