0
# Output Processing and Result Handling
1
2
Comprehensive result objects providing access to command output, exit codes, and execution metadata for both parallel and single-host operations. Enables flexible processing of command results with generators, buffering, and error handling.
3
4
## Capabilities
5
6
### HostOutput Class
7
8
Primary result object containing all information about command execution including output streams, exit codes, and error conditions.
9
10
```python { .api }
11
class HostOutput:
12
"""
13
Container for SSH command execution results.
14
15
Properties:
16
- host (str): Hostname or IP address
17
- alias (str): Host alias if configured
18
- channel (object): SSH channel object used for execution
19
- stdin (object): Stdin stream object
20
- stdout (generator): Generator yielding stdout lines
21
- stderr (generator): Generator yielding stderr lines
22
- exit_code (int or None): Command exit code (None until command completes)
23
- client (object): Reference to SSH client that created this output
24
- exception (Exception or None): Exception if command execution failed
25
"""
26
```
27
28
### Output Stream Processing
29
30
Process command output using generators that yield lines as they become available, enabling real-time output processing and memory-efficient handling of large outputs.
31
32
```python
33
from pssh.clients import ParallelSSHClient
34
35
# Basic output processing
36
hosts = ['server1.example.com', 'server2.example.com']
37
client = ParallelSSHClient(hosts)
38
39
output = client.run_command('find /var/log -name "*.log" -type f')
40
41
# Process output from each host
42
for host_output in output:
43
print(f"\n=== Results from {host_output.host} ===")
44
45
# Stdout is a generator - iterate to get lines
46
for line in host_output.stdout:
47
print(f"Found: {line}")
48
49
# Check for errors
50
error_lines = []
51
for line in host_output.stderr:
52
error_lines.append(line)
53
54
if error_lines:
55
print("Errors:")
56
for error in error_lines:
57
print(f"ERROR: {error}")
58
59
print(f"Exit code: {host_output.exit_code}")
60
```
61
62
### Exit Code Handling
63
64
Monitor command completion and handle exit codes for success/failure detection.
65
66
```python
67
# Wait for completion before checking exit codes
68
output = client.run_command('test -f /etc/passwd && echo "exists" || echo "missing"')
69
client.join() # Wait for all commands to complete
70
71
successful_hosts = []
72
failed_hosts = []
73
74
for host_output in output:
75
if host_output.exit_code == 0:
76
successful_hosts.append(host_output.host)
77
# Process successful output
78
for line in host_output.stdout:
79
print(f"[{host_output.host}] {line}")
80
else:
81
failed_hosts.append((host_output.host, host_output.exit_code))
82
# Process error output
83
for line in host_output.stderr:
84
print(f"[{host_output.host}] ERROR: {line}")
85
86
print(f"Successful: {len(successful_hosts)}, Failed: {len(failed_hosts)}")
87
```
88
89
### Exception Handling in Output
90
91
Handle exceptions that occur during command execution, including connection errors, timeouts, and command failures.
92
93
```python
94
from pssh.exceptions import Timeout, ConnectionError, AuthenticationError
95
96
output = client.run_command('long_running_command', read_timeout=30)
97
98
for host_output in output:
99
if host_output.exception:
100
print(f"Host {host_output.host} had an exception:")
101
print(f" Exception type: {type(host_output.exception).__name__}")
102
print(f" Exception message: {host_output.exception}")
103
104
# Handle specific exception types
105
if isinstance(host_output.exception, Timeout):
106
print(" -> Command timed out, consider increasing timeout")
107
elif isinstance(host_output.exception, ConnectionError):
108
print(" -> Connection failed, host may be unreachable")
109
elif isinstance(host_output.exception, AuthenticationError):
110
print(" -> Authentication failed, check credentials")
111
else:
112
# Process normal output
113
for line in host_output.stdout:
114
print(f"[{host_output.host}] {line}")
115
```
116
117
### Real-Time Output Processing
118
119
Process output as it becomes available without waiting for command completion, useful for long-running commands and real-time monitoring.
120
121
```python
122
import time
123
from threading import Thread
124
125
def process_output_realtime(host_output):
126
"""Process output in real-time as it becomes available"""
127
host = host_output.host
128
129
try:
130
for line in host_output.stdout:
131
timestamp = time.strftime('%H:%M:%S')
132
print(f"[{timestamp}] [{host}] {line}")
133
except Exception as e:
134
print(f"[{host}] Output processing error: {e}")
135
136
# Start long-running command
137
output = client.run_command('tail -f /var/log/system.log')
138
139
# Process output from each host in separate threads
140
threads = []
141
for host_output in output:
142
thread = Thread(target=process_output_realtime, args=(host_output,))
143
thread.daemon = True
144
thread.start()
145
threads.append(thread)
146
147
# Let it run for 60 seconds
148
time.sleep(60)
149
150
# Commands are still running - you could join() to wait for completion
151
# or handle them as needed for your use case
152
print("Stopping real-time monitoring...")
153
```
154
155
### Output Buffering and Collection
156
157
Collect and buffer output for post-processing, analysis, or storage.
158
159
```python
160
def collect_all_output(host_output):
161
"""Collect all output into structured data"""
162
result = {
163
'host': host_output.host,
164
'alias': host_output.alias,
165
'stdout_lines': [],
166
'stderr_lines': [],
167
'exit_code': None,
168
'exception': None
169
}
170
171
# Collect stdout
172
try:
173
for line in host_output.stdout:
174
result['stdout_lines'].append(line)
175
except Exception as e:
176
result['exception'] = e
177
178
# Collect stderr
179
try:
180
for line in host_output.stderr:
181
result['stderr_lines'].append(line)
182
except Exception as e:
183
if not result['exception']:
184
result['exception'] = e
185
186
result['exit_code'] = host_output.exit_code
187
if host_output.exception:
188
result['exception'] = host_output.exception
189
190
return result
191
192
# Execute command and collect all results
193
output = client.run_command('ps aux | head -20')
194
client.join()
195
196
results = []
197
for host_output in output:
198
result = collect_all_output(host_output)
199
results.append(result)
200
201
# Analyze collected results
202
for result in results:
203
print(f"\n=== {result['host']} ===")
204
print(f"Exit code: {result['exit_code']}")
205
print(f"Stdout lines: {len(result['stdout_lines'])}")
206
print(f"Stderr lines: {len(result['stderr_lines'])}")
207
208
if result['exception']:
209
print(f"Exception: {result['exception']}")
210
211
# Show first few lines of output
212
for line in result['stdout_lines'][:5]:
213
print(f" {line}")
214
```
215
216
### Output Filtering and Processing
217
218
Filter and process output streams for specific patterns, log levels, or data extraction.
219
220
```python
221
import re
222
223
def filter_and_process_output(output, filters=None):
224
"""Filter output based on patterns and extract structured data"""
225
filters = filters or {}
226
227
results = {}
228
229
for host_output in output:
230
host = host_output.host
231
results[host] = {
232
'filtered_lines': [],
233
'matched_patterns': {},
234
'line_count': 0
235
}
236
237
# Process each line of stdout
238
for line in host_output.stdout:
239
results[host]['line_count'] += 1
240
241
# Apply filters
242
include_line = True
243
if 'exclude_patterns' in filters:
244
for pattern in filters['exclude_patterns']:
245
if re.search(pattern, line):
246
include_line = False
247
break
248
249
if include_line and 'include_patterns' in filters:
250
include_line = False
251
for pattern in filters['include_patterns']:
252
if re.search(pattern, line):
253
include_line = True
254
break
255
256
if include_line:
257
results[host]['filtered_lines'].append(line)
258
259
# Extract specific patterns
260
if 'extract_patterns' in filters:
261
for name, pattern in filters['extract_patterns'].items():
262
matches = re.findall(pattern, line)
263
if matches:
264
if name not in results[host]['matched_patterns']:
265
results[host]['matched_patterns'][name] = []
266
results[host]['matched_patterns'][name].extend(matches)
267
268
return results
269
270
# Example: Filter log files for errors and extract timestamps
271
output = client.run_command('cat /var/log/application.log')
272
client.join()
273
274
filters = {
275
'include_patterns': [r'ERROR', r'WARN', r'CRITICAL'],
276
'exclude_patterns': [r'DEBUG'],
277
'extract_patterns': {
278
'timestamps': r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}',
279
'error_codes': r'ERROR_(\d+)'
280
}
281
}
282
283
results = filter_and_process_output(output, filters)
284
285
for host, data in results.items():
286
print(f"\n=== Log Analysis for {host} ===")
287
print(f"Total lines processed: {data['line_count']}")
288
print(f"Filtered lines (errors/warnings): {len(data['filtered_lines'])}")
289
290
if 'timestamps' in data['matched_patterns']:
291
timestamps = data['matched_patterns']['timestamps']
292
print(f"Error time range: {timestamps[0]} to {timestamps[-1]}")
293
294
if 'error_codes' in data['matched_patterns']:
295
error_codes = set(data['matched_patterns']['error_codes'])
296
print(f"Error codes found: {sorted(error_codes)}")
297
298
# Show filtered lines
299
for line in data['filtered_lines'][:10]: # First 10 filtered lines
300
print(f" {line}")
301
```
302
303
### Output Export and Persistence
304
305
Save command output to files for later analysis or record keeping.
306
307
```python
308
import json
309
import csv
310
from datetime import datetime
311
312
def export_output_to_files(output, base_path='/tmp/ssh_results'):
313
"""Export command output to various file formats"""
314
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
315
316
# Collect all data
317
all_results = []
318
319
for host_output in output:
320
host_data = {
321
'host': host_output.host,
322
'alias': host_output.alias,
323
'timestamp': timestamp,
324
'exit_code': host_output.exit_code,
325
'stdout_lines': list(host_output.stdout),
326
'stderr_lines': list(host_output.stderr),
327
'has_exception': host_output.exception is not None,
328
'exception_str': str(host_output.exception) if host_output.exception else None
329
}
330
all_results.append(host_data)
331
332
# Export individual host files
333
host_file = f"{base_path}/{host_output.host}_{timestamp}.txt"
334
with open(host_file, 'w') as f:
335
f.write(f"Host: {host_data['host']}\n")
336
f.write(f"Exit Code: {host_data['exit_code']}\n")
337
f.write(f"Timestamp: {host_data['timestamp']}\n\n")
338
339
f.write("=== STDOUT ===\n")
340
for line in host_data['stdout_lines']:
341
f.write(f"{line}\n")
342
343
if host_data['stderr_lines']:
344
f.write("\n=== STDERR ===\n")
345
for line in host_data['stderr_lines']:
346
f.write(f"{line}\n")
347
348
if host_data['exception_str']:
349
f.write(f"\n=== EXCEPTION ===\n{host_data['exception_str']}\n")
350
351
# Export JSON summary
352
json_file = f"{base_path}/summary_{timestamp}.json"
353
with open(json_file, 'w') as f:
354
json.dump(all_results, f, indent=2)
355
356
# Export CSV summary
357
csv_file = f"{base_path}/summary_{timestamp}.csv"
358
with open(csv_file, 'w', newline='') as f:
359
writer = csv.writer(f)
360
writer.writerow(['host', 'alias', 'exit_code', 'stdout_lines', 'stderr_lines', 'has_exception'])
361
362
for result in all_results:
363
writer.writerow([
364
result['host'],
365
result['alias'],
366
result['exit_code'],
367
len(result['stdout_lines']),
368
len(result['stderr_lines']),
369
result['has_exception']
370
])
371
372
return {
373
'json_file': json_file,
374
'csv_file': csv_file,
375
'individual_files': [f"{base_path}/{r['host']}_{timestamp}.txt" for r in all_results]
376
}
377
378
# Execute command and export results
379
output = client.run_command('df -h && free -m && uptime')
380
client.join()
381
382
exported_files = export_output_to_files(output)
383
print("Results exported to:")
384
for file_type, files in exported_files.items():
385
if isinstance(files, list):
386
print(f" {file_type}: {len(files)} files")
387
else:
388
print(f" {file_type}: {files}")
389
```
390
391
## Built-in Host Logger
392
393
Use the built-in host logger for automatic output logging without manual processing.
394
395
```python
396
from pssh.utils import enable_host_logger
397
398
# Enable automatic host output logging
399
enable_host_logger()
400
401
# Execute commands - output will be automatically logged
402
output = client.run_command('echo "Hello from $(hostname)"')
403
client.join(consume_output=True) # Consume output to trigger logging
404
405
# Output will appear like:
406
# [timestamp] pssh.host_logger [server1.example.com] Hello from server1
407
# [timestamp] pssh.host_logger [server2.example.com] Hello from server2
408
```
409
410
## Performance Considerations
411
412
### Memory Management
413
414
```python
415
# For large outputs, process incrementally to avoid memory issues
416
def process_large_output_incrementally(host_output, chunk_size=1000):
417
"""Process large output in chunks to manage memory"""
418
lines_processed = 0
419
current_chunk = []
420
421
for line in host_output.stdout:
422
current_chunk.append(line)
423
lines_processed += 1
424
425
if len(current_chunk) >= chunk_size:
426
# Process chunk
427
process_chunk(current_chunk)
428
current_chunk = []
429
430
# Process remaining lines
431
if current_chunk:
432
process_chunk(current_chunk)
433
434
return lines_processed
435
436
def process_chunk(lines):
437
"""Process a chunk of output lines"""
438
# Your processing logic here
439
print(f"Processed chunk of {len(lines)} lines")
440
```
441
442
### Concurrent Output Processing
443
444
```python
445
from concurrent.futures import ThreadPoolExecutor
446
447
def process_host_output_concurrent(output):
448
"""Process output from multiple hosts concurrently"""
449
450
def process_single_host(host_output):
451
result = {
452
'host': host_output.host,
453
'line_count': 0,
454
'error_count': 0
455
}
456
457
for line in host_output.stdout:
458
result['line_count'] += 1
459
if 'ERROR' in line:
460
result['error_count'] += 1
461
462
return result
463
464
# Process all hosts concurrently
465
with ThreadPoolExecutor(max_workers=len(output)) as executor:
466
results = list(executor.map(process_single_host, output))
467
468
return results
469
470
# Use concurrent processing for large outputs
471
output = client.run_command('find /var -type f -name "*.log" | head -1000')
472
client.join()
473
474
results = process_host_output_concurrent(output)
475
for result in results:
476
print(f"Host {result['host']}: {result['line_count']} lines, {result['error_count']} errors")
477
```