0
# Docker Container Operations
1
2
Execute commands and run tasks inside Docker containers with comprehensive configuration options for production workflows. The DockerOperator provides full control over container lifecycle, resource allocation, networking, security, and data persistence.
3
4
## Capabilities
5
6
### DockerOperator
7
8
Execute commands inside Docker containers with complete control over container configuration.
9
10
```python { .api }
11
class DockerOperator(BaseOperator):
12
def __init__(
13
self,
14
*,
15
image: str,
16
api_version: str | None = None,
17
command: str | list[str] | None = None,
18
container_name: str | None = None,
19
cpus: float = 1.0,
20
docker_url: str | list[str] | None = None,
21
environment: dict | None = None,
22
private_environment: dict | None = None,
23
env_file: str | None = None,
24
force_pull: bool = False,
25
mem_limit: float | str | None = None,
26
host_tmp_dir: str | None = None,
27
network_mode: str | None = None,
28
tls_ca_cert: str | None = None,
29
tls_client_cert: str | None = None,
30
tls_client_key: str | None = None,
31
tls_verify: bool = True,
32
tls_hostname: str | bool | None = None,
33
tls_ssl_version: str | None = None,
34
mount_tmp_dir: bool = True,
35
tmp_dir: str = "/tmp/airflow",
36
user: str | int | None = None,
37
mounts: list[Mount] | None = None,
38
entrypoint: str | list[str] | None = None,
39
working_dir: str | None = None,
40
xcom_all: bool = False,
41
docker_conn_id: str | None = None,
42
dns: list[str] | None = None,
43
dns_search: list[str] | None = None,
44
auto_remove: Literal["never", "success", "force"] = "never",
45
shm_size: int | None = None,
46
tty: bool = False,
47
hostname: str | None = None,
48
privileged: bool = False,
49
cap_add: Iterable[str] | None = None,
50
extra_hosts: dict[str, str] | None = None,
51
retrieve_output: bool = False,
52
retrieve_output_path: str | None = None,
53
timeout: int = 60,
54
device_requests: list[DeviceRequest] | None = None,
55
log_opts_max_size: str | None = None,
56
log_opts_max_file: str | None = None,
57
ipc_mode: str | None = None,
58
skip_on_exit_code: int | Container[int] | None = None,
59
port_bindings: dict | None = None,
60
ulimits: list[Ulimit] | None = None,
61
labels: dict[str, str] | list[str] | None = None,
62
**kwargs
63
) -> None
64
```
65
66
**Parameters:**
67
68
- `image`: Docker image from which to create the container (templated)
69
- `api_version`: Remote API version, set to "auto" to automatically detect server version
70
- `command`: Command to run in the container (templated)
71
- `container_name`: Name of the container (templated)
72
- `cpus`: Number of CPUs to assign (multiplied by 1024)
73
- `docker_url`: URL(s) of Docker daemon host, defaults to DOCKER_HOST env var or unix://var/run/docker.sock
74
- `environment`: Environment variables dictionary (templated)
75
- `private_environment`: Private environment variables (not templated, hidden from UI)
76
- `env_file`: Relative path to .env file with environment variables (templated)
77
- `force_pull`: Pull the Docker image on every run
78
- `mem_limit`: Maximum memory limit (float bytes or string like "128m", "1g")
79
- `host_tmp_dir`: Host temporary directory location for mounting
80
- `network_mode`: Network mode ("bridge", "none", "container:<name>", "host", "<network-name>")
81
- `tls_ca_cert`: Path to PEM-encoded CA certificate for TLS
82
- `tls_client_cert`: Path to PEM-encoded client certificate for TLS
83
- `tls_client_key`: Path to PEM-encoded client key for TLS
84
- `tls_verify`: Verify certificate validity
85
- `tls_hostname`: Hostname to match against server certificate
86
- `tls_ssl_version`: SSL version for Docker daemon communication
87
- `mount_tmp_dir`: Whether to bind-mount temporary directory from host
88
- `tmp_dir`: Container mount point for temporary directory
89
- `user`: Default user inside the container
90
- `mounts`: List of docker.types.Mount instances for volume mounting (templated)
91
- `entrypoint`: Override container ENTRYPOINT
92
- `working_dir`: Working directory in container
93
- `xcom_all`: Push all stdout lines or just the last line to XCom
94
- `docker_conn_id`: Docker connection ID for authentication
95
- `dns`: Custom DNS servers list
96
- `dns_search`: Custom DNS search domains list
97
- `auto_remove`: Container removal policy ("never", "success", "force")
98
- `shm_size`: Size of /dev/shm in bytes
99
- `tty`: Allocate pseudo-TTY
100
- `hostname`: Container hostname
101
- `privileged`: Give extended privileges to container
102
- `cap_add`: Container capabilities to add
103
- `extra_hosts`: Additional hostname to IP address mappings
104
- `retrieve_output`: Retrieve output file before shutdown
105
- `retrieve_output_path`: Path for retrieving output file
106
- `timeout`: API timeout in seconds
107
- `device_requests`: GPU/device requests list
108
- `log_opts_max_size`: Maximum log size before rolling
109
- `log_opts_max_file`: Maximum number of log files
110
- `ipc_mode`: IPC mode for container
111
- `skip_on_exit_code`: Exit codes to treat as skipped task
112
- `port_bindings`: Port bindings dictionary
113
- `ulimits`: List of ulimit configurations
114
- `labels`: Container labels dictionary or list
115
116
### Execution Methods
117
118
```python { .api }
119
def execute(self, context: Context) -> list[str] | str | None:
120
"""Execute the Docker container and return output."""
121
122
def hook(self) -> DockerHook:
123
"""Get DockerHook instance for this operator."""
124
125
def cli(self) -> APIClient:
126
"""Get Docker API client."""
127
128
def on_kill(self) -> None:
129
"""Handle task cancellation by stopping container."""
130
```
131
132
### Utility Methods
133
134
```python { .api }
135
@staticmethod
136
def format_command(command: list[str] | str | None) -> list[str] | str | None:
137
"""Format command for execution."""
138
139
@staticmethod
140
def unpack_environment_variables(env_str: str) -> dict:
141
"""Parse environment variable string into dictionary."""
142
```
143
144
## Usage Examples
145
146
### Basic Container Execution
147
148
```python
149
from airflow.providers.docker.operators.docker import DockerOperator
150
151
# Simple command execution
152
basic_task = DockerOperator(
153
task_id='hello_world',
154
image='alpine:latest',
155
command=['echo', 'Hello World from Docker!']
156
)
157
```
158
159
### Volume Mounting and Environment Variables
160
161
```python
162
from docker.types import Mount
163
164
# Data processing with mounted volumes
165
data_processor = DockerOperator(
166
task_id='process_data',
167
image='python:3.9',
168
command=['python', '/app/process.py'],
169
mounts=[
170
Mount(
171
source='/host/input',
172
target='/app/input',
173
type='bind',
174
read_only=True
175
),
176
Mount(
177
source='/host/output',
178
target='/app/output',
179
type='bind'
180
)
181
],
182
environment={
183
'INPUT_PATH': '/app/input',
184
'OUTPUT_PATH': '/app/output',
185
'LOG_LEVEL': 'info'
186
},
187
working_dir='/app'
188
)
189
```
190
191
### Resource Constraints and Security
192
193
```python
194
# Container with resource limits and security settings
195
secure_task = DockerOperator(
196
task_id='secure_processing',
197
image='myapp:latest',
198
command=['./process.sh'],
199
mem_limit='2g',
200
cpus=2.0,
201
user='1000:1000',
202
privileged=False,
203
cap_add=['NET_ADMIN'],
204
ulimits=[
205
Ulimit(name='nofile', soft=65536, hard=65536)
206
],
207
shm_size=268435456 # 256MB
208
)
209
```
210
211
### Network Configuration
212
213
```python
214
# Custom networking with DNS and port bindings
215
networked_service = DockerOperator(
216
task_id='web_service',
217
image='nginx:alpine',
218
network_mode='bridge',
219
port_bindings={'80/tcp': 8080},
220
dns=['8.8.8.8', '8.8.4.4'],
221
dns_search=['example.com'],
222
extra_hosts={'database': '192.168.1.100'},
223
hostname='web-container'
224
)
225
```
226
227
### TLS and Authentication
228
229
```python
230
# Secure Docker daemon connection
231
secure_docker = DockerOperator(
232
task_id='secure_docker',
233
image='ubuntu:20.04',
234
command=['apt', 'update'],
235
docker_conn_id='secure_docker_conn',
236
tls_verify=True,
237
tls_ca_cert='/path/to/ca.pem',
238
tls_client_cert='/path/to/cert.pem',
239
tls_client_key='/path/to/key.pem',
240
tls_hostname='docker.example.com'
241
)
242
```
243
244
### GPU and Device Access
245
246
```python
247
from docker.types import DeviceRequest
248
249
# GPU-enabled container
250
gpu_task = DockerOperator(
251
task_id='gpu_computation',
252
image='tensorflow/tensorflow:latest-gpu',
253
command=['python', '-c', 'import tensorflow as tf; print(tf.config.list_physical_devices("GPU"))'],
254
device_requests=[
255
DeviceRequest(count=1, capabilities=[['gpu']])
256
]
257
)
258
```
259
260
### Error Handling with Skip Conditions
261
262
```python
263
# Skip task on specific exit codes
264
conditional_task = DockerOperator(
265
task_id='conditional_process',
266
image='myapp:latest',
267
command=['./check_and_process.sh'],
268
skip_on_exit_code=[2, 3], # Skip if exit code is 2 or 3
269
auto_remove='success' # Remove container on successful completion
270
)
271
```
272
273
## Template Fields
274
275
The following fields support Jinja templating:
276
277
- `image`
278
- `command`
279
- `environment`
280
- `env_file`
281
- `container_name`
282
- `mounts`
283
284
## Template Extensions
285
286
Files with these extensions are treated as templates:
287
288
- `.sh` - Shell scripts
289
- `.bash` - Bash scripts
290
- `.env` - Environment files