0
# Stream Watchers
1
2
Stream watchers provide interactive subprocess communication through pattern-based responders and stream processors, enabling automated responses to subprocess output and sophisticated process interaction.
3
4
## Capabilities
5
6
### StreamWatcher Class
7
8
Base class for watching and processing subprocess output streams.
9
10
```python { .api }
11
class StreamWatcher:
12
"""
13
Base class for subprocess stream watchers.
14
15
Provides framework for monitoring subprocess output streams and
16
responding to specific patterns or conditions in real-time.
17
18
Attributes:
19
- thread (threading.Thread): Background processing thread
20
- reader (callable): Stream reading function
21
- writer (callable): Stream writing function
22
"""
23
24
def __init__(self):
25
"""Initialize StreamWatcher."""
26
27
def submit(self, stream):
28
"""
29
Process stream data.
30
31
Called whenever new data is available from the watched stream.
32
Override this method to implement custom stream processing logic.
33
34
Parameters:
35
- stream (str): Stream data to process
36
37
Returns:
38
list: List of responses to send to subprocess stdin
39
"""
40
```
41
42
### Responder Class
43
44
Pattern-based auto-responder for interactive subprocess communication.
45
46
```python { .api }
47
class Responder(StreamWatcher):
48
"""
49
Pattern-matching auto-responder for subprocess interaction.
50
51
Monitors subprocess output for specific patterns and automatically
52
sends responses when patterns are detected, enabling automated
53
interaction with interactive programs.
54
55
Attributes:
56
- pattern (str or regex): Pattern to watch for
57
- response (str or callable): Response to send when pattern matches
58
- sentinel (str, optional): Sentinel value for pattern matching
59
"""
60
61
def __init__(self, pattern, response, sentinel=None):
62
"""
63
Initialize Responder.
64
65
Parameters:
66
- pattern (str or regex): Pattern to match in stream output
67
- response (str or callable): Response to send when pattern matches
68
- sentinel (str, optional): Additional pattern matching control
69
"""
70
71
def pattern_matches(self, stream, pattern):
72
"""
73
Test if pattern matches stream content.
74
75
Parameters:
76
- stream (str): Stream content to test
77
- pattern (str or regex): Pattern to match against
78
79
Returns:
80
bool: True if pattern matches stream content
81
"""
82
83
def submit(self, stream):
84
"""
85
Process stream and respond to pattern matches.
86
87
Parameters:
88
- stream (str): Stream data to process
89
90
Returns:
91
list: List containing response if pattern matched, empty otherwise
92
"""
93
```
94
95
### FailingResponder Class
96
97
Auto-responder with failure detection capabilities.
98
99
```python { .api }
100
class FailingResponder(Responder):
101
"""
102
Responder that can detect and handle failure conditions.
103
104
Extends Responder with the ability to detect failure patterns
105
and handle error conditions in subprocess communication.
106
107
Attributes:
108
- pattern (str or regex): Pattern to watch for
109
- response (str or callable): Response to send when pattern matches
110
- sentinel (str, optional): Sentinel value for pattern matching
111
- failure_pattern (str or regex): Pattern indicating failure
112
"""
113
114
def __init__(self, pattern, response, sentinel=None):
115
"""
116
Initialize FailingResponder.
117
118
Parameters:
119
- pattern (str or regex): Pattern to match in stream output
120
- response (str or callable): Response to send when pattern matches
121
- sentinel (str, optional): Additional pattern matching control
122
"""
123
124
def submit(self, stream):
125
"""
126
Process stream with failure detection.
127
128
Parameters:
129
- stream (str): Stream data to process
130
131
Returns:
132
list: Response list or raises exception on failure
133
134
Raises:
135
ResponseNotAccepted: If response is rejected or fails
136
"""
137
```
138
139
### Terminal Utilities
140
141
Additional utilities for terminal and PTY interaction.
142
143
```python { .api }
144
def pty_size():
145
"""
146
Get current terminal size.
147
148
Returns:
149
tuple: (columns, rows) terminal dimensions
150
"""
151
152
def stdin_is_foregrounded_tty(stream):
153
"""
154
Check if stdin is a foregrounded TTY.
155
156
Parameters:
157
- stream: Input stream to check
158
159
Returns:
160
bool: True if stream is a foregrounded TTY
161
"""
162
163
def character_buffered(stream):
164
"""
165
Context manager for character-level input buffering.
166
167
Parameters:
168
- stream: Input stream to modify
169
170
Returns:
171
context manager: Character buffering context
172
"""
173
174
def ready_for_reading(input_):
175
"""
176
Check if input stream has data ready for reading.
177
178
Parameters:
179
- input_: Input stream to check
180
181
Returns:
182
bool: True if data is ready for reading
183
"""
184
185
def bytes_to_read(input_):
186
"""
187
Get number of bytes available to read from input stream.
188
189
Parameters:
190
- input_: Input stream to check
191
192
Returns:
193
int: Number of bytes available
194
"""
195
```
196
197
## Usage Examples
198
199
### Basic Stream Watching
200
201
```python
202
from invoke import Context
203
from invoke.watchers import StreamWatcher
204
205
class LogWatcher(StreamWatcher):
206
"""Custom watcher that logs all output."""
207
208
def submit(self, stream):
209
print(f"[LOG] {stream.strip()}")
210
return [] # No responses to send
211
212
# Use watcher with command execution
213
ctx = Context()
214
watcher = LogWatcher()
215
result = ctx.run("echo 'Hello World'", watchers=[watcher])
216
# Output: [LOG] Hello World
217
```
218
219
### Pattern-based Auto-response
220
221
```python
222
from invoke import Context
223
from invoke.watchers import Responder
224
225
# Create auto-responder for password prompts
226
password_responder = Responder(
227
pattern=r'Password:',
228
response='my_secret_password\n'
229
)
230
231
# Use with commands that require interaction
232
ctx = Context()
233
result = ctx.run("sudo some-command", watchers=[password_responder], pty=True)
234
```
235
236
### Multiple Response Patterns
237
238
```python
239
from invoke import Context
240
from invoke.watchers import Responder
241
242
# Multiple responders for different prompts
243
responders = [
244
Responder(r'Username:', 'myuser\n'),
245
Responder(r'Password:', 'mypass\n'),
246
Responder(r'Continue\? \[y/N\]', 'y\n'),
247
Responder(r'Are you sure\?', 'yes\n')
248
]
249
250
ctx = Context()
251
result = ctx.run("interactive-installer", watchers=responders, pty=True)
252
```
253
254
### Conditional Responses
255
256
```python
257
from invoke import Context
258
from invoke.watchers import Responder
259
260
def dynamic_response(stream):
261
"""Generate response based on stream content."""
262
if 'staging' in stream:
263
return 'staging_password\n'
264
elif 'production' in stream:
265
return 'production_password\n'
266
else:
267
return 'default_password\n'
268
269
# Responder with callable response
270
dynamic_responder = Responder(
271
pattern=r'Enter password for (\w+):',
272
response=dynamic_response
273
)
274
275
ctx = Context()
276
result = ctx.run("deploy-script", watchers=[dynamic_responder], pty=True)
277
```
278
279
### Failure Handling
280
281
```python
282
from invoke import Context
283
from invoke.watchers import FailingResponder
284
from invoke.exceptions import ResponseNotAccepted
285
286
try:
287
# Responder that can detect failures
288
careful_responder = FailingResponder(
289
pattern=r'Password:',
290
response='wrong_password\n'
291
)
292
293
ctx = Context()
294
result = ctx.run("sudo ls", watchers=[careful_responder], pty=True)
295
296
except ResponseNotAccepted as e:
297
print(f"Authentication failed: {e}")
298
# Handle failed authentication
299
```
300
301
### Custom Stream Processing
302
303
```python
304
from invoke import Context
305
from invoke.watchers import StreamWatcher
306
import re
307
308
class ProgressWatcher(StreamWatcher):
309
"""Watch for progress indicators and update display."""
310
311
def __init__(self):
312
super().__init__()
313
self.progress_pattern = re.compile(r'(\d+)% complete')
314
315
def submit(self, stream):
316
match = self.progress_pattern.search(stream)
317
if match:
318
progress = int(match.group(1))
319
print(f"\rProgress: {progress}%", end='', flush=True)
320
return []
321
322
# Use with long-running commands
323
ctx = Context()
324
watcher = ProgressWatcher()
325
result = ctx.run("long-running-process", watchers=[watcher])
326
```
327
328
### Interactive Terminal Session
329
330
```python
331
from invoke import Context
332
from invoke.watchers import Responder
333
import time
334
335
class InteractiveSession:
336
"""Manage interactive terminal session with automatic responses."""
337
338
def __init__(self):
339
self.responders = []
340
341
def add_response(self, pattern, response):
342
"""Add pattern-response pair."""
343
self.responders.append(Responder(pattern, response))
344
345
def run_session(self, command):
346
"""Run interactive session with all responders."""
347
ctx = Context()
348
return ctx.run(command, watchers=self.responders, pty=True)
349
350
# Set up interactive session
351
session = InteractiveSession()
352
session.add_response(r'Name:', 'John Doe\n')
353
session.add_response(r'Email:', 'john@example.com\n')
354
session.add_response(r'Confirm \[y/N\]:', 'y\n')
355
356
# Run interactive command
357
result = session.run_session("interactive-setup")
358
```
359
360
### Stream Filtering and Logging
361
362
```python
363
from invoke import Context
364
from invoke.watchers import StreamWatcher
365
import logging
366
367
class FilteredLogger(StreamWatcher):
368
"""Filter and log specific stream content."""
369
370
def __init__(self, logger, level=logging.INFO):
371
super().__init__()
372
self.logger = logger
373
self.level = level
374
375
def submit(self, stream):
376
# Filter out sensitive information
377
filtered = stream.replace('password', '***')
378
379
# Log important messages
380
if 'ERROR' in stream.upper():
381
self.logger.error(filtered.strip())
382
elif 'WARNING' in stream.upper():
383
self.logger.warning(filtered.strip())
384
else:
385
self.logger.log(self.level, filtered.strip())
386
387
return []
388
389
# Set up logging
390
logger = logging.getLogger('command_output')
391
watcher = FilteredLogger(logger)
392
393
ctx = Context()
394
result = ctx.run("deployment-script", watchers=[watcher])
395
```
396
397
### Testing with Watchers
398
399
```python
400
from invoke import MockContext
401
from invoke.watchers import Responder
402
import unittest
403
404
class TestInteractiveCommand(unittest.TestCase):
405
406
def test_password_prompt(self):
407
"""Test automatic password response."""
408
# Create mock context with expected result
409
ctx = MockContext()
410
ctx.set_result_for("sudo ls", Result(stdout="file1.txt\nfile2.txt\n"))
411
412
# Set up responder
413
responder = Responder(r'Password:', 'test_password\n')
414
415
# Run command with watcher
416
result = ctx.run("sudo ls", watchers=[responder])
417
418
# Verify result
419
self.assertEqual(result.stdout, "file1.txt\nfile2.txt\n")
420
```
421
422
### Advanced Pattern Matching
423
424
```python
425
from invoke import Context
426
from invoke.watchers import Responder
427
import re
428
429
# Complex regex patterns for different scenarios
430
patterns = [
431
# SSH key fingerprint confirmation
432
Responder(
433
pattern=re.compile(r'Are you sure you want to continue connecting \(yes/no\)\?'),
434
response='yes\n'
435
),
436
437
# GPG passphrase prompt
438
Responder(
439
pattern=re.compile(r'Enter passphrase:'),
440
response='my_gpg_passphrase\n'
441
),
442
443
# Database migration confirmation
444
Responder(
445
pattern=re.compile(r'This will delete all data\. Continue\? \[y/N\]'),
446
response='y\n'
447
)
448
]
449
450
ctx = Context()
451
result = ctx.run("complex-deployment", watchers=patterns, pty=True)
452
```