Pythonic task execution library for managing shell-oriented subprocesses and organizing executable Python code into CLI-invokable tasks
Overall
score
96%
Stream watchers provide interactive subprocess communication through pattern-based responders and stream processors, enabling automated responses to subprocess output and sophisticated process interaction.
Base class for watching and processing subprocess output streams.
class StreamWatcher:
"""
Base class for subprocess stream watchers.
Provides framework for monitoring subprocess output streams and
responding to specific patterns or conditions in real-time.
Attributes:
- thread (threading.Thread): Background processing thread
- reader (callable): Stream reading function
- writer (callable): Stream writing function
"""
def __init__(self):
"""Initialize StreamWatcher."""
def submit(self, stream):
"""
Process stream data.
Called whenever new data is available from the watched stream.
Override this method to implement custom stream processing logic.
Parameters:
- stream (str): Stream data to process
Returns:
list: List of responses to send to subprocess stdin
"""Pattern-based auto-responder for interactive subprocess communication.
class Responder(StreamWatcher):
"""
Pattern-matching auto-responder for subprocess interaction.
Monitors subprocess output for specific patterns and automatically
sends responses when patterns are detected, enabling automated
interaction with interactive programs.
Attributes:
- pattern (str or regex): Pattern to watch for
- response (str or callable): Response to send when pattern matches
- sentinel (str, optional): Sentinel value for pattern matching
"""
def __init__(self, pattern, response, sentinel=None):
"""
Initialize Responder.
Parameters:
- pattern (str or regex): Pattern to match in stream output
- response (str or callable): Response to send when pattern matches
- sentinel (str, optional): Additional pattern matching control
"""
def pattern_matches(self, stream, pattern):
"""
Test if pattern matches stream content.
Parameters:
- stream (str): Stream content to test
- pattern (str or regex): Pattern to match against
Returns:
bool: True if pattern matches stream content
"""
def submit(self, stream):
"""
Process stream and respond to pattern matches.
Parameters:
- stream (str): Stream data to process
Returns:
list: List containing response if pattern matched, empty otherwise
"""Auto-responder with failure detection capabilities.
class FailingResponder(Responder):
"""
Responder that can detect and handle failure conditions.
Extends Responder with the ability to detect failure patterns
and handle error conditions in subprocess communication.
Attributes:
- pattern (str or regex): Pattern to watch for
- response (str or callable): Response to send when pattern matches
- sentinel (str, optional): Sentinel value for pattern matching
- failure_pattern (str or regex): Pattern indicating failure
"""
def __init__(self, pattern, response, sentinel=None):
"""
Initialize FailingResponder.
Parameters:
- pattern (str or regex): Pattern to match in stream output
- response (str or callable): Response to send when pattern matches
- sentinel (str, optional): Additional pattern matching control
"""
def submit(self, stream):
"""
Process stream with failure detection.
Parameters:
- stream (str): Stream data to process
Returns:
list: Response list or raises exception on failure
Raises:
ResponseNotAccepted: If response is rejected or fails
"""Additional utilities for terminal and PTY interaction.
def pty_size():
"""
Get current terminal size.
Returns:
tuple: (columns, rows) terminal dimensions
"""
def stdin_is_foregrounded_tty(stream):
"""
Check if stdin is a foregrounded TTY.
Parameters:
- stream: Input stream to check
Returns:
bool: True if stream is a foregrounded TTY
"""
def character_buffered(stream):
"""
Context manager for character-level input buffering.
Parameters:
- stream: Input stream to modify
Returns:
context manager: Character buffering context
"""
def ready_for_reading(input_):
"""
Check if input stream has data ready for reading.
Parameters:
- input_: Input stream to check
Returns:
bool: True if data is ready for reading
"""
def bytes_to_read(input_):
"""
Get number of bytes available to read from input stream.
Parameters:
- input_: Input stream to check
Returns:
int: Number of bytes available
"""from invoke import Context
from invoke.watchers import StreamWatcher
class LogWatcher(StreamWatcher):
"""Custom watcher that logs all output."""
def submit(self, stream):
print(f"[LOG] {stream.strip()}")
return [] # No responses to send
# Use watcher with command execution
ctx = Context()
watcher = LogWatcher()
result = ctx.run("echo 'Hello World'", watchers=[watcher])
# Output: [LOG] Hello Worldfrom invoke import Context
from invoke.watchers import Responder
# Create auto-responder for password prompts
password_responder = Responder(
pattern=r'Password:',
response='my_secret_password\n'
)
# Use with commands that require interaction
ctx = Context()
result = ctx.run("sudo some-command", watchers=[password_responder], pty=True)from invoke import Context
from invoke.watchers import Responder
# Multiple responders for different prompts
responders = [
Responder(r'Username:', 'myuser\n'),
Responder(r'Password:', 'mypass\n'),
Responder(r'Continue\? \[y/N\]', 'y\n'),
Responder(r'Are you sure\?', 'yes\n')
]
ctx = Context()
result = ctx.run("interactive-installer", watchers=responders, pty=True)from invoke import Context
from invoke.watchers import Responder
def dynamic_response(stream):
"""Generate response based on stream content."""
if 'staging' in stream:
return 'staging_password\n'
elif 'production' in stream:
return 'production_password\n'
else:
return 'default_password\n'
# Responder with callable response
dynamic_responder = Responder(
pattern=r'Enter password for (\w+):',
response=dynamic_response
)
ctx = Context()
result = ctx.run("deploy-script", watchers=[dynamic_responder], pty=True)from invoke import Context
from invoke.watchers import FailingResponder
from invoke.exceptions import ResponseNotAccepted
try:
# Responder that can detect failures
careful_responder = FailingResponder(
pattern=r'Password:',
response='wrong_password\n'
)
ctx = Context()
result = ctx.run("sudo ls", watchers=[careful_responder], pty=True)
except ResponseNotAccepted as e:
print(f"Authentication failed: {e}")
# Handle failed authenticationfrom invoke import Context
from invoke.watchers import StreamWatcher
import re
class ProgressWatcher(StreamWatcher):
"""Watch for progress indicators and update display."""
def __init__(self):
super().__init__()
self.progress_pattern = re.compile(r'(\d+)% complete')
def submit(self, stream):
match = self.progress_pattern.search(stream)
if match:
progress = int(match.group(1))
print(f"\rProgress: {progress}%", end='', flush=True)
return []
# Use with long-running commands
ctx = Context()
watcher = ProgressWatcher()
result = ctx.run("long-running-process", watchers=[watcher])from invoke import Context
from invoke.watchers import Responder
import time
class InteractiveSession:
"""Manage interactive terminal session with automatic responses."""
def __init__(self):
self.responders = []
def add_response(self, pattern, response):
"""Add pattern-response pair."""
self.responders.append(Responder(pattern, response))
def run_session(self, command):
"""Run interactive session with all responders."""
ctx = Context()
return ctx.run(command, watchers=self.responders, pty=True)
# Set up interactive session
session = InteractiveSession()
session.add_response(r'Name:', 'John Doe\n')
session.add_response(r'Email:', 'john@example.com\n')
session.add_response(r'Confirm \[y/N\]:', 'y\n')
# Run interactive command
result = session.run_session("interactive-setup")from invoke import Context
from invoke.watchers import StreamWatcher
import logging
class FilteredLogger(StreamWatcher):
"""Filter and log specific stream content."""
def __init__(self, logger, level=logging.INFO):
super().__init__()
self.logger = logger
self.level = level
def submit(self, stream):
# Filter out sensitive information
filtered = stream.replace('password', '***')
# Log important messages
if 'ERROR' in stream.upper():
self.logger.error(filtered.strip())
elif 'WARNING' in stream.upper():
self.logger.warning(filtered.strip())
else:
self.logger.log(self.level, filtered.strip())
return []
# Set up logging
logger = logging.getLogger('command_output')
watcher = FilteredLogger(logger)
ctx = Context()
result = ctx.run("deployment-script", watchers=[watcher])from invoke import MockContext
from invoke.watchers import Responder
import unittest
class TestInteractiveCommand(unittest.TestCase):
def test_password_prompt(self):
"""Test automatic password response."""
# Create mock context with expected result
ctx = MockContext()
ctx.set_result_for("sudo ls", Result(stdout="file1.txt\nfile2.txt\n"))
# Set up responder
responder = Responder(r'Password:', 'test_password\n')
# Run command with watcher
result = ctx.run("sudo ls", watchers=[responder])
# Verify result
self.assertEqual(result.stdout, "file1.txt\nfile2.txt\n")from invoke import Context
from invoke.watchers import Responder
import re
# Complex regex patterns for different scenarios
patterns = [
# SSH key fingerprint confirmation
Responder(
pattern=re.compile(r'Are you sure you want to continue connecting \(yes/no\)\?'),
response='yes\n'
),
# GPG passphrase prompt
Responder(
pattern=re.compile(r'Enter passphrase:'),
response='my_gpg_passphrase\n'
),
# Database migration confirmation
Responder(
pattern=re.compile(r'This will delete all data\. Continue\? \[y/N\]'),
response='y\n'
)
]
ctx = Context()
result = ctx.run("complex-deployment", watchers=patterns, pty=True)Install with Tessl CLI
npx tessl i tessl/pypi-invokedocs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10