0
# Performance Insights
1
2
Worker performance monitoring and insights for analyzing multiprocessing efficiency. The insights system provides detailed timing data, task completion statistics, and performance metrics to help optimize parallel processing workloads.
3
4
## Capabilities
5
6
### Insights Management
7
8
Functions for retrieving and displaying worker performance insights.
9
10
```python { .api }
11
def print_insights(self) -> None
12
def get_insights(self) -> Dict
13
def get_exit_results(self) -> List
14
```
15
16
**print_insights**: Print formatted performance insights to console with timing breakdown and efficiency metrics.
17
18
**get_insights**: Return performance insights as a dictionary with detailed worker statistics.
19
20
**get_exit_results**: Get results returned by worker exit functions (if any).
21
22
### WorkerInsights Class
23
24
Internal class for managing worker performance data.
25
26
```python { .api }
27
class WorkerInsights:
28
def __init__(self, ctx: multiprocessing.context.BaseContext, n_jobs: int, use_dill: bool) -> None
29
def enable_insights(self) -> None
30
def disable_insights(self) -> None
31
def reset_insights(self, start_time: float) -> None
32
```
33
34
## Usage Examples
35
36
### Basic Insights Usage
37
38
```python
39
from mpire import WorkerPool
40
import time
41
42
def slow_function(x):
43
time.sleep(0.1) # Simulate work
44
return x * x
45
46
# Enable insights during pool creation
47
with WorkerPool(n_jobs=4, enable_insights=True) as pool:
48
results = pool.map(slow_function, range(20))
49
50
# Print formatted insights
51
pool.print_insights()
52
53
# Get insights as dictionary
54
insights = pool.get_insights()
55
print("Total tasks:", insights['n_completed_tasks'])
56
print("Average task time:", insights['avg_task_duration'])
57
```
58
59
### Detailed Performance Analysis
60
61
```python
62
from mpire import WorkerPool
63
import time
64
import random
65
66
def variable_workload(x):
67
# Simulate variable processing time
68
sleep_time = random.uniform(0.01, 0.2)
69
time.sleep(sleep_time)
70
return x ** 2
71
72
with WorkerPool(n_jobs=4, enable_insights=True) as pool:
73
results = pool.map(variable_workload, range(100), chunk_size=10)
74
75
insights = pool.get_insights()
76
77
print("=== Performance Insights ===")
78
print(f"Total completed tasks: {insights['n_completed_tasks']}")
79
print(f"Total execution time: {insights['total_elapsed_time']:.2f}s")
80
print(f"Average task duration: {insights['avg_task_duration']:.4f}s")
81
print(f"Worker efficiency: {insights['efficiency']:.2%}")
82
83
# Per-worker statistics
84
for worker_id in range(4):
85
worker_stats = insights[f'worker_{worker_id}']
86
print(f"Worker {worker_id}:")
87
print(f" Tasks completed: {worker_stats['n_completed_tasks']}")
88
print(f" Working time: {worker_stats['working_time']:.2f}s")
89
print(f" Waiting time: {worker_stats['waiting_time']:.2f}s")
90
print(f" Efficiency: {worker_stats['efficiency']:.2%}")
91
```
92
93
### Comparing Different Configurations
94
95
```python
96
from mpire import WorkerPool
97
import time
98
99
def cpu_bound_task(n):
100
# CPU-intensive task
101
result = 0
102
for i in range(n * 1000):
103
result += i
104
return result
105
106
def benchmark_configuration(n_jobs, chunk_size, data):
107
"""Benchmark a specific configuration"""
108
with WorkerPool(n_jobs=n_jobs, enable_insights=True) as pool:
109
results = pool.map(cpu_bound_task, data, chunk_size=chunk_size)
110
insights = pool.get_insights()
111
112
return {
113
'n_jobs': n_jobs,
114
'chunk_size': chunk_size,
115
'total_time': insights['total_elapsed_time'],
116
'efficiency': insights['efficiency'],
117
'avg_task_time': insights['avg_task_duration']
118
}
119
120
# Test different configurations
121
data = range(100)
122
configurations = [
123
(2, 5), (2, 10), (2, 25),
124
(4, 5), (4, 10), (4, 25),
125
(8, 5), (8, 10), (8, 25)
126
]
127
128
results = []
129
for n_jobs, chunk_size in configurations:
130
result = benchmark_configuration(n_jobs, chunk_size, data)
131
results.append(result)
132
print(f"Jobs: {n_jobs}, Chunk: {chunk_size:2d} => "
133
f"Time: {result['total_time']:.2f}s, "
134
f"Efficiency: {result['efficiency']:.2%}")
135
136
# Find best configuration
137
best = min(results, key=lambda x: x['total_time'])
138
print(f"\nBest configuration: {best['n_jobs']} jobs, chunk size {best['chunk_size']}")
139
```
140
141
### Worker Initialization Insights
142
143
```python
144
def expensive_init(worker_state):
145
"""Simulate expensive worker initialization"""
146
import time
147
time.sleep(1) # Expensive setup
148
worker_state['start_time'] = time.time()
149
return "initialization_complete"
150
151
def quick_task(worker_state, x):
152
return x * 2
153
154
with WorkerPool(n_jobs=4, use_worker_state=True, enable_insights=True) as pool:
155
results = pool.map(
156
quick_task,
157
range(20),
158
worker_init=expensive_init,
159
chunk_size=5
160
)
161
162
insights = pool.get_insights()
163
164
print("=== Initialization Impact ===")
165
print(f"Total init time: {insights['total_init_time']:.2f}s")
166
print(f"Average init time per worker: {insights['avg_init_time']:.2f}s")
167
print(f"Total working time: {insights['total_working_time']:.2f}s")
168
print(f"Init overhead: {insights['init_overhead_percentage']:.1%}")
169
170
# Check if initialization time dominates
171
if insights['init_overhead_percentage'] > 0.5:
172
print("Consider using keep_alive=True for better performance")
173
```
174
175
### Memory and Resource Insights
176
177
```python
178
import psutil
179
import os
180
181
def memory_intensive_task(size):
182
"""Task that uses significant memory"""
183
data = list(range(size * 1000))
184
return sum(data)
185
186
def monitor_memory_usage():
187
"""Get current memory usage"""
188
process = psutil.Process(os.getpid())
189
return process.memory_info().rss / 1024 / 1024 # MB
190
191
# Monitor memory usage with insights
192
initial_memory = monitor_memory_usage()
193
194
with WorkerPool(n_jobs=2, enable_insights=True) as pool:
195
results = pool.map(memory_intensive_task, range(50))
196
197
peak_memory = monitor_memory_usage()
198
insights = pool.get_insights()
199
200
print("=== Memory Usage Analysis ===")
201
print(f"Initial memory: {initial_memory:.1f} MB")
202
print(f"Peak memory: {peak_memory:.1f} MB")
203
print(f"Memory increase: {peak_memory - initial_memory:.1f} MB")
204
print(f"Tasks completed: {insights['n_completed_tasks']}")
205
print(f"Memory per task: {(peak_memory - initial_memory) / insights['n_completed_tasks']:.2f} MB")
206
207
pool.print_insights()
208
```
209
210
### Worker Exit Results
211
212
```python
213
def init_worker(worker_state):
214
worker_state['processed_items'] = []
215
worker_state['error_count'] = 0
216
217
def process_item(worker_state, item):
218
try:
219
result = item * 2
220
worker_state['processed_items'].append(item)
221
return result
222
except Exception:
223
worker_state['error_count'] += 1
224
raise
225
226
def cleanup_worker(worker_state):
227
"""Return summary of worker's work"""
228
return {
229
'total_processed': len(worker_state['processed_items']),
230
'error_count': worker_state['error_count'],
231
'first_item': worker_state['processed_items'][0] if worker_state['processed_items'] else None,
232
'last_item': worker_state['processed_items'][-1] if worker_state['processed_items'] else None
233
}
234
235
with WorkerPool(n_jobs=3, use_worker_state=True, enable_insights=True) as pool:
236
results = pool.map(
237
process_item,
238
range(30),
239
worker_init=init_worker,
240
worker_exit=cleanup_worker,
241
chunk_size=5
242
)
243
244
# Get worker exit results
245
exit_results = pool.get_exit_results()
246
247
print("=== Worker Exit Results ===")
248
for i, worker_result in enumerate(exit_results):
249
if worker_result: # Some workers might not have exit results
250
print(f"Worker {i}:")
251
print(f" Processed: {worker_result['total_processed']} items")
252
print(f" Errors: {worker_result['error_count']}")
253
print(f" Range: {worker_result['first_item']} to {worker_result['last_item']}")
254
255
pool.print_insights()
256
```