or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdfile-transfer.mdindex.mdinteractive-shells.mdoutput-handling.mdparallel-operations.mdsingle-host-operations.md

output-handling.mddocs/

0

# Output Processing and Result Handling

1

2

Comprehensive result objects providing access to command output, exit codes, and execution metadata for both parallel and single-host operations. Enables flexible processing of command results with generators, buffering, and error handling.

3

4

## Capabilities

5

6

### HostOutput Class

7

8

Primary result object containing all information about command execution including output streams, exit codes, and error conditions.

9

10

```python { .api }

11

class HostOutput:

12

"""

13

Container for SSH command execution results.

14

15

Properties:

16

- host (str): Hostname or IP address

17

- alias (str): Host alias if configured

18

- channel (object): SSH channel object used for execution

19

- stdin (object): Stdin stream object

20

- stdout (generator): Generator yielding stdout lines

21

- stderr (generator): Generator yielding stderr lines

22

- exit_code (int or None): Command exit code (None until command completes)

23

- client (object): Reference to SSH client that created this output

24

- exception (Exception or None): Exception if command execution failed

25

"""

26

```

27

28

### Output Stream Processing

29

30

Process command output using generators that yield lines as they become available, enabling real-time output processing and memory-efficient handling of large outputs.

31

32

```python

33

from pssh.clients import ParallelSSHClient

34

35

# Basic output processing

36

hosts = ['server1.example.com', 'server2.example.com']

37

client = ParallelSSHClient(hosts)

38

39

output = client.run_command('find /var/log -name "*.log" -type f')

40

41

# Process output from each host

42

for host_output in output:

43

print(f"\n=== Results from {host_output.host} ===")

44

45

# Stdout is a generator - iterate to get lines

46

for line in host_output.stdout:

47

print(f"Found: {line}")

48

49

# Check for errors

50

error_lines = []

51

for line in host_output.stderr:

52

error_lines.append(line)

53

54

if error_lines:

55

print("Errors:")

56

for error in error_lines:

57

print(f"ERROR: {error}")

58

59

print(f"Exit code: {host_output.exit_code}")

60

```

61

62

### Exit Code Handling

63

64

Monitor command completion and handle exit codes for success/failure detection.

65

66

```python

67

# Wait for completion before checking exit codes

68

output = client.run_command('test -f /etc/passwd && echo "exists" || echo "missing"')

69

client.join() # Wait for all commands to complete

70

71

successful_hosts = []

72

failed_hosts = []

73

74

for host_output in output:

75

if host_output.exit_code == 0:

76

successful_hosts.append(host_output.host)

77

# Process successful output

78

for line in host_output.stdout:

79

print(f"[{host_output.host}] {line}")

80

else:

81

failed_hosts.append((host_output.host, host_output.exit_code))

82

# Process error output

83

for line in host_output.stderr:

84

print(f"[{host_output.host}] ERROR: {line}")

85

86

print(f"Successful: {len(successful_hosts)}, Failed: {len(failed_hosts)}")

87

```

88

89

### Exception Handling in Output

90

91

Handle exceptions that occur during command execution, including connection errors, timeouts, and command failures.

92

93

```python

94

from pssh.exceptions import Timeout, ConnectionError, AuthenticationError

95

96

output = client.run_command('long_running_command', read_timeout=30)

97

98

for host_output in output:

99

if host_output.exception:

100

print(f"Host {host_output.host} had an exception:")

101

print(f" Exception type: {type(host_output.exception).__name__}")

102

print(f" Exception message: {host_output.exception}")

103

104

# Handle specific exception types

105

if isinstance(host_output.exception, Timeout):

106

print(" -> Command timed out, consider increasing timeout")

107

elif isinstance(host_output.exception, ConnectionError):

108

print(" -> Connection failed, host may be unreachable")

109

elif isinstance(host_output.exception, AuthenticationError):

110

print(" -> Authentication failed, check credentials")

111

else:

112

# Process normal output

113

for line in host_output.stdout:

114

print(f"[{host_output.host}] {line}")

115

```

116

117

### Real-Time Output Processing

118

119

Process output as it becomes available without waiting for command completion, useful for long-running commands and real-time monitoring.

120

121

```python

122

import time

123

from threading import Thread

124

125

def process_output_realtime(host_output):

126

"""Process output in real-time as it becomes available"""

127

host = host_output.host

128

129

try:

130

for line in host_output.stdout:

131

timestamp = time.strftime('%H:%M:%S')

132

print(f"[{timestamp}] [{host}] {line}")

133

except Exception as e:

134

print(f"[{host}] Output processing error: {e}")

135

136

# Start long-running command

137

output = client.run_command('tail -f /var/log/system.log')

138

139

# Process output from each host in separate threads

140

threads = []

141

for host_output in output:

142

thread = Thread(target=process_output_realtime, args=(host_output,))

143

thread.daemon = True

144

thread.start()

145

threads.append(thread)

146

147

# Let it run for 60 seconds

148

time.sleep(60)

149

150

# Commands are still running - you could join() to wait for completion

151

# or handle them as needed for your use case

152

print("Stopping real-time monitoring...")

153

```

154

155

### Output Buffering and Collection

156

157

Collect and buffer output for post-processing, analysis, or storage.

158

159

```python

160

def collect_all_output(host_output):

161

"""Collect all output into structured data"""

162

result = {

163

'host': host_output.host,

164

'alias': host_output.alias,

165

'stdout_lines': [],

166

'stderr_lines': [],

167

'exit_code': None,

168

'exception': None

169

}

170

171

# Collect stdout

172

try:

173

for line in host_output.stdout:

174

result['stdout_lines'].append(line)

175

except Exception as e:

176

result['exception'] = e

177

178

# Collect stderr

179

try:

180

for line in host_output.stderr:

181

result['stderr_lines'].append(line)

182

except Exception as e:

183

if not result['exception']:

184

result['exception'] = e

185

186

result['exit_code'] = host_output.exit_code

187

if host_output.exception:

188

result['exception'] = host_output.exception

189

190

return result

191

192

# Execute command and collect all results

193

output = client.run_command('ps aux | head -20')

194

client.join()

195

196

results = []

197

for host_output in output:

198

result = collect_all_output(host_output)

199

results.append(result)

200

201

# Analyze collected results

202

for result in results:

203

print(f"\n=== {result['host']} ===")

204

print(f"Exit code: {result['exit_code']}")

205

print(f"Stdout lines: {len(result['stdout_lines'])}")

206

print(f"Stderr lines: {len(result['stderr_lines'])}")

207

208

if result['exception']:

209

print(f"Exception: {result['exception']}")

210

211

# Show first few lines of output

212

for line in result['stdout_lines'][:5]:

213

print(f" {line}")

214

```

215

216

### Output Filtering and Processing

217

218

Filter and process output streams for specific patterns, log levels, or data extraction.

219

220

```python

221

import re

222

223

def filter_and_process_output(output, filters=None):

224

"""Filter output based on patterns and extract structured data"""

225

filters = filters or {}

226

227

results = {}

228

229

for host_output in output:

230

host = host_output.host

231

results[host] = {

232

'filtered_lines': [],

233

'matched_patterns': {},

234

'line_count': 0

235

}

236

237

# Process each line of stdout

238

for line in host_output.stdout:

239

results[host]['line_count'] += 1

240

241

# Apply filters

242

include_line = True

243

if 'exclude_patterns' in filters:

244

for pattern in filters['exclude_patterns']:

245

if re.search(pattern, line):

246

include_line = False

247

break

248

249

if include_line and 'include_patterns' in filters:

250

include_line = False

251

for pattern in filters['include_patterns']:

252

if re.search(pattern, line):

253

include_line = True

254

break

255

256

if include_line:

257

results[host]['filtered_lines'].append(line)

258

259

# Extract specific patterns

260

if 'extract_patterns' in filters:

261

for name, pattern in filters['extract_patterns'].items():

262

matches = re.findall(pattern, line)

263

if matches:

264

if name not in results[host]['matched_patterns']:

265

results[host]['matched_patterns'][name] = []

266

results[host]['matched_patterns'][name].extend(matches)

267

268

return results

269

270

# Example: Filter log files for errors and extract timestamps

271

output = client.run_command('cat /var/log/application.log')

272

client.join()

273

274

filters = {

275

'include_patterns': [r'ERROR', r'WARN', r'CRITICAL'],

276

'exclude_patterns': [r'DEBUG'],

277

'extract_patterns': {

278

'timestamps': r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}',

279

'error_codes': r'ERROR_(\d+)'

280

}

281

}

282

283

results = filter_and_process_output(output, filters)

284

285

for host, data in results.items():

286

print(f"\n=== Log Analysis for {host} ===")

287

print(f"Total lines processed: {data['line_count']}")

288

print(f"Filtered lines (errors/warnings): {len(data['filtered_lines'])}")

289

290

if 'timestamps' in data['matched_patterns']:

291

timestamps = data['matched_patterns']['timestamps']

292

print(f"Error time range: {timestamps[0]} to {timestamps[-1]}")

293

294

if 'error_codes' in data['matched_patterns']:

295

error_codes = set(data['matched_patterns']['error_codes'])

296

print(f"Error codes found: {sorted(error_codes)}")

297

298

# Show filtered lines

299

for line in data['filtered_lines'][:10]: # First 10 filtered lines

300

print(f" {line}")

301

```

302

303

### Output Export and Persistence

304

305

Save command output to files for later analysis or record keeping.

306

307

```python

308

import json

309

import csv

310

from datetime import datetime

311

312

def export_output_to_files(output, base_path='/tmp/ssh_results'):

313

"""Export command output to various file formats"""

314

timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')

315

316

# Collect all data

317

all_results = []

318

319

for host_output in output:

320

host_data = {

321

'host': host_output.host,

322

'alias': host_output.alias,

323

'timestamp': timestamp,

324

'exit_code': host_output.exit_code,

325

'stdout_lines': list(host_output.stdout),

326

'stderr_lines': list(host_output.stderr),

327

'has_exception': host_output.exception is not None,

328

'exception_str': str(host_output.exception) if host_output.exception else None

329

}

330

all_results.append(host_data)

331

332

# Export individual host files

333

host_file = f"{base_path}/{host_output.host}_{timestamp}.txt"

334

with open(host_file, 'w') as f:

335

f.write(f"Host: {host_data['host']}\n")

336

f.write(f"Exit Code: {host_data['exit_code']}\n")

337

f.write(f"Timestamp: {host_data['timestamp']}\n\n")

338

339

f.write("=== STDOUT ===\n")

340

for line in host_data['stdout_lines']:

341

f.write(f"{line}\n")

342

343

if host_data['stderr_lines']:

344

f.write("\n=== STDERR ===\n")

345

for line in host_data['stderr_lines']:

346

f.write(f"{line}\n")

347

348

if host_data['exception_str']:

349

f.write(f"\n=== EXCEPTION ===\n{host_data['exception_str']}\n")

350

351

# Export JSON summary

352

json_file = f"{base_path}/summary_{timestamp}.json"

353

with open(json_file, 'w') as f:

354

json.dump(all_results, f, indent=2)

355

356

# Export CSV summary

357

csv_file = f"{base_path}/summary_{timestamp}.csv"

358

with open(csv_file, 'w', newline='') as f:

359

writer = csv.writer(f)

360

writer.writerow(['host', 'alias', 'exit_code', 'stdout_lines', 'stderr_lines', 'has_exception'])

361

362

for result in all_results:

363

writer.writerow([

364

result['host'],

365

result['alias'],

366

result['exit_code'],

367

len(result['stdout_lines']),

368

len(result['stderr_lines']),

369

result['has_exception']

370

])

371

372

return {

373

'json_file': json_file,

374

'csv_file': csv_file,

375

'individual_files': [f"{base_path}/{r['host']}_{timestamp}.txt" for r in all_results]

376

}

377

378

# Execute command and export results

379

output = client.run_command('df -h && free -m && uptime')

380

client.join()

381

382

exported_files = export_output_to_files(output)

383

print("Results exported to:")

384

for file_type, files in exported_files.items():

385

if isinstance(files, list):

386

print(f" {file_type}: {len(files)} files")

387

else:

388

print(f" {file_type}: {files}")

389

```

390

391

## Built-in Host Logger

392

393

Use the built-in host logger for automatic output logging without manual processing.

394

395

```python

396

from pssh.utils import enable_host_logger

397

398

# Enable automatic host output logging

399

enable_host_logger()

400

401

# Execute commands - output will be automatically logged

402

output = client.run_command('echo "Hello from $(hostname)"')

403

client.join(consume_output=True) # Consume output to trigger logging

404

405

# Output will appear like:

406

# [timestamp] pssh.host_logger [server1.example.com] Hello from server1

407

# [timestamp] pssh.host_logger [server2.example.com] Hello from server2

408

```

409

410

## Performance Considerations

411

412

### Memory Management

413

414

```python

415

# For large outputs, process incrementally to avoid memory issues

416

def process_large_output_incrementally(host_output, chunk_size=1000):

417

"""Process large output in chunks to manage memory"""

418

lines_processed = 0

419

current_chunk = []

420

421

for line in host_output.stdout:

422

current_chunk.append(line)

423

lines_processed += 1

424

425

if len(current_chunk) >= chunk_size:

426

# Process chunk

427

process_chunk(current_chunk)

428

current_chunk = []

429

430

# Process remaining lines

431

if current_chunk:

432

process_chunk(current_chunk)

433

434

return lines_processed

435

436

def process_chunk(lines):

437

"""Process a chunk of output lines"""

438

# Your processing logic here

439

print(f"Processed chunk of {len(lines)} lines")

440

```

441

442

### Concurrent Output Processing

443

444

```python

445

from concurrent.futures import ThreadPoolExecutor

446

447

def process_host_output_concurrent(output):

448

"""Process output from multiple hosts concurrently"""

449

450

def process_single_host(host_output):

451

result = {

452

'host': host_output.host,

453

'line_count': 0,

454

'error_count': 0

455

}

456

457

for line in host_output.stdout:

458

result['line_count'] += 1

459

if 'ERROR' in line:

460

result['error_count'] += 1

461

462

return result

463

464

# Process all hosts concurrently

465

with ThreadPoolExecutor(max_workers=len(output)) as executor:

466

results = list(executor.map(process_single_host, output))

467

468

return results

469

470

# Use concurrent processing for large outputs

471

output = client.run_command('find /var -type f -name "*.log" | head -1000')

472

client.join()

473

474

results = process_host_output_concurrent(output)

475

for result in results:

476

print(f"Host {result['host']}: {result['line_count']} lines, {result['error_count']} errors")

477

```