0
# Streaming and Distributed Execution
1
2
Components for distributed execution across multiple processes or containers with real-time event streaming and coordination. These classes enable scalable, distributed Ansible automation workflows with event streaming capabilities.
3
4
## Capabilities
5
6
### Transmitter Class
7
8
Handles streaming data transmission for distributed execution scenarios. Coordinates with Worker and Processor components to enable real-time event streaming.
9
10
```python { .api }
11
class Transmitter:
12
def __init__(
13
self,
14
private_data_dir: str,
15
**kwargs
16
)
17
```
18
19
The Transmitter class is used internally by ansible-runner when `streamer='transmit'` is specified. It handles the transmission of execution data to remote workers or processors.
20
21
Usage example:
22
23
```python
24
import ansible_runner
25
26
# Use transmitter mode for distributed execution
27
result = ansible_runner.run(
28
private_data_dir='/shared/data',
29
playbook='site.yml',
30
inventory='hosts',
31
streamer='transmit'
32
)
33
```
34
35
### Worker Class
36
37
Worker process component for distributed execution. Handles the actual execution of Ansible operations in distributed scenarios.
38
39
```python { .api }
40
class Worker:
41
def __init__(
42
self,
43
private_data_dir: str,
44
**kwargs
45
)
46
```
47
48
The Worker class executes Ansible operations as part of a distributed system. It receives work from a Transmitter and sends results to a Processor.
49
50
Usage example:
51
52
```python
53
import ansible_runner
54
55
# Use worker mode for distributed execution
56
result = ansible_runner.run(
57
private_data_dir='/shared/data',
58
playbook='site.yml',
59
inventory='hosts',
60
streamer='worker'
61
)
62
```
63
64
### Processor Class
65
66
Processes streaming data from distributed execution. Collects and processes results from Worker instances.
67
68
```python { .api }
69
class Processor:
70
def __init__(
71
self,
72
private_data_dir: str,
73
**kwargs
74
)
75
```
76
77
The Processor class collects and processes execution results from distributed workers. It aggregates events, artifacts, and status information.
78
79
Usage example:
80
81
```python
82
import ansible_runner
83
84
# Use processor mode for distributed execution
85
result = ansible_runner.run(
86
private_data_dir='/shared/data',
87
playbook='site.yml',
88
inventory='hosts',
89
streamer='process'
90
)
91
```
92
93
## Distributed Execution Patterns
94
95
### Basic Streaming Workflow
96
97
```python
98
import ansible_runner
99
import threading
100
import time
101
102
def run_transmitter():
103
"""Run transmitter to coordinate execution"""
104
return ansible_runner.run(
105
private_data_dir='/shared/ansible',
106
playbook='distributed.yml',
107
inventory='large_inventory',
108
streamer='transmit',
109
process_isolation=True
110
)
111
112
def run_worker():
113
"""Run worker to execute tasks"""
114
return ansible_runner.run(
115
private_data_dir='/shared/ansible',
116
playbook='distributed.yml',
117
inventory='large_inventory',
118
streamer='worker',
119
process_isolation=True
120
)
121
122
def run_processor():
123
"""Run processor to collect results"""
124
return ansible_runner.run(
125
private_data_dir='/shared/ansible',
126
playbook='distributed.yml',
127
inventory='large_inventory',
128
streamer='process',
129
process_isolation=True
130
)
131
132
# Coordinate distributed execution
133
transmitter_thread = threading.Thread(target=run_transmitter)
134
worker_threads = [threading.Thread(target=run_worker) for _ in range(3)]
135
processor_thread = threading.Thread(target=run_processor)
136
137
# Start all components
138
transmitter_thread.start()
139
for worker in worker_threads:
140
worker.start()
141
processor_thread.start()
142
143
# Wait for completion
144
transmitter_thread.join()
145
for worker in worker_threads:
146
worker.join()
147
processor_thread.join()
148
```
149
150
### Event Streaming with Custom Handlers
151
152
```python
153
import ansible_runner
154
import json
155
import queue
156
157
class StreamingEventHandler:
158
def __init__(self):
159
self.event_queue = queue.Queue()
160
self.processed_events = []
161
162
def handle_event(self, event):
163
"""Handle streaming events"""
164
self.event_queue.put(event)
165
self.processed_events.append(event)
166
167
# Real-time event processing
168
if event['event'] == 'runner_on_failed':
169
self.handle_failure(event)
170
elif event['event'] == 'playbook_on_stats':
171
self.handle_completion(event)
172
173
return True
174
175
def handle_failure(self, event):
176
"""Handle task failures in real-time"""
177
host = event['event_data']['host']
178
task = event['event_data']['task']
179
print(f"FAILURE: Task '{task}' failed on host '{host}'")
180
181
def handle_completion(self, event):
182
"""Handle playbook completion"""
183
stats = event['event_data']
184
print(f"Playbook completed with stats: {json.dumps(stats, indent=2)}")
185
186
def get_events(self):
187
"""Get all queued events"""
188
events = []
189
while not self.event_queue.empty():
190
events.append(self.event_queue.get())
191
return events
192
193
# Use with streaming execution
194
handler = StreamingEventHandler()
195
196
result = ansible_runner.run(
197
private_data_dir='/project',
198
playbook='streaming.yml',
199
inventory='hosts',
200
event_handler=handler.handle_event,
201
streamer='transmit'
202
)
203
204
# Process collected events
205
all_events = handler.get_events()
206
print(f"Processed {len(all_events)} streaming events")
207
```
208
209
### Distributed Execution with Process Isolation
210
211
```python
212
import subprocess
213
import os
214
import tempfile
215
216
def setup_distributed_environment():
217
"""Setup shared environment for distributed execution"""
218
shared_dir = tempfile.mkdtemp(prefix='ansible-distributed-')
219
220
# Create directory structure
221
os.makedirs(f"{shared_dir}/project", exist_ok=True)
222
os.makedirs(f"{shared_dir}/inventory", exist_ok=True)
223
os.makedirs(f"{shared_dir}/artifacts", exist_ok=True)
224
225
return shared_dir
226
227
def run_distributed_component(component_type, shared_dir, **kwargs):
228
"""Run a distributed component in isolation"""
229
import ansible_runner
230
231
return ansible_runner.run(
232
private_data_dir=shared_dir,
233
streamer=component_type,
234
process_isolation=True,
235
process_isolation_executable='podman',
236
container_image='quay.io/ansible/ansible-runner:latest',
237
container_volume_mounts=[f"{shared_dir}:{shared_dir}:Z"],
238
**kwargs
239
)
240
241
# Setup distributed execution
242
shared_dir = setup_distributed_environment()
243
244
# Run components in separate processes
245
components = ['transmit', 'worker', 'worker', 'process']
246
processes = []
247
248
for component in components:
249
proc = subprocess.Popen([
250
'python', '-c', f'''
251
import ansible_runner
252
result = ansible_runner.run(
253
private_data_dir="{shared_dir}",
254
playbook="site.yml",
255
inventory="hosts",
256
streamer="{component}",
257
process_isolation=True
258
)
259
print(f"Component {component} finished with status: {{result.status}}")
260
'''
261
])
262
processes.append(proc)
263
264
# Wait for all components to complete
265
for proc in processes:
266
proc.wait()
267
268
print("Distributed execution completed")
269
```
270
271
### Real-time Monitoring
272
273
```python
274
import threading
275
import time
276
import json
277
from collections import defaultdict
278
279
class DistributedMonitor:
280
def __init__(self, private_data_dir):
281
self.private_data_dir = private_data_dir
282
self.stats = defaultdict(int)
283
self.active_hosts = set()
284
self.failed_hosts = set()
285
self.monitoring = True
286
287
def start_monitoring(self):
288
"""Start real-time monitoring thread"""
289
monitor_thread = threading.Thread(target=self._monitor_events)
290
monitor_thread.daemon = True
291
monitor_thread.start()
292
return monitor_thread
293
294
def _monitor_events(self):
295
"""Monitor execution events in real-time"""
296
events_dir = f"{self.private_data_dir}/artifacts/job_events"
297
processed_files = set()
298
299
while self.monitoring:
300
try:
301
if os.path.exists(events_dir):
302
event_files = os.listdir(events_dir)
303
new_files = set(event_files) - processed_files
304
305
for filename in new_files:
306
if filename.endswith('.json'):
307
filepath = os.path.join(events_dir, filename)
308
try:
309
with open(filepath, 'r') as f:
310
event = json.load(f)
311
self._process_event(event)
312
processed_files.add(filename)
313
except (json.JSONDecodeError, IOError):
314
# File may still be being written
315
pass
316
317
time.sleep(0.5)
318
except Exception as e:
319
print(f"Monitoring error: {e}")
320
321
def _process_event(self, event):
322
"""Process individual events for monitoring"""
323
event_type = event.get('event')
324
self.stats[event_type] += 1
325
326
if 'event_data' in event:
327
host = event['event_data'].get('host')
328
if host:
329
self.active_hosts.add(host)
330
331
if event_type == 'runner_on_failed':
332
self.failed_hosts.add(host)
333
334
def get_status_summary(self):
335
"""Get current execution status summary"""
336
return {
337
'total_events': sum(self.stats.values()),
338
'active_hosts': len(self.active_hosts),
339
'failed_hosts': len(self.failed_hosts),
340
'event_breakdown': dict(self.stats)
341
}
342
343
def stop_monitoring(self):
344
"""Stop monitoring"""
345
self.monitoring = False
346
347
# Usage with distributed execution
348
monitor = DistributedMonitor('/shared/ansible')
349
monitor_thread = monitor.start_monitoring()
350
351
# Start distributed execution
352
result = ansible_runner.run(
353
private_data_dir='/shared/ansible',
354
playbook='large-deployment.yml',
355
inventory='production',
356
streamer='transmit',
357
process_isolation=True
358
)
359
360
# Monitor progress
361
while result.status in ['pending', 'running']:
362
summary = monitor.get_status_summary()
363
print(f"Progress: {summary['total_events']} events, "
364
f"{summary['active_hosts']} hosts, "
365
f"{summary['failed_hosts']} failures")
366
time.sleep(5)
367
368
# Stop monitoring
369
monitor.stop_monitoring()
370
monitor_thread.join(timeout=1)
371
372
print(f"Execution completed: {result.status}")
373
final_summary = monitor.get_status_summary()
374
print(f"Final stats: {json.dumps(final_summary, indent=2)}")
375
```
376
377
## Advanced Configuration
378
379
### Custom Streaming Configuration
380
381
```python
382
import ansible_runner
383
import tempfile
384
import os
385
386
def create_streaming_config(mode, shared_dir, **kwargs):
387
"""Create configuration for streaming components"""
388
base_config = {
389
'private_data_dir': shared_dir,
390
'process_isolation': True,
391
'process_isolation_executable': 'podman',
392
'container_image': 'quay.io/ansible/ansible-runner:latest',
393
'container_volume_mounts': [f"{shared_dir}:{shared_dir}:Z"],
394
'envvars': {
395
'ANSIBLE_HOST_KEY_CHECKING': 'False',
396
'ANSIBLE_STREAMING_MODE': mode
397
}
398
}
399
400
base_config.update(kwargs)
401
return base_config
402
403
# Setup
404
shared_dir = tempfile.mkdtemp(prefix='ansible-streaming-')
405
406
# Configure different components
407
transmit_config = create_streaming_config(
408
'transmit',
409
shared_dir,
410
playbook='orchestration.yml',
411
inventory='clusters',
412
streamer='transmit'
413
)
414
415
worker_config = create_streaming_config(
416
'worker',
417
shared_dir,
418
playbook='orchestration.yml',
419
inventory='clusters',
420
streamer='worker'
421
)
422
423
process_config = create_streaming_config(
424
'process',
425
shared_dir,
426
playbook='orchestration.yml',
427
inventory='clusters',
428
streamer='process'
429
)
430
431
# Execute with custom configurations
432
transmit_result = ansible_runner.run(**transmit_config)
433
worker_result = ansible_runner.run(**worker_config)
434
process_result = ansible_runner.run(**process_config)
435
```