CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-ansible-runner

Consistent Ansible Python API and CLI with container and process isolation runtime capabilities

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

utilities-cleanup.mddocs/

Utilities and Cleanup

Helper functions for artifact management, cleanup operations, output handling, and system integration. These utilities provide essential support functions for managing ansible-runner operations and maintaining system resources.

Capabilities

Cleanup Functions

Functions for managing temporary files, artifacts, and system resources.

def cleanup_folder(folder: str) -> bool:
    """
    Delete folder and return whether a change happened.
    
    Args:
        folder (str): Path to folder to delete
        
    Returns:
        bool: True if folder was deleted, False if folder didn't exist
    """

def register_for_cleanup(folder: str) -> None:
    """
    Register a folder path for cleanup when execution finishes.
    The folder need not exist at the time when this is called.
    
    Args:
        folder (str): Path to folder to register for cleanup
    """

Usage examples:

from ansible_runner.utils import cleanup_folder, register_for_cleanup

# Clean up specific folder
success = cleanup_folder('/tmp/ansible-artifacts-12345')
if success:
    print("Folder cleaned up successfully")

# Register folder for automatic cleanup on exit
register_for_cleanup('/tmp/ansible-temp-67890')

Bunch Class

A utility class for collecting variables together in a dynamic object.

class Bunch:
    """
    Collect a bunch of variables together in an object.
    This is a slight modification of Alex Martelli's and Doug Hudgeon's Bunch pattern.
    """
    
    def __init__(self, **kwargs):
        """Initialize with keyword arguments as attributes"""
    
    def update(self, **kwargs):
        """Update the object with new keyword arguments"""
    
    def get(self, key):
        """Get attribute value by key name"""

Usage examples:

from ansible_runner.utils import Bunch

# Create configuration object
config = Bunch(
    host='localhost',
    port=22,
    user='ansible',
    timeout=30
)

# Access attributes
print(f"Connecting to {config.host}:{config.port}")

# Update configuration
config.update(timeout=60, retries=3)

# Get attribute with default
timeout = config.get('timeout') or 30

Helper Functions

Various utility functions for working with Ansible data and artifacts.

def get_plugin_dir() -> str:
    """Get the path to the ansible-runner plugin directory"""

def get_callback_dir() -> str:
    """Get the path to the callback plugin directory"""

def is_dir_owner(directory: str) -> bool:
    """
    Check if current user is the owner of directory.
    
    Args:
        directory (str): Path to directory to check
        
    Returns:
        bool: True if current user owns the directory
    """

def isplaybook(obj) -> bool:
    """
    Inspect object and return if it is a playbook.
    
    Args:
        obj: The object to inspect
        
    Returns:
        bool: True if the object is a list (playbook format)
    """

def isinventory(obj) -> bool:
    """
    Inspect object and return if it is an inventory.
    
    Args:
        obj: The object to inspect
        
    Returns:
        bool: True if the object is an inventory dict or string
    """

Usage examples:

from ansible_runner.utils import (
    get_plugin_dir, get_callback_dir, is_dir_owner,
    isplaybook, isinventory
)

# Get plugin directories
plugin_dir = get_plugin_dir()
callback_dir = get_callback_dir()

# Check directory ownership
if is_dir_owner('/path/to/ansible/project'):
    print("You own this directory")

# Validate data types
playbook_data = [{'hosts': 'all', 'tasks': []}]
inventory_data = {'all': {'hosts': ['server1', 'server2']}}

if isplaybook(playbook_data):
    print("Valid playbook format")

if isinventory(inventory_data):
    print("Valid inventory format")

Artifact Management

Functions for handling execution artifacts and output data.

def dump_artifact(obj, path: str, filename: str = None):
    """
    Dump artifact object to file.
    
    Args:
        obj: Object to serialize and save
        path (str): Directory path to save artifact
        filename (str, optional): Filename for the artifact
    """

def ensure_str(obj):
    """
    Ensure object is converted to string format.
    
    Args:
        obj: Object to convert to string
        
    Returns:
        str: String representation of object
    """

def collect_new_events(artifact_dir: str, old_events=None):
    """
    Collect new events from artifact directory.
    
    Args:
        artifact_dir (str): Path to artifact directory
        old_events: Previously collected events to filter out
        
    Returns:
        list: New events since last collection
    """

def cleanup_artifact_dir(artifact_dir: str, remove_dir: bool = False):
    """
    Clean up artifact directory contents.
    
    Args:
        artifact_dir (str): Path to artifact directory
        remove_dir (bool): Whether to remove the directory itself
    """

Usage examples:

from ansible_runner.utils import (
    dump_artifact, ensure_str, collect_new_events, cleanup_artifact_dir
)

# Save execution results
results = {'status': 'successful', 'rc': 0}
dump_artifact(results, '/tmp/artifacts', 'execution_results.json')

# Ensure string conversion
host_data = {'hostname': 'server1', 'ip': '192.168.1.10'}
host_str = ensure_str(host_data)

# Collect new events
artifact_dir = '/tmp/ansible-artifacts'
new_events = collect_new_events(artifact_dir)
print(f"Found {len(new_events)} new events")

# Clean up artifacts
cleanup_artifact_dir(artifact_dir, remove_dir=True)

Output and Logging

Functions for handling output, logging, and display operations.

def display(msg: str, log_only: bool = False) -> None:
    """
    Display message to output.
    
    Args:
        msg (str): Message to display
        log_only (bool): Only log the message, don't display
    """

def debug(msg: str) -> None:
    """
    Debug logging function.
    
    Args:
        msg (str): Debug message to log
    """

def set_logfile(filename: str) -> None:
    """
    Set log file for output.
    
    Args:
        filename (str): Path to log file
    """

def set_debug(value: str) -> None:
    """
    Set debug level.
    
    Args:
        value (str): Debug level ('enable' or 'disable')
    """

def configure() -> None:
    """Configure logging system"""

Usage examples:

from ansible_runner import output

# Configure logging
output.configure()
output.set_logfile('/var/log/ansible-runner.log')
output.set_debug('enable')

# Display messages
output.display("Starting playbook execution")
output.debug("Debug information for troubleshooting")

# Log-only messages
output.display("Internal status update", log_only=True)

Cleanup Operations

Advanced cleanup operations for managing ansible-runner resources.

def add_cleanup_args(command) -> None:
    """
    Add cleanup arguments to argument parser.
    
    Args:
        command: ArgumentParser instance to add arguments to
    """

def run_cleanup(**kwargs):
    """
    Run cleanup operations based on provided arguments.
    
    Supported arguments:
    - file_pattern (str): File glob pattern for directories to remove
    - exclude_strings (list): Keywords to avoid when deleting
    - remove_images (list): Container images to remove
    - grace_period (int): Time in minutes before considering files for deletion
    - process_isolation_executable (str): Container runtime to use
    """

Usage examples:

import argparse
from ansible_runner.cleanup import add_cleanup_args, run_cleanup

# Setup argument parser with cleanup options
parser = argparse.ArgumentParser()
add_cleanup_args(parser)

# Run cleanup with specific parameters
run_cleanup(
    file_pattern='/tmp/.ansible-runner-*',
    exclude_strings=['important', 'keep'],
    grace_period=60,
    remove_images=['old-ansible-image:latest'],
    process_isolation_executable='podman'
)

Common Patterns

Comprehensive Cleanup Workflow

import tempfile
import shutil
from ansible_runner.utils import register_for_cleanup, cleanup_folder
from ansible_runner.cleanup import run_cleanup

class AnsibleWorkspace:
    def __init__(self, base_dir=None):
        self.base_dir = base_dir or tempfile.mkdtemp(prefix='ansible-workspace-')
        self.temp_dirs = []
        register_for_cleanup(self.base_dir)
    
    def create_temp_dir(self, prefix='temp-'):
        """Create temporary directory within workspace"""
        temp_dir = tempfile.mkdtemp(prefix=prefix, dir=self.base_dir)
        self.temp_dirs.append(temp_dir)
        return temp_dir
    
    def cleanup_temp_dirs(self):
        """Clean up all temporary directories"""
        cleaned = 0
        for temp_dir in self.temp_dirs:
            if cleanup_folder(temp_dir):
                cleaned += 1
        self.temp_dirs.clear()
        return cleaned
    
    def cleanup_workspace(self):
        """Clean up entire workspace"""
        return cleanup_folder(self.base_dir)

# Usage
workspace = AnsibleWorkspace()
temp_dir1 = workspace.create_temp_dir('playbook-')
temp_dir2 = workspace.create_temp_dir('inventory-')

# Use workspace for ansible operations
# ... ansible-runner operations ...

# Clean up when done
cleaned_temps = workspace.cleanup_temp_dirs()
print(f"Cleaned up {cleaned_temps} temporary directories")

workspace.cleanup_workspace()

Event Processing Pipeline

from ansible_runner.utils import collect_new_events, ensure_str, dump_artifact
import json
import time

class EventProcessor:
    def __init__(self, artifact_dir):
        self.artifact_dir = artifact_dir
        self.processed_events = []
        self.event_stats = {}
    
    def process_events(self):
        """Process new events from artifact directory"""
        new_events = collect_new_events(self.artifact_dir, self.processed_events)
        
        for event in new_events:
            self._process_single_event(event)
            self.processed_events.append(event)
        
        return len(new_events)
    
    def _process_single_event(self, event):
        """Process individual event"""
        event_type = event.get('event', 'unknown')
        self.event_stats[event_type] = self.event_stats.get(event_type, 0) + 1
        
        # Convert event to string for logging
        event_str = ensure_str(event)
        
        # Handle specific event types
        if event_type == 'runner_on_failed':
            self._handle_failure(event)
        elif event_type == 'playbook_on_stats':
            self._handle_completion(event)
    
    def _handle_failure(self, event):
        """Handle task failure events"""
        host = event.get('event_data', {}).get('host', 'unknown')
        task = event.get('event_data', {}).get('task', 'unknown')
        print(f"Task failure on {host}: {task}")
    
    def _handle_completion(self, event):
        """Handle playbook completion"""
        stats = event.get('event_data', {})
        print(f"Playbook completed with stats: {json.dumps(stats, indent=2)}")
    
    def save_summary(self, output_path):
        """Save event processing summary"""
        summary = {
            'total_events': len(self.processed_events),
            'event_breakdown': self.event_stats,
            'timestamp': time.time()
        }
        dump_artifact(summary, output_path, 'event_summary.json')

# Usage
processor = EventProcessor('/tmp/ansible-artifacts')

# Process events periodically
while True:
    new_count = processor.process_events()
    if new_count > 0:
        print(f"Processed {new_count} new events")
    
    # Check for completion condition
    if 'playbook_on_stats' in processor.event_stats:
        break
    
    time.sleep(1)

# Save final summary
processor.save_summary('/tmp/reports')

Resource Management

import os
import psutil
from ansible_runner.utils import Bunch, cleanup_folder
from ansible_runner.defaults import GRACE_PERIOD_DEFAULT

class ResourceManager:
    def __init__(self):
        self.config = Bunch(
            max_memory_mb=1024,
            max_cpu_percent=80,
            cleanup_grace_period=GRACE_PERIOD_DEFAULT,
            temp_dirs=[]
        )
        self.active_processes = []
    
    def check_resource_usage(self):
        """Check current system resource usage"""
        memory_mb = psutil.virtual_memory().used // (1024 * 1024)
        cpu_percent = psutil.cpu_percent(interval=1)
        
        return Bunch(
            memory_mb=memory_mb,
            cpu_percent=cpu_percent,
            memory_ok=memory_mb < self.config.max_memory_mb,
            cpu_ok=cpu_percent < self.config.max_cpu_percent
        )
    
    def create_managed_temp_dir(self, prefix='ansible-'):
        """Create and register temporary directory"""
        temp_dir = tempfile.mkdtemp(prefix=prefix)
        self.config.temp_dirs.append(temp_dir)
        return temp_dir
    
    def cleanup_temp_dirs(self, force=False):
        """Clean up managed temporary directories"""
        cleaned = []
        
        for temp_dir in self.config.temp_dirs[:]:
            # Check grace period unless forced
            if not force:
                stat = os.stat(temp_dir)
                age_minutes = (time.time() - stat.st_mtime) / 60
                if age_minutes < self.config.cleanup_grace_period:
                    continue
            
            if cleanup_folder(temp_dir):
                cleaned.append(temp_dir)
                self.config.temp_dirs.remove(temp_dir)
        
        return cleaned
    
    def get_status(self):
        """Get current resource manager status"""
        resources = self.check_resource_usage()
        return Bunch(
            resources=resources,
            temp_dirs_count=len(self.config.temp_dirs),
            active_processes=len(self.active_processes)
        )

# Usage
manager = ResourceManager()

# Check resources before starting
status = manager.get_status()
if not status.resources.memory_ok:
    print("Warning: High memory usage")

# Create managed temporary directory
temp_dir = manager.create_managed_temp_dir('playbook-')

# ... use temp_dir for ansible operations ...

# Clean up when appropriate
cleaned = manager.cleanup_temp_dirs()
print(f"Cleaned up {len(cleaned)} temporary directories")

Install with Tessl CLI

npx tessl i tessl/pypi-ansible-runner

docs

command-execution.md

configuration-runner.md

index.md

playbook-execution.md

plugin-role-management.md

streaming-distributed.md

utilities-cleanup.md

tile.json