Asynchronous parallel SSH client library that enables developers to execute SSH commands across many servers simultaneously with minimal system load on the client host.
—
Comprehensive result objects providing access to command output, exit codes, and execution metadata for both parallel and single-host operations. Enables flexible processing of command results with generators, buffering, and error handling.
Primary result object containing all information about command execution including output streams, exit codes, and error conditions.
class HostOutput:
"""
Container for SSH command execution results.
Properties:
- host (str): Hostname or IP address
- alias (str): Host alias if configured
- channel (object): SSH channel object used for execution
- stdin (object): Stdin stream object
- stdout (generator): Generator yielding stdout lines
- stderr (generator): Generator yielding stderr lines
- exit_code (int or None): Command exit code (None until command completes)
- client (object): Reference to SSH client that created this output
- exception (Exception or None): Exception if command execution failed
"""Process command output using generators that yield lines as they become available, enabling real-time output processing and memory-efficient handling of large outputs.
from pssh.clients import ParallelSSHClient
# Basic output processing
hosts = ['server1.example.com', 'server2.example.com']
client = ParallelSSHClient(hosts)
output = client.run_command('find /var/log -name "*.log" -type f')
# Process output from each host
for host_output in output:
print(f"\n=== Results from {host_output.host} ===")
# Stdout is a generator - iterate to get lines
for line in host_output.stdout:
print(f"Found: {line}")
# Check for errors
error_lines = []
for line in host_output.stderr:
error_lines.append(line)
if error_lines:
print("Errors:")
for error in error_lines:
print(f"ERROR: {error}")
print(f"Exit code: {host_output.exit_code}")Monitor command completion and handle exit codes for success/failure detection.
# Wait for completion before checking exit codes
output = client.run_command('test -f /etc/passwd && echo "exists" || echo "missing"')
client.join() # Wait for all commands to complete
successful_hosts = []
failed_hosts = []
for host_output in output:
if host_output.exit_code == 0:
successful_hosts.append(host_output.host)
# Process successful output
for line in host_output.stdout:
print(f"[{host_output.host}] {line}")
else:
failed_hosts.append((host_output.host, host_output.exit_code))
# Process error output
for line in host_output.stderr:
print(f"[{host_output.host}] ERROR: {line}")
print(f"Successful: {len(successful_hosts)}, Failed: {len(failed_hosts)}")Handle exceptions that occur during command execution, including connection errors, timeouts, and command failures.
from pssh.exceptions import Timeout, ConnectionError, AuthenticationError
output = client.run_command('long_running_command', read_timeout=30)
for host_output in output:
if host_output.exception:
print(f"Host {host_output.host} had an exception:")
print(f" Exception type: {type(host_output.exception).__name__}")
print(f" Exception message: {host_output.exception}")
# Handle specific exception types
if isinstance(host_output.exception, Timeout):
print(" -> Command timed out, consider increasing timeout")
elif isinstance(host_output.exception, ConnectionError):
print(" -> Connection failed, host may be unreachable")
elif isinstance(host_output.exception, AuthenticationError):
print(" -> Authentication failed, check credentials")
else:
# Process normal output
for line in host_output.stdout:
print(f"[{host_output.host}] {line}")Process output as it becomes available without waiting for command completion, useful for long-running commands and real-time monitoring.
import time
from threading import Thread
def process_output_realtime(host_output):
"""Process output in real-time as it becomes available"""
host = host_output.host
try:
for line in host_output.stdout:
timestamp = time.strftime('%H:%M:%S')
print(f"[{timestamp}] [{host}] {line}")
except Exception as e:
print(f"[{host}] Output processing error: {e}")
# Start long-running command
output = client.run_command('tail -f /var/log/system.log')
# Process output from each host in separate threads
threads = []
for host_output in output:
thread = Thread(target=process_output_realtime, args=(host_output,))
thread.daemon = True
thread.start()
threads.append(thread)
# Let it run for 60 seconds
time.sleep(60)
# Commands are still running - you could join() to wait for completion
# or handle them as needed for your use case
print("Stopping real-time monitoring...")Collect and buffer output for post-processing, analysis, or storage.
def collect_all_output(host_output):
"""Collect all output into structured data"""
result = {
'host': host_output.host,
'alias': host_output.alias,
'stdout_lines': [],
'stderr_lines': [],
'exit_code': None,
'exception': None
}
# Collect stdout
try:
for line in host_output.stdout:
result['stdout_lines'].append(line)
except Exception as e:
result['exception'] = e
# Collect stderr
try:
for line in host_output.stderr:
result['stderr_lines'].append(line)
except Exception as e:
if not result['exception']:
result['exception'] = e
result['exit_code'] = host_output.exit_code
if host_output.exception:
result['exception'] = host_output.exception
return result
# Execute command and collect all results
output = client.run_command('ps aux | head -20')
client.join()
results = []
for host_output in output:
result = collect_all_output(host_output)
results.append(result)
# Analyze collected results
for result in results:
print(f"\n=== {result['host']} ===")
print(f"Exit code: {result['exit_code']}")
print(f"Stdout lines: {len(result['stdout_lines'])}")
print(f"Stderr lines: {len(result['stderr_lines'])}")
if result['exception']:
print(f"Exception: {result['exception']}")
# Show first few lines of output
for line in result['stdout_lines'][:5]:
print(f" {line}")Filter and process output streams for specific patterns, log levels, or data extraction.
import re
def filter_and_process_output(output, filters=None):
"""Filter output based on patterns and extract structured data"""
filters = filters or {}
results = {}
for host_output in output:
host = host_output.host
results[host] = {
'filtered_lines': [],
'matched_patterns': {},
'line_count': 0
}
# Process each line of stdout
for line in host_output.stdout:
results[host]['line_count'] += 1
# Apply filters
include_line = True
if 'exclude_patterns' in filters:
for pattern in filters['exclude_patterns']:
if re.search(pattern, line):
include_line = False
break
if include_line and 'include_patterns' in filters:
include_line = False
for pattern in filters['include_patterns']:
if re.search(pattern, line):
include_line = True
break
if include_line:
results[host]['filtered_lines'].append(line)
# Extract specific patterns
if 'extract_patterns' in filters:
for name, pattern in filters['extract_patterns'].items():
matches = re.findall(pattern, line)
if matches:
if name not in results[host]['matched_patterns']:
results[host]['matched_patterns'][name] = []
results[host]['matched_patterns'][name].extend(matches)
return results
# Example: Filter log files for errors and extract timestamps
output = client.run_command('cat /var/log/application.log')
client.join()
filters = {
'include_patterns': [r'ERROR', r'WARN', r'CRITICAL'],
'exclude_patterns': [r'DEBUG'],
'extract_patterns': {
'timestamps': r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}',
'error_codes': r'ERROR_(\d+)'
}
}
results = filter_and_process_output(output, filters)
for host, data in results.items():
print(f"\n=== Log Analysis for {host} ===")
print(f"Total lines processed: {data['line_count']}")
print(f"Filtered lines (errors/warnings): {len(data['filtered_lines'])}")
if 'timestamps' in data['matched_patterns']:
timestamps = data['matched_patterns']['timestamps']
print(f"Error time range: {timestamps[0]} to {timestamps[-1]}")
if 'error_codes' in data['matched_patterns']:
error_codes = set(data['matched_patterns']['error_codes'])
print(f"Error codes found: {sorted(error_codes)}")
# Show filtered lines
for line in data['filtered_lines'][:10]: # First 10 filtered lines
print(f" {line}")Save command output to files for later analysis or record keeping.
import json
import csv
from datetime import datetime
def export_output_to_files(output, base_path='/tmp/ssh_results'):
"""Export command output to various file formats"""
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
# Collect all data
all_results = []
for host_output in output:
host_data = {
'host': host_output.host,
'alias': host_output.alias,
'timestamp': timestamp,
'exit_code': host_output.exit_code,
'stdout_lines': list(host_output.stdout),
'stderr_lines': list(host_output.stderr),
'has_exception': host_output.exception is not None,
'exception_str': str(host_output.exception) if host_output.exception else None
}
all_results.append(host_data)
# Export individual host files
host_file = f"{base_path}/{host_output.host}_{timestamp}.txt"
with open(host_file, 'w') as f:
f.write(f"Host: {host_data['host']}\n")
f.write(f"Exit Code: {host_data['exit_code']}\n")
f.write(f"Timestamp: {host_data['timestamp']}\n\n")
f.write("=== STDOUT ===\n")
for line in host_data['stdout_lines']:
f.write(f"{line}\n")
if host_data['stderr_lines']:
f.write("\n=== STDERR ===\n")
for line in host_data['stderr_lines']:
f.write(f"{line}\n")
if host_data['exception_str']:
f.write(f"\n=== EXCEPTION ===\n{host_data['exception_str']}\n")
# Export JSON summary
json_file = f"{base_path}/summary_{timestamp}.json"
with open(json_file, 'w') as f:
json.dump(all_results, f, indent=2)
# Export CSV summary
csv_file = f"{base_path}/summary_{timestamp}.csv"
with open(csv_file, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['host', 'alias', 'exit_code', 'stdout_lines', 'stderr_lines', 'has_exception'])
for result in all_results:
writer.writerow([
result['host'],
result['alias'],
result['exit_code'],
len(result['stdout_lines']),
len(result['stderr_lines']),
result['has_exception']
])
return {
'json_file': json_file,
'csv_file': csv_file,
'individual_files': [f"{base_path}/{r['host']}_{timestamp}.txt" for r in all_results]
}
# Execute command and export results
output = client.run_command('df -h && free -m && uptime')
client.join()
exported_files = export_output_to_files(output)
print("Results exported to:")
for file_type, files in exported_files.items():
if isinstance(files, list):
print(f" {file_type}: {len(files)} files")
else:
print(f" {file_type}: {files}")Use the built-in host logger for automatic output logging without manual processing.
from pssh.utils import enable_host_logger
# Enable automatic host output logging
enable_host_logger()
# Execute commands - output will be automatically logged
output = client.run_command('echo "Hello from $(hostname)"')
client.join(consume_output=True) # Consume output to trigger logging
# Output will appear like:
# [timestamp] pssh.host_logger [server1.example.com] Hello from server1
# [timestamp] pssh.host_logger [server2.example.com] Hello from server2# For large outputs, process incrementally to avoid memory issues
def process_large_output_incrementally(host_output, chunk_size=1000):
"""Process large output in chunks to manage memory"""
lines_processed = 0
current_chunk = []
for line in host_output.stdout:
current_chunk.append(line)
lines_processed += 1
if len(current_chunk) >= chunk_size:
# Process chunk
process_chunk(current_chunk)
current_chunk = []
# Process remaining lines
if current_chunk:
process_chunk(current_chunk)
return lines_processed
def process_chunk(lines):
"""Process a chunk of output lines"""
# Your processing logic here
print(f"Processed chunk of {len(lines)} lines")from concurrent.futures import ThreadPoolExecutor
def process_host_output_concurrent(output):
"""Process output from multiple hosts concurrently"""
def process_single_host(host_output):
result = {
'host': host_output.host,
'line_count': 0,
'error_count': 0
}
for line in host_output.stdout:
result['line_count'] += 1
if 'ERROR' in line:
result['error_count'] += 1
return result
# Process all hosts concurrently
with ThreadPoolExecutor(max_workers=len(output)) as executor:
results = list(executor.map(process_single_host, output))
return results
# Use concurrent processing for large outputs
output = client.run_command('find /var -type f -name "*.log" | head -1000')
client.join()
results = process_host_output_concurrent(output)
for result in results:
print(f"Host {result['host']}: {result['line_count']} lines, {result['error_count']} errors")Install with Tessl CLI
npx tessl i tessl/pypi-parallel-ssh