0
# Execution Engine
1
2
Ansible Core's execution engine provides comprehensive task orchestration managing task queues, parallel execution, result collection, callback processing, and strategy selection for efficient automation execution across multiple hosts with result aggregation and error handling.
3
4
## Capabilities
5
6
### Task Queue Management
7
8
Central orchestration system managing task execution across multiple hosts with parallel processing, callback coordination, and comprehensive result collection.
9
10
```python { .api }
11
class TaskQueueManager:
12
"""
13
Central task queue manager orchestrating playbook execution.
14
15
Coordinates task execution across hosts, manages callbacks, and
16
handles result collection with configurable parallelism and strategies.
17
18
Attributes:
19
- _inventory: Inventory manager
20
- _variable_manager: Variable manager
21
- _loader: DataLoader instance
22
- _passwords: Authentication passwords
23
- _stdout_callback: Primary output callback
24
- _run_additional_callbacks: Whether to run additional callbacks
25
- _run_tree: Whether to use run tree
26
- _forks: Number of parallel processes
27
"""
28
29
def __init__(self, inventory, variable_manager, loader, passwords=None,
30
stdout_callback=None, run_additional_callbacks=True, run_tree=False, forks=None):
31
"""
32
Initialize task queue manager.
33
34
Parameters:
35
- inventory: InventoryManager instance
36
- variable_manager: VariableManager instance
37
- loader: DataLoader instance
38
- passwords: Dictionary of passwords
39
- stdout_callback: Primary output callback name
40
- run_additional_callbacks: Enable additional callbacks
41
- run_tree: Enable run tree output
42
- forks: Number of parallel forks
43
"""
44
45
def run(self, play):
46
"""
47
Execute play with task queue management.
48
49
Parameters:
50
- play: Play object to execute
51
52
Returns:
53
int: Exit code (0 for success)
54
"""
55
56
def cleanup(self):
57
"""Clean up task queue manager resources"""
58
59
def send_callback(self, method_name, *args, **kwargs):
60
"""
61
Send callback to all registered callbacks.
62
63
Parameters:
64
- method_name: Callback method name
65
- args: Method arguments
66
- kwargs: Method keyword arguments
67
"""
68
69
def load_callbacks(self):
70
"""Load and initialize callback plugins"""
71
72
def set_default_callbacks(self):
73
"""Set default callback configuration"""
74
```
75
76
### Playbook Execution
77
78
High-level playbook executor coordinating multiple plays with shared context, variable management, and comprehensive result tracking.
79
80
```python { .api }
81
class PlaybookExecutor:
82
"""
83
High-level playbook executor managing multiple plays.
84
85
Coordinates execution of all plays in a playbook with shared
86
variable context and comprehensive result tracking.
87
88
Attributes:
89
- _playbooks: List of playbooks to execute
90
- _inventory: Inventory manager
91
- _variable_manager: Variable manager
92
- _loader: DataLoader instance
93
- _passwords: Authentication passwords
94
"""
95
96
def __init__(self, playbooks, inventory, variable_manager, loader, passwords):
97
"""
98
Initialize playbook executor.
99
100
Parameters:
101
- playbooks: List of playbook file paths
102
- inventory: InventoryManager instance
103
- variable_manager: VariableManager instance
104
- loader: DataLoader instance
105
- passwords: Dictionary of passwords
106
"""
107
108
def run(self):
109
"""
110
Execute all playbooks.
111
112
Returns:
113
int: Overall exit code
114
"""
115
116
def _get_serialized_batches(self, play):
117
"""
118
Get serialized host batches for play.
119
120
Parameters:
121
- play: Play object
122
123
Returns:
124
list: List of host batches
125
"""
126
```
127
128
### Task Execution
129
130
Individual task executor handling module dispatch, connection management, result processing, and error handling for single task execution.
131
132
```python { .api }
133
class TaskExecutor:
134
"""
135
Execute individual tasks on specific hosts.
136
137
Handles module execution, connection management, variable
138
resolution, and result processing for single tasks.
139
140
Attributes:
141
- _host: Target host
142
- _task: Task to execute
143
- _job_vars: Task variables
144
- _play_context: Play execution context
145
- _new_stdin: Input stream
146
- _loader: DataLoader instance
147
- _shared_loader_obj: Shared plugin loader
148
"""
149
150
def __init__(self, host, task, job_vars, play_context, new_stdin, loader, shared_loader_obj):
151
"""
152
Initialize task executor.
153
154
Parameters:
155
- host: Target host object
156
- task: Task object to execute
157
- job_vars: Task execution variables
158
- play_context: Play execution context
159
- new_stdin: Input stream
160
- loader: DataLoader instance
161
- shared_loader_obj: Shared plugin loader
162
"""
163
164
def run(self):
165
"""
166
Execute task on target host.
167
168
Returns:
169
dict: Task execution result
170
"""
171
172
def _get_connection(self):
173
"""
174
Get connection to target host.
175
176
Returns:
177
Connection: Connection plugin instance
178
"""
179
180
def _get_action_handler(self, connection, templar):
181
"""
182
Get action handler for task.
183
184
Parameters:
185
- connection: Host connection
186
- templar: Template engine
187
188
Returns:
189
ActionBase: Action plugin instance
190
"""
191
192
def _execute_action(self, action, tmp, task_vars):
193
"""
194
Execute action plugin.
195
196
Parameters:
197
- action: Action plugin instance
198
- tmp: Temporary directory
199
- task_vars: Task variables
200
201
Returns:
202
dict: Action result
203
"""
204
```
205
206
### Result Processing
207
208
Result collection and processing system managing task results, callback notifications, and statistics aggregation across all hosts.
209
210
```python { .api }
211
class TaskResult:
212
"""
213
Container for task execution results.
214
215
Encapsulates all information about task execution including
216
host, task, result data, and execution metadata.
217
218
Attributes:
219
- _host: Host that executed the task
220
- _task: Task that was executed
221
- _return_data: Task execution result data
222
- task_name: Name of executed task
223
"""
224
225
def __init__(self, host, task, return_data, task_fields=None):
226
"""
227
Initialize task result.
228
229
Parameters:
230
- host: Host object
231
- task: Task object
232
- return_data: Task result data
233
- task_fields: Additional task fields
234
"""
235
236
def is_changed(self):
237
"""
238
Check if task made changes.
239
240
Returns:
241
bool: True if task changed something
242
"""
243
244
def is_skipped(self):
245
"""
246
Check if task was skipped.
247
248
Returns:
249
bool: True if task was skipped
250
"""
251
252
def is_failed(self):
253
"""
254
Check if task failed.
255
256
Returns:
257
bool: True if task failed
258
"""
259
260
def is_unreachable(self):
261
"""
262
Check if host was unreachable.
263
264
Returns:
265
bool: True if host unreachable
266
"""
267
268
def needs_debugger(self):
269
"""
270
Check if result needs debugger.
271
272
Returns:
273
bool: True if debugger needed
274
"""
275
276
def clean_copy(self):
277
"""
278
Create clean copy of result without sensitive data.
279
280
Returns:
281
TaskResult: Sanitized result copy
282
"""
283
284
class PlayStats:
285
"""
286
Statistics tracking for playbook execution.
287
288
Tracks execution statistics across all hosts including
289
success, failure, change, and skip counts.
290
"""
291
292
def __init__(self):
293
"""Initialize statistics tracking"""
294
295
def increment(self, what, host):
296
"""
297
Increment statistic counter.
298
299
Parameters:
300
- what: Statistic type ('ok', 'failures', 'changed', 'skipped', 'unreachable')
301
- host: Host name
302
"""
303
304
def summarize(self, host):
305
"""
306
Get summary statistics for host.
307
308
Parameters:
309
- host: Host name
310
311
Returns:
312
dict: Host statistics
313
"""
314
315
def custom(self, what, host, field):
316
"""
317
Track custom statistic.
318
319
Parameters:
320
- what: Custom statistic name
321
- host: Host name
322
- field: Field value
323
"""
324
```
325
326
### Strategy Plugins Integration
327
328
Integration with strategy plugins for customizable execution patterns including linear, free, debug, and custom strategies.
329
330
```python { .api }
331
class StrategyModule:
332
"""
333
Base class for execution strategy plugins.
334
335
Defines the interface for custom execution strategies that
336
control how tasks are executed across hosts.
337
"""
338
339
def __init__(self, tqm):
340
"""
341
Initialize strategy.
342
343
Parameters:
344
- tqm: TaskQueueManager instance
345
"""
346
347
def run(self, iterator, play_context, result=0):
348
"""
349
Execute strategy for play.
350
351
Parameters:
352
- iterator: Play iterator
353
- play_context: Play execution context
354
- result: Current result code
355
356
Returns:
357
int: Final result code
358
"""
359
360
def get_hosts_left(self, iterator):
361
"""
362
Get hosts that still have tasks to execute.
363
364
Parameters:
365
- iterator: Play iterator
366
367
Returns:
368
list: Remaining host objects
369
"""
370
371
# Built-in strategy plugins
372
linear_strategy: StrategyModule # Execute tasks linearly across hosts
373
free_strategy: StrategyModule # Execute tasks as fast as possible
374
debug_strategy: StrategyModule # Interactive debugging strategy
375
```
376
377
### Play Context Management
378
379
Execution context management providing configuration, connection parameters, and runtime settings for play execution.
380
381
```python { .api }
382
class PlayContext:
383
"""
384
Execution context for plays providing connection and runtime configuration.
385
386
Encapsulates all configuration needed for task execution including
387
connection parameters, privilege escalation, and runtime options.
388
389
Attributes:
390
- check_mode: Whether in check mode
391
- diff: Whether to show diffs
392
- force_handlers: Whether to force handler execution
393
- remote_addr: Target host address
394
- remote_user: Remote username
395
- port: Connection port
396
- password: Connection password
397
- private_key_file: SSH private key
398
- timeout: Connection timeout
399
- become: Whether to use privilege escalation
400
- become_method: Privilege escalation method
401
- become_user: Target user for escalation
402
- become_pass: Become password
403
- verbosity: Output verbosity level
404
"""
405
406
def __init__(self, play=None, options=None, passwords=None, connection_lockfd=None):
407
"""
408
Initialize play context.
409
410
Parameters:
411
- play: Play object
412
- options: CLI options
413
- passwords: Password dictionary
414
- connection_lockfd: Connection lock file descriptor
415
"""
416
417
def copy(self, host=None):
418
"""
419
Create copy of play context.
420
421
Parameters:
422
- host: Host to customize context for
423
424
Returns:
425
PlayContext: Copied context
426
"""
427
428
def set_attributes_from_plugin(self, plugin):
429
"""
430
Set attributes from connection plugin.
431
432
Parameters:
433
- plugin: Connection plugin instance
434
"""
435
436
def set_attributes_from_cli(self, options):
437
"""
438
Set attributes from CLI options.
439
440
Parameters:
441
- options: CLI option namespace
442
"""
443
444
def make_become_cmd(self, cmd, executable='/bin/sh'):
445
"""
446
Create become command wrapper.
447
448
Parameters:
449
- cmd: Command to wrap
450
- executable: Shell executable
451
452
Returns:
453
str: Wrapped command
454
"""
455
```
456
457
### Worker Process Management
458
459
Worker process coordination for parallel task execution with inter-process communication and resource management.
460
461
```python { .api }
462
class WorkerProcess:
463
"""
464
Worker process for parallel task execution.
465
466
Handles individual task execution in separate processes
467
with result communication back to main process.
468
"""
469
470
def __init__(self, rslt_q, task_vars, host, task, play_context, loader, variable_manager, shared_loader_obj):
471
"""Initialize worker process"""
472
473
def run(self):
474
"""Execute task in worker process"""
475
476
def _hard_exit(self, exit_code):
477
"""Force process exit"""
478
479
def _become_prompt_regex(become_method):
480
"""
481
Get regex for become prompts.
482
483
Parameters:
484
- become_method: Become method name
485
486
Returns:
487
str: Regex pattern for prompts
488
"""
489
```
490
491
## Execution Flow
492
493
### Playbook Execution Sequence
494
495
1. **Playbook Loading**: Parse YAML and create play objects
496
2. **Inventory Processing**: Load and process inventory sources
497
3. **Variable Resolution**: Gather and merge variables from all sources
498
4. **Play Iteration**: Execute each play in sequence
499
5. **Host Batching**: Group hosts according to serial settings
500
6. **Task Execution**: Execute tasks according to strategy
501
7. **Result Collection**: Gather and process task results
502
8. **Handler Notification**: Track and execute handlers
503
9. **Statistics Reporting**: Generate final execution statistics
504
505
### Task Execution Pipeline
506
507
1. **Task Preparation**: Resolve variables and templates
508
2. **Connection Establishment**: Connect to target host
509
3. **Action Plugin Loading**: Load appropriate action plugin
510
4. **Module Transfer**: Transfer and execute module code
511
5. **Result Processing**: Process and validate results
512
6. **Callback Notification**: Send results to callbacks
513
7. **Cleanup**: Clean up temporary resources
514
515
## Usage Examples
516
517
### Basic Execution Setup
518
519
```python
520
from ansible.executor.task_queue_manager import TaskQueueManager
521
from ansible.executor.playbook_executor import PlaybookExecutor
522
from ansible.inventory.manager import InventoryManager
523
from ansible.parsing.dataloader import DataLoader
524
from ansible.vars.manager import VariableManager
525
from ansible.playbook import Playbook
526
527
# Initialize core components
528
loader = DataLoader()
529
inventory = InventoryManager(loader=loader, sources=['inventory'])
530
variable_manager = VariableManager(loader=loader, inventory=inventory)
531
532
# Set up passwords
533
passwords = {
534
'conn_pass': None,
535
'become_pass': None
536
}
537
538
# Execute single play
539
pb = Playbook.load('site.yml', variable_manager=variable_manager, loader=loader)
540
plays = pb.get_plays()
541
542
tqm = TaskQueueManager(
543
inventory=inventory,
544
variable_manager=variable_manager,
545
loader=loader,
546
passwords=passwords,
547
stdout_callback='default'
548
)
549
550
result = 0
551
try:
552
for play in plays:
553
result = tqm.run(play)
554
if result != 0:
555
break
556
finally:
557
tqm.cleanup()
558
559
print(f"Playbook execution result: {result}")
560
```
561
562
### Advanced Execution with Custom Callbacks
563
564
```python
565
from ansible.executor.task_queue_manager import TaskQueueManager
566
from ansible.plugins.callback import CallbackBase
567
568
class CustomCallback(CallbackBase):
569
"""Custom callback for execution monitoring"""
570
571
def __init__(self):
572
super().__init__()
573
self.host_stats = {}
574
575
def v2_runner_on_ok(self, result):
576
host = result._host.get_name()
577
self.host_stats.setdefault(host, {'ok': 0, 'failed': 0})
578
self.host_stats[host]['ok'] += 1
579
print(f"✓ {host}: {result._task.get_name()}")
580
581
def v2_runner_on_failed(self, result, ignore_errors=False):
582
host = result._host.get_name()
583
self.host_stats.setdefault(host, {'ok': 0, 'failed': 0})
584
self.host_stats[host]['failed'] += 1
585
print(f"✗ {host}: {result._task.get_name()} - {result._result.get('msg', 'Failed')}")
586
587
def v2_playbook_on_stats(self, stats):
588
print("\nExecution Summary:")
589
for host, host_stats in self.host_stats.items():
590
print(f" {host}: {host_stats['ok']} ok, {host_stats['failed']} failed")
591
592
# Use custom callback
593
custom_callback = CustomCallback()
594
595
tqm = TaskQueueManager(
596
inventory=inventory,
597
variable_manager=variable_manager,
598
loader=loader,
599
passwords=passwords,
600
stdout_callback=custom_callback
601
)
602
```
603
604
### Task-Level Execution
605
606
```python
607
from ansible.executor.task_executor import TaskExecutor
608
from ansible.playbook.task import Task
609
from ansible.executor.play_context import PlayContext
610
611
# Create task
612
task_data = {
613
'name': 'Test task',
614
'debug': {'msg': 'Hello from task executor'}
615
}
616
task = Task.load(task_data, variable_manager=variable_manager, loader=loader)
617
618
# Set up execution context
619
play_context = PlayContext()
620
play_context.remote_user = 'ansible'
621
622
# Get target host
623
host = inventory.get_host('localhost')
624
625
# Execute task
626
task_executor = TaskExecutor(
627
host=host,
628
task=task,
629
job_vars=variable_manager.get_vars(host=host),
630
play_context=play_context,
631
new_stdin=None,
632
loader=loader,
633
shared_loader_obj=None
634
)
635
636
result = task_executor.run()
637
print(f"Task result: {result}")
638
```
639
640
### Parallel Execution Control
641
642
```python
643
# Configure parallel execution
644
tqm = TaskQueueManager(
645
inventory=inventory,
646
variable_manager=variable_manager,
647
loader=loader,
648
passwords=passwords,
649
forks=10, # 10 parallel processes
650
stdout_callback='minimal'
651
)
652
653
# Monitor execution with statistics
654
from ansible.executor import stats
655
656
play_stats = stats.PlayStats()
657
658
# Custom strategy with statistics
659
class MonitoredStrategy:
660
def __init__(self, tqm):
661
self.tqm = tqm
662
self.stats = play_stats
663
664
def execute_task(self, host, task):
665
# Execute task and track statistics
666
result = self.tqm.execute_task(host, task)
667
668
if result.is_changed():
669
self.stats.increment('changed', host.name)
670
elif result.is_failed():
671
self.stats.increment('failures', host.name)
672
elif result.is_skipped():
673
self.stats.increment('skipped', host.name)
674
else:
675
self.stats.increment('ok', host.name)
676
677
return result
678
```
679
680
### Error Handling and Recovery
681
682
```python
683
from ansible.errors import AnsibleError, AnsibleConnectionFailure
684
685
def robust_execution(playbook_path):
686
"""Execute playbook with comprehensive error handling"""
687
688
try:
689
# Initialize execution components
690
loader = DataLoader()
691
inventory = InventoryManager(loader=loader, sources=['inventory'])
692
variable_manager = VariableManager(loader=loader, inventory=inventory)
693
694
# Execute playbook
695
pbex = PlaybookExecutor(
696
playbooks=[playbook_path],
697
inventory=inventory,
698
variable_manager=variable_manager,
699
loader=loader,
700
passwords={}
701
)
702
703
result = pbex.run()
704
return result
705
706
except AnsibleConnectionFailure as e:
707
print(f"Connection failed: {e}")
708
return 4 # HOST_UNREACHABLE
709
710
except AnsibleError as e:
711
print(f"Ansible error: {e}")
712
return 1 # GENERIC_ERROR
713
714
except Exception as e:
715
print(f"Unexpected error: {e}")
716
return 250 # UNKNOWN_ERROR
717
718
finally:
719
# Cleanup resources
720
if 'tqm' in locals():
721
tqm.cleanup()
722
723
# Execute with error handling
724
exit_code = robust_execution('site.yml')
725
exit(exit_code)
726
```