Consistent Ansible Python API and CLI with container and process isolation runtime capabilities
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Components for distributed execution across multiple processes or containers with real-time event streaming and coordination. These classes enable scalable, distributed Ansible automation workflows with event streaming capabilities.
Handles streaming data transmission for distributed execution scenarios. Coordinates with Worker and Processor components to enable real-time event streaming.
class Transmitter:
def __init__(
self,
private_data_dir: str,
**kwargs
)The Transmitter class is used internally by ansible-runner when streamer='transmit' is specified. It handles the transmission of execution data to remote workers or processors.
Usage example:
import ansible_runner
# Use transmitter mode for distributed execution
result = ansible_runner.run(
private_data_dir='/shared/data',
playbook='site.yml',
inventory='hosts',
streamer='transmit'
)Worker process component for distributed execution. Handles the actual execution of Ansible operations in distributed scenarios.
class Worker:
def __init__(
self,
private_data_dir: str,
**kwargs
)The Worker class executes Ansible operations as part of a distributed system. It receives work from a Transmitter and sends results to a Processor.
Usage example:
import ansible_runner
# Use worker mode for distributed execution
result = ansible_runner.run(
private_data_dir='/shared/data',
playbook='site.yml',
inventory='hosts',
streamer='worker'
)Processes streaming data from distributed execution. Collects and processes results from Worker instances.
class Processor:
def __init__(
self,
private_data_dir: str,
**kwargs
)The Processor class collects and processes execution results from distributed workers. It aggregates events, artifacts, and status information.
Usage example:
import ansible_runner
# Use processor mode for distributed execution
result = ansible_runner.run(
private_data_dir='/shared/data',
playbook='site.yml',
inventory='hosts',
streamer='process'
)import ansible_runner
import threading
import time
def run_transmitter():
"""Run transmitter to coordinate execution"""
return ansible_runner.run(
private_data_dir='/shared/ansible',
playbook='distributed.yml',
inventory='large_inventory',
streamer='transmit',
process_isolation=True
)
def run_worker():
"""Run worker to execute tasks"""
return ansible_runner.run(
private_data_dir='/shared/ansible',
playbook='distributed.yml',
inventory='large_inventory',
streamer='worker',
process_isolation=True
)
def run_processor():
"""Run processor to collect results"""
return ansible_runner.run(
private_data_dir='/shared/ansible',
playbook='distributed.yml',
inventory='large_inventory',
streamer='process',
process_isolation=True
)
# Coordinate distributed execution
transmitter_thread = threading.Thread(target=run_transmitter)
worker_threads = [threading.Thread(target=run_worker) for _ in range(3)]
processor_thread = threading.Thread(target=run_processor)
# Start all components
transmitter_thread.start()
for worker in worker_threads:
worker.start()
processor_thread.start()
# Wait for completion
transmitter_thread.join()
for worker in worker_threads:
worker.join()
processor_thread.join()import ansible_runner
import json
import queue
class StreamingEventHandler:
def __init__(self):
self.event_queue = queue.Queue()
self.processed_events = []
def handle_event(self, event):
"""Handle streaming events"""
self.event_queue.put(event)
self.processed_events.append(event)
# Real-time event processing
if event['event'] == 'runner_on_failed':
self.handle_failure(event)
elif event['event'] == 'playbook_on_stats':
self.handle_completion(event)
return True
def handle_failure(self, event):
"""Handle task failures in real-time"""
host = event['event_data']['host']
task = event['event_data']['task']
print(f"FAILURE: Task '{task}' failed on host '{host}'")
def handle_completion(self, event):
"""Handle playbook completion"""
stats = event['event_data']
print(f"Playbook completed with stats: {json.dumps(stats, indent=2)}")
def get_events(self):
"""Get all queued events"""
events = []
while not self.event_queue.empty():
events.append(self.event_queue.get())
return events
# Use with streaming execution
handler = StreamingEventHandler()
result = ansible_runner.run(
private_data_dir='/project',
playbook='streaming.yml',
inventory='hosts',
event_handler=handler.handle_event,
streamer='transmit'
)
# Process collected events
all_events = handler.get_events()
print(f"Processed {len(all_events)} streaming events")import subprocess
import os
import tempfile
def setup_distributed_environment():
"""Setup shared environment for distributed execution"""
shared_dir = tempfile.mkdtemp(prefix='ansible-distributed-')
# Create directory structure
os.makedirs(f"{shared_dir}/project", exist_ok=True)
os.makedirs(f"{shared_dir}/inventory", exist_ok=True)
os.makedirs(f"{shared_dir}/artifacts", exist_ok=True)
return shared_dir
def run_distributed_component(component_type, shared_dir, **kwargs):
"""Run a distributed component in isolation"""
import ansible_runner
return ansible_runner.run(
private_data_dir=shared_dir,
streamer=component_type,
process_isolation=True,
process_isolation_executable='podman',
container_image='quay.io/ansible/ansible-runner:latest',
container_volume_mounts=[f"{shared_dir}:{shared_dir}:Z"],
**kwargs
)
# Setup distributed execution
shared_dir = setup_distributed_environment()
# Run components in separate processes
components = ['transmit', 'worker', 'worker', 'process']
processes = []
for component in components:
proc = subprocess.Popen([
'python', '-c', f'''
import ansible_runner
result = ansible_runner.run(
private_data_dir="{shared_dir}",
playbook="site.yml",
inventory="hosts",
streamer="{component}",
process_isolation=True
)
print(f"Component {component} finished with status: {{result.status}}")
'''
])
processes.append(proc)
# Wait for all components to complete
for proc in processes:
proc.wait()
print("Distributed execution completed")import threading
import time
import json
from collections import defaultdict
class DistributedMonitor:
def __init__(self, private_data_dir):
self.private_data_dir = private_data_dir
self.stats = defaultdict(int)
self.active_hosts = set()
self.failed_hosts = set()
self.monitoring = True
def start_monitoring(self):
"""Start real-time monitoring thread"""
monitor_thread = threading.Thread(target=self._monitor_events)
monitor_thread.daemon = True
monitor_thread.start()
return monitor_thread
def _monitor_events(self):
"""Monitor execution events in real-time"""
events_dir = f"{self.private_data_dir}/artifacts/job_events"
processed_files = set()
while self.monitoring:
try:
if os.path.exists(events_dir):
event_files = os.listdir(events_dir)
new_files = set(event_files) - processed_files
for filename in new_files:
if filename.endswith('.json'):
filepath = os.path.join(events_dir, filename)
try:
with open(filepath, 'r') as f:
event = json.load(f)
self._process_event(event)
processed_files.add(filename)
except (json.JSONDecodeError, IOError):
# File may still be being written
pass
time.sleep(0.5)
except Exception as e:
print(f"Monitoring error: {e}")
def _process_event(self, event):
"""Process individual events for monitoring"""
event_type = event.get('event')
self.stats[event_type] += 1
if 'event_data' in event:
host = event['event_data'].get('host')
if host:
self.active_hosts.add(host)
if event_type == 'runner_on_failed':
self.failed_hosts.add(host)
def get_status_summary(self):
"""Get current execution status summary"""
return {
'total_events': sum(self.stats.values()),
'active_hosts': len(self.active_hosts),
'failed_hosts': len(self.failed_hosts),
'event_breakdown': dict(self.stats)
}
def stop_monitoring(self):
"""Stop monitoring"""
self.monitoring = False
# Usage with distributed execution
monitor = DistributedMonitor('/shared/ansible')
monitor_thread = monitor.start_monitoring()
# Start distributed execution
result = ansible_runner.run(
private_data_dir='/shared/ansible',
playbook='large-deployment.yml',
inventory='production',
streamer='transmit',
process_isolation=True
)
# Monitor progress
while result.status in ['pending', 'running']:
summary = monitor.get_status_summary()
print(f"Progress: {summary['total_events']} events, "
f"{summary['active_hosts']} hosts, "
f"{summary['failed_hosts']} failures")
time.sleep(5)
# Stop monitoring
monitor.stop_monitoring()
monitor_thread.join(timeout=1)
print(f"Execution completed: {result.status}")
final_summary = monitor.get_status_summary()
print(f"Final stats: {json.dumps(final_summary, indent=2)}")import ansible_runner
import tempfile
import os
def create_streaming_config(mode, shared_dir, **kwargs):
"""Create configuration for streaming components"""
base_config = {
'private_data_dir': shared_dir,
'process_isolation': True,
'process_isolation_executable': 'podman',
'container_image': 'quay.io/ansible/ansible-runner:latest',
'container_volume_mounts': [f"{shared_dir}:{shared_dir}:Z"],
'envvars': {
'ANSIBLE_HOST_KEY_CHECKING': 'False',
'ANSIBLE_STREAMING_MODE': mode
}
}
base_config.update(kwargs)
return base_config
# Setup
shared_dir = tempfile.mkdtemp(prefix='ansible-streaming-')
# Configure different components
transmit_config = create_streaming_config(
'transmit',
shared_dir,
playbook='orchestration.yml',
inventory='clusters',
streamer='transmit'
)
worker_config = create_streaming_config(
'worker',
shared_dir,
playbook='orchestration.yml',
inventory='clusters',
streamer='worker'
)
process_config = create_streaming_config(
'process',
shared_dir,
playbook='orchestration.yml',
inventory='clusters',
streamer='process'
)
# Execute with custom configurations
transmit_result = ansible_runner.run(**transmit_config)
worker_result = ansible_runner.run(**worker_config)
process_result = ansible_runner.run(**process_config)Install with Tessl CLI
npx tessl i tessl/pypi-ansible-runner