0
# Monitoring and Logging
1
2
Parsl's monitoring and logging system provides comprehensive tracking of workflow execution, resource usage, performance metrics, and debugging information through the MonitoringHub and logging utilities.
3
4
## Capabilities
5
6
### MonitoringHub
7
8
Central monitoring system that collects and stores workflow execution data, resource usage metrics, and performance information.
9
10
```python { .api }
11
class MonitoringHub:
12
def __init__(self, hub_address, hub_port=None, hub_port_range=(55050, 56000),
13
workflow_name=None, workflow_version=None, logging_endpoint=None,
14
monitoring_debug=False, resource_monitoring_enabled=True,
15
resource_monitoring_interval=30):
16
"""
17
Initialize workflow monitoring and resource tracking system.
18
19
Parameters:
20
- hub_address: Address for monitoring hub (required)
21
- hub_port: Port for hub communication (auto-selected if None)
22
- hub_port_range: Port range for executor monitoring messages (default: (55050, 56000))
23
- workflow_name: Name for the workflow (default: script name)
24
- workflow_version: Version of the workflow (default: start datetime)
25
- logging_endpoint: Database connection URL for monitoring data
26
- monitoring_debug: Enable debug logging (default: False)
27
- resource_monitoring_enabled: Enable resource monitoring (default: True)
28
- resource_monitoring_interval: Resource polling interval in seconds (default: 30)
29
"""
30
```
31
32
**Basic Monitoring Setup:**
33
34
```python
35
from parsl.config import Config
36
from parsl.monitoring import MonitoringHub
37
from parsl.executors import HighThroughputExecutor
38
39
# Configure monitoring
40
monitoring = MonitoringHub(
41
hub_address='localhost',
42
hub_port=55055,
43
resource_monitoring_interval=10, # Monitor every 10 seconds
44
logdir='parsl_monitoring_logs'
45
)
46
47
# Configure Parsl with monitoring
48
config = Config(
49
executors=[HighThroughputExecutor(max_workers=4)],
50
monitoring=monitoring
51
)
52
53
import parsl
54
with parsl.load(config):
55
# All tasks are automatically monitored
56
futures = [my_app(i) for i in range(10)]
57
results = [f.result() for f in futures]
58
```
59
60
### Database Storage
61
62
Configure persistent storage for monitoring data using SQLite or PostgreSQL databases.
63
64
```python { .api }
65
# Database URL formats:
66
# SQLite: 'sqlite:///monitoring.db'
67
# PostgreSQL: 'postgresql://user:password@host:port/database'
68
```
69
70
**Database Monitoring Example:**
71
72
```python
73
from parsl.monitoring import MonitoringHub
74
75
# SQLite database storage
76
sqlite_monitoring = MonitoringHub(
77
hub_address='localhost',
78
hub_port=55055,
79
db_url='sqlite:///workflow_monitoring.db',
80
logdir='monitoring_logs'
81
)
82
83
# PostgreSQL database storage
84
postgres_monitoring = MonitoringHub(
85
hub_address='monitoring.example.com',
86
hub_port=55055,
87
db_url='postgresql://parsl:password@db.example.com:5432/monitoring',
88
resource_monitoring_interval=15
89
)
90
91
config = Config(
92
executors=[HighThroughputExecutor(max_workers=8)],
93
monitoring=sqlite_monitoring
94
)
95
```
96
97
### Resource Monitoring
98
99
Automatic tracking of CPU usage, memory consumption, disk I/O, and network activity for tasks and workers.
100
101
```python { .api }
102
# Resource monitoring metrics collected:
103
# - CPU utilization per core and total
104
# - Memory usage (RSS, VMS, available)
105
# - Disk I/O (read/write bytes and operations)
106
# - Network I/O (bytes sent/received)
107
# - Process information (PID, status, runtime)
108
# - System load and availability
109
```
110
111
**Resource Monitoring Configuration:**
112
113
```python
114
# Detailed resource monitoring
115
detailed_monitoring = MonitoringHub(
116
resource_monitoring_interval=5, # High-frequency monitoring
117
resource_monitoring_enabled=True,
118
monitoring_debug=True, # Enable debug logs
119
logdir='detailed_monitoring'
120
)
121
122
# Lightweight monitoring for production
123
production_monitoring = MonitoringHub(
124
resource_monitoring_interval=60, # Monitor every minute
125
resource_monitoring_enabled=True,
126
monitoring_debug=False,
127
db_url='postgresql://monitoring@prod-db:5432/parsl'
128
)
129
```
130
131
### Workflow Visualization
132
133
Integration with visualization tools for workflow analysis and performance review.
134
135
```python { .api }
136
# Parsl provides a web-based visualization tool:
137
# parsl-visualize command launches monitoring dashboard
138
139
# Command-line usage:
140
# parsl-visualize --db sqlite:///monitoring.db --host 0.0.0.0 --port 8080
141
```
142
143
**Visualization Example:**
144
145
```python
146
# After workflow execution with monitoring enabled
147
import subprocess
148
149
# Launch visualization server
150
subprocess.Popen([
151
'parsl-visualize',
152
'--db', 'sqlite:///workflow_monitoring.db',
153
'--host', '0.0.0.0',
154
'--port', '8080'
155
])
156
157
# Access dashboard at http://localhost:8080
158
print("Monitoring dashboard available at http://localhost:8080")
159
```
160
161
### Logging Configuration
162
163
Configure Parsl's logging system for debugging, development, and production monitoring.
164
165
```python { .api }
166
def set_stream_logger(name='parsl', level=logging.DEBUG, format_string=None):
167
"""
168
Configure stream-based logging to stdout/stderr.
169
170
Parameters:
171
- name: Logger name (default: 'parsl')
172
- level: Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
173
- format_string: Custom log format string
174
175
Returns:
176
logging.Logger: Configured logger instance
177
"""
178
179
def set_file_logger(filename, name='parsl', level=logging.DEBUG,
180
format_string=None):
181
"""
182
Configure file-based logging to specified file.
183
184
Parameters:
185
- filename: Log file path (str or AUTO_LOGNAME for automatic naming)
186
- name: Logger name (default: 'parsl')
187
- level: Logging level
188
- format_string: Custom log format string
189
190
Returns:
191
logging.Logger: Configured logger instance
192
"""
193
194
# Constants
195
AUTO_LOGNAME = -1 # Special value for automatic log filename generation
196
```
197
198
**Logging Examples:**
199
200
```python
201
import logging
202
from parsl.log_utils import set_stream_logger, set_file_logger
203
from parsl import AUTO_LOGNAME
204
205
# Stream logging for development
206
dev_logger = set_stream_logger(
207
name='parsl',
208
level=logging.DEBUG
209
)
210
211
# File logging with automatic filename
212
file_logger = set_file_logger(
213
filename=AUTO_LOGNAME, # Generates unique filename
214
level=logging.INFO
215
)
216
217
# Custom log format
218
custom_logger = set_file_logger(
219
filename='workflow.log',
220
level=logging.WARNING,
221
format_string='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
222
)
223
224
# Production logging configuration
225
production_logger = set_file_logger(
226
filename='/var/log/parsl/workflow.log',
227
level=logging.ERROR
228
)
229
```
230
231
### Monitoring Data Analysis
232
233
Access and analyze monitoring data programmatically for performance optimization and debugging.
234
235
```python { .api }
236
# Monitoring data can be accessed through:
237
# 1. Database queries (SQLite/PostgreSQL)
238
# 2. Log file analysis
239
# 3. Real-time monitoring API (when available)
240
```
241
242
**Data Analysis Example:**
243
244
```python
245
import sqlite3
246
import pandas as pd
247
248
# Connect to monitoring database
249
conn = sqlite3.connect('workflow_monitoring.db')
250
251
# Query task execution data
252
task_query = """
253
SELECT task_id, task_func_name, task_time_submitted,
254
task_time_returned, task_status, task_fail_count
255
FROM task
256
WHERE task_time_submitted > datetime('now', '-1 day')
257
"""
258
259
tasks_df = pd.read_sql_query(task_query, conn)
260
261
# Analyze task performance
262
avg_runtime = (pd.to_datetime(tasks_df['task_time_returned']) -
263
pd.to_datetime(tasks_df['task_time_submitted'])).mean()
264
265
print(f"Average task runtime: {avg_runtime}")
266
print(f"Failed tasks: {tasks_df[tasks_df['task_fail_count'] > 0].shape[0]}")
267
268
# Query resource usage
269
resource_query = """
270
SELECT timestamp, cpu_percent, memory_percent, disk_read, disk_write
271
FROM resource
272
ORDER BY timestamp DESC
273
LIMIT 1000
274
"""
275
276
resources_df = pd.read_sql_query(resource_query, conn)
277
278
# Plot resource usage over time
279
import matplotlib.pyplot as plt
280
281
plt.figure(figsize=(12, 8))
282
plt.subplot(2, 2, 1)
283
plt.plot(resources_df['timestamp'], resources_df['cpu_percent'])
284
plt.title('CPU Usage Over Time')
285
plt.xticks(rotation=45)
286
287
plt.subplot(2, 2, 2)
288
plt.plot(resources_df['timestamp'], resources_df['memory_percent'])
289
plt.title('Memory Usage Over Time')
290
plt.xticks(rotation=45)
291
292
plt.tight_layout()
293
plt.show()
294
295
conn.close()
296
```
297
298
### Advanced Monitoring Configuration
299
300
Advanced monitoring features for complex workflows and production environments.
301
302
```python
303
# Multi-hub monitoring for distributed workflows
304
from parsl.monitoring import MonitoringHub
305
306
# Central monitoring hub
307
central_hub = MonitoringHub(
308
hub_address='central-monitoring.example.com',
309
hub_port=55055,
310
db_url='postgresql://monitoring@central-db:5432/parsl',
311
resource_monitoring_interval=30
312
)
313
314
# Site-specific monitoring
315
site_hub = MonitoringHub(
316
hub_address='site-monitor.local',
317
hub_port=55056,
318
db_url='sqlite:///site_monitoring.db',
319
resource_monitoring_interval=10,
320
monitoring_debug=True
321
)
322
323
# Configure different executors with different monitoring
324
config = Config(
325
executors=[
326
HighThroughputExecutor(
327
label='central_compute',
328
provider=SlurmProvider(partition='compute')
329
),
330
HighThroughputExecutor(
331
label='local_testing',
332
provider=LocalProvider()
333
)
334
],
335
monitoring=central_hub # Global monitoring configuration
336
)
337
```
338
339
### Performance Optimization
340
341
Use monitoring data to optimize workflow performance and resource utilization.
342
343
```python
344
# Monitor workflow execution
345
@python_app
346
def monitored_task(task_id, size):
347
"""Task with performance monitoring."""
348
import time
349
import psutil
350
351
start_time = time.time()
352
start_memory = psutil.virtual_memory().used
353
354
# Simulate work
355
result = sum(range(size))
356
time.sleep(0.1)
357
358
end_time = time.time()
359
end_memory = psutil.virtual_memory().used
360
361
# Log performance data
362
print(f"Task {task_id}: Runtime={end_time-start_time:.2f}s, "
363
f"Memory={end_memory-start_memory} bytes")
364
365
return result
366
367
# Execute with monitoring
368
with parsl.load(config):
369
# Submit various task sizes
370
small_tasks = [monitored_task(i, 1000) for i in range(10)]
371
large_tasks = [monitored_task(i+10, 100000) for i in range(5)]
372
373
# Collect results and performance data
374
small_results = [f.result() for f in small_tasks]
375
large_results = [f.result() for f in large_tasks]
376
377
# Analyze monitoring data to optimize future runs
378
print("Check monitoring dashboard for performance analysis")
379
```
380
381
### Error and Exception Monitoring
382
383
Track and analyze errors and exceptions in workflow execution.
384
385
```python
386
from parsl.monitoring import MonitoringHub
387
388
# Configure monitoring with error tracking
389
error_monitoring = MonitoringHub(
390
monitoring_debug=True, # Capture detailed error information
391
logdir='error_monitoring'
392
)
393
394
@python_app
395
def error_prone_task(task_id):
396
"""Task that may fail for monitoring testing."""
397
import random
398
399
if random.random() < 0.2: # 20% failure rate
400
raise ValueError(f"Task {task_id} failed randomly")
401
402
return f"Task {task_id} completed successfully"
403
404
# Execute with error monitoring
405
config = Config(
406
executors=[ThreadPoolExecutor(max_threads=4)],
407
monitoring=error_monitoring,
408
retries=2 # Retry failed tasks
409
)
410
411
with parsl.load(config):
412
futures = [error_prone_task(i) for i in range(20)]
413
414
# Handle failures gracefully
415
for i, future in enumerate(futures):
416
try:
417
result = future.result()
418
print(f"Success: {result}")
419
except Exception as e:
420
print(f"Task {i} failed: {e}")
421
422
# Error data is captured in monitoring database for analysis
423
```