0
# Worker Configuration
1
2
Advanced worker configuration including CPU pinning, shared objects, worker state management, and initialization/exit functions. These features enable fine-tuned control over worker behavior and resource management.
3
4
## Capabilities
5
6
### Worker Settings Configuration
7
8
Dynamic configuration methods for modifying worker behavior after WorkerPool creation.
9
10
```python { .api }
11
def pass_on_worker_id(self, pass_on: bool = True) -> None
12
def set_shared_objects(self, shared_objects: Any = None) -> None
13
def set_use_worker_state(self, use_worker_state: bool = True) -> None
14
def set_keep_alive(self, keep_alive: bool = True) -> None
15
def set_order_tasks(self, order_tasks: bool = True) -> None
16
```
17
18
**pass_on_worker_id**: Configure whether to pass worker ID as the first argument to target functions.
19
20
**set_shared_objects**: Set or update shared objects that are passed to all workers (copy-on-write with fork).
21
22
**set_use_worker_state**: Enable or disable worker state functionality.
23
24
**set_keep_alive**: Configure whether to keep workers alive between map operations.
25
26
**set_order_tasks**: Configure whether tasks are distributed to workers in order.
27
28
## Usage Examples
29
30
### Worker ID Access
31
32
```python
33
from mpire import WorkerPool
34
35
def process_with_id(worker_id, data):
36
print(f"Worker {worker_id} processing: {data}")
37
return f"worker_{worker_id}_{data}"
38
39
# Enable worker ID passing during initialization
40
with WorkerPool(n_jobs=3, pass_worker_id=True) as pool:
41
results = pool.map(process_with_id, range(10))
42
43
# Or enable it dynamically
44
with WorkerPool(n_jobs=3) as pool:
45
pool.pass_on_worker_id(True)
46
results = pool.map(process_with_id, range(10))
47
```
48
49
### Shared Objects
50
51
```python
52
import numpy as np
53
from mpire import WorkerPool
54
55
# Large shared data structure
56
shared_data = {
57
'lookup_table': np.random.rand(1000, 1000),
58
'constants': {'pi': 3.14159, 'e': 2.71828},
59
'config': {'threshold': 0.5, 'max_iterations': 100}
60
}
61
62
def process_with_shared(shared_objects, item):
63
# Access shared data without copying
64
threshold = shared_objects['config']['threshold']
65
lookup = shared_objects['lookup_table'][item % 1000]
66
return (lookup > threshold).sum()
67
68
# Set shared objects during initialization
69
with WorkerPool(n_jobs=4, shared_objects=shared_data) as pool:
70
results = pool.map(process_with_shared, range(100))
71
72
# Or set them dynamically
73
with WorkerPool(n_jobs=4) as pool:
74
pool.set_shared_objects(shared_data)
75
results = pool.map(process_with_shared, range(100))
76
```
77
78
### Worker State Management
79
80
```python
81
def init_worker_state(worker_state):
82
"""Initialize worker with persistent state"""
83
import sqlite3
84
worker_state['db'] = sqlite3.connect(':memory:')
85
worker_state['processed_count'] = 0
86
worker_state['cache'] = {}
87
88
def process_with_state(worker_state, item):
89
"""Process item using worker state"""
90
worker_state['processed_count'] += 1
91
92
# Use cache
93
if item in worker_state['cache']:
94
return worker_state['cache'][item]
95
96
# Expensive computation
97
result = item ** 2
98
worker_state['cache'][item] = result
99
100
return result
101
102
def cleanup_worker_state(worker_state):
103
"""Clean up worker state"""
104
worker_state['db'].close()
105
print(f"Worker processed {worker_state['processed_count']} items")
106
107
# Enable worker state during initialization
108
with WorkerPool(n_jobs=3, use_worker_state=True) as pool:
109
results = pool.map(
110
process_with_state,
111
range(100),
112
worker_init=init_worker_state,
113
worker_exit=cleanup_worker_state
114
)
115
116
# Or enable it dynamically
117
with WorkerPool(n_jobs=3) as pool:
118
pool.set_use_worker_state(True)
119
results = pool.map(
120
process_with_state,
121
range(100),
122
worker_init=init_worker_state,
123
worker_exit=cleanup_worker_state
124
)
125
```
126
127
### Worker Reuse with Keep Alive
128
129
```python
130
def expensive_init():
131
"""Simulate expensive initialization"""
132
import time
133
time.sleep(2) # Expensive setup
134
return "initialized"
135
136
def init_worker(worker_state):
137
worker_state['resource'] = expensive_init()
138
139
def process_item(worker_state, item):
140
return f"{worker_state['resource']}_{item}"
141
142
# Workers stay alive between operations
143
pool = WorkerPool(n_jobs=2, use_worker_state=True, keep_alive=True)
144
145
# First operation - workers initialize
146
results1 = pool.map(process_item, range(5), worker_init=init_worker)
147
148
# Second operation - reuses existing workers (no re-initialization)
149
results2 = pool.map(process_item, range(5, 10))
150
151
# Cleanup
152
pool.stop_and_join()
153
154
# Or configure dynamically
155
pool = WorkerPool(n_jobs=2, use_worker_state=True)
156
pool.set_keep_alive(True)
157
# ... use pool ...
158
pool.stop_and_join()
159
```
160
161
### Task Ordering
162
163
```python
164
def process_with_order_info(worker_id, item):
165
return f"Worker {worker_id} got item {item}"
166
167
# Tasks distributed in order: worker 0 gets first chunk, worker 1 gets second, etc.
168
with WorkerPool(n_jobs=3, pass_worker_id=True, order_tasks=True) as pool:
169
results = pool.map(process_with_order_info, range(15), chunk_size=5)
170
for result in results:
171
print(result)
172
173
# Configure ordering dynamically
174
with WorkerPool(n_jobs=3, pass_worker_id=True) as pool:
175
pool.set_order_tasks(True)
176
results = pool.map(process_with_order_info, range(15), chunk_size=5)
177
```
178
179
### CPU Pinning
180
181
```python
182
# Pin workers to specific CPUs during initialization
183
cpu_assignments = [0, 1, 2, 3] # One CPU per worker
184
with WorkerPool(n_jobs=4, cpu_ids=cpu_assignments) as pool:
185
results = pool.map(cpu_intensive_function, range(100))
186
187
# Pin all workers to the same CPUs
188
cpu_set = [0, 1] # Workers can use CPU 0 or 1
189
with WorkerPool(n_jobs=4, cpu_ids=cpu_set) as pool:
190
results = pool.map(cpu_intensive_function, range(100))
191
192
# Different CPU sets per worker
193
cpu_per_worker = [[0], [1], [2, 3], [4, 5]] # Worker-specific CPU assignments
194
with WorkerPool(n_jobs=4, cpu_ids=cpu_per_worker) as pool:
195
results = pool.map(cpu_intensive_function, range(100))
196
```
197
198
### Combined Configuration
199
200
```python
201
def comprehensive_example():
202
"""Example showing multiple configuration options together"""
203
204
# Shared data
205
shared_resources = {
206
'model_weights': load_model_weights(),
207
'lookup_tables': load_lookup_tables()
208
}
209
210
def init_worker(worker_id, shared_objects, worker_state):
211
worker_state['session_id'] = f"session_{worker_id}"
212
worker_state['processed'] = 0
213
print(f"Worker {worker_id} initialized with shared resources")
214
215
def process_item(worker_id, shared_objects, worker_state, item):
216
worker_state['processed'] += 1
217
model = shared_objects['model_weights']
218
result = f"Worker {worker_id} processed {item} (total: {worker_state['processed']})"
219
return result
220
221
def cleanup_worker(worker_id, shared_objects, worker_state):
222
print(f"Worker {worker_id} processed {worker_state['processed']} items total")
223
224
with WorkerPool(
225
n_jobs=4,
226
cpu_ids=[0, 1, 2, 3], # CPU pinning
227
shared_objects=shared_resources, # Shared data
228
pass_worker_id=True, # Pass worker ID
229
use_worker_state=True, # Enable worker state
230
keep_alive=True, # Reuse workers
231
order_tasks=True, # Ordered task distribution
232
enable_insights=True # Performance monitoring
233
) as pool:
234
235
# First batch
236
results1 = pool.map(
237
process_item,
238
range(20),
239
worker_init=init_worker,
240
worker_exit=cleanup_worker,
241
chunk_size=5
242
)
243
244
# Second batch reuses workers
245
results2 = pool.map(process_item, range(20, 40), chunk_size=5)
246
247
# Print performance insights
248
pool.print_insights()
249
250
comprehensive_example()
251
```