or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

command-execution.mdconfiguration-runner.mdindex.mdplaybook-execution.mdplugin-role-management.mdstreaming-distributed.mdutilities-cleanup.md

streaming-distributed.mddocs/

0

# Streaming and Distributed Execution

1

2

Components for distributed execution across multiple processes or containers with real-time event streaming and coordination. These classes enable scalable, distributed Ansible automation workflows with event streaming capabilities.

3

4

## Capabilities

5

6

### Transmitter Class

7

8

Handles streaming data transmission for distributed execution scenarios. Coordinates with Worker and Processor components to enable real-time event streaming.

9

10

```python { .api }

11

class Transmitter:

12

def __init__(

13

self,

14

private_data_dir: str,

15

**kwargs

16

)

17

```

18

19

The Transmitter class is used internally by ansible-runner when `streamer='transmit'` is specified. It handles the transmission of execution data to remote workers or processors.

20

21

Usage example:

22

23

```python

24

import ansible_runner

25

26

# Use transmitter mode for distributed execution

27

result = ansible_runner.run(

28

private_data_dir='/shared/data',

29

playbook='site.yml',

30

inventory='hosts',

31

streamer='transmit'

32

)

33

```

34

35

### Worker Class

36

37

Worker process component for distributed execution. Handles the actual execution of Ansible operations in distributed scenarios.

38

39

```python { .api }

40

class Worker:

41

def __init__(

42

self,

43

private_data_dir: str,

44

**kwargs

45

)

46

```

47

48

The Worker class executes Ansible operations as part of a distributed system. It receives work from a Transmitter and sends results to a Processor.

49

50

Usage example:

51

52

```python

53

import ansible_runner

54

55

# Use worker mode for distributed execution

56

result = ansible_runner.run(

57

private_data_dir='/shared/data',

58

playbook='site.yml',

59

inventory='hosts',

60

streamer='worker'

61

)

62

```

63

64

### Processor Class

65

66

Processes streaming data from distributed execution. Collects and processes results from Worker instances.

67

68

```python { .api }

69

class Processor:

70

def __init__(

71

self,

72

private_data_dir: str,

73

**kwargs

74

)

75

```

76

77

The Processor class collects and processes execution results from distributed workers. It aggregates events, artifacts, and status information.

78

79

Usage example:

80

81

```python

82

import ansible_runner

83

84

# Use processor mode for distributed execution

85

result = ansible_runner.run(

86

private_data_dir='/shared/data',

87

playbook='site.yml',

88

inventory='hosts',

89

streamer='process'

90

)

91

```

92

93

## Distributed Execution Patterns

94

95

### Basic Streaming Workflow

96

97

```python

98

import ansible_runner

99

import threading

100

import time

101

102

def run_transmitter():

103

"""Run transmitter to coordinate execution"""

104

return ansible_runner.run(

105

private_data_dir='/shared/ansible',

106

playbook='distributed.yml',

107

inventory='large_inventory',

108

streamer='transmit',

109

process_isolation=True

110

)

111

112

def run_worker():

113

"""Run worker to execute tasks"""

114

return ansible_runner.run(

115

private_data_dir='/shared/ansible',

116

playbook='distributed.yml',

117

inventory='large_inventory',

118

streamer='worker',

119

process_isolation=True

120

)

121

122

def run_processor():

123

"""Run processor to collect results"""

124

return ansible_runner.run(

125

private_data_dir='/shared/ansible',

126

playbook='distributed.yml',

127

inventory='large_inventory',

128

streamer='process',

129

process_isolation=True

130

)

131

132

# Coordinate distributed execution

133

transmitter_thread = threading.Thread(target=run_transmitter)

134

worker_threads = [threading.Thread(target=run_worker) for _ in range(3)]

135

processor_thread = threading.Thread(target=run_processor)

136

137

# Start all components

138

transmitter_thread.start()

139

for worker in worker_threads:

140

worker.start()

141

processor_thread.start()

142

143

# Wait for completion

144

transmitter_thread.join()

145

for worker in worker_threads:

146

worker.join()

147

processor_thread.join()

148

```

149

150

### Event Streaming with Custom Handlers

151

152

```python

153

import ansible_runner

154

import json

155

import queue

156

157

class StreamingEventHandler:

158

def __init__(self):

159

self.event_queue = queue.Queue()

160

self.processed_events = []

161

162

def handle_event(self, event):

163

"""Handle streaming events"""

164

self.event_queue.put(event)

165

self.processed_events.append(event)

166

167

# Real-time event processing

168

if event['event'] == 'runner_on_failed':

169

self.handle_failure(event)

170

elif event['event'] == 'playbook_on_stats':

171

self.handle_completion(event)

172

173

return True

174

175

def handle_failure(self, event):

176

"""Handle task failures in real-time"""

177

host = event['event_data']['host']

178

task = event['event_data']['task']

179

print(f"FAILURE: Task '{task}' failed on host '{host}'")

180

181

def handle_completion(self, event):

182

"""Handle playbook completion"""

183

stats = event['event_data']

184

print(f"Playbook completed with stats: {json.dumps(stats, indent=2)}")

185

186

def get_events(self):

187

"""Get all queued events"""

188

events = []

189

while not self.event_queue.empty():

190

events.append(self.event_queue.get())

191

return events

192

193

# Use with streaming execution

194

handler = StreamingEventHandler()

195

196

result = ansible_runner.run(

197

private_data_dir='/project',

198

playbook='streaming.yml',

199

inventory='hosts',

200

event_handler=handler.handle_event,

201

streamer='transmit'

202

)

203

204

# Process collected events

205

all_events = handler.get_events()

206

print(f"Processed {len(all_events)} streaming events")

207

```

208

209

### Distributed Execution with Process Isolation

210

211

```python

212

import subprocess

213

import os

214

import tempfile

215

216

def setup_distributed_environment():

217

"""Setup shared environment for distributed execution"""

218

shared_dir = tempfile.mkdtemp(prefix='ansible-distributed-')

219

220

# Create directory structure

221

os.makedirs(f"{shared_dir}/project", exist_ok=True)

222

os.makedirs(f"{shared_dir}/inventory", exist_ok=True)

223

os.makedirs(f"{shared_dir}/artifacts", exist_ok=True)

224

225

return shared_dir

226

227

def run_distributed_component(component_type, shared_dir, **kwargs):

228

"""Run a distributed component in isolation"""

229

import ansible_runner

230

231

return ansible_runner.run(

232

private_data_dir=shared_dir,

233

streamer=component_type,

234

process_isolation=True,

235

process_isolation_executable='podman',

236

container_image='quay.io/ansible/ansible-runner:latest',

237

container_volume_mounts=[f"{shared_dir}:{shared_dir}:Z"],

238

**kwargs

239

)

240

241

# Setup distributed execution

242

shared_dir = setup_distributed_environment()

243

244

# Run components in separate processes

245

components = ['transmit', 'worker', 'worker', 'process']

246

processes = []

247

248

for component in components:

249

proc = subprocess.Popen([

250

'python', '-c', f'''

251

import ansible_runner

252

result = ansible_runner.run(

253

private_data_dir="{shared_dir}",

254

playbook="site.yml",

255

inventory="hosts",

256

streamer="{component}",

257

process_isolation=True

258

)

259

print(f"Component {component} finished with status: {{result.status}}")

260

'''

261

])

262

processes.append(proc)

263

264

# Wait for all components to complete

265

for proc in processes:

266

proc.wait()

267

268

print("Distributed execution completed")

269

```

270

271

### Real-time Monitoring

272

273

```python

274

import threading

275

import time

276

import json

277

from collections import defaultdict

278

279

class DistributedMonitor:

280

def __init__(self, private_data_dir):

281

self.private_data_dir = private_data_dir

282

self.stats = defaultdict(int)

283

self.active_hosts = set()

284

self.failed_hosts = set()

285

self.monitoring = True

286

287

def start_monitoring(self):

288

"""Start real-time monitoring thread"""

289

monitor_thread = threading.Thread(target=self._monitor_events)

290

monitor_thread.daemon = True

291

monitor_thread.start()

292

return monitor_thread

293

294

def _monitor_events(self):

295

"""Monitor execution events in real-time"""

296

events_dir = f"{self.private_data_dir}/artifacts/job_events"

297

processed_files = set()

298

299

while self.monitoring:

300

try:

301

if os.path.exists(events_dir):

302

event_files = os.listdir(events_dir)

303

new_files = set(event_files) - processed_files

304

305

for filename in new_files:

306

if filename.endswith('.json'):

307

filepath = os.path.join(events_dir, filename)

308

try:

309

with open(filepath, 'r') as f:

310

event = json.load(f)

311

self._process_event(event)

312

processed_files.add(filename)

313

except (json.JSONDecodeError, IOError):

314

# File may still be being written

315

pass

316

317

time.sleep(0.5)

318

except Exception as e:

319

print(f"Monitoring error: {e}")

320

321

def _process_event(self, event):

322

"""Process individual events for monitoring"""

323

event_type = event.get('event')

324

self.stats[event_type] += 1

325

326

if 'event_data' in event:

327

host = event['event_data'].get('host')

328

if host:

329

self.active_hosts.add(host)

330

331

if event_type == 'runner_on_failed':

332

self.failed_hosts.add(host)

333

334

def get_status_summary(self):

335

"""Get current execution status summary"""

336

return {

337

'total_events': sum(self.stats.values()),

338

'active_hosts': len(self.active_hosts),

339

'failed_hosts': len(self.failed_hosts),

340

'event_breakdown': dict(self.stats)

341

}

342

343

def stop_monitoring(self):

344

"""Stop monitoring"""

345

self.monitoring = False

346

347

# Usage with distributed execution

348

monitor = DistributedMonitor('/shared/ansible')

349

monitor_thread = monitor.start_monitoring()

350

351

# Start distributed execution

352

result = ansible_runner.run(

353

private_data_dir='/shared/ansible',

354

playbook='large-deployment.yml',

355

inventory='production',

356

streamer='transmit',

357

process_isolation=True

358

)

359

360

# Monitor progress

361

while result.status in ['pending', 'running']:

362

summary = monitor.get_status_summary()

363

print(f"Progress: {summary['total_events']} events, "

364

f"{summary['active_hosts']} hosts, "

365

f"{summary['failed_hosts']} failures")

366

time.sleep(5)

367

368

# Stop monitoring

369

monitor.stop_monitoring()

370

monitor_thread.join(timeout=1)

371

372

print(f"Execution completed: {result.status}")

373

final_summary = monitor.get_status_summary()

374

print(f"Final stats: {json.dumps(final_summary, indent=2)}")

375

```

376

377

## Advanced Configuration

378

379

### Custom Streaming Configuration

380

381

```python

382

import ansible_runner

383

import tempfile

384

import os

385

386

def create_streaming_config(mode, shared_dir, **kwargs):

387

"""Create configuration for streaming components"""

388

base_config = {

389

'private_data_dir': shared_dir,

390

'process_isolation': True,

391

'process_isolation_executable': 'podman',

392

'container_image': 'quay.io/ansible/ansible-runner:latest',

393

'container_volume_mounts': [f"{shared_dir}:{shared_dir}:Z"],

394

'envvars': {

395

'ANSIBLE_HOST_KEY_CHECKING': 'False',

396

'ANSIBLE_STREAMING_MODE': mode

397

}

398

}

399

400

base_config.update(kwargs)

401

return base_config

402

403

# Setup

404

shared_dir = tempfile.mkdtemp(prefix='ansible-streaming-')

405

406

# Configure different components

407

transmit_config = create_streaming_config(

408

'transmit',

409

shared_dir,

410

playbook='orchestration.yml',

411

inventory='clusters',

412

streamer='transmit'

413

)

414

415

worker_config = create_streaming_config(

416

'worker',

417

shared_dir,

418

playbook='orchestration.yml',

419

inventory='clusters',

420

streamer='worker'

421

)

422

423

process_config = create_streaming_config(

424

'process',

425

shared_dir,

426

playbook='orchestration.yml',

427

inventory='clusters',

428

streamer='process'

429

)

430

431

# Execute with custom configurations

432

transmit_result = ansible_runner.run(**transmit_config)

433

worker_result = ansible_runner.run(**worker_config)

434

process_result = ansible_runner.run(**process_config)

435

```