0
# Dashboard Integration
1
2
Optional web dashboard for monitoring multiprocessing jobs with real-time progress tracking and performance visualization. The dashboard provides a web interface for monitoring MPIRE worker pools and their progress in real-time.
3
4
## Capabilities
5
6
### Dashboard Management
7
8
Functions for starting, connecting to, and shutting down the MPIRE dashboard.
9
10
```python { .api }
11
def start_dashboard(port_range: Sequence = range(8080, 8100)) -> Dict[str, Union[int, str]]
12
def shutdown_dashboard() -> None
13
def connect_to_dashboard(manager_port_nr: int, manager_host: Optional[Union[bytes, str]] = None) -> None
14
```
15
16
**start_dashboard**: Start the dashboard server on an available port within the specified range.
17
- `port_range` (Sequence): Range of ports to try for the dashboard server
18
- Returns: Dictionary with dashboard URL and port information
19
20
**shutdown_dashboard**: Stop the running dashboard server.
21
22
**connect_to_dashboard**: Connect a WorkerPool to an existing dashboard for monitoring.
23
- `manager_port_nr` (int): Port number of the dashboard manager
24
- `manager_host` (Optional[Union[bytes, str]]): Host address of the dashboard manager
25
26
### Dashboard Utilities
27
28
Utility functions for dashboard configuration and management.
29
30
```python { .api }
31
def get_stacklevel() -> int
32
def set_stacklevel(stacklevel: int) -> None
33
```
34
35
**get_stacklevel**: Get the current stack level for dashboard function detection.
36
37
**set_stacklevel**: Set the stack level for dashboard function detection.
38
39
## Usage Examples
40
41
### Basic Dashboard Usage
42
43
```python
44
from mpire import WorkerPool
45
from mpire.dashboard import start_dashboard, shutdown_dashboard
46
import time
47
48
def slow_computation(x):
49
time.sleep(0.1) # Simulate work
50
return x ** 2
51
52
# Start the dashboard
53
dashboard_info = start_dashboard()
54
print(f"Dashboard started at: {dashboard_info['dashboard_url']}")
55
56
try:
57
# Use WorkerPool with dashboard progress tracking
58
with WorkerPool(n_jobs=4) as pool:
59
results = pool.map(
60
slow_computation,
61
range(100),
62
progress_bar=True,
63
progress_bar_style='dashboard' # Use dashboard progress bar
64
)
65
66
print("Processing completed!")
67
68
finally:
69
# Shutdown dashboard when done
70
shutdown_dashboard()
71
```
72
73
### Multiple WorkerPools with Dashboard
74
75
```python
76
from mpire import WorkerPool
77
from mpire.dashboard import start_dashboard, shutdown_dashboard, connect_to_dashboard
78
import time
79
import threading
80
81
def cpu_task(x):
82
# CPU-intensive task
83
result = 0
84
for i in range(x * 1000):
85
result += i
86
return result
87
88
def io_task(x):
89
# I/O simulation
90
time.sleep(0.05)
91
return f"processed_{x}"
92
93
# Start dashboard
94
dashboard_info = start_dashboard()
95
print(f"Dashboard available at: {dashboard_info['dashboard_url']}")
96
97
def run_cpu_pool():
98
"""Run CPU-intensive tasks"""
99
with WorkerPool(n_jobs=2) as pool:
100
connect_to_dashboard(dashboard_info['manager_port'])
101
results = pool.map(
102
cpu_task,
103
range(50),
104
progress_bar=True,
105
progress_bar_style='dashboard',
106
progress_bar_options={'desc': 'CPU Tasks'}
107
)
108
109
def run_io_pool():
110
"""Run I/O tasks"""
111
with WorkerPool(n_jobs=4) as pool:
112
connect_to_dashboard(dashboard_info['manager_port'])
113
results = pool.map(
114
io_task,
115
range(100),
116
progress_bar=True,
117
progress_bar_style='dashboard',
118
progress_bar_options={'desc': 'I/O Tasks'}
119
)
120
121
try:
122
# Run both pools concurrently
123
cpu_thread = threading.Thread(target=run_cpu_pool)
124
io_thread = threading.Thread(target=run_io_pool)
125
126
cpu_thread.start()
127
io_thread.start()
128
129
cpu_thread.join()
130
io_thread.join()
131
132
print("All tasks completed!")
133
134
finally:
135
shutdown_dashboard()
136
```
137
138
### Dashboard with Custom Progress Options
139
140
```python
141
from mpire import WorkerPool
142
from mpire.dashboard import start_dashboard, shutdown_dashboard
143
import time
144
import random
145
146
def variable_workload(task_id):
147
# Simulate variable processing time
148
sleep_time = random.uniform(0.05, 0.5)
149
time.sleep(sleep_time)
150
return f"Task {task_id} completed in {sleep_time:.2f}s"
151
152
# Start dashboard with custom port range
153
dashboard_info = start_dashboard(port_range=range(9000, 9010))
154
print(f"Dashboard URL: {dashboard_info['dashboard_url']}")
155
156
try:
157
with WorkerPool(n_jobs=3, enable_insights=True) as pool:
158
results = pool.map(
159
variable_workload,
160
range(50),
161
progress_bar=True,
162
progress_bar_style='dashboard',
163
progress_bar_options={
164
'desc': 'Variable Workload',
165
'unit': 'tasks',
166
'colour': 'green',
167
'position': 0,
168
'leave': True
169
}
170
)
171
172
print("Processing complete!")
173
174
finally:
175
shutdown_dashboard()
176
```
177
178
### Dashboard with Worker Insights
179
180
```python
181
from mpire import WorkerPool
182
from mpire.dashboard import start_dashboard, shutdown_dashboard
183
import time
184
import numpy as np
185
186
def data_processing_task(data_chunk):
187
"""Simulate data processing with varying complexity"""
188
# Simulate different processing complexities
189
complexity = len(data_chunk)
190
time.sleep(complexity * 0.01)
191
192
# Process the data
193
result = np.sum(data_chunk) if len(data_chunk) > 0 else 0
194
return result
195
196
# Generate test data
197
np.random.seed(42)
198
data_chunks = [np.random.rand(size) for size in np.random.randint(10, 100, 30)]
199
200
# Start dashboard
201
dashboard_info = start_dashboard()
202
print(f"Monitor progress at: {dashboard_info['dashboard_url']}")
203
204
try:
205
with WorkerPool(n_jobs=4, enable_insights=True) as pool:
206
results = pool.map(
207
data_processing_task,
208
data_chunks,
209
progress_bar=True,
210
progress_bar_style='dashboard',
211
progress_bar_options={
212
'desc': 'Data Processing',
213
'unit': 'chunks',
214
'miniters': 1,
215
'mininterval': 0.1
216
}
217
)
218
219
# Print insights after completion
220
pool.print_insights()
221
print(f"Processed {len(results)} data chunks")
222
223
finally:
224
shutdown_dashboard()
225
```
226
227
### Dashboard Error Handling
228
229
```python
230
from mpire import WorkerPool
231
from mpire.dashboard import start_dashboard, shutdown_dashboard
232
import time
233
234
def unreliable_task(x):
235
time.sleep(0.1)
236
# Simulate occasional failures
237
if x % 10 == 7: # Fail on multiples of 7
238
raise RuntimeError(f"Task {x} failed")
239
return x * 2
240
241
# Try to start dashboard with error handling
242
try:
243
dashboard_info = start_dashboard()
244
dashboard_started = True
245
print(f"Dashboard started: {dashboard_info['dashboard_url']}")
246
except Exception as e:
247
print(f"Could not start dashboard: {e}")
248
print("Continuing without dashboard...")
249
dashboard_started = False
250
251
try:
252
with WorkerPool(n_jobs=3) as pool:
253
# Use dashboard style if available, otherwise fall back to standard
254
progress_style = 'dashboard' if dashboard_started else 'std'
255
256
try:
257
results = pool.map(
258
unreliable_task,
259
range(50),
260
progress_bar=True,
261
progress_bar_style=progress_style,
262
progress_bar_options={'desc': 'Unreliable Tasks'}
263
)
264
print(f"Completed {len(results)} tasks successfully")
265
266
except Exception as e:
267
print(f"Some tasks failed: {e}")
268
# Continue processing with error handling...
269
270
finally:
271
if dashboard_started:
272
shutdown_dashboard()
273
```
274
275
### Dashboard with Multiple Progress Bars
276
277
```python
278
from mpire import WorkerPool
279
from mpire.dashboard import start_dashboard, shutdown_dashboard
280
import time
281
import concurrent.futures
282
283
def preprocessing_task(x):
284
time.sleep(0.05)
285
return x * 2
286
287
def main_processing_task(x):
288
time.sleep(0.1)
289
return x ** 2
290
291
def postprocessing_task(x):
292
time.sleep(0.03)
293
return x + 10
294
295
# Start dashboard
296
dashboard_info = start_dashboard()
297
print(f"Monitor at: {dashboard_info['dashboard_url']}")
298
299
try:
300
# Create data pipeline with multiple stages
301
input_data = range(30)
302
303
# Stage 1: Preprocessing
304
with WorkerPool(n_jobs=2) as pool:
305
preprocessed = pool.map(
306
preprocessing_task,
307
input_data,
308
progress_bar=True,
309
progress_bar_style='dashboard',
310
progress_bar_options={'desc': 'Preprocessing', 'position': 0}
311
)
312
313
# Stage 2: Main processing
314
with WorkerPool(n_jobs=3) as pool:
315
processed = pool.map(
316
main_processing_task,
317
preprocessed,
318
progress_bar=True,
319
progress_bar_style='dashboard',
320
progress_bar_options={'desc': 'Main Processing', 'position': 1}
321
)
322
323
# Stage 3: Postprocessing
324
with WorkerPool(n_jobs=2) as pool:
325
final_results = pool.map(
326
postprocessing_task,
327
processed,
328
progress_bar=True,
329
progress_bar_style='dashboard',
330
progress_bar_options={'desc': 'Postprocessing', 'position': 2}
331
)
332
333
print(f"Pipeline completed! Final results: {final_results[:5]}...")
334
335
finally:
336
shutdown_dashboard()
337
```
338
339
### Dashboard Installation Check
340
341
```python
342
def check_dashboard_availability():
343
"""Check if dashboard dependencies are available"""
344
try:
345
from mpire.dashboard import start_dashboard, shutdown_dashboard
346
return True
347
except ImportError:
348
return False
349
350
def run_with_optional_dashboard():
351
"""Run processing with dashboard if available"""
352
353
dashboard_available = check_dashboard_availability()
354
dashboard_started = False
355
356
if dashboard_available:
357
try:
358
from mpire.dashboard import start_dashboard, shutdown_dashboard
359
dashboard_info = start_dashboard()
360
dashboard_started = True
361
print(f"Dashboard available at: {dashboard_info['dashboard_url']}")
362
except Exception as e:
363
print(f"Dashboard start failed: {e}")
364
else:
365
print("Dashboard dependencies not installed. Install with: pip install mpire[dashboard]")
366
367
# Processing function
368
def compute_task(x):
369
import time
370
time.sleep(0.1)
371
return x ** 2
372
373
try:
374
from mpire import WorkerPool
375
376
with WorkerPool(n_jobs=4) as pool:
377
progress_style = 'dashboard' if dashboard_started else 'std'
378
379
results = pool.map(
380
compute_task,
381
range(20),
382
progress_bar=True,
383
progress_bar_style=progress_style
384
)
385
386
print(f"Completed {len(results)} tasks")
387
388
finally:
389
if dashboard_started:
390
shutdown_dashboard()
391
392
# Run the example
393
run_with_optional_dashboard()
394
```