CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-ansible-runner

Consistent Ansible Python API and CLI with container and process isolation runtime capabilities

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

streaming-distributed.mddocs/

Streaming and Distributed Execution

Components for distributed execution across multiple processes or containers with real-time event streaming and coordination. These classes enable scalable, distributed Ansible automation workflows with event streaming capabilities.

Capabilities

Transmitter Class

Handles streaming data transmission for distributed execution scenarios. Coordinates with Worker and Processor components to enable real-time event streaming.

class Transmitter:
    def __init__(
        self,
        private_data_dir: str,
        **kwargs
    )

The Transmitter class is used internally by ansible-runner when streamer='transmit' is specified. It handles the transmission of execution data to remote workers or processors.

Usage example:

import ansible_runner

# Use transmitter mode for distributed execution
result = ansible_runner.run(
    private_data_dir='/shared/data',
    playbook='site.yml',
    inventory='hosts',
    streamer='transmit'
)

Worker Class

Worker process component for distributed execution. Handles the actual execution of Ansible operations in distributed scenarios.

class Worker:
    def __init__(
        self,
        private_data_dir: str,
        **kwargs
    )

The Worker class executes Ansible operations as part of a distributed system. It receives work from a Transmitter and sends results to a Processor.

Usage example:

import ansible_runner

# Use worker mode for distributed execution
result = ansible_runner.run(
    private_data_dir='/shared/data',
    playbook='site.yml',
    inventory='hosts',
    streamer='worker'
)

Processor Class

Processes streaming data from distributed execution. Collects and processes results from Worker instances.

class Processor:
    def __init__(
        self,
        private_data_dir: str,
        **kwargs
    )

The Processor class collects and processes execution results from distributed workers. It aggregates events, artifacts, and status information.

Usage example:

import ansible_runner

# Use processor mode for distributed execution
result = ansible_runner.run(
    private_data_dir='/shared/data',
    playbook='site.yml',
    inventory='hosts',
    streamer='process'
)

Distributed Execution Patterns

Basic Streaming Workflow

import ansible_runner
import threading
import time

def run_transmitter():
    """Run transmitter to coordinate execution"""
    return ansible_runner.run(
        private_data_dir='/shared/ansible',
        playbook='distributed.yml',
        inventory='large_inventory',
        streamer='transmit',
        process_isolation=True
    )

def run_worker():
    """Run worker to execute tasks"""
    return ansible_runner.run(
        private_data_dir='/shared/ansible',
        playbook='distributed.yml',
        inventory='large_inventory',
        streamer='worker',
        process_isolation=True
    )

def run_processor():
    """Run processor to collect results"""
    return ansible_runner.run(
        private_data_dir='/shared/ansible',
        playbook='distributed.yml',
        inventory='large_inventory',
        streamer='process',
        process_isolation=True
    )

# Coordinate distributed execution
transmitter_thread = threading.Thread(target=run_transmitter)
worker_threads = [threading.Thread(target=run_worker) for _ in range(3)]
processor_thread = threading.Thread(target=run_processor)

# Start all components
transmitter_thread.start()
for worker in worker_threads:
    worker.start()
processor_thread.start()

# Wait for completion
transmitter_thread.join()
for worker in worker_threads:
    worker.join()
processor_thread.join()

Event Streaming with Custom Handlers

import ansible_runner
import json
import queue

class StreamingEventHandler:
    def __init__(self):
        self.event_queue = queue.Queue()
        self.processed_events = []
    
    def handle_event(self, event):
        """Handle streaming events"""
        self.event_queue.put(event)
        self.processed_events.append(event)
        
        # Real-time event processing
        if event['event'] == 'runner_on_failed':
            self.handle_failure(event)
        elif event['event'] == 'playbook_on_stats':
            self.handle_completion(event)
        
        return True
    
    def handle_failure(self, event):
        """Handle task failures in real-time"""
        host = event['event_data']['host']
        task = event['event_data']['task']
        print(f"FAILURE: Task '{task}' failed on host '{host}'")
    
    def handle_completion(self, event):
        """Handle playbook completion"""
        stats = event['event_data']
        print(f"Playbook completed with stats: {json.dumps(stats, indent=2)}")
    
    def get_events(self):
        """Get all queued events"""
        events = []
        while not self.event_queue.empty():
            events.append(self.event_queue.get())
        return events

# Use with streaming execution
handler = StreamingEventHandler()

result = ansible_runner.run(
    private_data_dir='/project',
    playbook='streaming.yml',
    inventory='hosts',
    event_handler=handler.handle_event,
    streamer='transmit'
)

# Process collected events
all_events = handler.get_events()
print(f"Processed {len(all_events)} streaming events")

Distributed Execution with Process Isolation

import subprocess
import os
import tempfile

def setup_distributed_environment():
    """Setup shared environment for distributed execution"""
    shared_dir = tempfile.mkdtemp(prefix='ansible-distributed-')
    
    # Create directory structure
    os.makedirs(f"{shared_dir}/project", exist_ok=True)
    os.makedirs(f"{shared_dir}/inventory", exist_ok=True)
    os.makedirs(f"{shared_dir}/artifacts", exist_ok=True)
    
    return shared_dir

def run_distributed_component(component_type, shared_dir, **kwargs):
    """Run a distributed component in isolation"""
    import ansible_runner
    
    return ansible_runner.run(
        private_data_dir=shared_dir,
        streamer=component_type,
        process_isolation=True,
        process_isolation_executable='podman',
        container_image='quay.io/ansible/ansible-runner:latest',
        container_volume_mounts=[f"{shared_dir}:{shared_dir}:Z"],
        **kwargs
    )

# Setup distributed execution
shared_dir = setup_distributed_environment()

# Run components in separate processes
components = ['transmit', 'worker', 'worker', 'process']
processes = []

for component in components:
    proc = subprocess.Popen([
        'python', '-c', f'''
import ansible_runner
result = ansible_runner.run(
    private_data_dir="{shared_dir}",
    playbook="site.yml",
    inventory="hosts",
    streamer="{component}",
    process_isolation=True
)
print(f"Component {component} finished with status: {{result.status}}")
'''
    ])
    processes.append(proc)

# Wait for all components to complete
for proc in processes:
    proc.wait()

print("Distributed execution completed")

Real-time Monitoring

import threading
import time
import json
from collections import defaultdict

class DistributedMonitor:
    def __init__(self, private_data_dir):
        self.private_data_dir = private_data_dir
        self.stats = defaultdict(int)
        self.active_hosts = set()
        self.failed_hosts = set()
        self.monitoring = True
    
    def start_monitoring(self):
        """Start real-time monitoring thread"""
        monitor_thread = threading.Thread(target=self._monitor_events)
        monitor_thread.daemon = True
        monitor_thread.start()
        return monitor_thread
    
    def _monitor_events(self):
        """Monitor execution events in real-time"""
        events_dir = f"{self.private_data_dir}/artifacts/job_events"
        processed_files = set()
        
        while self.monitoring:
            try:
                if os.path.exists(events_dir):
                    event_files = os.listdir(events_dir)
                    new_files = set(event_files) - processed_files
                    
                    for filename in new_files:
                        if filename.endswith('.json'):
                            filepath = os.path.join(events_dir, filename)
                            try:
                                with open(filepath, 'r') as f:
                                    event = json.load(f)
                                    self._process_event(event)
                                processed_files.add(filename)
                            except (json.JSONDecodeError, IOError):
                                # File may still be being written
                                pass
                
                time.sleep(0.5)
            except Exception as e:
                print(f"Monitoring error: {e}")
    
    def _process_event(self, event):
        """Process individual events for monitoring"""
        event_type = event.get('event')
        self.stats[event_type] += 1
        
        if 'event_data' in event:
            host = event['event_data'].get('host')
            if host:
                self.active_hosts.add(host)
                
                if event_type == 'runner_on_failed':
                    self.failed_hosts.add(host)
    
    def get_status_summary(self):
        """Get current execution status summary"""
        return {
            'total_events': sum(self.stats.values()),
            'active_hosts': len(self.active_hosts),
            'failed_hosts': len(self.failed_hosts),
            'event_breakdown': dict(self.stats)
        }
    
    def stop_monitoring(self):
        """Stop monitoring"""
        self.monitoring = False

# Usage with distributed execution
monitor = DistributedMonitor('/shared/ansible')
monitor_thread = monitor.start_monitoring()

# Start distributed execution
result = ansible_runner.run(
    private_data_dir='/shared/ansible',
    playbook='large-deployment.yml',
    inventory='production',
    streamer='transmit',
    process_isolation=True
)

# Monitor progress
while result.status in ['pending', 'running']:
    summary = monitor.get_status_summary()
    print(f"Progress: {summary['total_events']} events, "
          f"{summary['active_hosts']} hosts, "
          f"{summary['failed_hosts']} failures")
    time.sleep(5)

# Stop monitoring
monitor.stop_monitoring()
monitor_thread.join(timeout=1)

print(f"Execution completed: {result.status}")
final_summary = monitor.get_status_summary()
print(f"Final stats: {json.dumps(final_summary, indent=2)}")

Advanced Configuration

Custom Streaming Configuration

import ansible_runner
import tempfile
import os

def create_streaming_config(mode, shared_dir, **kwargs):
    """Create configuration for streaming components"""
    base_config = {
        'private_data_dir': shared_dir,
        'process_isolation': True,
        'process_isolation_executable': 'podman',
        'container_image': 'quay.io/ansible/ansible-runner:latest',
        'container_volume_mounts': [f"{shared_dir}:{shared_dir}:Z"],
        'envvars': {
            'ANSIBLE_HOST_KEY_CHECKING': 'False',
            'ANSIBLE_STREAMING_MODE': mode
        }
    }
    
    base_config.update(kwargs)
    return base_config

# Setup
shared_dir = tempfile.mkdtemp(prefix='ansible-streaming-')

# Configure different components
transmit_config = create_streaming_config(
    'transmit',
    shared_dir,
    playbook='orchestration.yml',
    inventory='clusters',
    streamer='transmit'
)

worker_config = create_streaming_config(
    'worker',
    shared_dir,
    playbook='orchestration.yml',
    inventory='clusters',
    streamer='worker'
)

process_config = create_streaming_config(
    'process',
    shared_dir,
    playbook='orchestration.yml',
    inventory='clusters',
    streamer='process'
)

# Execute with custom configurations
transmit_result = ansible_runner.run(**transmit_config)
worker_result = ansible_runner.run(**worker_config)
process_result = ansible_runner.run(**process_config)

Install with Tessl CLI

npx tessl i tessl/pypi-ansible-runner

docs

command-execution.md

configuration-runner.md

index.md

playbook-execution.md

plugin-role-management.md

streaming-distributed.md

utilities-cleanup.md

tile.json