CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-parallel-ssh

Asynchronous parallel SSH client library that enables developers to execute SSH commands across many servers simultaneously with minimal system load on the client host.

Pending
Overview
Eval results
Files

parallel-operations.mddocs/

Parallel SSH Operations

Execute commands across multiple hosts simultaneously with comprehensive configuration options, authentication methods, and result handling. Built for high performance and scalability with minimal system load on the client host.

Capabilities

ParallelSSHClient Initialization

Create a parallel SSH client for connecting to multiple hosts with shared or per-host configuration.

class ParallelSSHClient:
    def __init__(self, hosts, user=None, password=None, port=22, pkey=None,
                 num_retries=DEFAULT_RETRIES, timeout=None, pool_size=100,
                 allow_agent=True, host_config=None, retry_delay=RETRY_DELAY,
                 proxy_host=None, proxy_port=None, proxy_user=None, proxy_password=None,
                 proxy_pkey=None, forward_ssh_agent=False, keepalive_seconds=60,
                 identity_auth=True, ipv6_only=False):
        """
        Create parallel SSH client for multiple hosts.

        Parameters:
        - hosts (list[str]): List of hostnames or IP addresses
        - user (str, optional): Username for authentication 
        - password (str, optional): Password for authentication
        - port (int, optional): SSH port number (default: 22)
        - pkey (str or bytes, optional): Private key file path or key data
        - num_retries (int, optional): Connection retry attempts (default: 3)
        - timeout (float, optional): Global timeout in seconds for all operations
        - pool_size (int, optional): Greenlet pool size for concurrency (default: 100)
        - allow_agent (bool, optional): Use SSH agent authentication (default: True)
        - host_config (list[HostConfig], optional): Per-host configuration list
        - retry_delay (float, optional): Seconds between retries (default: 5)
        - proxy_host (str, optional): SSH proxy hostname
        - proxy_port (int, optional): SSH proxy port
        - proxy_user (str, optional): SSH proxy username  
        - proxy_password (str, optional): SSH proxy password
        - proxy_pkey (str or bytes, optional): SSH proxy private key
        - forward_ssh_agent (bool, optional): SSH agent forwarding (default: False)
        - keepalive_seconds (int, optional): Keepalive interval (default: 60)
        - identity_auth (bool, optional): Use default identity files (default: True)
        - ipv6_only (bool, optional): IPv6 addresses only (default: False)
        """

Usage example:

from pssh.clients import ParallelSSHClient
from pssh.config import HostConfig

# Simple parallel client
hosts = ['web1.example.com', 'web2.example.com', 'db.example.com']
client = ParallelSSHClient(hosts, user='admin', port=22)

# With per-host configuration
host_config = [
    HostConfig(user='web_user', port=22),    # web1
    HostConfig(user='web_user', port=22),    # web2  
    HostConfig(user='db_user', port=2222)    # db (different port)
]
client = ParallelSSHClient(hosts, host_config=host_config)

Parallel Command Execution

Execute commands on all hosts in parallel with comprehensive options for sudo, user switching, and output handling.

def run_command(self, command, sudo=False, user=None, stop_on_errors=True,
               use_pty=False, host_args=None, shell=None, encoding='utf-8',
               read_timeout=None):
    """
    Execute command on all hosts in parallel.

    Parameters:
    - command (str): Command to execute
    - sudo (bool, optional): Run command with sudo (default: False)
    - user (str, optional): Run command as different user
    - stop_on_errors (bool, optional): Stop execution on first error (default: True)
    - use_pty (bool, optional): Use pseudo-terminal (default: False)
    - host_args (list, optional): Per-host command arguments
    - shell (str, optional): Shell to use for command execution
    - encoding (str, optional): Output encoding (default: 'utf-8')
    - read_timeout (float, optional): Per-host output read timeout

    Returns:
    list[HostOutput]: List of HostOutput objects containing results
    """

Usage examples:

# Basic command execution
output = client.run_command('uptime')

# With sudo
output = client.run_command('systemctl status nginx', sudo=True)

# Run as different user
output = client.run_command('whoami', user='nginx')

# Per-host arguments
host_args = [
    {'command': 'ls /var/log/httpd'},    # web1
    {'command': 'ls /var/log/apache2'},  # web2  
    {'command': 'ls /var/log/mysql'}     # db
]
output = client.run_command(host_args=host_args)

Connection Management

Establish connections and authenticate to all hosts, with control over the connection process.

def connect_auth(self):
    """
    Connect and authenticate to all hosts.

    Returns:
    list[gevent.Greenlet]: List of greenlets for connection operations
    """

def finished(self, output=None):
    """
    Check if all commands in output have finished without blocking.

    Parameters:
    - output (list[HostOutput], optional): Output to check (default: last run_command)

    Returns:
    bool: True if all commands finished, False otherwise
    """

Result Synchronization

Wait for command completion and control output consumption across all hosts.

def join(self, output=None, consume_output=False, timeout=None):
    """
    Wait for all commands in output to complete.

    Parameters:
    - output (list[HostOutput], optional): Output to wait for (default: last run_command)
    - consume_output (bool, optional): Consume and discard output (default: False)
    - timeout (float, optional): Timeout in seconds for join operation

    Raises:
    - Timeout: If timeout is reached before completion
    """

Usage examples:

# Execute and wait for completion
output = client.run_command('long_running_command')
client.join(output)

# Process results after completion
for host_output in output:
    print(f"Host {host_output.host} exit code: {host_output.exit_code}")
    for line in host_output.stdout:
        print(f"  {line}")

# Join with timeout
try:
    client.join(output, timeout=30)
except Timeout:
    print("Commands did not complete within 30 seconds")

# Consume output without reading (useful with host logger)
from pssh.utils import enable_host_logger
enable_host_logger()
output = client.run_command('echo "Hello from $(hostname)"')
client.join(output, consume_output=True)  # Output logged automatically

Interactive Shell Management

Manage interactive shell sessions across multiple hosts with parallel shell operations and coordination.

def open_shell(self, encoding='utf-8', read_timeout=None):
    """
    Open interactive shell sessions on all hosts.

    Parameters:
    - encoding (str, optional): Shell output encoding (default: 'utf-8')
    - read_timeout (float, optional): Timeout for reading shell output

    Returns:
    list[InteractiveShell]: List of interactive shell objects for all hosts
    """

def run_shell_commands(self, shells, commands):
    """
    Execute commands on multiple shell sessions.

    Parameters:
    - shells (list[InteractiveShell]): List of shell objects
    - commands (list[str]): Commands to execute on all shells

    Returns:
    list[gevent.Greenlet]: List of greenlets for shell operations
    """

def join_shells(self, shells, timeout=None):
    """
    Wait for shell operations to complete.

    Parameters:
    - shells (list[InteractiveShell]): Shell objects to wait for
    - timeout (float, optional): Timeout in seconds

    Raises:
    - Timeout: If timeout is reached before completion
    """

Output Management

Advanced output and command management for parallel operations.

def get_last_output(self, cmds=None):
    """
    Get output from last executed commands.

    Parameters:
    - cmds (list, optional): Specific commands to get output for

    Returns:
    list[HostOutput]: Output objects from last command execution
    """

Error Handling

The parallel client provides comprehensive error handling for various failure scenarios:

from pssh.exceptions import (
    UnknownHostError, ConnectionError, AuthenticationError,
    Timeout, HostArgumentError
)

try:
    client = ParallelSSHClient(['invalid-host.example.com'])
    output = client.run_command('echo test')
    client.join()
except UnknownHostError as e:
    print(f"DNS resolution failed: {e}")
except ConnectionError as e:
    print(f"Connection failed: {e}")  
except AuthenticationError as e:
    print(f"Authentication failed: {e}")
except Timeout as e:
    print(f"Operation timed out: {e}")

Performance Considerations

  • Pool Size: Adjust pool_size parameter based on system resources and network capacity
  • Timeouts: Set appropriate timeout and read_timeout values for your network conditions
  • Connection Reuse: Keep client instances alive for multiple operations to avoid reconnection overhead
  • Memory Usage: For very large host lists, consider batching operations or using streaming output consumption

Install with Tessl CLI

npx tessl i tessl/pypi-parallel-ssh

docs

configuration.md

file-transfer.md

index.md

interactive-shells.md

output-handling.md

parallel-operations.md

single-host-operations.md

tile.json