or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

apply-functions.mdasync-results.mddashboard-integration.mdexception-handling.mdindex.mdparallel-map.mdperformance-insights.mdutility-functions.mdworker-configuration.mdworkerpool-management.md

parallel-map.mddocs/

0

# Parallel Map Functions

1

2

Map-style parallel execution functions for processing iterables across multiple workers. Includes ordered and unordered variants, plus iterator versions for memory-efficient processing of large datasets.

3

4

## Capabilities

5

6

### Ordered Map Functions

7

8

Process iterables in parallel while maintaining result order.

9

10

```python { .api }

11

def map(self, func: Callable, iterable_of_args: Union[Sized, Iterable],

12

iterable_len: Optional[int] = None, max_tasks_active: Optional[int] = None,

13

chunk_size: Optional[int] = None, n_splits: Optional[int] = None,

14

worker_lifespan: Optional[int] = None, progress_bar: bool = False,

15

concatenate_numpy_output: bool = True, worker_init: Optional[Callable] = None,

16

worker_exit: Optional[Callable] = None, task_timeout: Optional[float] = None,

17

worker_init_timeout: Optional[float] = None, worker_exit_timeout: Optional[float] = None,

18

progress_bar_options: Optional[Dict[str, Any]] = None,

19

progress_bar_style: Optional[str] = None) -> Any

20

```

21

22

**Parameters:**

23

- `func` (Callable): Function to apply to each item

24

- `iterable_of_args` (Union[Sized, Iterable]): Arguments to process

25

- `iterable_len` (Optional[int]): Length of iterable if not sized

26

- `max_tasks_active` (Optional[int]): Maximum number of active tasks to prevent memory issues

27

- `chunk_size` (Optional[int]): Number of tasks per chunk for worker processing

28

- `n_splits` (Optional[int]): Number of splits for automatic chunking

29

- `worker_lifespan` (Optional[int]): Number of tasks before worker restart

30

- `progress_bar` (bool): Show progress bar during execution

31

- `concatenate_numpy_output` (bool): Whether to concatenate numpy array outputs

32

- `progress_bar_options` (Optional[Dict]): Custom tqdm progress bar options

33

- `progress_bar_style` (Optional[str]): Progress bar style ('std', 'notebook', 'dashboard')

34

- `enable_insights` (bool): Enable worker performance insights

35

- `worker_init` (Optional[Callable]): Function called when worker starts

36

- `worker_exit` (Optional[Callable]): Function called when worker exits

37

- `task_timeout` (Optional[float]): Timeout in seconds for individual tasks

38

- `worker_init_timeout` (Optional[float]): Timeout for worker initialization

39

- `worker_exit_timeout` (Optional[float]): Timeout for worker exit

40

41

### Unordered Map Functions

42

43

Process iterables in parallel without maintaining result order for better performance.

44

45

```python { .api }

46

def map_unordered(self, func: Callable, iterable_of_args: Union[Sized, Iterable],

47

iterable_len: Optional[int] = None, max_tasks_active: Optional[int] = None,

48

chunk_size: Optional[int] = None, n_splits: Optional[int] = None,

49

worker_lifespan: Optional[int] = None, progress_bar: bool = False,

50

progress_bar_options: Optional[Dict[str, Any]] = None,

51

progress_bar_style: Optional[str] = None, enable_insights: bool = False,

52

worker_init: Optional[Callable] = None, worker_exit: Optional[Callable] = None,

53

task_timeout: Optional[float] = None, worker_init_timeout: Optional[float] = None,

54

worker_exit_timeout: Optional[float] = None) -> List

55

```

56

57

### Iterator Map Functions

58

59

Memory-efficient iterator versions that yield results as they become available.

60

61

```python { .api }

62

def imap(self, func: Callable, iterable_of_args: Union[Sized, Iterable],

63

iterable_len: Optional[int] = None, max_tasks_active: Optional[int] = None,

64

chunk_size: Optional[int] = None, n_splits: Optional[int] = None,

65

worker_lifespan: Optional[int] = None, progress_bar: bool = False,

66

progress_bar_options: Optional[Dict[str, Any]] = None,

67

progress_bar_style: Optional[str] = None, enable_insights: bool = False,

68

worker_init: Optional[Callable] = None, worker_exit: Optional[Callable] = None,

69

task_timeout: Optional[float] = None, worker_init_timeout: Optional[float] = None,

70

worker_exit_timeout: Optional[float] = None) -> Iterator

71

72

def imap_unordered(self, func: Callable, iterable_of_args: Union[Sized, Iterable],

73

iterable_len: Optional[int] = None, max_tasks_active: Optional[int] = None,

74

chunk_size: Optional[int] = None, n_splits: Optional[int] = None,

75

worker_lifespan: Optional[int] = None, progress_bar: bool = False,

76

progress_bar_options: Optional[Dict[str, Any]] = None,

77

progress_bar_style: Optional[str] = None, enable_insights: bool = False,

78

worker_init: Optional[Callable] = None, worker_exit: Optional[Callable] = None,

79

task_timeout: Optional[float] = None, worker_init_timeout: Optional[float] = None,

80

worker_exit_timeout: Optional[float] = None) -> Iterator

81

```

82

83

## Usage Examples

84

85

### Basic Map Operations

86

87

```python

88

from mpire import WorkerPool

89

90

def square(x):

91

return x * x

92

93

with WorkerPool(n_jobs=4) as pool:

94

# Ordered results

95

results = pool.map(square, range(10))

96

print(results) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

97

98

# Unordered results (potentially faster)

99

results = pool.map_unordered(square, range(10))

100

print(sorted(results)) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

101

```

102

103

### Iterator Processing

104

105

```python

106

# Memory-efficient processing of large datasets

107

with WorkerPool(n_jobs=4) as pool:

108

# Process results as they become available

109

for result in pool.imap(expensive_function, large_dataset):

110

process_result(result)

111

112

# Unordered iterator for maximum performance

113

for result in pool.imap_unordered(expensive_function, large_dataset):

114

process_result(result)

115

```

116

117

### Progress Tracking

118

119

```python

120

# Basic progress bar

121

with WorkerPool(n_jobs=4) as pool:

122

results = pool.map(slow_function, range(100), progress_bar=True)

123

124

# Custom progress bar options

125

progress_options = {

126

'desc': 'Processing items',

127

'unit': 'items',

128

'disable': False

129

}

130

131

with WorkerPool(n_jobs=4) as pool:

132

results = pool.map(

133

slow_function,

134

range(100),

135

progress_bar=True,

136

progress_bar_options=progress_options

137

)

138

```

139

140

### Task Chunking and Performance Tuning

141

142

```python

143

# Manual chunk size control

144

with WorkerPool(n_jobs=4) as pool:

145

results = pool.map(

146

quick_function,

147

range(10000),

148

chunk_size=50 # Process 50 items per chunk

149

)

150

151

# Automatic chunking with splits

152

with WorkerPool(n_jobs=4) as pool:

153

results = pool.map(

154

function,

155

data,

156

n_splits=20 # Split into 20 chunks automatically

157

)

158

159

# Memory management with active task limit

160

with WorkerPool(n_jobs=4) as pool:

161

results = pool.map(

162

memory_intensive_function,

163

large_dataset,

164

max_tasks_active=8 # Limit active tasks to prevent memory issues

165

)

166

```

167

168

### Worker Lifecycle Management

169

170

```python

171

def init_worker(worker_state):

172

"""Initialize worker with expensive resources"""

173

worker_state['model'] = load_machine_learning_model()

174

worker_state['database'] = connect_to_database()

175

176

def exit_worker(worker_state):

177

"""Clean up worker resources"""

178

worker_state['database'].close()

179

180

def process_item(worker_state, item):

181

"""Process item using worker state"""

182

prediction = worker_state['model'].predict(item)

183

worker_state['database'].save_result(item, prediction)

184

return prediction

185

186

with WorkerPool(n_jobs=4, use_worker_state=True) as pool:

187

results = pool.map(

188

process_item,

189

items,

190

worker_init=init_worker,

191

worker_exit=exit_worker,

192

worker_lifespan=100 # Restart workers every 100 tasks

193

)

194

```

195

196

### Timeout Management

197

198

```python

199

# Function that might hang

200

def unreliable_function(x):

201

import random, time

202

if random.random() < 0.1: # 10% chance of hanging

203

time.sleep(1000)

204

return x * 2

205

206

with WorkerPool(n_jobs=4) as pool:

207

results = pool.map(

208

unreliable_function,

209

range(100),

210

task_timeout=5.0, # 5 second timeout per task

211

worker_init_timeout=10.0, # 10 second worker init timeout

212

worker_exit_timeout=5.0 # 5 second worker exit timeout

213

)

214

```