CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-invoke

Pythonic task execution library for managing shell-oriented subprocesses and organizing executable Python code into CLI-invokable tasks

Overall
score

96%

Overview
Eval results
Files

watchers.mddocs/

Stream Watchers

Stream watchers provide interactive subprocess communication through pattern-based responders and stream processors, enabling automated responses to subprocess output and sophisticated process interaction.

Capabilities

StreamWatcher Class

Base class for watching and processing subprocess output streams.

class StreamWatcher:
    """
    Base class for subprocess stream watchers.
    
    Provides framework for monitoring subprocess output streams and
    responding to specific patterns or conditions in real-time.
    
    Attributes:
    - thread (threading.Thread): Background processing thread
    - reader (callable): Stream reading function
    - writer (callable): Stream writing function
    """
    
    def __init__(self):
        """Initialize StreamWatcher."""
    
    def submit(self, stream):
        """
        Process stream data.
        
        Called whenever new data is available from the watched stream.
        Override this method to implement custom stream processing logic.
        
        Parameters:
        - stream (str): Stream data to process
        
        Returns:
        list: List of responses to send to subprocess stdin
        """

Responder Class

Pattern-based auto-responder for interactive subprocess communication.

class Responder(StreamWatcher):
    """
    Pattern-matching auto-responder for subprocess interaction.
    
    Monitors subprocess output for specific patterns and automatically
    sends responses when patterns are detected, enabling automated
    interaction with interactive programs.
    
    Attributes:
    - pattern (str or regex): Pattern to watch for
    - response (str or callable): Response to send when pattern matches
    - sentinel (str, optional): Sentinel value for pattern matching
    """
    
    def __init__(self, pattern, response, sentinel=None):
        """
        Initialize Responder.
        
        Parameters:
        - pattern (str or regex): Pattern to match in stream output
        - response (str or callable): Response to send when pattern matches
        - sentinel (str, optional): Additional pattern matching control
        """
    
    def pattern_matches(self, stream, pattern):
        """
        Test if pattern matches stream content.
        
        Parameters:
        - stream (str): Stream content to test
        - pattern (str or regex): Pattern to match against
        
        Returns:
        bool: True if pattern matches stream content
        """
    
    def submit(self, stream):
        """
        Process stream and respond to pattern matches.
        
        Parameters:
        - stream (str): Stream data to process
        
        Returns:
        list: List containing response if pattern matched, empty otherwise
        """

FailingResponder Class

Auto-responder with failure detection capabilities.

class FailingResponder(Responder):
    """
    Responder that can detect and handle failure conditions.
    
    Extends Responder with the ability to detect failure patterns
    and handle error conditions in subprocess communication.
    
    Attributes:
    - pattern (str or regex): Pattern to watch for
    - response (str or callable): Response to send when pattern matches
    - sentinel (str, optional): Sentinel value for pattern matching
    - failure_pattern (str or regex): Pattern indicating failure
    """
    
    def __init__(self, pattern, response, sentinel=None):
        """
        Initialize FailingResponder.
        
        Parameters:
        - pattern (str or regex): Pattern to match in stream output
        - response (str or callable): Response to send when pattern matches
        - sentinel (str, optional): Additional pattern matching control
        """
    
    def submit(self, stream):
        """
        Process stream with failure detection.
        
        Parameters:
        - stream (str): Stream data to process
        
        Returns:
        list: Response list or raises exception on failure
        
        Raises:
        ResponseNotAccepted: If response is rejected or fails
        """

Terminal Utilities

Additional utilities for terminal and PTY interaction.

def pty_size():
    """
    Get current terminal size.
    
    Returns:
    tuple: (columns, rows) terminal dimensions
    """

def stdin_is_foregrounded_tty(stream):
    """
    Check if stdin is a foregrounded TTY.
    
    Parameters:
    - stream: Input stream to check
    
    Returns:
    bool: True if stream is a foregrounded TTY
    """

def character_buffered(stream):
    """
    Context manager for character-level input buffering.
    
    Parameters:
    - stream: Input stream to modify
    
    Returns:
    context manager: Character buffering context
    """

def ready_for_reading(input_):
    """
    Check if input stream has data ready for reading.
    
    Parameters:
    - input_: Input stream to check
    
    Returns:
    bool: True if data is ready for reading
    """

def bytes_to_read(input_):
    """
    Get number of bytes available to read from input stream.
    
    Parameters:
    - input_: Input stream to check
    
    Returns:
    int: Number of bytes available
    """

Usage Examples

Basic Stream Watching

from invoke import Context
from invoke.watchers import StreamWatcher

class LogWatcher(StreamWatcher):
    """Custom watcher that logs all output."""
    
    def submit(self, stream):
        print(f"[LOG] {stream.strip()}")
        return []  # No responses to send

# Use watcher with command execution
ctx = Context()
watcher = LogWatcher()
result = ctx.run("echo 'Hello World'", watchers=[watcher])
# Output: [LOG] Hello World

Pattern-based Auto-response

from invoke import Context
from invoke.watchers import Responder

# Create auto-responder for password prompts
password_responder = Responder(
    pattern=r'Password:',
    response='my_secret_password\n'
)

# Use with commands that require interaction
ctx = Context()
result = ctx.run("sudo some-command", watchers=[password_responder], pty=True)

Multiple Response Patterns

from invoke import Context
from invoke.watchers import Responder

# Multiple responders for different prompts
responders = [
    Responder(r'Username:', 'myuser\n'),
    Responder(r'Password:', 'mypass\n'),
    Responder(r'Continue\? \[y/N\]', 'y\n'),
    Responder(r'Are you sure\?', 'yes\n')
]

ctx = Context()
result = ctx.run("interactive-installer", watchers=responders, pty=True)

Conditional Responses

from invoke import Context
from invoke.watchers import Responder

def dynamic_response(stream):
    """Generate response based on stream content."""
    if 'staging' in stream:
        return 'staging_password\n'
    elif 'production' in stream:
        return 'production_password\n'
    else:
        return 'default_password\n'

# Responder with callable response
dynamic_responder = Responder(
    pattern=r'Enter password for (\w+):',
    response=dynamic_response
)

ctx = Context()
result = ctx.run("deploy-script", watchers=[dynamic_responder], pty=True)

Failure Handling

from invoke import Context
from invoke.watchers import FailingResponder
from invoke.exceptions import ResponseNotAccepted

try:
    # Responder that can detect failures
    careful_responder = FailingResponder(
        pattern=r'Password:',
        response='wrong_password\n'
    )
    
    ctx = Context()
    result = ctx.run("sudo ls", watchers=[careful_responder], pty=True)
    
except ResponseNotAccepted as e:
    print(f"Authentication failed: {e}")
    # Handle failed authentication

Custom Stream Processing

from invoke import Context
from invoke.watchers import StreamWatcher
import re

class ProgressWatcher(StreamWatcher):
    """Watch for progress indicators and update display."""
    
    def __init__(self):
        super().__init__()
        self.progress_pattern = re.compile(r'(\d+)% complete')
    
    def submit(self, stream):
        match = self.progress_pattern.search(stream)
        if match:
            progress = int(match.group(1))
            print(f"\rProgress: {progress}%", end='', flush=True)
        return []

# Use with long-running commands
ctx = Context()
watcher = ProgressWatcher()
result = ctx.run("long-running-process", watchers=[watcher])

Interactive Terminal Session

from invoke import Context
from invoke.watchers import Responder
import time

class InteractiveSession:
    """Manage interactive terminal session with automatic responses."""
    
    def __init__(self):
        self.responders = []
    
    def add_response(self, pattern, response):
        """Add pattern-response pair."""
        self.responders.append(Responder(pattern, response))
    
    def run_session(self, command):
        """Run interactive session with all responders."""
        ctx = Context()
        return ctx.run(command, watchers=self.responders, pty=True)

# Set up interactive session
session = InteractiveSession()
session.add_response(r'Name:', 'John Doe\n')
session.add_response(r'Email:', 'john@example.com\n')
session.add_response(r'Confirm \[y/N\]:', 'y\n')

# Run interactive command
result = session.run_session("interactive-setup")

Stream Filtering and Logging

from invoke import Context
from invoke.watchers import StreamWatcher
import logging

class FilteredLogger(StreamWatcher):
    """Filter and log specific stream content."""
    
    def __init__(self, logger, level=logging.INFO):
        super().__init__()
        self.logger = logger
        self.level = level
    
    def submit(self, stream):
        # Filter out sensitive information
        filtered = stream.replace('password', '***')
        
        # Log important messages
        if 'ERROR' in stream.upper():
            self.logger.error(filtered.strip())
        elif 'WARNING' in stream.upper():
            self.logger.warning(filtered.strip())
        else:
            self.logger.log(self.level, filtered.strip())
        
        return []

# Set up logging
logger = logging.getLogger('command_output')
watcher = FilteredLogger(logger)

ctx = Context()
result = ctx.run("deployment-script", watchers=[watcher])

Testing with Watchers

from invoke import MockContext
from invoke.watchers import Responder
import unittest

class TestInteractiveCommand(unittest.TestCase):
    
    def test_password_prompt(self):
        """Test automatic password response."""
        # Create mock context with expected result
        ctx = MockContext()
        ctx.set_result_for("sudo ls", Result(stdout="file1.txt\nfile2.txt\n"))
        
        # Set up responder
        responder = Responder(r'Password:', 'test_password\n')
        
        # Run command with watcher
        result = ctx.run("sudo ls", watchers=[responder])
        
        # Verify result
        self.assertEqual(result.stdout, "file1.txt\nfile2.txt\n")

Advanced Pattern Matching

from invoke import Context
from invoke.watchers import Responder
import re

# Complex regex patterns for different scenarios
patterns = [
    # SSH key fingerprint confirmation
    Responder(
        pattern=re.compile(r'Are you sure you want to continue connecting \(yes/no\)\?'),
        response='yes\n'
    ),
    
    # GPG passphrase prompt
    Responder(
        pattern=re.compile(r'Enter passphrase:'),
        response='my_gpg_passphrase\n'
    ),
    
    # Database migration confirmation
    Responder(
        pattern=re.compile(r'This will delete all data\. Continue\? \[y/N\]'),
        response='y\n'
    )
]

ctx = Context()
result = ctx.run("complex-deployment", watchers=patterns, pty=True)

Install with Tessl CLI

npx tessl i tessl/pypi-invoke

docs

cli-parsing.md

collections.md

configuration.md

index.md

subprocess-runners.md

task-execution.md

watchers.md

tile.json