Docker integration provider for Apache Airflow workflows, enabling containerized task execution and Docker Swarm orchestration.
npx @tessl/cli install tessl/pypi-apache-airflow-providers-docker@4.4.0Docker integration provider for Apache Airflow that enables containerized task execution and orchestration. This provider allows you to run tasks in Docker containers, manage Docker Swarm services, and integrate Docker daemon operations into your Airflow workflows with comprehensive configuration options for networking, volumes, security, and resource management.
pip install apache-airflow-providers-dockerfrom airflow.providers.docker.operators.docker import DockerOperator
from airflow.providers.docker.operators.docker_swarm import DockerSwarmOperator
from airflow.providers.docker.hooks.docker import DockerHook
from airflow.providers.docker.decorators.docker import docker_taskException handling:
from airflow.providers.docker.exceptions import (
DockerContainerFailedException,
DockerContainerFailedSkipException
)from airflow import DAG
from airflow.providers.docker.operators.docker import DockerOperator
from datetime import datetime
# Define DAG
dag = DAG(
'docker_example',
start_date=datetime(2024, 1, 1),
schedule_interval='@daily'
)
# Basic container execution
run_container = DockerOperator(
task_id='run_python_container',
image='python:3.9-slim',
command=['python', '-c', 'print("Hello from Docker!")'],
dag=dag
)
# Container with volume mounting and environment variables
process_data = DockerOperator(
task_id='process_data',
image='python:3.9',
command=['python', '/app/process.py'],
mounts=[
{
'type': 'bind',
'source': '/host/data',
'target': '/app/data'
}
],
environment={
'ENV': 'production',
'LOG_LEVEL': 'info'
},
dag=dag
)
# Using the docker_task decorator
@docker_task(image='python:3.9')
def containerized_function():
import json
result = {"message": "Processing complete", "status": "success"}
return json.dumps(result)
containerized_task = containerized_function()The provider follows Apache Airflow's standard architecture patterns:
The provider supports both single-container execution (DockerOperator) and distributed service orchestration (DockerSwarmOperator), with comprehensive configuration options for production deployments including TLS security, resource constraints, networking, and volume management.
Execute commands and run tasks inside Docker containers with full control over container configuration, resource allocation, networking, and volume mounting. Supports both simple command execution and complex containerized workflows.
class DockerOperator(BaseOperator):
def __init__(
self,
*,
image: str,
command: str | list[str] | None = None,
environment: dict | None = None,
mounts: list[Mount] | None = None,
**kwargs
) -> None: ...
def execute(self, context: Context) -> list[str] | str | None: ...Deploy and manage Docker Swarm services for distributed containerized workloads. Provides orchestration capabilities for multi-container applications with service discovery, load balancing, and scaling features.
class DockerSwarmOperator(DockerOperator):
def __init__(
self,
*,
image: str,
configs: list | None = None,
secrets: list | None = None,
networks: list | None = None,
**kwargs
) -> None: ...
def execute(self, context: Context) -> None: ...Low-level Docker API client for direct Docker daemon interactions, connection management, and custom Docker operations not covered by the operators.
class DockerHook(BaseHook):
def __init__(
self,
docker_conn_id: str | None = "docker_default",
base_url: str | list[str] | None = None,
version: str | None = None,
**kwargs
) -> None: ...
def get_conn(self) -> APIClient: ...
def api_client(self) -> APIClient: ...Transform Python functions into containerized tasks using the @docker_task decorator. Provides seamless integration of containerized execution with Python function workflows.
def docker_task(
image: str,
python_command: str = "python",
serializer: Literal["pickle", "dill", "cloudpickle"] = "pickle",
multiple_outputs: bool | None = None,
**kwargs
) -> TaskDecorator: ...Custom exception classes for Docker container execution failures, providing detailed error information and logs for debugging containerized tasks.
class DockerContainerFailedException(AirflowException):
def __init__(
self,
message: str | None = None,
logs: list[str | bytes] | None = None
) -> None: ...
class DockerContainerFailedSkipException(AirflowSkipException):
def __init__(
self,
message: str | None = None,
logs: list[str | bytes] | None = None
) -> None: ...# Docker mount configuration
Mount = docker.types.Mount
# Docker device requests for GPU access
DeviceRequest = docker.types.DeviceRequest
# Docker ulimit configuration
Ulimit = docker.types.Ulimit
# Docker log configuration
LogConfig = docker.types.LogConfig
# Connection context for task execution
Context = dict[str, Any]