CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-parsl

Parallel scripting library for executing workflows across diverse computing resources

Pending
Overview
Eval results
Files

monitoring.mddocs/

Monitoring and Logging

Parsl's monitoring and logging system provides comprehensive tracking of workflow execution, resource usage, performance metrics, and debugging information through the MonitoringHub and logging utilities.

Capabilities

MonitoringHub

Central monitoring system that collects and stores workflow execution data, resource usage metrics, and performance information.

class MonitoringHub:
    def __init__(self, hub_address, hub_port=None, hub_port_range=(55050, 56000),
                 workflow_name=None, workflow_version=None, logging_endpoint=None,
                 monitoring_debug=False, resource_monitoring_enabled=True,
                 resource_monitoring_interval=30):
        """
        Initialize workflow monitoring and resource tracking system.
        
        Parameters:
        - hub_address: Address for monitoring hub (required)
        - hub_port: Port for hub communication (auto-selected if None)
        - hub_port_range: Port range for executor monitoring messages (default: (55050, 56000))
        - workflow_name: Name for the workflow (default: script name)
        - workflow_version: Version of the workflow (default: start datetime)
        - logging_endpoint: Database connection URL for monitoring data
        - monitoring_debug: Enable debug logging (default: False)
        - resource_monitoring_enabled: Enable resource monitoring (default: True)
        - resource_monitoring_interval: Resource polling interval in seconds (default: 30)
        """

Basic Monitoring Setup:

from parsl.config import Config
from parsl.monitoring import MonitoringHub
from parsl.executors import HighThroughputExecutor

# Configure monitoring
monitoring = MonitoringHub(
    hub_address='localhost',
    hub_port=55055,
    resource_monitoring_interval=10,  # Monitor every 10 seconds
    logdir='parsl_monitoring_logs'
)

# Configure Parsl with monitoring
config = Config(
    executors=[HighThroughputExecutor(max_workers=4)],
    monitoring=monitoring
)

import parsl
with parsl.load(config):
    # All tasks are automatically monitored
    futures = [my_app(i) for i in range(10)]
    results = [f.result() for f in futures]

Database Storage

Configure persistent storage for monitoring data using SQLite or PostgreSQL databases.

# Database URL formats:
# SQLite: 'sqlite:///monitoring.db'
# PostgreSQL: 'postgresql://user:password@host:port/database'

Database Monitoring Example:

from parsl.monitoring import MonitoringHub

# SQLite database storage
sqlite_monitoring = MonitoringHub(
    hub_address='localhost',
    hub_port=55055,
    db_url='sqlite:///workflow_monitoring.db',
    logdir='monitoring_logs'
)

# PostgreSQL database storage  
postgres_monitoring = MonitoringHub(
    hub_address='monitoring.example.com',
    hub_port=55055,
    db_url='postgresql://parsl:password@db.example.com:5432/monitoring',
    resource_monitoring_interval=15
)

config = Config(
    executors=[HighThroughputExecutor(max_workers=8)],
    monitoring=sqlite_monitoring
)

Resource Monitoring

Automatic tracking of CPU usage, memory consumption, disk I/O, and network activity for tasks and workers.

# Resource monitoring metrics collected:
# - CPU utilization per core and total
# - Memory usage (RSS, VMS, available)
# - Disk I/O (read/write bytes and operations)
# - Network I/O (bytes sent/received)
# - Process information (PID, status, runtime)
# - System load and availability

Resource Monitoring Configuration:

# Detailed resource monitoring
detailed_monitoring = MonitoringHub(
    resource_monitoring_interval=5,    # High-frequency monitoring
    resource_monitoring_enabled=True,
    monitoring_debug=True,             # Enable debug logs
    logdir='detailed_monitoring'
)

# Lightweight monitoring for production
production_monitoring = MonitoringHub(
    resource_monitoring_interval=60,   # Monitor every minute
    resource_monitoring_enabled=True,
    monitoring_debug=False,
    db_url='postgresql://monitoring@prod-db:5432/parsl'
)

Workflow Visualization

Integration with visualization tools for workflow analysis and performance review.

# Parsl provides a web-based visualization tool:
# parsl-visualize command launches monitoring dashboard

# Command-line usage:
# parsl-visualize --db sqlite:///monitoring.db --host 0.0.0.0 --port 8080

Visualization Example:

# After workflow execution with monitoring enabled
import subprocess

# Launch visualization server
subprocess.Popen([
    'parsl-visualize',
    '--db', 'sqlite:///workflow_monitoring.db',
    '--host', '0.0.0.0',
    '--port', '8080'
])

# Access dashboard at http://localhost:8080
print("Monitoring dashboard available at http://localhost:8080")

Logging Configuration

Configure Parsl's logging system for debugging, development, and production monitoring.

def set_stream_logger(name='parsl', level=logging.DEBUG, format_string=None):
    """
    Configure stream-based logging to stdout/stderr.
    
    Parameters:
    - name: Logger name (default: 'parsl')
    - level: Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
    - format_string: Custom log format string
    
    Returns:
    logging.Logger: Configured logger instance
    """

def set_file_logger(filename, name='parsl', level=logging.DEBUG, 
                   format_string=None):
    """
    Configure file-based logging to specified file.
    
    Parameters:
    - filename: Log file path (str or AUTO_LOGNAME for automatic naming)
    - name: Logger name (default: 'parsl')
    - level: Logging level
    - format_string: Custom log format string
    
    Returns:
    logging.Logger: Configured logger instance
    """

# Constants
AUTO_LOGNAME = -1  # Special value for automatic log filename generation

Logging Examples:

import logging
from parsl.log_utils import set_stream_logger, set_file_logger
from parsl import AUTO_LOGNAME

# Stream logging for development
dev_logger = set_stream_logger(
    name='parsl',
    level=logging.DEBUG
)

# File logging with automatic filename
file_logger = set_file_logger(
    filename=AUTO_LOGNAME,  # Generates unique filename
    level=logging.INFO
)

# Custom log format
custom_logger = set_file_logger(
    filename='workflow.log',
    level=logging.WARNING,
    format_string='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

# Production logging configuration
production_logger = set_file_logger(
    filename='/var/log/parsl/workflow.log',
    level=logging.ERROR
)

Monitoring Data Analysis

Access and analyze monitoring data programmatically for performance optimization and debugging.

# Monitoring data can be accessed through:
# 1. Database queries (SQLite/PostgreSQL)
# 2. Log file analysis  
# 3. Real-time monitoring API (when available)

Data Analysis Example:

import sqlite3
import pandas as pd

# Connect to monitoring database
conn = sqlite3.connect('workflow_monitoring.db')

# Query task execution data
task_query = """
SELECT task_id, task_func_name, task_time_submitted, 
       task_time_returned, task_status, task_fail_count
FROM task 
WHERE task_time_submitted > datetime('now', '-1 day')
"""

tasks_df = pd.read_sql_query(task_query, conn)

# Analyze task performance
avg_runtime = (pd.to_datetime(tasks_df['task_time_returned']) - 
               pd.to_datetime(tasks_df['task_time_submitted'])).mean()

print(f"Average task runtime: {avg_runtime}")
print(f"Failed tasks: {tasks_df[tasks_df['task_fail_count'] > 0].shape[0]}")

# Query resource usage
resource_query = """
SELECT timestamp, cpu_percent, memory_percent, disk_read, disk_write
FROM resource
ORDER BY timestamp DESC
LIMIT 1000
"""

resources_df = pd.read_sql_query(resource_query, conn)

# Plot resource usage over time
import matplotlib.pyplot as plt

plt.figure(figsize=(12, 8))
plt.subplot(2, 2, 1)
plt.plot(resources_df['timestamp'], resources_df['cpu_percent'])
plt.title('CPU Usage Over Time')
plt.xticks(rotation=45)

plt.subplot(2, 2, 2)
plt.plot(resources_df['timestamp'], resources_df['memory_percent'])
plt.title('Memory Usage Over Time')
plt.xticks(rotation=45)

plt.tight_layout()
plt.show()

conn.close()

Advanced Monitoring Configuration

Advanced monitoring features for complex workflows and production environments.

# Multi-hub monitoring for distributed workflows
from parsl.monitoring import MonitoringHub

# Central monitoring hub
central_hub = MonitoringHub(
    hub_address='central-monitoring.example.com',
    hub_port=55055,
    db_url='postgresql://monitoring@central-db:5432/parsl',
    resource_monitoring_interval=30
)

# Site-specific monitoring
site_hub = MonitoringHub(
    hub_address='site-monitor.local',
    hub_port=55056,
    db_url='sqlite:///site_monitoring.db',
    resource_monitoring_interval=10,
    monitoring_debug=True
)

# Configure different executors with different monitoring
config = Config(
    executors=[
        HighThroughputExecutor(
            label='central_compute',
            provider=SlurmProvider(partition='compute')
        ),
        HighThroughputExecutor(
            label='local_testing',
            provider=LocalProvider()
        )
    ],
    monitoring=central_hub  # Global monitoring configuration
)

Performance Optimization

Use monitoring data to optimize workflow performance and resource utilization.

# Monitor workflow execution
@python_app
def monitored_task(task_id, size):
    """Task with performance monitoring."""
    import time
    import psutil
    
    start_time = time.time()
    start_memory = psutil.virtual_memory().used
    
    # Simulate work
    result = sum(range(size))
    time.sleep(0.1)
    
    end_time = time.time()
    end_memory = psutil.virtual_memory().used
    
    # Log performance data
    print(f"Task {task_id}: Runtime={end_time-start_time:.2f}s, "
          f"Memory={end_memory-start_memory} bytes")
    
    return result

# Execute with monitoring
with parsl.load(config):
    # Submit various task sizes
    small_tasks = [monitored_task(i, 1000) for i in range(10)]
    large_tasks = [monitored_task(i+10, 100000) for i in range(5)]
    
    # Collect results and performance data
    small_results = [f.result() for f in small_tasks]
    large_results = [f.result() for f in large_tasks]
    
    # Analyze monitoring data to optimize future runs
    print("Check monitoring dashboard for performance analysis")

Error and Exception Monitoring

Track and analyze errors and exceptions in workflow execution.

from parsl.monitoring import MonitoringHub

# Configure monitoring with error tracking
error_monitoring = MonitoringHub(
    monitoring_debug=True,  # Capture detailed error information
    logdir='error_monitoring'
)

@python_app
def error_prone_task(task_id):
    """Task that may fail for monitoring testing.""" 
    import random
    
    if random.random() < 0.2:  # 20% failure rate
        raise ValueError(f"Task {task_id} failed randomly")
    
    return f"Task {task_id} completed successfully"

# Execute with error monitoring
config = Config(
    executors=[ThreadPoolExecutor(max_threads=4)],
    monitoring=error_monitoring,
    retries=2  # Retry failed tasks
)

with parsl.load(config):
    futures = [error_prone_task(i) for i in range(20)]
    
    # Handle failures gracefully
    for i, future in enumerate(futures):
        try:
            result = future.result()
            print(f"Success: {result}")
        except Exception as e:
            print(f"Task {i} failed: {e}")
    
# Error data is captured in monitoring database for analysis

Install with Tessl CLI

npx tessl i tessl/pypi-parsl

docs

app-decorators.md

configuration.md

data-management.md

executors.md

index.md

launchers.md

monitoring.md

providers.md

workflow-management.md

tile.json