or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

apply-functions.mdasync-results.mddashboard-integration.mdexception-handling.mdindex.mdparallel-map.mdperformance-insights.mdutility-functions.mdworker-configuration.mdworkerpool-management.md

performance-insights.mddocs/

0

# Performance Insights

1

2

Worker performance monitoring and insights for analyzing multiprocessing efficiency. The insights system provides detailed timing data, task completion statistics, and performance metrics to help optimize parallel processing workloads.

3

4

## Capabilities

5

6

### Insights Management

7

8

Functions for retrieving and displaying worker performance insights.

9

10

```python { .api }

11

def print_insights(self) -> None

12

def get_insights(self) -> Dict

13

def get_exit_results(self) -> List

14

```

15

16

**print_insights**: Print formatted performance insights to console with timing breakdown and efficiency metrics.

17

18

**get_insights**: Return performance insights as a dictionary with detailed worker statistics.

19

20

**get_exit_results**: Get results returned by worker exit functions (if any).

21

22

### WorkerInsights Class

23

24

Internal class for managing worker performance data.

25

26

```python { .api }

27

class WorkerInsights:

28

def __init__(self, ctx: multiprocessing.context.BaseContext, n_jobs: int, use_dill: bool) -> None

29

def enable_insights(self) -> None

30

def disable_insights(self) -> None

31

def reset_insights(self, start_time: float) -> None

32

```

33

34

## Usage Examples

35

36

### Basic Insights Usage

37

38

```python

39

from mpire import WorkerPool

40

import time

41

42

def slow_function(x):

43

time.sleep(0.1) # Simulate work

44

return x * x

45

46

# Enable insights during pool creation

47

with WorkerPool(n_jobs=4, enable_insights=True) as pool:

48

results = pool.map(slow_function, range(20))

49

50

# Print formatted insights

51

pool.print_insights()

52

53

# Get insights as dictionary

54

insights = pool.get_insights()

55

print("Total tasks:", insights['n_completed_tasks'])

56

print("Average task time:", insights['avg_task_duration'])

57

```

58

59

### Detailed Performance Analysis

60

61

```python

62

from mpire import WorkerPool

63

import time

64

import random

65

66

def variable_workload(x):

67

# Simulate variable processing time

68

sleep_time = random.uniform(0.01, 0.2)

69

time.sleep(sleep_time)

70

return x ** 2

71

72

with WorkerPool(n_jobs=4, enable_insights=True) as pool:

73

results = pool.map(variable_workload, range(100), chunk_size=10)

74

75

insights = pool.get_insights()

76

77

print("=== Performance Insights ===")

78

print(f"Total completed tasks: {insights['n_completed_tasks']}")

79

print(f"Total execution time: {insights['total_elapsed_time']:.2f}s")

80

print(f"Average task duration: {insights['avg_task_duration']:.4f}s")

81

print(f"Worker efficiency: {insights['efficiency']:.2%}")

82

83

# Per-worker statistics

84

for worker_id in range(4):

85

worker_stats = insights[f'worker_{worker_id}']

86

print(f"Worker {worker_id}:")

87

print(f" Tasks completed: {worker_stats['n_completed_tasks']}")

88

print(f" Working time: {worker_stats['working_time']:.2f}s")

89

print(f" Waiting time: {worker_stats['waiting_time']:.2f}s")

90

print(f" Efficiency: {worker_stats['efficiency']:.2%}")

91

```

92

93

### Comparing Different Configurations

94

95

```python

96

from mpire import WorkerPool

97

import time

98

99

def cpu_bound_task(n):

100

# CPU-intensive task

101

result = 0

102

for i in range(n * 1000):

103

result += i

104

return result

105

106

def benchmark_configuration(n_jobs, chunk_size, data):

107

"""Benchmark a specific configuration"""

108

with WorkerPool(n_jobs=n_jobs, enable_insights=True) as pool:

109

results = pool.map(cpu_bound_task, data, chunk_size=chunk_size)

110

insights = pool.get_insights()

111

112

return {

113

'n_jobs': n_jobs,

114

'chunk_size': chunk_size,

115

'total_time': insights['total_elapsed_time'],

116

'efficiency': insights['efficiency'],

117

'avg_task_time': insights['avg_task_duration']

118

}

119

120

# Test different configurations

121

data = range(100)

122

configurations = [

123

(2, 5), (2, 10), (2, 25),

124

(4, 5), (4, 10), (4, 25),

125

(8, 5), (8, 10), (8, 25)

126

]

127

128

results = []

129

for n_jobs, chunk_size in configurations:

130

result = benchmark_configuration(n_jobs, chunk_size, data)

131

results.append(result)

132

print(f"Jobs: {n_jobs}, Chunk: {chunk_size:2d} => "

133

f"Time: {result['total_time']:.2f}s, "

134

f"Efficiency: {result['efficiency']:.2%}")

135

136

# Find best configuration

137

best = min(results, key=lambda x: x['total_time'])

138

print(f"\nBest configuration: {best['n_jobs']} jobs, chunk size {best['chunk_size']}")

139

```

140

141

### Worker Initialization Insights

142

143

```python

144

def expensive_init(worker_state):

145

"""Simulate expensive worker initialization"""

146

import time

147

time.sleep(1) # Expensive setup

148

worker_state['start_time'] = time.time()

149

return "initialization_complete"

150

151

def quick_task(worker_state, x):

152

return x * 2

153

154

with WorkerPool(n_jobs=4, use_worker_state=True, enable_insights=True) as pool:

155

results = pool.map(

156

quick_task,

157

range(20),

158

worker_init=expensive_init,

159

chunk_size=5

160

)

161

162

insights = pool.get_insights()

163

164

print("=== Initialization Impact ===")

165

print(f"Total init time: {insights['total_init_time']:.2f}s")

166

print(f"Average init time per worker: {insights['avg_init_time']:.2f}s")

167

print(f"Total working time: {insights['total_working_time']:.2f}s")

168

print(f"Init overhead: {insights['init_overhead_percentage']:.1%}")

169

170

# Check if initialization time dominates

171

if insights['init_overhead_percentage'] > 0.5:

172

print("Consider using keep_alive=True for better performance")

173

```

174

175

### Memory and Resource Insights

176

177

```python

178

import psutil

179

import os

180

181

def memory_intensive_task(size):

182

"""Task that uses significant memory"""

183

data = list(range(size * 1000))

184

return sum(data)

185

186

def monitor_memory_usage():

187

"""Get current memory usage"""

188

process = psutil.Process(os.getpid())

189

return process.memory_info().rss / 1024 / 1024 # MB

190

191

# Monitor memory usage with insights

192

initial_memory = monitor_memory_usage()

193

194

with WorkerPool(n_jobs=2, enable_insights=True) as pool:

195

results = pool.map(memory_intensive_task, range(50))

196

197

peak_memory = monitor_memory_usage()

198

insights = pool.get_insights()

199

200

print("=== Memory Usage Analysis ===")

201

print(f"Initial memory: {initial_memory:.1f} MB")

202

print(f"Peak memory: {peak_memory:.1f} MB")

203

print(f"Memory increase: {peak_memory - initial_memory:.1f} MB")

204

print(f"Tasks completed: {insights['n_completed_tasks']}")

205

print(f"Memory per task: {(peak_memory - initial_memory) / insights['n_completed_tasks']:.2f} MB")

206

207

pool.print_insights()

208

```

209

210

### Worker Exit Results

211

212

```python

213

def init_worker(worker_state):

214

worker_state['processed_items'] = []

215

worker_state['error_count'] = 0

216

217

def process_item(worker_state, item):

218

try:

219

result = item * 2

220

worker_state['processed_items'].append(item)

221

return result

222

except Exception:

223

worker_state['error_count'] += 1

224

raise

225

226

def cleanup_worker(worker_state):

227

"""Return summary of worker's work"""

228

return {

229

'total_processed': len(worker_state['processed_items']),

230

'error_count': worker_state['error_count'],

231

'first_item': worker_state['processed_items'][0] if worker_state['processed_items'] else None,

232

'last_item': worker_state['processed_items'][-1] if worker_state['processed_items'] else None

233

}

234

235

with WorkerPool(n_jobs=3, use_worker_state=True, enable_insights=True) as pool:

236

results = pool.map(

237

process_item,

238

range(30),

239

worker_init=init_worker,

240

worker_exit=cleanup_worker,

241

chunk_size=5

242

)

243

244

# Get worker exit results

245

exit_results = pool.get_exit_results()

246

247

print("=== Worker Exit Results ===")

248

for i, worker_result in enumerate(exit_results):

249

if worker_result: # Some workers might not have exit results

250

print(f"Worker {i}:")

251

print(f" Processed: {worker_result['total_processed']} items")

252

print(f" Errors: {worker_result['error_count']}")

253

print(f" Range: {worker_result['first_item']} to {worker_result['last_item']}")

254

255

pool.print_insights()

256

```