or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

apply-functions.mdasync-results.mddashboard-integration.mdexception-handling.mdindex.mdparallel-map.mdperformance-insights.mdutility-functions.mdworker-configuration.mdworkerpool-management.md

worker-configuration.mddocs/

0

# Worker Configuration

1

2

Advanced worker configuration including CPU pinning, shared objects, worker state management, and initialization/exit functions. These features enable fine-tuned control over worker behavior and resource management.

3

4

## Capabilities

5

6

### Worker Settings Configuration

7

8

Dynamic configuration methods for modifying worker behavior after WorkerPool creation.

9

10

```python { .api }

11

def pass_on_worker_id(self, pass_on: bool = True) -> None

12

def set_shared_objects(self, shared_objects: Any = None) -> None

13

def set_use_worker_state(self, use_worker_state: bool = True) -> None

14

def set_keep_alive(self, keep_alive: bool = True) -> None

15

def set_order_tasks(self, order_tasks: bool = True) -> None

16

```

17

18

**pass_on_worker_id**: Configure whether to pass worker ID as the first argument to target functions.

19

20

**set_shared_objects**: Set or update shared objects that are passed to all workers (copy-on-write with fork).

21

22

**set_use_worker_state**: Enable or disable worker state functionality.

23

24

**set_keep_alive**: Configure whether to keep workers alive between map operations.

25

26

**set_order_tasks**: Configure whether tasks are distributed to workers in order.

27

28

## Usage Examples

29

30

### Worker ID Access

31

32

```python

33

from mpire import WorkerPool

34

35

def process_with_id(worker_id, data):

36

print(f"Worker {worker_id} processing: {data}")

37

return f"worker_{worker_id}_{data}"

38

39

# Enable worker ID passing during initialization

40

with WorkerPool(n_jobs=3, pass_worker_id=True) as pool:

41

results = pool.map(process_with_id, range(10))

42

43

# Or enable it dynamically

44

with WorkerPool(n_jobs=3) as pool:

45

pool.pass_on_worker_id(True)

46

results = pool.map(process_with_id, range(10))

47

```

48

49

### Shared Objects

50

51

```python

52

import numpy as np

53

from mpire import WorkerPool

54

55

# Large shared data structure

56

shared_data = {

57

'lookup_table': np.random.rand(1000, 1000),

58

'constants': {'pi': 3.14159, 'e': 2.71828},

59

'config': {'threshold': 0.5, 'max_iterations': 100}

60

}

61

62

def process_with_shared(shared_objects, item):

63

# Access shared data without copying

64

threshold = shared_objects['config']['threshold']

65

lookup = shared_objects['lookup_table'][item % 1000]

66

return (lookup > threshold).sum()

67

68

# Set shared objects during initialization

69

with WorkerPool(n_jobs=4, shared_objects=shared_data) as pool:

70

results = pool.map(process_with_shared, range(100))

71

72

# Or set them dynamically

73

with WorkerPool(n_jobs=4) as pool:

74

pool.set_shared_objects(shared_data)

75

results = pool.map(process_with_shared, range(100))

76

```

77

78

### Worker State Management

79

80

```python

81

def init_worker_state(worker_state):

82

"""Initialize worker with persistent state"""

83

import sqlite3

84

worker_state['db'] = sqlite3.connect(':memory:')

85

worker_state['processed_count'] = 0

86

worker_state['cache'] = {}

87

88

def process_with_state(worker_state, item):

89

"""Process item using worker state"""

90

worker_state['processed_count'] += 1

91

92

# Use cache

93

if item in worker_state['cache']:

94

return worker_state['cache'][item]

95

96

# Expensive computation

97

result = item ** 2

98

worker_state['cache'][item] = result

99

100

return result

101

102

def cleanup_worker_state(worker_state):

103

"""Clean up worker state"""

104

worker_state['db'].close()

105

print(f"Worker processed {worker_state['processed_count']} items")

106

107

# Enable worker state during initialization

108

with WorkerPool(n_jobs=3, use_worker_state=True) as pool:

109

results = pool.map(

110

process_with_state,

111

range(100),

112

worker_init=init_worker_state,

113

worker_exit=cleanup_worker_state

114

)

115

116

# Or enable it dynamically

117

with WorkerPool(n_jobs=3) as pool:

118

pool.set_use_worker_state(True)

119

results = pool.map(

120

process_with_state,

121

range(100),

122

worker_init=init_worker_state,

123

worker_exit=cleanup_worker_state

124

)

125

```

126

127

### Worker Reuse with Keep Alive

128

129

```python

130

def expensive_init():

131

"""Simulate expensive initialization"""

132

import time

133

time.sleep(2) # Expensive setup

134

return "initialized"

135

136

def init_worker(worker_state):

137

worker_state['resource'] = expensive_init()

138

139

def process_item(worker_state, item):

140

return f"{worker_state['resource']}_{item}"

141

142

# Workers stay alive between operations

143

pool = WorkerPool(n_jobs=2, use_worker_state=True, keep_alive=True)

144

145

# First operation - workers initialize

146

results1 = pool.map(process_item, range(5), worker_init=init_worker)

147

148

# Second operation - reuses existing workers (no re-initialization)

149

results2 = pool.map(process_item, range(5, 10))

150

151

# Cleanup

152

pool.stop_and_join()

153

154

# Or configure dynamically

155

pool = WorkerPool(n_jobs=2, use_worker_state=True)

156

pool.set_keep_alive(True)

157

# ... use pool ...

158

pool.stop_and_join()

159

```

160

161

### Task Ordering

162

163

```python

164

def process_with_order_info(worker_id, item):

165

return f"Worker {worker_id} got item {item}"

166

167

# Tasks distributed in order: worker 0 gets first chunk, worker 1 gets second, etc.

168

with WorkerPool(n_jobs=3, pass_worker_id=True, order_tasks=True) as pool:

169

results = pool.map(process_with_order_info, range(15), chunk_size=5)

170

for result in results:

171

print(result)

172

173

# Configure ordering dynamically

174

with WorkerPool(n_jobs=3, pass_worker_id=True) as pool:

175

pool.set_order_tasks(True)

176

results = pool.map(process_with_order_info, range(15), chunk_size=5)

177

```

178

179

### CPU Pinning

180

181

```python

182

# Pin workers to specific CPUs during initialization

183

cpu_assignments = [0, 1, 2, 3] # One CPU per worker

184

with WorkerPool(n_jobs=4, cpu_ids=cpu_assignments) as pool:

185

results = pool.map(cpu_intensive_function, range(100))

186

187

# Pin all workers to the same CPUs

188

cpu_set = [0, 1] # Workers can use CPU 0 or 1

189

with WorkerPool(n_jobs=4, cpu_ids=cpu_set) as pool:

190

results = pool.map(cpu_intensive_function, range(100))

191

192

# Different CPU sets per worker

193

cpu_per_worker = [[0], [1], [2, 3], [4, 5]] # Worker-specific CPU assignments

194

with WorkerPool(n_jobs=4, cpu_ids=cpu_per_worker) as pool:

195

results = pool.map(cpu_intensive_function, range(100))

196

```

197

198

### Combined Configuration

199

200

```python

201

def comprehensive_example():

202

"""Example showing multiple configuration options together"""

203

204

# Shared data

205

shared_resources = {

206

'model_weights': load_model_weights(),

207

'lookup_tables': load_lookup_tables()

208

}

209

210

def init_worker(worker_id, shared_objects, worker_state):

211

worker_state['session_id'] = f"session_{worker_id}"

212

worker_state['processed'] = 0

213

print(f"Worker {worker_id} initialized with shared resources")

214

215

def process_item(worker_id, shared_objects, worker_state, item):

216

worker_state['processed'] += 1

217

model = shared_objects['model_weights']

218

result = f"Worker {worker_id} processed {item} (total: {worker_state['processed']})"

219

return result

220

221

def cleanup_worker(worker_id, shared_objects, worker_state):

222

print(f"Worker {worker_id} processed {worker_state['processed']} items total")

223

224

with WorkerPool(

225

n_jobs=4,

226

cpu_ids=[0, 1, 2, 3], # CPU pinning

227

shared_objects=shared_resources, # Shared data

228

pass_worker_id=True, # Pass worker ID

229

use_worker_state=True, # Enable worker state

230

keep_alive=True, # Reuse workers

231

order_tasks=True, # Ordered task distribution

232

enable_insights=True # Performance monitoring

233

) as pool:

234

235

# First batch

236

results1 = pool.map(

237

process_item,

238

range(20),

239

worker_init=init_worker,

240

worker_exit=cleanup_worker,

241

chunk_size=5

242

)

243

244

# Second batch reuses workers

245

results2 = pool.map(process_item, range(20, 40), chunk_size=5)

246

247

# Print performance insights

248

pool.print_insights()

249

250

comprehensive_example()

251

```