Docker integration provider for Apache Airflow workflows, enabling containerized task execution and Docker Swarm orchestration.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Deploy and manage Docker Swarm services for distributed containerized workloads. The DockerSwarmOperator extends DockerOperator functionality to provide orchestration capabilities for multi-container applications with service discovery, load balancing, and scaling features.
Execute commands as Docker Swarm services with distributed orchestration capabilities.
class DockerSwarmOperator(DockerOperator):
def __init__(
self,
*,
image: str,
enable_logging: bool = True,
configs: list | None = None,
secrets: list | None = None,
mode: dict | None = None,
networks: list | None = None,
endpoint_spec: dict | None = None,
**kwargs
) -> NoneAdditional Parameters (beyond DockerOperator):
image: Docker image for the Swarm serviceenable_logging: Enable service logging and log streamingconfigs: List of Docker configs to attach to the servicesecrets: List of Docker secrets to attach to the servicemode: Service mode configuration (replicated, global, etc.)networks: List of networks to attach the service toendpoint_spec: Service endpoint specification for port publishingdef execute(self, context: Context) -> None:
"""Execute the Docker Swarm service."""
def on_kill(self) -> None:
"""Handle task cancellation by removing Swarm service."""@staticmethod
def format_args(args: list[str] | str | None) -> list[str] | None:
"""Format service arguments for Swarm deployment."""from airflow.providers.docker.operators.docker_swarm import DockerSwarmOperator
# Simple Swarm service
basic_service = DockerSwarmOperator(
task_id='swarm_hello',
image='alpine:latest',
command=['echo', 'Hello from Docker Swarm!']
)# Multi-replica service
replicated_service = DockerSwarmOperator(
task_id='data_processing_service',
image='myapp:latest',
command=['python', '/app/worker.py'],
mode={
'Replicated': {
'Replicas': 3
}
},
environment={
'WORKER_TYPE': 'processor',
'CONCURRENCY': '4'
}
)# Service using Docker secrets and configs
secure_service = DockerSwarmOperator(
task_id='secure_web_service',
image='nginx:alpine',
configs=[
{
'ConfigID': 'nginx_config',
'ConfigName': 'nginx.conf',
'File': {
'Name': '/etc/nginx/nginx.conf',
'UID': '0',
'GID': '0',
'Mode': 0o644
}
}
],
secrets=[
{
'SecretID': 'ssl_cert',
'SecretName': 'server.crt',
'File': {
'Name': '/etc/ssl/certs/server.crt',
'UID': '0',
'GID': '0',
'Mode': 0o600
}
},
{
'SecretID': 'ssl_key',
'SecretName': 'server.key',
'File': {
'Name': '/etc/ssl/private/server.key',
'UID': '0',
'GID': '0',
'Mode': 0o600
}
}
]
)# Service with overlay network configuration
networked_service = DockerSwarmOperator(
task_id='microservice',
image='myapp:v1.2.0',
command=['./start-server.sh'],
networks=[
{
'Target': 'backend_network',
'Aliases': ['api-service']
},
{
'Target': 'monitoring_network',
'Aliases': ['app-metrics']
}
],
endpoint_spec={
'Ports': [
{
'Protocol': 'tcp',
'TargetPort': 8080,
'PublishedPort': 80,
'PublishMode': 'ingress'
}
]
}
)# Global service (one task per node)
monitoring_agent = DockerSwarmOperator(
task_id='node_monitoring',
image='monitoring/agent:latest',
command=['./monitor.sh'],
mode={
'Global': {}
},
mounts=[
{
'Type': 'bind',
'Source': '/var/run/docker.sock',
'Target': '/var/run/docker.sock',
'ReadOnly': True
},
{
'Type': 'bind',
'Source': '/proc',
'Target': '/host/proc',
'ReadOnly': True
}
],
privileged=True
)# Service with CPU and memory limits
constrained_service = DockerSwarmOperator(
task_id='batch_processor',
image='processor:latest',
command=['python', '/app/batch_process.py'],
mode={
'Replicated': {
'Replicas': 2
}
},
mem_limit='1g',
cpus=1.5,
environment={
'MAX_WORKERS': '8',
'BATCH_SIZE': '1000'
}
)# Service with custom health check
web_service = DockerSwarmOperator(
task_id='web_application',
image='webapp:latest',
endpoint_spec={
'Ports': [
{
'Protocol': 'tcp',
'TargetPort': 3000,
'PublishedPort': 3000
}
]
},
# Health check configured via Docker image or service update
mode={
'Replicated': {
'Replicas': 2
}
},
labels={
'service.type': 'web',
'monitoring.enabled': 'true'
}
)# Service with rolling update configuration
updating_service = DockerSwarmOperator(
task_id='rolling_update_service',
image='myapp:v2.0.0',
command=['./start.sh'],
mode={
'Replicated': {
'Replicas': 4
}
},
# Update configuration handled by Swarm
labels={
'update.strategy': 'rolling',
'update.parallelism': '2'
}
)The DockerSwarmOperator handles the complete service lifecycle:
enable_logging=TrueServices progress through these states:
The operator handles various failure scenarios:
Failed services are automatically cleaned up, and detailed error information is provided in task logs.
To use DockerSwarmOperator, you need:
Initialize Docker Swarm:
docker swarm initJoin additional nodes:
docker swarm join --token <token> <manager-ip>:2377Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-docker