CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-parallel-ssh

Asynchronous parallel SSH client library that enables developers to execute SSH commands across many servers simultaneously with minimal system load on the client host.

Pending
Overview
Eval results
Files

interactive-shells.mddocs/

Interactive Shell Sessions

Open and manage interactive shell sessions on remote hosts for complex multi-command workflows and real-time interaction. Enables stateful command execution where commands can build upon previous commands' results.

Capabilities

Opening Interactive Shells

Create interactive shell sessions that maintain state between command executions, useful for complex workflows that require environment setup or multi-step operations.

def open_shell(self, encoding='utf-8', read_timeout=None):
    """
    Open interactive shell sessions on hosts.

    Parameters:
    - encoding (str, optional): Shell output encoding (default: 'utf-8')
    - read_timeout (float, optional): Timeout for reading shell output

    Returns:
    list[InteractiveShell]: List of shell objects (ParallelSSHClient)
    InteractiveShell: Single shell object (SSHClient)
    """

Usage examples:

from pssh.clients import ParallelSSHClient, SSHClient

# Parallel interactive shells
hosts = ['server1.example.com', 'server2.example.com']
client = ParallelSSHClient(hosts, user='admin')

shells = client.open_shell()
print(f"Opened {len(shells)} interactive shells")

# Single host interactive shell
single_client = SSHClient('server.example.com', user='admin')
shell = single_client.open_shell()
print("Opened interactive shell")

InteractiveShell Class

Manage individual interactive shell sessions with stateful command execution and output access.

class InteractiveShell:
    def run(self, cmd):
        """
        Execute command in the interactive shell.

        Parameters:
        - cmd (str): Command to execute in shell

        Note: Command is executed in the shell's current context,
        maintaining environment variables, working directory, etc.
        """

    def close(self):
        """
        Close the interactive shell and wait for completion.
        
        This should be called when finished with the shell to
        properly clean up resources and get final exit code.
        """

    def __enter__(self):
        """
        Context manager entry - returns self.
        
        Returns:
        InteractiveShell: Self for use in with statement
        """
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        """
        Context manager exit - automatically closes the shell.
        
        Parameters:
        - exc_type: Exception type if any
        - exc_val: Exception value if any  
        - exc_tb: Exception traceback if any
        """

    # Properties
    stdout: object      # Generator yielding stdout lines
    stderr: object      # Generator yielding stderr lines  
    stdin: object       # Stdin stream for shell input
    exit_code: int      # Shell exit code (available after close())
    output: HostOutput  # Complete output object

Stdin Class

Manage stdin input for interactive shells, allowing you to send data to running commands.

class Stdin:
    def write(self, data):
        """
        Write data to stdin.

        Parameters:
        - data (str or bytes): Data to write to stdin
        
        Note: Data should end with newline if intended as command input
        """

    def flush(self):
        """
        Flush any pending stdin data.
        
        Ensures all written data is sent to the remote shell immediately.
        """

Stateful Command Execution

Execute multiple related commands in the same shell context, maintaining environment variables, working directory, and other shell state.

# Single host interactive workflow
client = SSHClient('server.example.com', user='admin')
shell = client.open_shell()

# Set up environment (state persists)
shell.run('export APP_ENV=production')
shell.run('cd /opt/myapp')
shell.run('source venv/bin/activate')

# Execute commands in prepared environment
shell.run('python manage.py migrate')
shell.run('python manage.py collectstatic --noinput')
shell.run('systemctl restart myapp')

# Read output from all commands
for line in shell.stdout:
    print(line)

# Close shell and get exit code
shell.close()
print(f"Shell exit code: {shell.exit_code}")

Interactive Input with Stdin

Send input to interactive commands using the stdin stream for commands that require user input.

# Interactive command requiring input
client = SSHClient('server.example.com', user='admin')
shell = client.open_shell()

# Start interactive command
shell.run('python -c "name = input(\'Enter name: \'); print(f\'Hello {name}\')"')

# Send input via stdin
shell.stdin.write('Alice\n')
shell.stdin.flush()

# Read response
for line in shell.stdout:
    print(line)  # Output: Hello Alice

shell.close()

# Multi-step interactive session
shell = client.open_shell()

# Start database client
shell.run('psql -U postgres -d mydb')

# Send SQL commands via stdin
shell.stdin.write('SELECT COUNT(*) FROM users;\n')
shell.stdin.flush()

shell.stdin.write('\\q\n')  # Quit database client
shell.stdin.flush()

# Process output
for line in shell.stdout:
    print(line)

shell.close()

Parallel Interactive Workflows

Execute complex workflows across multiple hosts simultaneously while maintaining shell state on each host.

# Parallel deployment workflow
hosts = ['web1.example.com', 'web2.example.com', 'web3.example.com']
client = ParallelSSHClient(hosts, user='deploy')

shells = client.open_shell()

# Execute deployment steps on all hosts
deployment_commands = [
    'cd /opt/webapp',
    'git fetch origin',
    'git checkout v2.1.0',
    'source venv/bin/activate',
    'pip install -r requirements.txt',
    'python manage.py migrate',
    'systemctl restart webapp',
    'systemctl status webapp'
]

# Run each command on all shells
for cmd in deployment_commands:
    print(f"Executing: {cmd}")
    for shell in shells:
        shell.run(cmd)
    
    # Wait a moment between commands
    import time
    time.sleep(2)

# Collect output from all hosts
for i, shell in enumerate(shells):
    print(f"\n--- Output from {hosts[i]} ---")
    for line in shell.stdout:
        print(line)
    
    shell.close()
    print(f"Shell exit code: {shell.exit_code}")

Error Handling in Interactive Shells

Handle errors and monitor command execution in interactive shell sessions.

from pssh.exceptions import ShellError

try:
    client = SSHClient('server.example.com', user='admin')
    shell = client.open_shell()
    
    # Execute potentially failing commands
    shell.run('command_that_might_fail')
    shell.run('echo "Command completed"')
    
    # Check output for errors
    output_lines = []
    for line in shell.stdout:
        output_lines.append(line)
        if 'ERROR' in line or 'FAILED' in line:
            print(f"Error detected: {line}")
    
    shell.close()
    
    if shell.exit_code != 0:
        print(f"Shell ended with non-zero exit code: {shell.exit_code}")
        
except ShellError as e:
    print(f"Shell error: {e}")

Interactive Shells with Custom Encoding

Handle shells with different character encodings or binary output.

# Shell with custom encoding
shell = client.open_shell(encoding='latin1')

# Commands that might produce non-UTF8 output
shell.run('cat /etc/passwd')
shell.run('ls -la /var/log')

for line in shell.stdout:
    print(line)  # Properly decoded using latin1

shell.close()

# Shell with binary/raw handling
shell = client.open_shell(encoding=None)  # Raw bytes mode

shell.run('cat /bin/ls | head -n 1')  # Binary output
for line in shell.stdout:
    print(repr(line))  # Show raw bytes

shell.close()

Advanced Interactive Shell Patterns

Long-Running Interactive Sessions

Manage long-running interactive processes and real-time output monitoring.

import time
from threading import Thread

def monitor_shell_output(shell, host):
    """Monitor shell output in real-time"""
    try:
        for line in shell.stdout:
            print(f"[{host}] {line}")
    except:
        pass

# Start long-running process with real-time monitoring
client = SSHClient('server.example.com', user='admin')
shell = client.open_shell()

# Start background monitoring
monitor_thread = Thread(target=monitor_shell_output, args=(shell, 'server'))
monitor_thread.daemon = True
monitor_thread.start()

# Start long-running process
shell.run('tail -f /var/log/application.log')

# Let it run for a while
time.sleep(60)

# Stop the process and close shell
shell.run('pkill tail')  # Stop tail command
shell.close()

Multi-Stage Deployments

Execute complex deployment workflows with error checking and rollback capabilities.

def safe_deployment_workflow(shell, host):
    """Execute deployment with rollback on failure"""
    
    commands = [
        ('echo "Starting deployment"', True),
        ('cd /opt/webapp', True),
        ('cp -r current backup_$(date +%Y%m%d_%H%M%S)', True),
        ('git fetch origin', True),
        ('git checkout v2.0.0', True),
        ('source venv/bin/activate', True),
        ('pip install -r requirements.txt', True),
        ('python manage.py migrate', True),
        ('python manage.py test', False),  # Allow this to fail
        ('systemctl restart webapp', True),
        ('sleep 5', True),
        ('curl -f http://localhost:8000/health', True),
        ('echo "Deployment successful"', True)
    ]
    
    for cmd, critical in commands:
        print(f"[{host}] Executing: {cmd}")
        shell.run(cmd)
        shell.run('echo "EXIT_CODE:$?"')  # Capture exit code
        
        # Check last command's exit code
        last_output = []
        for line in shell.stdout:
            last_output.append(line)
            if line.startswith('EXIT_CODE:'):
                exit_code = int(line.split(':')[1])
                if exit_code != 0 and critical:
                    print(f"[{host}] Critical command failed: {cmd}")
                    # Rollback
                    shell.run('systemctl stop webapp')
                    shell.run('git checkout HEAD~1')
                    shell.run('systemctl start webapp')
                    return False
                elif exit_code != 0:
                    print(f"[{host}] Non-critical command failed: {cmd}")
                break
    
    return True

# Execute safe deployment on multiple hosts
hosts = ['web1.example.com', 'web2.example.com']
client = ParallelSSHClient(hosts, user='deploy')
shells = client.open_shell()

results = []
for i, shell in enumerate(shells):
    success = safe_deployment_workflow(shell, hosts[i])
    results.append((hosts[i], success))
    shell.close()

# Report results
for host, success in results:
    status = "SUCCESS" if success else "FAILED"
    print(f"Deployment on {host}: {status}")

Database Maintenance Workflows

Execute database maintenance tasks that require multiple sequential commands.

def database_maintenance(shell, host):
    """Execute database maintenance workflow"""
    
    # Set up database environment
    shell.run('export PGPASSWORD=secretpassword')
    shell.run('export PGUSER=dbadmin')
    shell.run('export PGHOST=localhost')
    shell.run('export PGDATABASE=production')
    
    # Pre-maintenance checks
    shell.run('echo "=== Database Status ==="')
    shell.run('psql -c "SELECT version();"')
    shell.run('psql -c "SELECT pg_size_pretty(pg_database_size(current_database()));"')
    
    # Create backup
    shell.run('echo "=== Creating Backup ==="')
    shell.run('pg_dump production > /backup/production_$(date +%Y%m%d_%H%M%S).sql')
    
    # Maintenance operations
    shell.run('echo "=== Running Maintenance ==="')
    shell.run('psql -c "VACUUM ANALYZE;"')
    shell.run('psql -c "REINDEX DATABASE production;"')
    
    # Post-maintenance verification
    shell.run('echo "=== Verification ==="')
    shell.run('psql -c "SELECT schemaname,tablename,n_tup_ins,n_tup_upd,n_tup_del FROM pg_stat_user_tables WHERE n_tup_ins+n_tup_upd+n_tup_del > 0;"')
    
    # Collect all output
    output_lines = []
    for line in shell.stdout:
        output_lines.append(line)
    
    return output_lines

# Run maintenance on database servers
db_hosts = ['db1.example.com', 'db2.example.com']
client = ParallelSSHClient(db_hosts, user='postgres')
shells = client.open_shell()

for i, shell in enumerate(shells):
    print(f"\n=== Maintenance on {db_hosts[i]} ===")
    output = database_maintenance(shell, db_hosts[i])
    
    for line in output:
        print(f"[{db_hosts[i]}] {line}")
    
    shell.close()
    print(f"[{db_hosts[i]}] Shell exit code: {shell.exit_code}")

Best Practices

Resource Management

# Always close shells to free resources
try:
    shell = client.open_shell()
    shell.run('some commands')
    # Process output...
finally:
    shell.close()

# Use context manager for automatic cleanup (recommended)
with client.open_shell() as shell:
    shell.run('export ENV=production')
    shell.run('cd /opt/app')
    shell.run('python manage.py status')
    
    # Process output
    for line in shell.stdout:
        print(line)
# Shell automatically closed when exiting with block

# Or use context-like pattern for functions
def with_shell(client, commands):
    shell = client.open_shell()
    try:
        for cmd in commands:
            shell.run(cmd)
        return list(shell.stdout)
    finally:
        shell.close()

Output Processing

# Process output incrementally for long-running commands
shell = client.open_shell()
shell.run('long_running_command')

processed_lines = 0
for line in shell.stdout:
    print(f"Line {processed_lines}: {line}")
    processed_lines += 1
    
    # Process in chunks
    if processed_lines % 100 == 0:
        print(f"Processed {processed_lines} lines so far...")

shell.close()

Error Detection

# Monitor for specific error patterns
shell = client.open_shell()
shell.run('risky_operation')

error_patterns = ['ERROR', 'FAILED', 'Exception', 'Traceback']
errors_found = []

for line in shell.stdout:
    for pattern in error_patterns:
        if pattern in line:
            errors_found.append(line)
    print(line)

shell.close()

if errors_found:
    print("Errors detected:")
    for error in errors_found:
        print(f"  {error}")

Install with Tessl CLI

npx tessl i tessl/pypi-parallel-ssh

docs

configuration.md

file-transfer.md

index.md

interactive-shells.md

output-handling.md

parallel-operations.md

single-host-operations.md

tile.json