Docker integration provider for Apache Airflow workflows, enabling containerized task execution and Docker Swarm orchestration.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Execute commands and run tasks inside Docker containers with comprehensive configuration options for production workflows. The DockerOperator provides full control over container lifecycle, resource allocation, networking, security, and data persistence.
Execute commands inside Docker containers with complete control over container configuration.
class DockerOperator(BaseOperator):
def __init__(
self,
*,
image: str,
api_version: str | None = None,
command: str | list[str] | None = None,
container_name: str | None = None,
cpus: float = 1.0,
docker_url: str | list[str] | None = None,
environment: dict | None = None,
private_environment: dict | None = None,
env_file: str | None = None,
force_pull: bool = False,
mem_limit: float | str | None = None,
host_tmp_dir: str | None = None,
network_mode: str | None = None,
tls_ca_cert: str | None = None,
tls_client_cert: str | None = None,
tls_client_key: str | None = None,
tls_verify: bool = True,
tls_hostname: str | bool | None = None,
tls_ssl_version: str | None = None,
mount_tmp_dir: bool = True,
tmp_dir: str = "/tmp/airflow",
user: str | int | None = None,
mounts: list[Mount] | None = None,
entrypoint: str | list[str] | None = None,
working_dir: str | None = None,
xcom_all: bool = False,
docker_conn_id: str | None = None,
dns: list[str] | None = None,
dns_search: list[str] | None = None,
auto_remove: Literal["never", "success", "force"] = "never",
shm_size: int | None = None,
tty: bool = False,
hostname: str | None = None,
privileged: bool = False,
cap_add: Iterable[str] | None = None,
extra_hosts: dict[str, str] | None = None,
retrieve_output: bool = False,
retrieve_output_path: str | None = None,
timeout: int = 60,
device_requests: list[DeviceRequest] | None = None,
log_opts_max_size: str | None = None,
log_opts_max_file: str | None = None,
ipc_mode: str | None = None,
skip_on_exit_code: int | Container[int] | None = None,
port_bindings: dict | None = None,
ulimits: list[Ulimit] | None = None,
labels: dict[str, str] | list[str] | None = None,
**kwargs
) -> NoneParameters:
image: Docker image from which to create the container (templated)api_version: Remote API version, set to "auto" to automatically detect server versioncommand: Command to run in the container (templated)container_name: Name of the container (templated)cpus: Number of CPUs to assign (multiplied by 1024)docker_url: URL(s) of Docker daemon host, defaults to DOCKER_HOST env var or unix://var/run/docker.sockenvironment: Environment variables dictionary (templated)private_environment: Private environment variables (not templated, hidden from UI)env_file: Relative path to .env file with environment variables (templated)force_pull: Pull the Docker image on every runmem_limit: Maximum memory limit (float bytes or string like "128m", "1g")host_tmp_dir: Host temporary directory location for mountingnetwork_mode: Network mode ("bridge", "none", "container:<name>", "host", "<network-name>")tls_ca_cert: Path to PEM-encoded CA certificate for TLStls_client_cert: Path to PEM-encoded client certificate for TLStls_client_key: Path to PEM-encoded client key for TLStls_verify: Verify certificate validitytls_hostname: Hostname to match against server certificatetls_ssl_version: SSL version for Docker daemon communicationmount_tmp_dir: Whether to bind-mount temporary directory from hosttmp_dir: Container mount point for temporary directoryuser: Default user inside the containermounts: List of docker.types.Mount instances for volume mounting (templated)entrypoint: Override container ENTRYPOINTworking_dir: Working directory in containerxcom_all: Push all stdout lines or just the last line to XComdocker_conn_id: Docker connection ID for authenticationdns: Custom DNS servers listdns_search: Custom DNS search domains listauto_remove: Container removal policy ("never", "success", "force")shm_size: Size of /dev/shm in bytestty: Allocate pseudo-TTYhostname: Container hostnameprivileged: Give extended privileges to containercap_add: Container capabilities to addextra_hosts: Additional hostname to IP address mappingsretrieve_output: Retrieve output file before shutdownretrieve_output_path: Path for retrieving output filetimeout: API timeout in secondsdevice_requests: GPU/device requests listlog_opts_max_size: Maximum log size before rollinglog_opts_max_file: Maximum number of log filesipc_mode: IPC mode for containerskip_on_exit_code: Exit codes to treat as skipped taskport_bindings: Port bindings dictionaryulimits: List of ulimit configurationslabels: Container labels dictionary or listdef execute(self, context: Context) -> list[str] | str | None:
"""Execute the Docker container and return output."""
def hook(self) -> DockerHook:
"""Get DockerHook instance for this operator."""
def cli(self) -> APIClient:
"""Get Docker API client."""
def on_kill(self) -> None:
"""Handle task cancellation by stopping container."""@staticmethod
def format_command(command: list[str] | str | None) -> list[str] | str | None:
"""Format command for execution."""
@staticmethod
def unpack_environment_variables(env_str: str) -> dict:
"""Parse environment variable string into dictionary."""from airflow.providers.docker.operators.docker import DockerOperator
# Simple command execution
basic_task = DockerOperator(
task_id='hello_world',
image='alpine:latest',
command=['echo', 'Hello World from Docker!']
)from docker.types import Mount
# Data processing with mounted volumes
data_processor = DockerOperator(
task_id='process_data',
image='python:3.9',
command=['python', '/app/process.py'],
mounts=[
Mount(
source='/host/input',
target='/app/input',
type='bind',
read_only=True
),
Mount(
source='/host/output',
target='/app/output',
type='bind'
)
],
environment={
'INPUT_PATH': '/app/input',
'OUTPUT_PATH': '/app/output',
'LOG_LEVEL': 'info'
},
working_dir='/app'
)# Container with resource limits and security settings
secure_task = DockerOperator(
task_id='secure_processing',
image='myapp:latest',
command=['./process.sh'],
mem_limit='2g',
cpus=2.0,
user='1000:1000',
privileged=False,
cap_add=['NET_ADMIN'],
ulimits=[
Ulimit(name='nofile', soft=65536, hard=65536)
],
shm_size=268435456 # 256MB
)# Custom networking with DNS and port bindings
networked_service = DockerOperator(
task_id='web_service',
image='nginx:alpine',
network_mode='bridge',
port_bindings={'80/tcp': 8080},
dns=['8.8.8.8', '8.8.4.4'],
dns_search=['example.com'],
extra_hosts={'database': '192.168.1.100'},
hostname='web-container'
)# Secure Docker daemon connection
secure_docker = DockerOperator(
task_id='secure_docker',
image='ubuntu:20.04',
command=['apt', 'update'],
docker_conn_id='secure_docker_conn',
tls_verify=True,
tls_ca_cert='/path/to/ca.pem',
tls_client_cert='/path/to/cert.pem',
tls_client_key='/path/to/key.pem',
tls_hostname='docker.example.com'
)from docker.types import DeviceRequest
# GPU-enabled container
gpu_task = DockerOperator(
task_id='gpu_computation',
image='tensorflow/tensorflow:latest-gpu',
command=['python', '-c', 'import tensorflow as tf; print(tf.config.list_physical_devices("GPU"))'],
device_requests=[
DeviceRequest(count=1, capabilities=[['gpu']])
]
)# Skip task on specific exit codes
conditional_task = DockerOperator(
task_id='conditional_process',
image='myapp:latest',
command=['./check_and_process.sh'],
skip_on_exit_code=[2, 3], # Skip if exit code is 2 or 3
auto_remove='success' # Remove container on successful completion
)The following fields support Jinja templating:
imagecommandenvironmentenv_filecontainer_namemountsFiles with these extensions are treated as templates:
.sh - Shell scripts.bash - Bash scripts.env - Environment filesInstall with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-docker