or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cli.mdconfiguration.mderrors.mdexecution.mdindex.mdinventory.mdmodule-utils.mdplaybook.mdplugins.mdtemplating.md

execution.mddocs/

0

# Execution Engine

1

2

Ansible Core's execution engine provides comprehensive task orchestration managing task queues, parallel execution, result collection, callback processing, and strategy selection for efficient automation execution across multiple hosts with result aggregation and error handling.

3

4

## Capabilities

5

6

### Task Queue Management

7

8

Central orchestration system managing task execution across multiple hosts with parallel processing, callback coordination, and comprehensive result collection.

9

10

```python { .api }

11

class TaskQueueManager:

12

"""

13

Central task queue manager orchestrating playbook execution.

14

15

Coordinates task execution across hosts, manages callbacks, and

16

handles result collection with configurable parallelism and strategies.

17

18

Attributes:

19

- _inventory: Inventory manager

20

- _variable_manager: Variable manager

21

- _loader: DataLoader instance

22

- _passwords: Authentication passwords

23

- _stdout_callback: Primary output callback

24

- _run_additional_callbacks: Whether to run additional callbacks

25

- _run_tree: Whether to use run tree

26

- _forks: Number of parallel processes

27

"""

28

29

def __init__(self, inventory, variable_manager, loader, passwords=None,

30

stdout_callback=None, run_additional_callbacks=True, run_tree=False, forks=None):

31

"""

32

Initialize task queue manager.

33

34

Parameters:

35

- inventory: InventoryManager instance

36

- variable_manager: VariableManager instance

37

- loader: DataLoader instance

38

- passwords: Dictionary of passwords

39

- stdout_callback: Primary output callback name

40

- run_additional_callbacks: Enable additional callbacks

41

- run_tree: Enable run tree output

42

- forks: Number of parallel forks

43

"""

44

45

def run(self, play):

46

"""

47

Execute play with task queue management.

48

49

Parameters:

50

- play: Play object to execute

51

52

Returns:

53

int: Exit code (0 for success)

54

"""

55

56

def cleanup(self):

57

"""Clean up task queue manager resources"""

58

59

def send_callback(self, method_name, *args, **kwargs):

60

"""

61

Send callback to all registered callbacks.

62

63

Parameters:

64

- method_name: Callback method name

65

- args: Method arguments

66

- kwargs: Method keyword arguments

67

"""

68

69

def load_callbacks(self):

70

"""Load and initialize callback plugins"""

71

72

def set_default_callbacks(self):

73

"""Set default callback configuration"""

74

```

75

76

### Playbook Execution

77

78

High-level playbook executor coordinating multiple plays with shared context, variable management, and comprehensive result tracking.

79

80

```python { .api }

81

class PlaybookExecutor:

82

"""

83

High-level playbook executor managing multiple plays.

84

85

Coordinates execution of all plays in a playbook with shared

86

variable context and comprehensive result tracking.

87

88

Attributes:

89

- _playbooks: List of playbooks to execute

90

- _inventory: Inventory manager

91

- _variable_manager: Variable manager

92

- _loader: DataLoader instance

93

- _passwords: Authentication passwords

94

"""

95

96

def __init__(self, playbooks, inventory, variable_manager, loader, passwords):

97

"""

98

Initialize playbook executor.

99

100

Parameters:

101

- playbooks: List of playbook file paths

102

- inventory: InventoryManager instance

103

- variable_manager: VariableManager instance

104

- loader: DataLoader instance

105

- passwords: Dictionary of passwords

106

"""

107

108

def run(self):

109

"""

110

Execute all playbooks.

111

112

Returns:

113

int: Overall exit code

114

"""

115

116

def _get_serialized_batches(self, play):

117

"""

118

Get serialized host batches for play.

119

120

Parameters:

121

- play: Play object

122

123

Returns:

124

list: List of host batches

125

"""

126

```

127

128

### Task Execution

129

130

Individual task executor handling module dispatch, connection management, result processing, and error handling for single task execution.

131

132

```python { .api }

133

class TaskExecutor:

134

"""

135

Execute individual tasks on specific hosts.

136

137

Handles module execution, connection management, variable

138

resolution, and result processing for single tasks.

139

140

Attributes:

141

- _host: Target host

142

- _task: Task to execute

143

- _job_vars: Task variables

144

- _play_context: Play execution context

145

- _new_stdin: Input stream

146

- _loader: DataLoader instance

147

- _shared_loader_obj: Shared plugin loader

148

"""

149

150

def __init__(self, host, task, job_vars, play_context, new_stdin, loader, shared_loader_obj):

151

"""

152

Initialize task executor.

153

154

Parameters:

155

- host: Target host object

156

- task: Task object to execute

157

- job_vars: Task execution variables

158

- play_context: Play execution context

159

- new_stdin: Input stream

160

- loader: DataLoader instance

161

- shared_loader_obj: Shared plugin loader

162

"""

163

164

def run(self):

165

"""

166

Execute task on target host.

167

168

Returns:

169

dict: Task execution result

170

"""

171

172

def _get_connection(self):

173

"""

174

Get connection to target host.

175

176

Returns:

177

Connection: Connection plugin instance

178

"""

179

180

def _get_action_handler(self, connection, templar):

181

"""

182

Get action handler for task.

183

184

Parameters:

185

- connection: Host connection

186

- templar: Template engine

187

188

Returns:

189

ActionBase: Action plugin instance

190

"""

191

192

def _execute_action(self, action, tmp, task_vars):

193

"""

194

Execute action plugin.

195

196

Parameters:

197

- action: Action plugin instance

198

- tmp: Temporary directory

199

- task_vars: Task variables

200

201

Returns:

202

dict: Action result

203

"""

204

```

205

206

### Result Processing

207

208

Result collection and processing system managing task results, callback notifications, and statistics aggregation across all hosts.

209

210

```python { .api }

211

class TaskResult:

212

"""

213

Container for task execution results.

214

215

Encapsulates all information about task execution including

216

host, task, result data, and execution metadata.

217

218

Attributes:

219

- _host: Host that executed the task

220

- _task: Task that was executed

221

- _return_data: Task execution result data

222

- task_name: Name of executed task

223

"""

224

225

def __init__(self, host, task, return_data, task_fields=None):

226

"""

227

Initialize task result.

228

229

Parameters:

230

- host: Host object

231

- task: Task object

232

- return_data: Task result data

233

- task_fields: Additional task fields

234

"""

235

236

def is_changed(self):

237

"""

238

Check if task made changes.

239

240

Returns:

241

bool: True if task changed something

242

"""

243

244

def is_skipped(self):

245

"""

246

Check if task was skipped.

247

248

Returns:

249

bool: True if task was skipped

250

"""

251

252

def is_failed(self):

253

"""

254

Check if task failed.

255

256

Returns:

257

bool: True if task failed

258

"""

259

260

def is_unreachable(self):

261

"""

262

Check if host was unreachable.

263

264

Returns:

265

bool: True if host unreachable

266

"""

267

268

def needs_debugger(self):

269

"""

270

Check if result needs debugger.

271

272

Returns:

273

bool: True if debugger needed

274

"""

275

276

def clean_copy(self):

277

"""

278

Create clean copy of result without sensitive data.

279

280

Returns:

281

TaskResult: Sanitized result copy

282

"""

283

284

class PlayStats:

285

"""

286

Statistics tracking for playbook execution.

287

288

Tracks execution statistics across all hosts including

289

success, failure, change, and skip counts.

290

"""

291

292

def __init__(self):

293

"""Initialize statistics tracking"""

294

295

def increment(self, what, host):

296

"""

297

Increment statistic counter.

298

299

Parameters:

300

- what: Statistic type ('ok', 'failures', 'changed', 'skipped', 'unreachable')

301

- host: Host name

302

"""

303

304

def summarize(self, host):

305

"""

306

Get summary statistics for host.

307

308

Parameters:

309

- host: Host name

310

311

Returns:

312

dict: Host statistics

313

"""

314

315

def custom(self, what, host, field):

316

"""

317

Track custom statistic.

318

319

Parameters:

320

- what: Custom statistic name

321

- host: Host name

322

- field: Field value

323

"""

324

```

325

326

### Strategy Plugins Integration

327

328

Integration with strategy plugins for customizable execution patterns including linear, free, debug, and custom strategies.

329

330

```python { .api }

331

class StrategyModule:

332

"""

333

Base class for execution strategy plugins.

334

335

Defines the interface for custom execution strategies that

336

control how tasks are executed across hosts.

337

"""

338

339

def __init__(self, tqm):

340

"""

341

Initialize strategy.

342

343

Parameters:

344

- tqm: TaskQueueManager instance

345

"""

346

347

def run(self, iterator, play_context, result=0):

348

"""

349

Execute strategy for play.

350

351

Parameters:

352

- iterator: Play iterator

353

- play_context: Play execution context

354

- result: Current result code

355

356

Returns:

357

int: Final result code

358

"""

359

360

def get_hosts_left(self, iterator):

361

"""

362

Get hosts that still have tasks to execute.

363

364

Parameters:

365

- iterator: Play iterator

366

367

Returns:

368

list: Remaining host objects

369

"""

370

371

# Built-in strategy plugins

372

linear_strategy: StrategyModule # Execute tasks linearly across hosts

373

free_strategy: StrategyModule # Execute tasks as fast as possible

374

debug_strategy: StrategyModule # Interactive debugging strategy

375

```

376

377

### Play Context Management

378

379

Execution context management providing configuration, connection parameters, and runtime settings for play execution.

380

381

```python { .api }

382

class PlayContext:

383

"""

384

Execution context for plays providing connection and runtime configuration.

385

386

Encapsulates all configuration needed for task execution including

387

connection parameters, privilege escalation, and runtime options.

388

389

Attributes:

390

- check_mode: Whether in check mode

391

- diff: Whether to show diffs

392

- force_handlers: Whether to force handler execution

393

- remote_addr: Target host address

394

- remote_user: Remote username

395

- port: Connection port

396

- password: Connection password

397

- private_key_file: SSH private key

398

- timeout: Connection timeout

399

- become: Whether to use privilege escalation

400

- become_method: Privilege escalation method

401

- become_user: Target user for escalation

402

- become_pass: Become password

403

- verbosity: Output verbosity level

404

"""

405

406

def __init__(self, play=None, options=None, passwords=None, connection_lockfd=None):

407

"""

408

Initialize play context.

409

410

Parameters:

411

- play: Play object

412

- options: CLI options

413

- passwords: Password dictionary

414

- connection_lockfd: Connection lock file descriptor

415

"""

416

417

def copy(self, host=None):

418

"""

419

Create copy of play context.

420

421

Parameters:

422

- host: Host to customize context for

423

424

Returns:

425

PlayContext: Copied context

426

"""

427

428

def set_attributes_from_plugin(self, plugin):

429

"""

430

Set attributes from connection plugin.

431

432

Parameters:

433

- plugin: Connection plugin instance

434

"""

435

436

def set_attributes_from_cli(self, options):

437

"""

438

Set attributes from CLI options.

439

440

Parameters:

441

- options: CLI option namespace

442

"""

443

444

def make_become_cmd(self, cmd, executable='/bin/sh'):

445

"""

446

Create become command wrapper.

447

448

Parameters:

449

- cmd: Command to wrap

450

- executable: Shell executable

451

452

Returns:

453

str: Wrapped command

454

"""

455

```

456

457

### Worker Process Management

458

459

Worker process coordination for parallel task execution with inter-process communication and resource management.

460

461

```python { .api }

462

class WorkerProcess:

463

"""

464

Worker process for parallel task execution.

465

466

Handles individual task execution in separate processes

467

with result communication back to main process.

468

"""

469

470

def __init__(self, rslt_q, task_vars, host, task, play_context, loader, variable_manager, shared_loader_obj):

471

"""Initialize worker process"""

472

473

def run(self):

474

"""Execute task in worker process"""

475

476

def _hard_exit(self, exit_code):

477

"""Force process exit"""

478

479

def _become_prompt_regex(become_method):

480

"""

481

Get regex for become prompts.

482

483

Parameters:

484

- become_method: Become method name

485

486

Returns:

487

str: Regex pattern for prompts

488

"""

489

```

490

491

## Execution Flow

492

493

### Playbook Execution Sequence

494

495

1. **Playbook Loading**: Parse YAML and create play objects

496

2. **Inventory Processing**: Load and process inventory sources

497

3. **Variable Resolution**: Gather and merge variables from all sources

498

4. **Play Iteration**: Execute each play in sequence

499

5. **Host Batching**: Group hosts according to serial settings

500

6. **Task Execution**: Execute tasks according to strategy

501

7. **Result Collection**: Gather and process task results

502

8. **Handler Notification**: Track and execute handlers

503

9. **Statistics Reporting**: Generate final execution statistics

504

505

### Task Execution Pipeline

506

507

1. **Task Preparation**: Resolve variables and templates

508

2. **Connection Establishment**: Connect to target host

509

3. **Action Plugin Loading**: Load appropriate action plugin

510

4. **Module Transfer**: Transfer and execute module code

511

5. **Result Processing**: Process and validate results

512

6. **Callback Notification**: Send results to callbacks

513

7. **Cleanup**: Clean up temporary resources

514

515

## Usage Examples

516

517

### Basic Execution Setup

518

519

```python

520

from ansible.executor.task_queue_manager import TaskQueueManager

521

from ansible.executor.playbook_executor import PlaybookExecutor

522

from ansible.inventory.manager import InventoryManager

523

from ansible.parsing.dataloader import DataLoader

524

from ansible.vars.manager import VariableManager

525

from ansible.playbook import Playbook

526

527

# Initialize core components

528

loader = DataLoader()

529

inventory = InventoryManager(loader=loader, sources=['inventory'])

530

variable_manager = VariableManager(loader=loader, inventory=inventory)

531

532

# Set up passwords

533

passwords = {

534

'conn_pass': None,

535

'become_pass': None

536

}

537

538

# Execute single play

539

pb = Playbook.load('site.yml', variable_manager=variable_manager, loader=loader)

540

plays = pb.get_plays()

541

542

tqm = TaskQueueManager(

543

inventory=inventory,

544

variable_manager=variable_manager,

545

loader=loader,

546

passwords=passwords,

547

stdout_callback='default'

548

)

549

550

result = 0

551

try:

552

for play in plays:

553

result = tqm.run(play)

554

if result != 0:

555

break

556

finally:

557

tqm.cleanup()

558

559

print(f"Playbook execution result: {result}")

560

```

561

562

### Advanced Execution with Custom Callbacks

563

564

```python

565

from ansible.executor.task_queue_manager import TaskQueueManager

566

from ansible.plugins.callback import CallbackBase

567

568

class CustomCallback(CallbackBase):

569

"""Custom callback for execution monitoring"""

570

571

def __init__(self):

572

super().__init__()

573

self.host_stats = {}

574

575

def v2_runner_on_ok(self, result):

576

host = result._host.get_name()

577

self.host_stats.setdefault(host, {'ok': 0, 'failed': 0})

578

self.host_stats[host]['ok'] += 1

579

print(f"✓ {host}: {result._task.get_name()}")

580

581

def v2_runner_on_failed(self, result, ignore_errors=False):

582

host = result._host.get_name()

583

self.host_stats.setdefault(host, {'ok': 0, 'failed': 0})

584

self.host_stats[host]['failed'] += 1

585

print(f"✗ {host}: {result._task.get_name()} - {result._result.get('msg', 'Failed')}")

586

587

def v2_playbook_on_stats(self, stats):

588

print("\nExecution Summary:")

589

for host, host_stats in self.host_stats.items():

590

print(f" {host}: {host_stats['ok']} ok, {host_stats['failed']} failed")

591

592

# Use custom callback

593

custom_callback = CustomCallback()

594

595

tqm = TaskQueueManager(

596

inventory=inventory,

597

variable_manager=variable_manager,

598

loader=loader,

599

passwords=passwords,

600

stdout_callback=custom_callback

601

)

602

```

603

604

### Task-Level Execution

605

606

```python

607

from ansible.executor.task_executor import TaskExecutor

608

from ansible.playbook.task import Task

609

from ansible.executor.play_context import PlayContext

610

611

# Create task

612

task_data = {

613

'name': 'Test task',

614

'debug': {'msg': 'Hello from task executor'}

615

}

616

task = Task.load(task_data, variable_manager=variable_manager, loader=loader)

617

618

# Set up execution context

619

play_context = PlayContext()

620

play_context.remote_user = 'ansible'

621

622

# Get target host

623

host = inventory.get_host('localhost')

624

625

# Execute task

626

task_executor = TaskExecutor(

627

host=host,

628

task=task,

629

job_vars=variable_manager.get_vars(host=host),

630

play_context=play_context,

631

new_stdin=None,

632

loader=loader,

633

shared_loader_obj=None

634

)

635

636

result = task_executor.run()

637

print(f"Task result: {result}")

638

```

639

640

### Parallel Execution Control

641

642

```python

643

# Configure parallel execution

644

tqm = TaskQueueManager(

645

inventory=inventory,

646

variable_manager=variable_manager,

647

loader=loader,

648

passwords=passwords,

649

forks=10, # 10 parallel processes

650

stdout_callback='minimal'

651

)

652

653

# Monitor execution with statistics

654

from ansible.executor import stats

655

656

play_stats = stats.PlayStats()

657

658

# Custom strategy with statistics

659

class MonitoredStrategy:

660

def __init__(self, tqm):

661

self.tqm = tqm

662

self.stats = play_stats

663

664

def execute_task(self, host, task):

665

# Execute task and track statistics

666

result = self.tqm.execute_task(host, task)

667

668

if result.is_changed():

669

self.stats.increment('changed', host.name)

670

elif result.is_failed():

671

self.stats.increment('failures', host.name)

672

elif result.is_skipped():

673

self.stats.increment('skipped', host.name)

674

else:

675

self.stats.increment('ok', host.name)

676

677

return result

678

```

679

680

### Error Handling and Recovery

681

682

```python

683

from ansible.errors import AnsibleError, AnsibleConnectionFailure

684

685

def robust_execution(playbook_path):

686

"""Execute playbook with comprehensive error handling"""

687

688

try:

689

# Initialize execution components

690

loader = DataLoader()

691

inventory = InventoryManager(loader=loader, sources=['inventory'])

692

variable_manager = VariableManager(loader=loader, inventory=inventory)

693

694

# Execute playbook

695

pbex = PlaybookExecutor(

696

playbooks=[playbook_path],

697

inventory=inventory,

698

variable_manager=variable_manager,

699

loader=loader,

700

passwords={}

701

)

702

703

result = pbex.run()

704

return result

705

706

except AnsibleConnectionFailure as e:

707

print(f"Connection failed: {e}")

708

return 4 # HOST_UNREACHABLE

709

710

except AnsibleError as e:

711

print(f"Ansible error: {e}")

712

return 1 # GENERIC_ERROR

713

714

except Exception as e:

715

print(f"Unexpected error: {e}")

716

return 250 # UNKNOWN_ERROR

717

718

finally:

719

# Cleanup resources

720

if 'tqm' in locals():

721

tqm.cleanup()

722

723

# Execute with error handling

724

exit_code = robust_execution('site.yml')

725

exit(exit_code)

726

```