or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cli-parsing.mdcollections.mdconfiguration.mdindex.mdsubprocess-runners.mdtask-execution.mdwatchers.md

watchers.mddocs/

0

# Stream Watchers

1

2

Stream watchers provide interactive subprocess communication through pattern-based responders and stream processors, enabling automated responses to subprocess output and sophisticated process interaction.

3

4

## Capabilities

5

6

### StreamWatcher Class

7

8

Base class for watching and processing subprocess output streams.

9

10

```python { .api }

11

class StreamWatcher:

12

"""

13

Base class for subprocess stream watchers.

14

15

Provides framework for monitoring subprocess output streams and

16

responding to specific patterns or conditions in real-time.

17

18

Attributes:

19

- thread (threading.Thread): Background processing thread

20

- reader (callable): Stream reading function

21

- writer (callable): Stream writing function

22

"""

23

24

def __init__(self):

25

"""Initialize StreamWatcher."""

26

27

def submit(self, stream):

28

"""

29

Process stream data.

30

31

Called whenever new data is available from the watched stream.

32

Override this method to implement custom stream processing logic.

33

34

Parameters:

35

- stream (str): Stream data to process

36

37

Returns:

38

list: List of responses to send to subprocess stdin

39

"""

40

```

41

42

### Responder Class

43

44

Pattern-based auto-responder for interactive subprocess communication.

45

46

```python { .api }

47

class Responder(StreamWatcher):

48

"""

49

Pattern-matching auto-responder for subprocess interaction.

50

51

Monitors subprocess output for specific patterns and automatically

52

sends responses when patterns are detected, enabling automated

53

interaction with interactive programs.

54

55

Attributes:

56

- pattern (str or regex): Pattern to watch for

57

- response (str or callable): Response to send when pattern matches

58

- sentinel (str, optional): Sentinel value for pattern matching

59

"""

60

61

def __init__(self, pattern, response, sentinel=None):

62

"""

63

Initialize Responder.

64

65

Parameters:

66

- pattern (str or regex): Pattern to match in stream output

67

- response (str or callable): Response to send when pattern matches

68

- sentinel (str, optional): Additional pattern matching control

69

"""

70

71

def pattern_matches(self, stream, pattern):

72

"""

73

Test if pattern matches stream content.

74

75

Parameters:

76

- stream (str): Stream content to test

77

- pattern (str or regex): Pattern to match against

78

79

Returns:

80

bool: True if pattern matches stream content

81

"""

82

83

def submit(self, stream):

84

"""

85

Process stream and respond to pattern matches.

86

87

Parameters:

88

- stream (str): Stream data to process

89

90

Returns:

91

list: List containing response if pattern matched, empty otherwise

92

"""

93

```

94

95

### FailingResponder Class

96

97

Auto-responder with failure detection capabilities.

98

99

```python { .api }

100

class FailingResponder(Responder):

101

"""

102

Responder that can detect and handle failure conditions.

103

104

Extends Responder with the ability to detect failure patterns

105

and handle error conditions in subprocess communication.

106

107

Attributes:

108

- pattern (str or regex): Pattern to watch for

109

- response (str or callable): Response to send when pattern matches

110

- sentinel (str, optional): Sentinel value for pattern matching

111

- failure_pattern (str or regex): Pattern indicating failure

112

"""

113

114

def __init__(self, pattern, response, sentinel=None):

115

"""

116

Initialize FailingResponder.

117

118

Parameters:

119

- pattern (str or regex): Pattern to match in stream output

120

- response (str or callable): Response to send when pattern matches

121

- sentinel (str, optional): Additional pattern matching control

122

"""

123

124

def submit(self, stream):

125

"""

126

Process stream with failure detection.

127

128

Parameters:

129

- stream (str): Stream data to process

130

131

Returns:

132

list: Response list or raises exception on failure

133

134

Raises:

135

ResponseNotAccepted: If response is rejected or fails

136

"""

137

```

138

139

### Terminal Utilities

140

141

Additional utilities for terminal and PTY interaction.

142

143

```python { .api }

144

def pty_size():

145

"""

146

Get current terminal size.

147

148

Returns:

149

tuple: (columns, rows) terminal dimensions

150

"""

151

152

def stdin_is_foregrounded_tty(stream):

153

"""

154

Check if stdin is a foregrounded TTY.

155

156

Parameters:

157

- stream: Input stream to check

158

159

Returns:

160

bool: True if stream is a foregrounded TTY

161

"""

162

163

def character_buffered(stream):

164

"""

165

Context manager for character-level input buffering.

166

167

Parameters:

168

- stream: Input stream to modify

169

170

Returns:

171

context manager: Character buffering context

172

"""

173

174

def ready_for_reading(input_):

175

"""

176

Check if input stream has data ready for reading.

177

178

Parameters:

179

- input_: Input stream to check

180

181

Returns:

182

bool: True if data is ready for reading

183

"""

184

185

def bytes_to_read(input_):

186

"""

187

Get number of bytes available to read from input stream.

188

189

Parameters:

190

- input_: Input stream to check

191

192

Returns:

193

int: Number of bytes available

194

"""

195

```

196

197

## Usage Examples

198

199

### Basic Stream Watching

200

201

```python

202

from invoke import Context

203

from invoke.watchers import StreamWatcher

204

205

class LogWatcher(StreamWatcher):

206

"""Custom watcher that logs all output."""

207

208

def submit(self, stream):

209

print(f"[LOG] {stream.strip()}")

210

return [] # No responses to send

211

212

# Use watcher with command execution

213

ctx = Context()

214

watcher = LogWatcher()

215

result = ctx.run("echo 'Hello World'", watchers=[watcher])

216

# Output: [LOG] Hello World

217

```

218

219

### Pattern-based Auto-response

220

221

```python

222

from invoke import Context

223

from invoke.watchers import Responder

224

225

# Create auto-responder for password prompts

226

password_responder = Responder(

227

pattern=r'Password:',

228

response='my_secret_password\n'

229

)

230

231

# Use with commands that require interaction

232

ctx = Context()

233

result = ctx.run("sudo some-command", watchers=[password_responder], pty=True)

234

```

235

236

### Multiple Response Patterns

237

238

```python

239

from invoke import Context

240

from invoke.watchers import Responder

241

242

# Multiple responders for different prompts

243

responders = [

244

Responder(r'Username:', 'myuser\n'),

245

Responder(r'Password:', 'mypass\n'),

246

Responder(r'Continue\? \[y/N\]', 'y\n'),

247

Responder(r'Are you sure\?', 'yes\n')

248

]

249

250

ctx = Context()

251

result = ctx.run("interactive-installer", watchers=responders, pty=True)

252

```

253

254

### Conditional Responses

255

256

```python

257

from invoke import Context

258

from invoke.watchers import Responder

259

260

def dynamic_response(stream):

261

"""Generate response based on stream content."""

262

if 'staging' in stream:

263

return 'staging_password\n'

264

elif 'production' in stream:

265

return 'production_password\n'

266

else:

267

return 'default_password\n'

268

269

# Responder with callable response

270

dynamic_responder = Responder(

271

pattern=r'Enter password for (\w+):',

272

response=dynamic_response

273

)

274

275

ctx = Context()

276

result = ctx.run("deploy-script", watchers=[dynamic_responder], pty=True)

277

```

278

279

### Failure Handling

280

281

```python

282

from invoke import Context

283

from invoke.watchers import FailingResponder

284

from invoke.exceptions import ResponseNotAccepted

285

286

try:

287

# Responder that can detect failures

288

careful_responder = FailingResponder(

289

pattern=r'Password:',

290

response='wrong_password\n'

291

)

292

293

ctx = Context()

294

result = ctx.run("sudo ls", watchers=[careful_responder], pty=True)

295

296

except ResponseNotAccepted as e:

297

print(f"Authentication failed: {e}")

298

# Handle failed authentication

299

```

300

301

### Custom Stream Processing

302

303

```python

304

from invoke import Context

305

from invoke.watchers import StreamWatcher

306

import re

307

308

class ProgressWatcher(StreamWatcher):

309

"""Watch for progress indicators and update display."""

310

311

def __init__(self):

312

super().__init__()

313

self.progress_pattern = re.compile(r'(\d+)% complete')

314

315

def submit(self, stream):

316

match = self.progress_pattern.search(stream)

317

if match:

318

progress = int(match.group(1))

319

print(f"\rProgress: {progress}%", end='', flush=True)

320

return []

321

322

# Use with long-running commands

323

ctx = Context()

324

watcher = ProgressWatcher()

325

result = ctx.run("long-running-process", watchers=[watcher])

326

```

327

328

### Interactive Terminal Session

329

330

```python

331

from invoke import Context

332

from invoke.watchers import Responder

333

import time

334

335

class InteractiveSession:

336

"""Manage interactive terminal session with automatic responses."""

337

338

def __init__(self):

339

self.responders = []

340

341

def add_response(self, pattern, response):

342

"""Add pattern-response pair."""

343

self.responders.append(Responder(pattern, response))

344

345

def run_session(self, command):

346

"""Run interactive session with all responders."""

347

ctx = Context()

348

return ctx.run(command, watchers=self.responders, pty=True)

349

350

# Set up interactive session

351

session = InteractiveSession()

352

session.add_response(r'Name:', 'John Doe\n')

353

session.add_response(r'Email:', 'john@example.com\n')

354

session.add_response(r'Confirm \[y/N\]:', 'y\n')

355

356

# Run interactive command

357

result = session.run_session("interactive-setup")

358

```

359

360

### Stream Filtering and Logging

361

362

```python

363

from invoke import Context

364

from invoke.watchers import StreamWatcher

365

import logging

366

367

class FilteredLogger(StreamWatcher):

368

"""Filter and log specific stream content."""

369

370

def __init__(self, logger, level=logging.INFO):

371

super().__init__()

372

self.logger = logger

373

self.level = level

374

375

def submit(self, stream):

376

# Filter out sensitive information

377

filtered = stream.replace('password', '***')

378

379

# Log important messages

380

if 'ERROR' in stream.upper():

381

self.logger.error(filtered.strip())

382

elif 'WARNING' in stream.upper():

383

self.logger.warning(filtered.strip())

384

else:

385

self.logger.log(self.level, filtered.strip())

386

387

return []

388

389

# Set up logging

390

logger = logging.getLogger('command_output')

391

watcher = FilteredLogger(logger)

392

393

ctx = Context()

394

result = ctx.run("deployment-script", watchers=[watcher])

395

```

396

397

### Testing with Watchers

398

399

```python

400

from invoke import MockContext

401

from invoke.watchers import Responder

402

import unittest

403

404

class TestInteractiveCommand(unittest.TestCase):

405

406

def test_password_prompt(self):

407

"""Test automatic password response."""

408

# Create mock context with expected result

409

ctx = MockContext()

410

ctx.set_result_for("sudo ls", Result(stdout="file1.txt\nfile2.txt\n"))

411

412

# Set up responder

413

responder = Responder(r'Password:', 'test_password\n')

414

415

# Run command with watcher

416

result = ctx.run("sudo ls", watchers=[responder])

417

418

# Verify result

419

self.assertEqual(result.stdout, "file1.txt\nfile2.txt\n")

420

```

421

422

### Advanced Pattern Matching

423

424

```python

425

from invoke import Context

426

from invoke.watchers import Responder

427

import re

428

429

# Complex regex patterns for different scenarios

430

patterns = [

431

# SSH key fingerprint confirmation

432

Responder(

433

pattern=re.compile(r'Are you sure you want to continue connecting \(yes/no\)\?'),

434

response='yes\n'

435

),

436

437

# GPG passphrase prompt

438

Responder(

439

pattern=re.compile(r'Enter passphrase:'),

440

response='my_gpg_passphrase\n'

441

),

442

443

# Database migration confirmation

444

Responder(

445

pattern=re.compile(r'This will delete all data\. Continue\? \[y/N\]'),

446

response='y\n'

447

)

448

]

449

450

ctx = Context()

451

result = ctx.run("complex-deployment", watchers=patterns, pty=True)

452

```