or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

apply-functions.mdasync-results.mddashboard-integration.mdexception-handling.mdindex.mdparallel-map.mdperformance-insights.mdutility-functions.mdworker-configuration.mdworkerpool-management.md

dashboard-integration.mddocs/

0

# Dashboard Integration

1

2

Optional web dashboard for monitoring multiprocessing jobs with real-time progress tracking and performance visualization. The dashboard provides a web interface for monitoring MPIRE worker pools and their progress in real-time.

3

4

## Capabilities

5

6

### Dashboard Management

7

8

Functions for starting, connecting to, and shutting down the MPIRE dashboard.

9

10

```python { .api }

11

def start_dashboard(port_range: Sequence = range(8080, 8100)) -> Dict[str, Union[int, str]]

12

def shutdown_dashboard() -> None

13

def connect_to_dashboard(manager_port_nr: int, manager_host: Optional[Union[bytes, str]] = None) -> None

14

```

15

16

**start_dashboard**: Start the dashboard server on an available port within the specified range.

17

- `port_range` (Sequence): Range of ports to try for the dashboard server

18

- Returns: Dictionary with dashboard URL and port information

19

20

**shutdown_dashboard**: Stop the running dashboard server.

21

22

**connect_to_dashboard**: Connect a WorkerPool to an existing dashboard for monitoring.

23

- `manager_port_nr` (int): Port number of the dashboard manager

24

- `manager_host` (Optional[Union[bytes, str]]): Host address of the dashboard manager

25

26

### Dashboard Utilities

27

28

Utility functions for dashboard configuration and management.

29

30

```python { .api }

31

def get_stacklevel() -> int

32

def set_stacklevel(stacklevel: int) -> None

33

```

34

35

**get_stacklevel**: Get the current stack level for dashboard function detection.

36

37

**set_stacklevel**: Set the stack level for dashboard function detection.

38

39

## Usage Examples

40

41

### Basic Dashboard Usage

42

43

```python

44

from mpire import WorkerPool

45

from mpire.dashboard import start_dashboard, shutdown_dashboard

46

import time

47

48

def slow_computation(x):

49

time.sleep(0.1) # Simulate work

50

return x ** 2

51

52

# Start the dashboard

53

dashboard_info = start_dashboard()

54

print(f"Dashboard started at: {dashboard_info['dashboard_url']}")

55

56

try:

57

# Use WorkerPool with dashboard progress tracking

58

with WorkerPool(n_jobs=4) as pool:

59

results = pool.map(

60

slow_computation,

61

range(100),

62

progress_bar=True,

63

progress_bar_style='dashboard' # Use dashboard progress bar

64

)

65

66

print("Processing completed!")

67

68

finally:

69

# Shutdown dashboard when done

70

shutdown_dashboard()

71

```

72

73

### Multiple WorkerPools with Dashboard

74

75

```python

76

from mpire import WorkerPool

77

from mpire.dashboard import start_dashboard, shutdown_dashboard, connect_to_dashboard

78

import time

79

import threading

80

81

def cpu_task(x):

82

# CPU-intensive task

83

result = 0

84

for i in range(x * 1000):

85

result += i

86

return result

87

88

def io_task(x):

89

# I/O simulation

90

time.sleep(0.05)

91

return f"processed_{x}"

92

93

# Start dashboard

94

dashboard_info = start_dashboard()

95

print(f"Dashboard available at: {dashboard_info['dashboard_url']}")

96

97

def run_cpu_pool():

98

"""Run CPU-intensive tasks"""

99

with WorkerPool(n_jobs=2) as pool:

100

connect_to_dashboard(dashboard_info['manager_port'])

101

results = pool.map(

102

cpu_task,

103

range(50),

104

progress_bar=True,

105

progress_bar_style='dashboard',

106

progress_bar_options={'desc': 'CPU Tasks'}

107

)

108

109

def run_io_pool():

110

"""Run I/O tasks"""

111

with WorkerPool(n_jobs=4) as pool:

112

connect_to_dashboard(dashboard_info['manager_port'])

113

results = pool.map(

114

io_task,

115

range(100),

116

progress_bar=True,

117

progress_bar_style='dashboard',

118

progress_bar_options={'desc': 'I/O Tasks'}

119

)

120

121

try:

122

# Run both pools concurrently

123

cpu_thread = threading.Thread(target=run_cpu_pool)

124

io_thread = threading.Thread(target=run_io_pool)

125

126

cpu_thread.start()

127

io_thread.start()

128

129

cpu_thread.join()

130

io_thread.join()

131

132

print("All tasks completed!")

133

134

finally:

135

shutdown_dashboard()

136

```

137

138

### Dashboard with Custom Progress Options

139

140

```python

141

from mpire import WorkerPool

142

from mpire.dashboard import start_dashboard, shutdown_dashboard

143

import time

144

import random

145

146

def variable_workload(task_id):

147

# Simulate variable processing time

148

sleep_time = random.uniform(0.05, 0.5)

149

time.sleep(sleep_time)

150

return f"Task {task_id} completed in {sleep_time:.2f}s"

151

152

# Start dashboard with custom port range

153

dashboard_info = start_dashboard(port_range=range(9000, 9010))

154

print(f"Dashboard URL: {dashboard_info['dashboard_url']}")

155

156

try:

157

with WorkerPool(n_jobs=3, enable_insights=True) as pool:

158

results = pool.map(

159

variable_workload,

160

range(50),

161

progress_bar=True,

162

progress_bar_style='dashboard',

163

progress_bar_options={

164

'desc': 'Variable Workload',

165

'unit': 'tasks',

166

'colour': 'green',

167

'position': 0,

168

'leave': True

169

}

170

)

171

172

print("Processing complete!")

173

174

finally:

175

shutdown_dashboard()

176

```

177

178

### Dashboard with Worker Insights

179

180

```python

181

from mpire import WorkerPool

182

from mpire.dashboard import start_dashboard, shutdown_dashboard

183

import time

184

import numpy as np

185

186

def data_processing_task(data_chunk):

187

"""Simulate data processing with varying complexity"""

188

# Simulate different processing complexities

189

complexity = len(data_chunk)

190

time.sleep(complexity * 0.01)

191

192

# Process the data

193

result = np.sum(data_chunk) if len(data_chunk) > 0 else 0

194

return result

195

196

# Generate test data

197

np.random.seed(42)

198

data_chunks = [np.random.rand(size) for size in np.random.randint(10, 100, 30)]

199

200

# Start dashboard

201

dashboard_info = start_dashboard()

202

print(f"Monitor progress at: {dashboard_info['dashboard_url']}")

203

204

try:

205

with WorkerPool(n_jobs=4, enable_insights=True) as pool:

206

results = pool.map(

207

data_processing_task,

208

data_chunks,

209

progress_bar=True,

210

progress_bar_style='dashboard',

211

progress_bar_options={

212

'desc': 'Data Processing',

213

'unit': 'chunks',

214

'miniters': 1,

215

'mininterval': 0.1

216

}

217

)

218

219

# Print insights after completion

220

pool.print_insights()

221

print(f"Processed {len(results)} data chunks")

222

223

finally:

224

shutdown_dashboard()

225

```

226

227

### Dashboard Error Handling

228

229

```python

230

from mpire import WorkerPool

231

from mpire.dashboard import start_dashboard, shutdown_dashboard

232

import time

233

234

def unreliable_task(x):

235

time.sleep(0.1)

236

# Simulate occasional failures

237

if x % 10 == 7: # Fail on multiples of 7

238

raise RuntimeError(f"Task {x} failed")

239

return x * 2

240

241

# Try to start dashboard with error handling

242

try:

243

dashboard_info = start_dashboard()

244

dashboard_started = True

245

print(f"Dashboard started: {dashboard_info['dashboard_url']}")

246

except Exception as e:

247

print(f"Could not start dashboard: {e}")

248

print("Continuing without dashboard...")

249

dashboard_started = False

250

251

try:

252

with WorkerPool(n_jobs=3) as pool:

253

# Use dashboard style if available, otherwise fall back to standard

254

progress_style = 'dashboard' if dashboard_started else 'std'

255

256

try:

257

results = pool.map(

258

unreliable_task,

259

range(50),

260

progress_bar=True,

261

progress_bar_style=progress_style,

262

progress_bar_options={'desc': 'Unreliable Tasks'}

263

)

264

print(f"Completed {len(results)} tasks successfully")

265

266

except Exception as e:

267

print(f"Some tasks failed: {e}")

268

# Continue processing with error handling...

269

270

finally:

271

if dashboard_started:

272

shutdown_dashboard()

273

```

274

275

### Dashboard with Multiple Progress Bars

276

277

```python

278

from mpire import WorkerPool

279

from mpire.dashboard import start_dashboard, shutdown_dashboard

280

import time

281

import concurrent.futures

282

283

def preprocessing_task(x):

284

time.sleep(0.05)

285

return x * 2

286

287

def main_processing_task(x):

288

time.sleep(0.1)

289

return x ** 2

290

291

def postprocessing_task(x):

292

time.sleep(0.03)

293

return x + 10

294

295

# Start dashboard

296

dashboard_info = start_dashboard()

297

print(f"Monitor at: {dashboard_info['dashboard_url']}")

298

299

try:

300

# Create data pipeline with multiple stages

301

input_data = range(30)

302

303

# Stage 1: Preprocessing

304

with WorkerPool(n_jobs=2) as pool:

305

preprocessed = pool.map(

306

preprocessing_task,

307

input_data,

308

progress_bar=True,

309

progress_bar_style='dashboard',

310

progress_bar_options={'desc': 'Preprocessing', 'position': 0}

311

)

312

313

# Stage 2: Main processing

314

with WorkerPool(n_jobs=3) as pool:

315

processed = pool.map(

316

main_processing_task,

317

preprocessed,

318

progress_bar=True,

319

progress_bar_style='dashboard',

320

progress_bar_options={'desc': 'Main Processing', 'position': 1}

321

)

322

323

# Stage 3: Postprocessing

324

with WorkerPool(n_jobs=2) as pool:

325

final_results = pool.map(

326

postprocessing_task,

327

processed,

328

progress_bar=True,

329

progress_bar_style='dashboard',

330

progress_bar_options={'desc': 'Postprocessing', 'position': 2}

331

)

332

333

print(f"Pipeline completed! Final results: {final_results[:5]}...")

334

335

finally:

336

shutdown_dashboard()

337

```

338

339

### Dashboard Installation Check

340

341

```python

342

def check_dashboard_availability():

343

"""Check if dashboard dependencies are available"""

344

try:

345

from mpire.dashboard import start_dashboard, shutdown_dashboard

346

return True

347

except ImportError:

348

return False

349

350

def run_with_optional_dashboard():

351

"""Run processing with dashboard if available"""

352

353

dashboard_available = check_dashboard_availability()

354

dashboard_started = False

355

356

if dashboard_available:

357

try:

358

from mpire.dashboard import start_dashboard, shutdown_dashboard

359

dashboard_info = start_dashboard()

360

dashboard_started = True

361

print(f"Dashboard available at: {dashboard_info['dashboard_url']}")

362

except Exception as e:

363

print(f"Dashboard start failed: {e}")

364

else:

365

print("Dashboard dependencies not installed. Install with: pip install mpire[dashboard]")

366

367

# Processing function

368

def compute_task(x):

369

import time

370

time.sleep(0.1)

371

return x ** 2

372

373

try:

374

from mpire import WorkerPool

375

376

with WorkerPool(n_jobs=4) as pool:

377

progress_style = 'dashboard' if dashboard_started else 'std'

378

379

results = pool.map(

380

compute_task,

381

range(20),

382

progress_bar=True,

383

progress_bar_style=progress_style

384

)

385

386

print(f"Completed {len(results)} tasks")

387

388

finally:

389

if dashboard_started:

390

shutdown_dashboard()

391

392

# Run the example

393

run_with_optional_dashboard()

394

```