or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

app-decorators.mdconfiguration.mddata-management.mdexecutors.mdindex.mdlaunchers.mdmonitoring.mdproviders.mdworkflow-management.md

monitoring.mddocs/

0

# Monitoring and Logging

1

2

Parsl's monitoring and logging system provides comprehensive tracking of workflow execution, resource usage, performance metrics, and debugging information through the MonitoringHub and logging utilities.

3

4

## Capabilities

5

6

### MonitoringHub

7

8

Central monitoring system that collects and stores workflow execution data, resource usage metrics, and performance information.

9

10

```python { .api }

11

class MonitoringHub:

12

def __init__(self, hub_address, hub_port=None, hub_port_range=(55050, 56000),

13

workflow_name=None, workflow_version=None, logging_endpoint=None,

14

monitoring_debug=False, resource_monitoring_enabled=True,

15

resource_monitoring_interval=30):

16

"""

17

Initialize workflow monitoring and resource tracking system.

18

19

Parameters:

20

- hub_address: Address for monitoring hub (required)

21

- hub_port: Port for hub communication (auto-selected if None)

22

- hub_port_range: Port range for executor monitoring messages (default: (55050, 56000))

23

- workflow_name: Name for the workflow (default: script name)

24

- workflow_version: Version of the workflow (default: start datetime)

25

- logging_endpoint: Database connection URL for monitoring data

26

- monitoring_debug: Enable debug logging (default: False)

27

- resource_monitoring_enabled: Enable resource monitoring (default: True)

28

- resource_monitoring_interval: Resource polling interval in seconds (default: 30)

29

"""

30

```

31

32

**Basic Monitoring Setup:**

33

34

```python

35

from parsl.config import Config

36

from parsl.monitoring import MonitoringHub

37

from parsl.executors import HighThroughputExecutor

38

39

# Configure monitoring

40

monitoring = MonitoringHub(

41

hub_address='localhost',

42

hub_port=55055,

43

resource_monitoring_interval=10, # Monitor every 10 seconds

44

logdir='parsl_monitoring_logs'

45

)

46

47

# Configure Parsl with monitoring

48

config = Config(

49

executors=[HighThroughputExecutor(max_workers=4)],

50

monitoring=monitoring

51

)

52

53

import parsl

54

with parsl.load(config):

55

# All tasks are automatically monitored

56

futures = [my_app(i) for i in range(10)]

57

results = [f.result() for f in futures]

58

```

59

60

### Database Storage

61

62

Configure persistent storage for monitoring data using SQLite or PostgreSQL databases.

63

64

```python { .api }

65

# Database URL formats:

66

# SQLite: 'sqlite:///monitoring.db'

67

# PostgreSQL: 'postgresql://user:password@host:port/database'

68

```

69

70

**Database Monitoring Example:**

71

72

```python

73

from parsl.monitoring import MonitoringHub

74

75

# SQLite database storage

76

sqlite_monitoring = MonitoringHub(

77

hub_address='localhost',

78

hub_port=55055,

79

db_url='sqlite:///workflow_monitoring.db',

80

logdir='monitoring_logs'

81

)

82

83

# PostgreSQL database storage

84

postgres_monitoring = MonitoringHub(

85

hub_address='monitoring.example.com',

86

hub_port=55055,

87

db_url='postgresql://parsl:password@db.example.com:5432/monitoring',

88

resource_monitoring_interval=15

89

)

90

91

config = Config(

92

executors=[HighThroughputExecutor(max_workers=8)],

93

monitoring=sqlite_monitoring

94

)

95

```

96

97

### Resource Monitoring

98

99

Automatic tracking of CPU usage, memory consumption, disk I/O, and network activity for tasks and workers.

100

101

```python { .api }

102

# Resource monitoring metrics collected:

103

# - CPU utilization per core and total

104

# - Memory usage (RSS, VMS, available)

105

# - Disk I/O (read/write bytes and operations)

106

# - Network I/O (bytes sent/received)

107

# - Process information (PID, status, runtime)

108

# - System load and availability

109

```

110

111

**Resource Monitoring Configuration:**

112

113

```python

114

# Detailed resource monitoring

115

detailed_monitoring = MonitoringHub(

116

resource_monitoring_interval=5, # High-frequency monitoring

117

resource_monitoring_enabled=True,

118

monitoring_debug=True, # Enable debug logs

119

logdir='detailed_monitoring'

120

)

121

122

# Lightweight monitoring for production

123

production_monitoring = MonitoringHub(

124

resource_monitoring_interval=60, # Monitor every minute

125

resource_monitoring_enabled=True,

126

monitoring_debug=False,

127

db_url='postgresql://monitoring@prod-db:5432/parsl'

128

)

129

```

130

131

### Workflow Visualization

132

133

Integration with visualization tools for workflow analysis and performance review.

134

135

```python { .api }

136

# Parsl provides a web-based visualization tool:

137

# parsl-visualize command launches monitoring dashboard

138

139

# Command-line usage:

140

# parsl-visualize --db sqlite:///monitoring.db --host 0.0.0.0 --port 8080

141

```

142

143

**Visualization Example:**

144

145

```python

146

# After workflow execution with monitoring enabled

147

import subprocess

148

149

# Launch visualization server

150

subprocess.Popen([

151

'parsl-visualize',

152

'--db', 'sqlite:///workflow_monitoring.db',

153

'--host', '0.0.0.0',

154

'--port', '8080'

155

])

156

157

# Access dashboard at http://localhost:8080

158

print("Monitoring dashboard available at http://localhost:8080")

159

```

160

161

### Logging Configuration

162

163

Configure Parsl's logging system for debugging, development, and production monitoring.

164

165

```python { .api }

166

def set_stream_logger(name='parsl', level=logging.DEBUG, format_string=None):

167

"""

168

Configure stream-based logging to stdout/stderr.

169

170

Parameters:

171

- name: Logger name (default: 'parsl')

172

- level: Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)

173

- format_string: Custom log format string

174

175

Returns:

176

logging.Logger: Configured logger instance

177

"""

178

179

def set_file_logger(filename, name='parsl', level=logging.DEBUG,

180

format_string=None):

181

"""

182

Configure file-based logging to specified file.

183

184

Parameters:

185

- filename: Log file path (str or AUTO_LOGNAME for automatic naming)

186

- name: Logger name (default: 'parsl')

187

- level: Logging level

188

- format_string: Custom log format string

189

190

Returns:

191

logging.Logger: Configured logger instance

192

"""

193

194

# Constants

195

AUTO_LOGNAME = -1 # Special value for automatic log filename generation

196

```

197

198

**Logging Examples:**

199

200

```python

201

import logging

202

from parsl.log_utils import set_stream_logger, set_file_logger

203

from parsl import AUTO_LOGNAME

204

205

# Stream logging for development

206

dev_logger = set_stream_logger(

207

name='parsl',

208

level=logging.DEBUG

209

)

210

211

# File logging with automatic filename

212

file_logger = set_file_logger(

213

filename=AUTO_LOGNAME, # Generates unique filename

214

level=logging.INFO

215

)

216

217

# Custom log format

218

custom_logger = set_file_logger(

219

filename='workflow.log',

220

level=logging.WARNING,

221

format_string='%(asctime)s - %(name)s - %(levelname)s - %(message)s'

222

)

223

224

# Production logging configuration

225

production_logger = set_file_logger(

226

filename='/var/log/parsl/workflow.log',

227

level=logging.ERROR

228

)

229

```

230

231

### Monitoring Data Analysis

232

233

Access and analyze monitoring data programmatically for performance optimization and debugging.

234

235

```python { .api }

236

# Monitoring data can be accessed through:

237

# 1. Database queries (SQLite/PostgreSQL)

238

# 2. Log file analysis

239

# 3. Real-time monitoring API (when available)

240

```

241

242

**Data Analysis Example:**

243

244

```python

245

import sqlite3

246

import pandas as pd

247

248

# Connect to monitoring database

249

conn = sqlite3.connect('workflow_monitoring.db')

250

251

# Query task execution data

252

task_query = """

253

SELECT task_id, task_func_name, task_time_submitted,

254

task_time_returned, task_status, task_fail_count

255

FROM task

256

WHERE task_time_submitted > datetime('now', '-1 day')

257

"""

258

259

tasks_df = pd.read_sql_query(task_query, conn)

260

261

# Analyze task performance

262

avg_runtime = (pd.to_datetime(tasks_df['task_time_returned']) -

263

pd.to_datetime(tasks_df['task_time_submitted'])).mean()

264

265

print(f"Average task runtime: {avg_runtime}")

266

print(f"Failed tasks: {tasks_df[tasks_df['task_fail_count'] > 0].shape[0]}")

267

268

# Query resource usage

269

resource_query = """

270

SELECT timestamp, cpu_percent, memory_percent, disk_read, disk_write

271

FROM resource

272

ORDER BY timestamp DESC

273

LIMIT 1000

274

"""

275

276

resources_df = pd.read_sql_query(resource_query, conn)

277

278

# Plot resource usage over time

279

import matplotlib.pyplot as plt

280

281

plt.figure(figsize=(12, 8))

282

plt.subplot(2, 2, 1)

283

plt.plot(resources_df['timestamp'], resources_df['cpu_percent'])

284

plt.title('CPU Usage Over Time')

285

plt.xticks(rotation=45)

286

287

plt.subplot(2, 2, 2)

288

plt.plot(resources_df['timestamp'], resources_df['memory_percent'])

289

plt.title('Memory Usage Over Time')

290

plt.xticks(rotation=45)

291

292

plt.tight_layout()

293

plt.show()

294

295

conn.close()

296

```

297

298

### Advanced Monitoring Configuration

299

300

Advanced monitoring features for complex workflows and production environments.

301

302

```python

303

# Multi-hub monitoring for distributed workflows

304

from parsl.monitoring import MonitoringHub

305

306

# Central monitoring hub

307

central_hub = MonitoringHub(

308

hub_address='central-monitoring.example.com',

309

hub_port=55055,

310

db_url='postgresql://monitoring@central-db:5432/parsl',

311

resource_monitoring_interval=30

312

)

313

314

# Site-specific monitoring

315

site_hub = MonitoringHub(

316

hub_address='site-monitor.local',

317

hub_port=55056,

318

db_url='sqlite:///site_monitoring.db',

319

resource_monitoring_interval=10,

320

monitoring_debug=True

321

)

322

323

# Configure different executors with different monitoring

324

config = Config(

325

executors=[

326

HighThroughputExecutor(

327

label='central_compute',

328

provider=SlurmProvider(partition='compute')

329

),

330

HighThroughputExecutor(

331

label='local_testing',

332

provider=LocalProvider()

333

)

334

],

335

monitoring=central_hub # Global monitoring configuration

336

)

337

```

338

339

### Performance Optimization

340

341

Use monitoring data to optimize workflow performance and resource utilization.

342

343

```python

344

# Monitor workflow execution

345

@python_app

346

def monitored_task(task_id, size):

347

"""Task with performance monitoring."""

348

import time

349

import psutil

350

351

start_time = time.time()

352

start_memory = psutil.virtual_memory().used

353

354

# Simulate work

355

result = sum(range(size))

356

time.sleep(0.1)

357

358

end_time = time.time()

359

end_memory = psutil.virtual_memory().used

360

361

# Log performance data

362

print(f"Task {task_id}: Runtime={end_time-start_time:.2f}s, "

363

f"Memory={end_memory-start_memory} bytes")

364

365

return result

366

367

# Execute with monitoring

368

with parsl.load(config):

369

# Submit various task sizes

370

small_tasks = [monitored_task(i, 1000) for i in range(10)]

371

large_tasks = [monitored_task(i+10, 100000) for i in range(5)]

372

373

# Collect results and performance data

374

small_results = [f.result() for f in small_tasks]

375

large_results = [f.result() for f in large_tasks]

376

377

# Analyze monitoring data to optimize future runs

378

print("Check monitoring dashboard for performance analysis")

379

```

380

381

### Error and Exception Monitoring

382

383

Track and analyze errors and exceptions in workflow execution.

384

385

```python

386

from parsl.monitoring import MonitoringHub

387

388

# Configure monitoring with error tracking

389

error_monitoring = MonitoringHub(

390

monitoring_debug=True, # Capture detailed error information

391

logdir='error_monitoring'

392

)

393

394

@python_app

395

def error_prone_task(task_id):

396

"""Task that may fail for monitoring testing."""

397

import random

398

399

if random.random() < 0.2: # 20% failure rate

400

raise ValueError(f"Task {task_id} failed randomly")

401

402

return f"Task {task_id} completed successfully"

403

404

# Execute with error monitoring

405

config = Config(

406

executors=[ThreadPoolExecutor(max_threads=4)],

407

monitoring=error_monitoring,

408

retries=2 # Retry failed tasks

409

)

410

411

with parsl.load(config):

412

futures = [error_prone_task(i) for i in range(20)]

413

414

# Handle failures gracefully

415

for i, future in enumerate(futures):

416

try:

417

result = future.result()

418

print(f"Success: {result}")

419

except Exception as e:

420

print(f"Task {i} failed: {e}")

421

422

# Error data is captured in monitoring database for analysis

423

```