Cross-platform library for retrieving information on running processes and system utilization (CPU, memory, disks, network, sensors) in Python
—
psutil defines a comprehensive exception hierarchy for handling various error conditions that can occur during system monitoring and process management operations. Proper exception handling is essential for robust applications using psutil.
# psutil exception hierarchy:
# Exception (built-in)
# └── Error (psutil base exception)
# ├── NoSuchProcess
# ├── ZombieProcess
# ├── AccessDenied
# └── TimeoutExpiredimport psutil
# Base exception class for all psutil exceptions
try:
p = psutil.Process(99999) # Non-existent PID
print(p.name())
except psutil.Error as e:
print(f"psutil error occurred: {e}")
print(f"Exception type: {type(e).__name__}"){ .api }
Raised when a process no longer exists or was never found.
# NoSuchProcess - Process not found or terminated
try:
p = psutil.Process(99999) # Non-existent PID
print(p.name())
except psutil.NoSuchProcess as e:
print(f"Process not found: {e}")
print(f"PID: {e.pid}")
print(f"Process name: {e.name}")
print(f"Message: {e.msg}")
# Common scenario - process terminates between operations
def monitor_process_safely(pid):
"""Safely monitor a process that might terminate."""
try:
p = psutil.Process(pid)
while True:
try:
print(f"CPU: {p.cpu_percent()}")
print(f"Memory: {p.memory_info().rss / 1024**2:.1f} MB")
time.sleep(1)
except psutil.NoSuchProcess:
print(f"Process {pid} terminated")
break
except psutil.NoSuchProcess:
print(f"Process {pid} not found"){ .api }
Raised when insufficient permissions prevent accessing process information.
# AccessDenied - Permission denied
import os
try:
# Try to access system process (may require admin/root)
if psutil.WINDOWS:
system_pid = 4 # System process on Windows
else:
system_pid = 1 # Init process on Unix
p = psutil.Process(system_pid)
print(p.cmdline()) # May require elevated privileges
except psutil.AccessDenied as e:
print(f"Access denied: {e}")
print(f"PID: {e.pid}")
print(f"Process name: {e.name}")
print(f"Current user: {os.getenv('USER') or os.getenv('USERNAME')}")
print("Try running with elevated privileges")
# Graceful handling of access denied
def get_accessible_process_info(pid):
"""Get process info with graceful handling of access denied."""
try:
p = psutil.Process(pid)
info = {'pid': pid, 'accessible': True}
# Try to get basic info
try:
info['name'] = p.name()
except psutil.AccessDenied:
info['name'] = 'Access Denied'
try:
info['status'] = p.status()
except psutil.AccessDenied:
info['status'] = 'Unknown'
try:
info['cpu_percent'] = p.cpu_percent()
except psutil.AccessDenied:
info['cpu_percent'] = None
return info
except psutil.NoSuchProcess:
return {'pid': pid, 'accessible': False, 'error': 'Process not found'}
# Get info for current process (should work)
info = get_accessible_process_info(os.getpid())
print(f"Current process info: {info}"){ .api }
Raised when trying to access a zombie process (Unix systems).
# ZombieProcess - Process is in zombie state
try:
# Zombie processes are common on Unix systems
for proc in psutil.process_iter():
try:
if proc.status() == psutil.STATUS_ZOMBIE:
print(f"Found zombie: PID {proc.pid}")
# Trying to access zombie process info may raise ZombieProcess
print(proc.name()) # This might raise ZombieProcess
except psutil.ZombieProcess as e:
print(f"Zombie process detected: {e}")
print(f"PID: {e.pid}")
print(f"Process name: {e.name}")
# Zombie processes can't provide most information
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass # Skip inaccessible processes
# Handling zombie processes in monitoring
def safe_process_iterator():
"""Iterate processes safely handling zombies."""
accessible_procs = []
zombie_count = 0
for proc in psutil.process_iter(['pid', 'name', 'status']):
try:
# Check if process is zombie
if proc.info['status'] == psutil.STATUS_ZOMBIE:
zombie_count += 1
continue
# Try to get additional info
proc.cpu_percent() # This might raise ZombieProcess
accessible_procs.append(proc)
except psutil.ZombieProcess:
zombie_count += 1
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
print(f"Found {len(accessible_procs)} accessible processes")
print(f"Found {zombie_count} zombie processes")
return accessible_procs
# accessible_procs = safe_process_iterator(){ .api }
Raised when an operation exceeds the specified timeout.
# TimeoutExpired - Operation timed out
import time
try:
# Create a process to demonstrate timeout
import subprocess
proc = subprocess.Popen(['sleep', '10']) # Unix command
p = psutil.Process(proc.pid)
# Wait with timeout
p.wait(timeout=2) # Will timeout after 2 seconds
except psutil.TimeoutExpired as e:
print(f"Operation timed out: {e}")
print(f"PID: {e.pid}")
print(f"Process name: {e.name}")
print(f"Timeout seconds: {e.seconds}")
# Clean up the process
try:
p.terminate()
p.wait(timeout=5) # Give it time to terminate gracefully
except psutil.TimeoutExpired:
p.kill() # Force kill if it won't terminate
finally:
# Ensure cleanup
try:
proc.terminate()
except:
pass
# Timeout handling in monitoring
def wait_for_process_with_retry(pid, timeout=30, retries=3):
"""Wait for process termination with retry logic."""
for attempt in range(retries):
try:
p = psutil.Process(pid)
p.wait(timeout=timeout)
print(f"Process {pid} terminated normally")
return True
except psutil.TimeoutExpired:
print(f"Attempt {attempt + 1}: Process {pid} did not terminate within {timeout}s")
if attempt < retries - 1:
print("Retrying...")
else:
print("Giving up - process still running")
return False
except psutil.NoSuchProcess:
print(f"Process {pid} already terminated")
return True
return False{ .api }
def robust_process_operation(pid, operation_name="operation"):
"""Perform process operations with comprehensive exception handling."""
try:
p = psutil.Process(pid)
# Perform the actual operation
if operation_name == "get_info":
return {
'pid': p.pid,
'name': p.name(),
'status': p.status(),
'cpu_percent': p.cpu_percent(),
'memory_info': p.memory_info()
}
elif operation_name == "terminate":
p.terminate()
p.wait(timeout=10)
return {"result": "terminated"}
except psutil.NoSuchProcess as e:
return {
"error": "NoSuchProcess",
"message": f"Process {e.pid} ({e.name}) not found or terminated",
"details": str(e)
}
except psutil.AccessDenied as e:
return {
"error": "AccessDenied",
"message": f"Permission denied for process {e.pid} ({e.name})",
"details": str(e),
"suggestion": "Try running with elevated privileges"
}
except psutil.ZombieProcess as e:
return {
"error": "ZombieProcess",
"message": f"Process {e.pid} ({e.name}) is in zombie state",
"details": str(e)
}
except psutil.TimeoutExpired as e:
return {
"error": "TimeoutExpired",
"message": f"Operation on process {e.pid} ({e.name}) timed out after {e.seconds}s",
"details": str(e)
}
except psutil.Error as e:
return {
"error": "GeneralPsutilError",
"message": f"Unexpected psutil error: {str(e)}",
"details": str(e)
}
except Exception as e:
return {
"error": "UnexpectedError",
"message": f"Unexpected error: {str(e)}",
"details": str(e)
}
# Example usage
result = robust_process_operation(os.getpid(), "get_info")
print(f"Operation result: {result}"){ .api }
def process_batch_operation(pids, operation_func):
"""Perform operations on multiple processes with proper exception handling."""
results = {
'successful': [],
'failed': [],
'summary': {
'total': len(pids),
'successful': 0,
'no_such_process': 0,
'access_denied': 0,
'zombie_process': 0,
'timeout_expired': 0,
'other_errors': 0
}
}
for pid in pids:
try:
result = operation_func(pid)
results['successful'].append({
'pid': pid,
'result': result
})
results['summary']['successful'] += 1
except psutil.NoSuchProcess as e:
results['failed'].append({
'pid': pid,
'error': 'NoSuchProcess',
'details': str(e)
})
results['summary']['no_such_process'] += 1
except psutil.AccessDenied as e:
results['failed'].append({
'pid': pid,
'error': 'AccessDenied',
'details': str(e)
})
results['summary']['access_denied'] += 1
except psutil.ZombieProcess as e:
results['failed'].append({
'pid': pid,
'error': 'ZombieProcess',
'details': str(e)
})
results['summary']['zombie_process'] += 1
except psutil.TimeoutExpired as e:
results['failed'].append({
'pid': pid,
'error': 'TimeoutExpired',
'details': str(e)
})
results['summary']['timeout_expired'] += 1
except Exception as e:
results['failed'].append({
'pid': pid,
'error': 'OtherError',
'details': str(e)
})
results['summary']['other_errors'] += 1
return results
# Example: Get CPU usage for multiple processes
def get_cpu_percent(pid):
p = psutil.Process(pid)
return p.cpu_percent()
# Test with some PIDs
test_pids = [os.getpid(), 1, 99999] # Current, init, non-existent
batch_results = process_batch_operation(test_pids, get_cpu_percent)
print("Batch operation results:")
print(f"Summary: {batch_results['summary']}"){ .api }
class SafeProcess:
"""Context manager for safe process operations."""
def __init__(self, pid):
self.pid = pid
self.process = None
self.error = None
def __enter__(self):
try:
self.process = psutil.Process(self.pid)
return self.process
except psutil.Error as e:
self.error = e
return None
def __exit__(self, exc_type, exc_val, exc_tb):
# Handle any cleanup if needed
if exc_type and issubclass(exc_type, psutil.Error):
# Log the psutil exception
print(f"psutil exception in context: {exc_val}")
return True # Suppress the exception
return False
# Usage example
def monitor_with_context_manager(pid):
"""Monitor process using context manager."""
with SafeProcess(pid) as p:
if p is None:
print(f"Could not access process {pid}")
return None
try:
return {
'name': p.name(),
'cpu_percent': p.cpu_percent(),
'memory_info': p.memory_info()
}
except psutil.Error as e:
print(f"Error during monitoring: {e}")
return None
# Test the context manager
info = monitor_with_context_manager(os.getpid())
print(f"Process info: {info}"){ .api }
def analyze_psutil_exception(exception):
"""Analyze a psutil exception and extract useful information."""
analysis = {
'type': type(exception).__name__,
'message': str(exception),
'is_psutil_exception': isinstance(exception, psutil.Error)
}
# Extract exception-specific attributes
if hasattr(exception, 'pid'):
analysis['pid'] = exception.pid
if hasattr(exception, 'name'):
analysis['name'] = exception.name
if hasattr(exception, 'msg'):
analysis['msg'] = exception.msg
if hasattr(exception, 'seconds'):
analysis['seconds'] = exception.seconds
# Provide suggestions based on exception type
if isinstance(exception, psutil.NoSuchProcess):
analysis['suggestion'] = "Process may have terminated - check if PID is still valid"
elif isinstance(exception, psutil.AccessDenied):
analysis['suggestion'] = "Run with elevated privileges or check process ownership"
elif isinstance(exception, psutil.ZombieProcess):
analysis['suggestion'] = "Process is zombie - limited information available"
elif isinstance(exception, psutil.TimeoutExpired):
analysis['suggestion'] = "Increase timeout value or check if process is responsive"
return analysis
# Example exception analysis
try:
p = psutil.Process(99999)
p.name()
except psutil.Error as e:
analysis = analyze_psutil_exception(e)
print("Exception analysis:", analysis){ .api }
import logging
# Set up logging for psutil exceptions
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def logged_process_operation(pid, operation_name):
"""Process operation with comprehensive logging."""
logger.info(f"Starting {operation_name} for PID {pid}")
try:
p = psutil.Process(pid)
if operation_name == "info":
result = p.as_dict(['pid', 'name', 'status', 'cpu_percent'])
logger.info(f"Successfully retrieved info for PID {pid}: {result}")
return result
except psutil.NoSuchProcess as e:
logger.warning(f"Process not found - PID: {e.pid}, Name: {e.name}")
raise
except psutil.AccessDenied as e:
logger.error(f"Access denied - PID: {e.pid}, Name: {e.name}")
raise
except psutil.ZombieProcess as e:
logger.warning(f"Zombie process - PID: {e.pid}, Name: {e.name}")
raise
except psutil.TimeoutExpired as e:
logger.error(f"Timeout expired - PID: {e.pid}, Name: {e.name}, Timeout: {e.seconds}s")
raise
except psutil.Error as e:
logger.error(f"General psutil error for PID {pid}: {e}")
raise
# Example with logging
try:
info = logged_process_operation(os.getpid(), "info")
except psutil.Error:
pass # Exception already logged{ .api }
def defensive_process_monitor():
"""Example of defensive programming with psutil."""
# Always expect processes to disappear
active_processes = set()
while True:
try:
# Get current processes
current_pids = set(p.pid for p in psutil.process_iter())
# Find new processes
new_pids = current_pids - active_processes
for pid in new_pids:
try:
p = psutil.Process(pid)
print(f"New process: {pid} ({p.name()})")
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass # Process already gone or inaccessible
# Update active set
active_processes = current_pids
time.sleep(5)
except KeyboardInterrupt:
print("Monitoring stopped")
break
except Exception as e:
print(f"Unexpected error: {e}")
time.sleep(1) # Brief pause before retry
# Run defensive monitor
# defensive_process_monitor(){ .api }
Install with Tessl CLI
npx tessl i tessl/pypi-psutil