Parallel scripting library for executing workflows across diverse computing resources
—
Parsl's monitoring and logging system provides comprehensive tracking of workflow execution, resource usage, performance metrics, and debugging information through the MonitoringHub and logging utilities.
Central monitoring system that collects and stores workflow execution data, resource usage metrics, and performance information.
class MonitoringHub:
def __init__(self, hub_address, hub_port=None, hub_port_range=(55050, 56000),
workflow_name=None, workflow_version=None, logging_endpoint=None,
monitoring_debug=False, resource_monitoring_enabled=True,
resource_monitoring_interval=30):
"""
Initialize workflow monitoring and resource tracking system.
Parameters:
- hub_address: Address for monitoring hub (required)
- hub_port: Port for hub communication (auto-selected if None)
- hub_port_range: Port range for executor monitoring messages (default: (55050, 56000))
- workflow_name: Name for the workflow (default: script name)
- workflow_version: Version of the workflow (default: start datetime)
- logging_endpoint: Database connection URL for monitoring data
- monitoring_debug: Enable debug logging (default: False)
- resource_monitoring_enabled: Enable resource monitoring (default: True)
- resource_monitoring_interval: Resource polling interval in seconds (default: 30)
"""Basic Monitoring Setup:
from parsl.config import Config
from parsl.monitoring import MonitoringHub
from parsl.executors import HighThroughputExecutor
# Configure monitoring
monitoring = MonitoringHub(
hub_address='localhost',
hub_port=55055,
resource_monitoring_interval=10, # Monitor every 10 seconds
logdir='parsl_monitoring_logs'
)
# Configure Parsl with monitoring
config = Config(
executors=[HighThroughputExecutor(max_workers=4)],
monitoring=monitoring
)
import parsl
with parsl.load(config):
# All tasks are automatically monitored
futures = [my_app(i) for i in range(10)]
results = [f.result() for f in futures]Configure persistent storage for monitoring data using SQLite or PostgreSQL databases.
# Database URL formats:
# SQLite: 'sqlite:///monitoring.db'
# PostgreSQL: 'postgresql://user:password@host:port/database'Database Monitoring Example:
from parsl.monitoring import MonitoringHub
# SQLite database storage
sqlite_monitoring = MonitoringHub(
hub_address='localhost',
hub_port=55055,
db_url='sqlite:///workflow_monitoring.db',
logdir='monitoring_logs'
)
# PostgreSQL database storage
postgres_monitoring = MonitoringHub(
hub_address='monitoring.example.com',
hub_port=55055,
db_url='postgresql://parsl:password@db.example.com:5432/monitoring',
resource_monitoring_interval=15
)
config = Config(
executors=[HighThroughputExecutor(max_workers=8)],
monitoring=sqlite_monitoring
)Automatic tracking of CPU usage, memory consumption, disk I/O, and network activity for tasks and workers.
# Resource monitoring metrics collected:
# - CPU utilization per core and total
# - Memory usage (RSS, VMS, available)
# - Disk I/O (read/write bytes and operations)
# - Network I/O (bytes sent/received)
# - Process information (PID, status, runtime)
# - System load and availabilityResource Monitoring Configuration:
# Detailed resource monitoring
detailed_monitoring = MonitoringHub(
resource_monitoring_interval=5, # High-frequency monitoring
resource_monitoring_enabled=True,
monitoring_debug=True, # Enable debug logs
logdir='detailed_monitoring'
)
# Lightweight monitoring for production
production_monitoring = MonitoringHub(
resource_monitoring_interval=60, # Monitor every minute
resource_monitoring_enabled=True,
monitoring_debug=False,
db_url='postgresql://monitoring@prod-db:5432/parsl'
)Integration with visualization tools for workflow analysis and performance review.
# Parsl provides a web-based visualization tool:
# parsl-visualize command launches monitoring dashboard
# Command-line usage:
# parsl-visualize --db sqlite:///monitoring.db --host 0.0.0.0 --port 8080Visualization Example:
# After workflow execution with monitoring enabled
import subprocess
# Launch visualization server
subprocess.Popen([
'parsl-visualize',
'--db', 'sqlite:///workflow_monitoring.db',
'--host', '0.0.0.0',
'--port', '8080'
])
# Access dashboard at http://localhost:8080
print("Monitoring dashboard available at http://localhost:8080")Configure Parsl's logging system for debugging, development, and production monitoring.
def set_stream_logger(name='parsl', level=logging.DEBUG, format_string=None):
"""
Configure stream-based logging to stdout/stderr.
Parameters:
- name: Logger name (default: 'parsl')
- level: Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
- format_string: Custom log format string
Returns:
logging.Logger: Configured logger instance
"""
def set_file_logger(filename, name='parsl', level=logging.DEBUG,
format_string=None):
"""
Configure file-based logging to specified file.
Parameters:
- filename: Log file path (str or AUTO_LOGNAME for automatic naming)
- name: Logger name (default: 'parsl')
- level: Logging level
- format_string: Custom log format string
Returns:
logging.Logger: Configured logger instance
"""
# Constants
AUTO_LOGNAME = -1 # Special value for automatic log filename generationLogging Examples:
import logging
from parsl.log_utils import set_stream_logger, set_file_logger
from parsl import AUTO_LOGNAME
# Stream logging for development
dev_logger = set_stream_logger(
name='parsl',
level=logging.DEBUG
)
# File logging with automatic filename
file_logger = set_file_logger(
filename=AUTO_LOGNAME, # Generates unique filename
level=logging.INFO
)
# Custom log format
custom_logger = set_file_logger(
filename='workflow.log',
level=logging.WARNING,
format_string='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Production logging configuration
production_logger = set_file_logger(
filename='/var/log/parsl/workflow.log',
level=logging.ERROR
)Access and analyze monitoring data programmatically for performance optimization and debugging.
# Monitoring data can be accessed through:
# 1. Database queries (SQLite/PostgreSQL)
# 2. Log file analysis
# 3. Real-time monitoring API (when available)Data Analysis Example:
import sqlite3
import pandas as pd
# Connect to monitoring database
conn = sqlite3.connect('workflow_monitoring.db')
# Query task execution data
task_query = """
SELECT task_id, task_func_name, task_time_submitted,
task_time_returned, task_status, task_fail_count
FROM task
WHERE task_time_submitted > datetime('now', '-1 day')
"""
tasks_df = pd.read_sql_query(task_query, conn)
# Analyze task performance
avg_runtime = (pd.to_datetime(tasks_df['task_time_returned']) -
pd.to_datetime(tasks_df['task_time_submitted'])).mean()
print(f"Average task runtime: {avg_runtime}")
print(f"Failed tasks: {tasks_df[tasks_df['task_fail_count'] > 0].shape[0]}")
# Query resource usage
resource_query = """
SELECT timestamp, cpu_percent, memory_percent, disk_read, disk_write
FROM resource
ORDER BY timestamp DESC
LIMIT 1000
"""
resources_df = pd.read_sql_query(resource_query, conn)
# Plot resource usage over time
import matplotlib.pyplot as plt
plt.figure(figsize=(12, 8))
plt.subplot(2, 2, 1)
plt.plot(resources_df['timestamp'], resources_df['cpu_percent'])
plt.title('CPU Usage Over Time')
plt.xticks(rotation=45)
plt.subplot(2, 2, 2)
plt.plot(resources_df['timestamp'], resources_df['memory_percent'])
plt.title('Memory Usage Over Time')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()
conn.close()Advanced monitoring features for complex workflows and production environments.
# Multi-hub monitoring for distributed workflows
from parsl.monitoring import MonitoringHub
# Central monitoring hub
central_hub = MonitoringHub(
hub_address='central-monitoring.example.com',
hub_port=55055,
db_url='postgresql://monitoring@central-db:5432/parsl',
resource_monitoring_interval=30
)
# Site-specific monitoring
site_hub = MonitoringHub(
hub_address='site-monitor.local',
hub_port=55056,
db_url='sqlite:///site_monitoring.db',
resource_monitoring_interval=10,
monitoring_debug=True
)
# Configure different executors with different monitoring
config = Config(
executors=[
HighThroughputExecutor(
label='central_compute',
provider=SlurmProvider(partition='compute')
),
HighThroughputExecutor(
label='local_testing',
provider=LocalProvider()
)
],
monitoring=central_hub # Global monitoring configuration
)Use monitoring data to optimize workflow performance and resource utilization.
# Monitor workflow execution
@python_app
def monitored_task(task_id, size):
"""Task with performance monitoring."""
import time
import psutil
start_time = time.time()
start_memory = psutil.virtual_memory().used
# Simulate work
result = sum(range(size))
time.sleep(0.1)
end_time = time.time()
end_memory = psutil.virtual_memory().used
# Log performance data
print(f"Task {task_id}: Runtime={end_time-start_time:.2f}s, "
f"Memory={end_memory-start_memory} bytes")
return result
# Execute with monitoring
with parsl.load(config):
# Submit various task sizes
small_tasks = [monitored_task(i, 1000) for i in range(10)]
large_tasks = [monitored_task(i+10, 100000) for i in range(5)]
# Collect results and performance data
small_results = [f.result() for f in small_tasks]
large_results = [f.result() for f in large_tasks]
# Analyze monitoring data to optimize future runs
print("Check monitoring dashboard for performance analysis")Track and analyze errors and exceptions in workflow execution.
from parsl.monitoring import MonitoringHub
# Configure monitoring with error tracking
error_monitoring = MonitoringHub(
monitoring_debug=True, # Capture detailed error information
logdir='error_monitoring'
)
@python_app
def error_prone_task(task_id):
"""Task that may fail for monitoring testing."""
import random
if random.random() < 0.2: # 20% failure rate
raise ValueError(f"Task {task_id} failed randomly")
return f"Task {task_id} completed successfully"
# Execute with error monitoring
config = Config(
executors=[ThreadPoolExecutor(max_threads=4)],
monitoring=error_monitoring,
retries=2 # Retry failed tasks
)
with parsl.load(config):
futures = [error_prone_task(i) for i in range(20)]
# Handle failures gracefully
for i, future in enumerate(futures):
try:
result = future.result()
print(f"Success: {result}")
except Exception as e:
print(f"Task {i} failed: {e}")
# Error data is captured in monitoring database for analysisInstall with Tessl CLI
npx tessl i tessl/pypi-parsl