Docker integration provider for Apache Airflow workflows, enabling containerized task execution and Docker Swarm orchestration.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Transform Python functions into containerized tasks using the @docker_task decorator. This provides seamless integration of containerized execution with Python function workflows, enabling you to run functions in isolated Docker environments with automatic serialization and result handling.
Convert Python functions into DockerOperator tasks with automatic serialization.
def docker_task(
image: str,
python_command: str = "python",
serializer: Literal["pickle", "dill", "cloudpickle"] = "pickle",
multiple_outputs: bool | None = None,
**kwargs
) -> TaskDecorator:
"""
Decorator that converts a Python function into a DockerOperator task.
Args:
image: Docker image to run the function in
python_command: Python command to use in container (default: "python")
serializer: Serialization method for function arguments and return values
multiple_outputs: Whether the function returns multiple outputs
**kwargs: All DockerOperator parameters are supported
Returns:
TaskDecorator function that creates _DockerDecoratedOperator instances
"""Parameters:
image: Docker image containing Python runtime for function executionpython_command: Python executable command in container (e.g., "python", "python3", "/opt/python/bin/python")serializer: Method for serializing function arguments and return values:
"pickle": Standard Python pickle (default, fastest)"dill": Extended pickle with broader object support"cloudpickle": Cloud-optimized pickle for distributed computingmultiple_outputs: Set to True if function returns multiple values as dictionary**kwargs: All DockerOperator parameters (environment, mounts, resources, etc.)# Available serialization options
Serializer = Literal["pickle", "dill", "cloudpickle"]
# Serializer modules (lazy-loaded)
_SERIALIZERS: dict[Serializer, Any] = {
"pickle": pickle,
"dill": dill, # Requires: pip install dill
"cloudpickle": cloudpickle # Requires: pip install cloudpickle
}from airflow.providers.docker.decorators.docker import docker_task
@docker_task(image='python:3.9-slim')
def hello_world():
"""Simple containerized function."""
return "Hello from Docker container!"
# Use in DAG
hello_task = hello_world()@docker_task(image='python:3.9')
def process_data(data_list: list, multiplier: int = 2):
"""Process data with arguments."""
return [x * multiplier for x in data_list]
# Call with arguments
result = process_data([1, 2, 3, 4], multiplier=3)@docker_task(
image='python:3.9',
serializer='cloudpickle' # Better for scientific objects
)
def analyze_dataset():
"""Perform data analysis using scientific libraries."""
import numpy as np
import pandas as pd
# Generate sample data
data = np.random.randn(1000, 5)
df = pd.DataFrame(data, columns=['A', 'B', 'C', 'D', 'E'])
# Perform analysis
stats = {
'mean': df.mean().to_dict(),
'std': df.std().to_dict(),
'correlation': df.corr().to_dict()
}
return stats
analysis_task = analyze_dataset()@docker_task(
image='python:3.9',
environment={
'API_KEY': '{{ var.value.api_key }}',
'LOG_LEVEL': 'DEBUG',
'WORKERS': '4'
}
)
def api_data_fetch(endpoint: str):
"""Fetch data from API with environment configuration."""
import os
import requests
api_key = os.environ['API_KEY']
log_level = os.environ.get('LOG_LEVEL', 'INFO')
response = requests.get(
endpoint,
headers={'Authorization': f'Bearer {api_key}'}
)
return response.json()
fetch_task = api_data_fetch('https://api.example.com/data')from docker.types import Mount
@docker_task(
image='python:3.9',
mounts=[
Mount(
source='/host/data',
target='/app/data',
type='bind',
read_only=True
),
Mount(
source='/host/output',
target='/app/output',
type='bind'
)
]
)
def file_processor():
"""Process files from mounted volumes."""
import os
import json
# Read input files
input_dir = '/app/data'
output_dir = '/app/output'
results = []
for filename in os.listdir(input_dir):
filepath = os.path.join(input_dir, filename)
with open(filepath, 'r') as f:
data = f.read()
results.append({
'file': filename,
'size': len(data),
'lines': len(data.splitlines())
})
# Write results
output_file = os.path.join(output_dir, 'results.json')
with open(output_file, 'w') as f:
json.dump(results, f, indent=2)
return results
process_task = file_processor()@docker_task(
image='python:3.9',
multiple_outputs=True
)
def data_pipeline():
"""Process data and return multiple outputs."""
import random
# Simulate data processing
raw_data = [random.randint(1, 100) for _ in range(50)]
return {
'processed_data': [x * 2 for x in raw_data],
'statistics': {
'count': len(raw_data),
'mean': sum(raw_data) / len(raw_data),
'max': max(raw_data),
'min': min(raw_data)
},
'metadata': {
'processing_version': '1.0',
'timestamp': '2024-01-01T00:00:00Z'
}
}
pipeline_task = data_pipeline()
# Access individual outputs
processed = pipeline_task['processed_data']
stats = pipeline_task['statistics']
meta = pipeline_task['metadata']from docker.types import DeviceRequest
@docker_task(
image='tensorflow/tensorflow:latest-gpu',
device_requests=[
DeviceRequest(count=1, capabilities=[['gpu']])
],
serializer='cloudpickle'
)
def gpu_computation():
"""Perform GPU-accelerated computation."""
import tensorflow as tf
# Check GPU availability
gpus = tf.config.list_physical_devices('GPU')
print(f"Available GPUs: {len(gpus)}")
# Simple GPU computation
with tf.device('/GPU:0'):
a = tf.constant([[1.0, 2.0], [3.0, 4.0]])
b = tf.constant([[2.0, 1.0], [1.0, 2.0]])
result = tf.matmul(a, b)
return result.numpy().tolist()
gpu_task = gpu_computation()@docker_task(
image='continuumio/miniconda3:latest',
python_command='conda run -n myenv python'
)
def conda_analysis():
"""Run function in conda environment."""
import sys
import numpy as np
import pandas as pd
# Conda environment info
env_info = {
'python_version': sys.version,
'numpy_version': np.__version__,
'pandas_version': pd.__version__
}
return env_info
conda_task = conda_analysis()@docker_task(
image='python:3.9',
serializer='dill' # Better support for complex objects
)
def complex_object_handler():
"""Handle complex Python objects with dill."""
import functools
# Create complex objects that pickle can't serialize
def multiplier(factor):
return lambda x: x * factor
# Partial functions
double = functools.partial(multiplier, 2)
triple = functools.partial(multiplier, 3)
# Nested functions
def outer_func():
local_var = 42
def inner_func():
return local_var * 2
return inner_func
nested = outer_func()
return {
'double_result': double(5),
'triple_result': triple(5),
'nested_result': nested()
}
complex_task = complex_object_handler()@docker_task(
image='python:3.9',
mem_limit='2g',
cpus=2.0,
shm_size=268435456 # 256MB shared memory
)
def memory_intensive_task():
"""Function with resource constraints."""
import numpy as np
# Memory-intensive operation
large_array = np.random.rand(10000, 10000)
result = np.sum(large_array)
return float(result)
resource_task = memory_intensive_task()@docker_task(
image='python:3.9',
network_mode='host',
extra_hosts={'database': '192.168.1.100'}
)
def network_service():
"""Function with custom networking."""
import socket
import requests
# Get container hostname
hostname = socket.gethostname()
# Make network request
response = requests.get('http://database:5432/health')
return {
'hostname': hostname,
'database_status': response.status_code
}
network_task = network_service()Internal implementation class (not directly used):
class _DockerDecoratedOperator(DecoratedOperator, DockerOperator):
"""
Internal class that combines DecoratedOperator and DockerOperator.
Handles:
- Function serialization and deserialization
- Argument passing to containerized function
- Return value extraction and XCom storage
- Error handling and logging
"""Containerized functions handle errors through:
All errors are propagated as Airflow task failures with detailed logging.
Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-docker