0
# Docker Swarm Orchestration
1
2
Deploy and manage Docker Swarm services for distributed containerized workloads. The DockerSwarmOperator extends DockerOperator functionality to provide orchestration capabilities for multi-container applications with service discovery, load balancing, and scaling features.
3
4
## Capabilities
5
6
### DockerSwarmOperator
7
8
Execute commands as Docker Swarm services with distributed orchestration capabilities.
9
10
```python { .api }
11
class DockerSwarmOperator(DockerOperator):
12
def __init__(
13
self,
14
*,
15
image: str,
16
enable_logging: bool = True,
17
configs: list | None = None,
18
secrets: list | None = None,
19
mode: dict | None = None,
20
networks: list | None = None,
21
endpoint_spec: dict | None = None,
22
**kwargs
23
) -> None
24
```
25
26
**Additional Parameters (beyond DockerOperator):**
27
28
- `image`: Docker image for the Swarm service
29
- `enable_logging`: Enable service logging and log streaming
30
- `configs`: List of Docker configs to attach to the service
31
- `secrets`: List of Docker secrets to attach to the service
32
- `mode`: Service mode configuration (replicated, global, etc.)
33
- `networks`: List of networks to attach the service to
34
- `endpoint_spec`: Service endpoint specification for port publishing
35
36
### Execution Methods
37
38
```python { .api }
39
def execute(self, context: Context) -> None:
40
"""Execute the Docker Swarm service."""
41
42
def on_kill(self) -> None:
43
"""Handle task cancellation by removing Swarm service."""
44
```
45
46
### Utility Methods
47
48
```python { .api }
49
@staticmethod
50
def format_args(args: list[str] | str | None) -> list[str] | None:
51
"""Format service arguments for Swarm deployment."""
52
```
53
54
## Usage Examples
55
56
### Basic Swarm Service
57
58
```python
59
from airflow.providers.docker.operators.docker_swarm import DockerSwarmOperator
60
61
# Simple Swarm service
62
basic_service = DockerSwarmOperator(
63
task_id='swarm_hello',
64
image='alpine:latest',
65
command=['echo', 'Hello from Docker Swarm!']
66
)
67
```
68
69
### Replicated Service with Multiple Instances
70
71
```python
72
# Multi-replica service
73
replicated_service = DockerSwarmOperator(
74
task_id='data_processing_service',
75
image='myapp:latest',
76
command=['python', '/app/worker.py'],
77
mode={
78
'Replicated': {
79
'Replicas': 3
80
}
81
},
82
environment={
83
'WORKER_TYPE': 'processor',
84
'CONCURRENCY': '4'
85
}
86
)
87
```
88
89
### Service with Secrets and Configs
90
91
```python
92
# Service using Docker secrets and configs
93
secure_service = DockerSwarmOperator(
94
task_id='secure_web_service',
95
image='nginx:alpine',
96
configs=[
97
{
98
'ConfigID': 'nginx_config',
99
'ConfigName': 'nginx.conf',
100
'File': {
101
'Name': '/etc/nginx/nginx.conf',
102
'UID': '0',
103
'GID': '0',
104
'Mode': 0o644
105
}
106
}
107
],
108
secrets=[
109
{
110
'SecretID': 'ssl_cert',
111
'SecretName': 'server.crt',
112
'File': {
113
'Name': '/etc/ssl/certs/server.crt',
114
'UID': '0',
115
'GID': '0',
116
'Mode': 0o600
117
}
118
},
119
{
120
'SecretID': 'ssl_key',
121
'SecretName': 'server.key',
122
'File': {
123
'Name': '/etc/ssl/private/server.key',
124
'UID': '0',
125
'GID': '0',
126
'Mode': 0o600
127
}
128
}
129
]
130
)
131
```
132
133
### Service with Custom Networks
134
135
```python
136
# Service with overlay network configuration
137
networked_service = DockerSwarmOperator(
138
task_id='microservice',
139
image='myapp:v1.2.0',
140
command=['./start-server.sh'],
141
networks=[
142
{
143
'Target': 'backend_network',
144
'Aliases': ['api-service']
145
},
146
{
147
'Target': 'monitoring_network',
148
'Aliases': ['app-metrics']
149
}
150
],
151
endpoint_spec={
152
'Ports': [
153
{
154
'Protocol': 'tcp',
155
'TargetPort': 8080,
156
'PublishedPort': 80,
157
'PublishMode': 'ingress'
158
}
159
]
160
}
161
)
162
```
163
164
### Global Service Mode
165
166
```python
167
# Global service (one task per node)
168
monitoring_agent = DockerSwarmOperator(
169
task_id='node_monitoring',
170
image='monitoring/agent:latest',
171
command=['./monitor.sh'],
172
mode={
173
'Global': {}
174
},
175
mounts=[
176
{
177
'Type': 'bind',
178
'Source': '/var/run/docker.sock',
179
'Target': '/var/run/docker.sock',
180
'ReadOnly': True
181
},
182
{
183
'Type': 'bind',
184
'Source': '/proc',
185
'Target': '/host/proc',
186
'ReadOnly': True
187
}
188
],
189
privileged=True
190
)
191
```
192
193
### Service with Resource Constraints
194
195
```python
196
# Service with CPU and memory limits
197
constrained_service = DockerSwarmOperator(
198
task_id='batch_processor',
199
image='processor:latest',
200
command=['python', '/app/batch_process.py'],
201
mode={
202
'Replicated': {
203
'Replicas': 2
204
}
205
},
206
mem_limit='1g',
207
cpus=1.5,
208
environment={
209
'MAX_WORKERS': '8',
210
'BATCH_SIZE': '1000'
211
}
212
)
213
```
214
215
### Service with Health Checks
216
217
```python
218
# Service with custom health check
219
web_service = DockerSwarmOperator(
220
task_id='web_application',
221
image='webapp:latest',
222
endpoint_spec={
223
'Ports': [
224
{
225
'Protocol': 'tcp',
226
'TargetPort': 3000,
227
'PublishedPort': 3000
228
}
229
]
230
},
231
# Health check configured via Docker image or service update
232
mode={
233
'Replicated': {
234
'Replicas': 2
235
}
236
},
237
labels={
238
'service.type': 'web',
239
'monitoring.enabled': 'true'
240
}
241
)
242
```
243
244
### Service Update Strategy
245
246
```python
247
# Service with rolling update configuration
248
updating_service = DockerSwarmOperator(
249
task_id='rolling_update_service',
250
image='myapp:v2.0.0',
251
command=['./start.sh'],
252
mode={
253
'Replicated': {
254
'Replicas': 4
255
}
256
},
257
# Update configuration handled by Swarm
258
labels={
259
'update.strategy': 'rolling',
260
'update.parallelism': '2'
261
}
262
)
263
```
264
265
## Service Management
266
267
### Service Lifecycle
268
269
The DockerSwarmOperator handles the complete service lifecycle:
270
271
1. **Service Creation**: Creates a new Swarm service with specified configuration
272
2. **Task Monitoring**: Monitors service tasks for completion or failure
273
3. **Log Streaming**: Streams service logs when `enable_logging=True`
274
4. **Service Cleanup**: Removes service when task completes or is cancelled
275
276
### Service States
277
278
Services progress through these states:
279
- **NEW**: Service created but not yet running
280
- **RUNNING**: Service tasks are executing
281
- **COMPLETE**: All service tasks completed successfully
282
- **FAILED**: Service tasks failed or errored
283
284
### Error Handling
285
286
The operator handles various failure scenarios:
287
- Service creation failures
288
- Task execution failures
289
- Network connectivity issues
290
- Resource constraint violations
291
292
Failed services are automatically cleaned up, and detailed error information is provided in task logs.
293
294
## Docker Swarm Prerequisites
295
296
To use DockerSwarmOperator, you need:
297
298
1. **Docker Swarm Mode**: Docker daemon must be running in Swarm mode
299
2. **Swarm Manager Access**: Airflow worker must connect to a Swarm manager node
300
3. **Network Connectivity**: Proper network configuration for service communication
301
4. **Resource Availability**: Sufficient cluster resources for service requirements
302
303
Initialize Docker Swarm:
304
```bash
305
docker swarm init
306
```
307
308
Join additional nodes:
309
```bash
310
docker swarm join --token <token> <manager-ip>:2377
311
```