0
# Containerized Task Decorators
1
2
Transform Python functions into containerized tasks using the @docker_task decorator. This provides seamless integration of containerized execution with Python function workflows, enabling you to run functions in isolated Docker environments with automatic serialization and result handling.
3
4
## Capabilities
5
6
### docker_task Decorator
7
8
Convert Python functions into DockerOperator tasks with automatic serialization.
9
10
```python { .api }
11
def docker_task(
12
image: str,
13
python_command: str = "python",
14
serializer: Literal["pickle", "dill", "cloudpickle"] = "pickle",
15
multiple_outputs: bool | None = None,
16
**kwargs
17
) -> TaskDecorator:
18
"""
19
Decorator that converts a Python function into a DockerOperator task.
20
21
Args:
22
image: Docker image to run the function in
23
python_command: Python command to use in container (default: "python")
24
serializer: Serialization method for function arguments and return values
25
multiple_outputs: Whether the function returns multiple outputs
26
**kwargs: All DockerOperator parameters are supported
27
28
Returns:
29
TaskDecorator function that creates _DockerDecoratedOperator instances
30
"""
31
```
32
33
**Parameters:**
34
35
- `image`: Docker image containing Python runtime for function execution
36
- `python_command`: Python executable command in container (e.g., "python", "python3", "/opt/python/bin/python")
37
- `serializer`: Method for serializing function arguments and return values:
38
- `"pickle"`: Standard Python pickle (default, fastest)
39
- `"dill"`: Extended pickle with broader object support
40
- `"cloudpickle"`: Cloud-optimized pickle for distributed computing
41
- `multiple_outputs`: Set to True if function returns multiple values as dictionary
42
- `**kwargs`: All DockerOperator parameters (environment, mounts, resources, etc.)
43
44
### Supported Serializers
45
46
```python { .api }
47
# Available serialization options
48
Serializer = Literal["pickle", "dill", "cloudpickle"]
49
50
# Serializer modules (lazy-loaded)
51
_SERIALIZERS: dict[Serializer, Any] = {
52
"pickle": pickle,
53
"dill": dill, # Requires: pip install dill
54
"cloudpickle": cloudpickle # Requires: pip install cloudpickle
55
}
56
```
57
58
## Usage Examples
59
60
### Basic Function Containerization
61
62
```python
63
from airflow.providers.docker.decorators.docker import docker_task
64
65
@docker_task(image='python:3.9-slim')
66
def hello_world():
67
"""Simple containerized function."""
68
return "Hello from Docker container!"
69
70
# Use in DAG
71
hello_task = hello_world()
72
```
73
74
### Function with Arguments
75
76
```python
77
@docker_task(image='python:3.9')
78
def process_data(data_list: list, multiplier: int = 2):
79
"""Process data with arguments."""
80
return [x * multiplier for x in data_list]
81
82
# Call with arguments
83
result = process_data([1, 2, 3, 4], multiplier=3)
84
```
85
86
### Scientific Computing Function
87
88
```python
89
@docker_task(
90
image='python:3.9',
91
serializer='cloudpickle' # Better for scientific objects
92
)
93
def analyze_dataset():
94
"""Perform data analysis using scientific libraries."""
95
import numpy as np
96
import pandas as pd
97
98
# Generate sample data
99
data = np.random.randn(1000, 5)
100
df = pd.DataFrame(data, columns=['A', 'B', 'C', 'D', 'E'])
101
102
# Perform analysis
103
stats = {
104
'mean': df.mean().to_dict(),
105
'std': df.std().to_dict(),
106
'correlation': df.corr().to_dict()
107
}
108
109
return stats
110
111
analysis_task = analyze_dataset()
112
```
113
114
### Function with Custom Environment
115
116
```python
117
@docker_task(
118
image='python:3.9',
119
environment={
120
'API_KEY': '{{ var.value.api_key }}',
121
'LOG_LEVEL': 'DEBUG',
122
'WORKERS': '4'
123
}
124
)
125
def api_data_fetch(endpoint: str):
126
"""Fetch data from API with environment configuration."""
127
import os
128
import requests
129
130
api_key = os.environ['API_KEY']
131
log_level = os.environ.get('LOG_LEVEL', 'INFO')
132
133
response = requests.get(
134
endpoint,
135
headers={'Authorization': f'Bearer {api_key}'}
136
)
137
138
return response.json()
139
140
fetch_task = api_data_fetch('https://api.example.com/data')
141
```
142
143
### Function with Volume Mounts
144
145
```python
146
from docker.types import Mount
147
148
@docker_task(
149
image='python:3.9',
150
mounts=[
151
Mount(
152
source='/host/data',
153
target='/app/data',
154
type='bind',
155
read_only=True
156
),
157
Mount(
158
source='/host/output',
159
target='/app/output',
160
type='bind'
161
)
162
]
163
)
164
def file_processor():
165
"""Process files from mounted volumes."""
166
import os
167
import json
168
169
# Read input files
170
input_dir = '/app/data'
171
output_dir = '/app/output'
172
173
results = []
174
for filename in os.listdir(input_dir):
175
filepath = os.path.join(input_dir, filename)
176
with open(filepath, 'r') as f:
177
data = f.read()
178
results.append({
179
'file': filename,
180
'size': len(data),
181
'lines': len(data.splitlines())
182
})
183
184
# Write results
185
output_file = os.path.join(output_dir, 'results.json')
186
with open(output_file, 'w') as f:
187
json.dump(results, f, indent=2)
188
189
return results
190
191
process_task = file_processor()
192
```
193
194
### Function with Multiple Outputs
195
196
```python
197
@docker_task(
198
image='python:3.9',
199
multiple_outputs=True
200
)
201
def data_pipeline():
202
"""Process data and return multiple outputs."""
203
import random
204
205
# Simulate data processing
206
raw_data = [random.randint(1, 100) for _ in range(50)]
207
208
return {
209
'processed_data': [x * 2 for x in raw_data],
210
'statistics': {
211
'count': len(raw_data),
212
'mean': sum(raw_data) / len(raw_data),
213
'max': max(raw_data),
214
'min': min(raw_data)
215
},
216
'metadata': {
217
'processing_version': '1.0',
218
'timestamp': '2024-01-01T00:00:00Z'
219
}
220
}
221
222
pipeline_task = data_pipeline()
223
224
# Access individual outputs
225
processed = pipeline_task['processed_data']
226
stats = pipeline_task['statistics']
227
meta = pipeline_task['metadata']
228
```
229
230
### GPU-Enabled Function
231
232
```python
233
from docker.types import DeviceRequest
234
235
@docker_task(
236
image='tensorflow/tensorflow:latest-gpu',
237
device_requests=[
238
DeviceRequest(count=1, capabilities=[['gpu']])
239
],
240
serializer='cloudpickle'
241
)
242
def gpu_computation():
243
"""Perform GPU-accelerated computation."""
244
import tensorflow as tf
245
246
# Check GPU availability
247
gpus = tf.config.list_physical_devices('GPU')
248
print(f"Available GPUs: {len(gpus)}")
249
250
# Simple GPU computation
251
with tf.device('/GPU:0'):
252
a = tf.constant([[1.0, 2.0], [3.0, 4.0]])
253
b = tf.constant([[2.0, 1.0], [1.0, 2.0]])
254
result = tf.matmul(a, b)
255
256
return result.numpy().tolist()
257
258
gpu_task = gpu_computation()
259
```
260
261
### Function with Custom Python Environment
262
263
```python
264
@docker_task(
265
image='continuumio/miniconda3:latest',
266
python_command='conda run -n myenv python'
267
)
268
def conda_analysis():
269
"""Run function in conda environment."""
270
import sys
271
import numpy as np
272
import pandas as pd
273
274
# Conda environment info
275
env_info = {
276
'python_version': sys.version,
277
'numpy_version': np.__version__,
278
'pandas_version': pd.__version__
279
}
280
281
return env_info
282
283
conda_task = conda_analysis()
284
```
285
286
### Function with Dill Serialization
287
288
```python
289
@docker_task(
290
image='python:3.9',
291
serializer='dill' # Better support for complex objects
292
)
293
def complex_object_handler():
294
"""Handle complex Python objects with dill."""
295
import functools
296
297
# Create complex objects that pickle can't serialize
298
def multiplier(factor):
299
return lambda x: x * factor
300
301
# Partial functions
302
double = functools.partial(multiplier, 2)
303
triple = functools.partial(multiplier, 3)
304
305
# Nested functions
306
def outer_func():
307
local_var = 42
308
def inner_func():
309
return local_var * 2
310
return inner_func
311
312
nested = outer_func()
313
314
return {
315
'double_result': double(5),
316
'triple_result': triple(5),
317
'nested_result': nested()
318
}
319
320
complex_task = complex_object_handler()
321
```
322
323
## Advanced Configuration
324
325
### Resource Management
326
327
```python
328
@docker_task(
329
image='python:3.9',
330
mem_limit='2g',
331
cpus=2.0,
332
shm_size=268435456 # 256MB shared memory
333
)
334
def memory_intensive_task():
335
"""Function with resource constraints."""
336
import numpy as np
337
338
# Memory-intensive operation
339
large_array = np.random.rand(10000, 10000)
340
result = np.sum(large_array)
341
342
return float(result)
343
344
resource_task = memory_intensive_task()
345
```
346
347
### Network Configuration
348
349
```python
350
@docker_task(
351
image='python:3.9',
352
network_mode='host',
353
extra_hosts={'database': '192.168.1.100'}
354
)
355
def network_service():
356
"""Function with custom networking."""
357
import socket
358
import requests
359
360
# Get container hostname
361
hostname = socket.gethostname()
362
363
# Make network request
364
response = requests.get('http://database:5432/health')
365
366
return {
367
'hostname': hostname,
368
'database_status': response.status_code
369
}
370
371
network_task = network_service()
372
```
373
374
## Decorator Internals
375
376
### _DockerDecoratedOperator
377
378
Internal implementation class (not directly used):
379
380
```python { .api }
381
class _DockerDecoratedOperator(DecoratedOperator, DockerOperator):
382
"""
383
Internal class that combines DecoratedOperator and DockerOperator.
384
385
Handles:
386
- Function serialization and deserialization
387
- Argument passing to containerized function
388
- Return value extraction and XCom storage
389
- Error handling and logging
390
"""
391
```
392
393
## Serialization Considerations
394
395
### Pickle (Default)
396
- **Pros**: Fast, built-in, handles most Python objects
397
- **Cons**: Limited support for complex objects (lambdas, nested functions)
398
- **Use for**: Simple data types, standard library objects
399
400
### Dill
401
- **Pros**: Extended object support, handles lambdas and nested functions
402
- **Cons**: Slower than pickle, requires additional dependency
403
- **Use for**: Complex functions, closures, partial functions
404
405
### CloudPickle
406
- **Pros**: Optimized for distributed computing, cloud environments
407
- **Cons**: Additional dependency, may be slower for simple objects
408
- **Use for**: Scientific computing, distributed workflows, cloud deployments
409
410
## Error Handling
411
412
Containerized functions handle errors through:
413
414
- **Serialization errors**: Function arguments can't be serialized
415
- **Execution errors**: Function fails inside container
416
- **Deserialization errors**: Return value can't be deserialized
417
- **Container errors**: Docker container fails to start or execute
418
419
All errors are propagated as Airflow task failures with detailed logging.