or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

backend-context.mdcloudpickle-integration.mderror-handling.mdindex.mdprocess-pool-executor.mdreusable-executor.md

process-pool-executor.mddocs/

0

# Process Pool Executor

1

2

The ProcessPoolExecutor class provides a robust implementation of parallel task execution using worker processes. It offers enhanced reliability, consistent spawn behavior, and better error handling compared to the standard library implementation.

3

4

## Capabilities

5

6

### ProcessPoolExecutor Class

7

8

Main executor class for managing a pool of worker processes with configurable parameters and robust error handling.

9

10

```python { .api }

11

class ProcessPoolExecutor(Executor):

12

"""

13

Robust ProcessPoolExecutor with enhanced error handling and consistent spawn behavior.

14

15

Parameters:

16

- max_workers (int, optional): Maximum number of worker processes. Defaults to cpu_count()

17

- job_reducers (dict, optional): Custom reducers for job serialization

18

- result_reducers (dict, optional): Custom reducers for result serialization

19

- timeout (int, optional): Worker idle timeout in seconds. Default is None

20

- context (multiprocessing context, optional): Multiprocessing context for creating processes

21

- initializer (callable, optional): Function called at worker startup

22

- initargs (tuple): Arguments passed to initializer function

23

- env (dict, optional): Environment variables to set in worker processes

24

"""

25

def __init__(

26

self,

27

max_workers=None,

28

job_reducers=None,

29

result_reducers=None,

30

timeout=None,

31

context=None,

32

initializer=None,

33

initargs=(),

34

env=None

35

): ...

36

```

37

38

### Task Submission

39

40

Submit individual tasks for execution on worker processes.

41

42

```python { .api }

43

def submit(self, fn, *args, **kwargs):

44

"""

45

Submit a callable to be executed with given arguments.

46

47

Parameters:

48

- fn (callable): Function to execute

49

- *args: Positional arguments for the function

50

- **kwargs: Keyword arguments for the function

51

52

Returns:

53

Future: Future representing the execution of the callable

54

55

Raises:

56

RuntimeError: If executor is shutdown

57

"""

58

```

59

60

### Bulk Task Execution

61

62

Execute a function over multiple input values in parallel.

63

64

```python { .api }

65

def map(self, fn, *iterables, **kwargs):

66

"""

67

Apply function to every item of iterables in parallel.

68

69

Parameters:

70

- fn (callable): Function to apply to each item

71

- *iterables: One or more iterables to process

72

- timeout (float, optional): Maximum time to wait for results

73

- chunksize (int, optional): Size of chunks sent to worker processes

74

75

Returns:

76

Iterator: Iterator over results in same order as input

77

78

Raises:

79

TimeoutError: If timeout is reached before completion

80

"""

81

```

82

83

### Executor Shutdown

84

85

Clean shutdown of executor and worker processes.

86

87

```python { .api }

88

def shutdown(self, wait=True, kill_workers=False):

89

"""

90

Shutdown the executor and free associated resources.

91

92

Parameters:

93

- wait (bool): Whether to wait for pending tasks to complete. Default True

94

- kill_workers (bool): Whether to forcibly terminate workers. Default False

95

96

Returns:

97

None

98

"""

99

```

100

101

## Usage Examples

102

103

### Basic ProcessPoolExecutor Usage

104

105

```python

106

from loky import ProcessPoolExecutor

107

import time

108

109

def cpu_bound_task(n):

110

"""Simulate CPU-intensive work."""

111

result = sum(i * i for i in range(n))

112

return result

113

114

# Create executor with 4 workers

115

with ProcessPoolExecutor(max_workers=4) as executor:

116

# Submit individual tasks

117

future = executor.submit(cpu_bound_task, 10000)

118

result = future.result()

119

print(f"Result: {result}")

120

121

# Process multiple inputs

122

inputs = [1000, 2000, 3000, 4000, 5000]

123

results = list(executor.map(cpu_bound_task, inputs))

124

print(f"Results: {results}")

125

```

126

127

### With Initializer Function

128

129

```python

130

from loky import ProcessPoolExecutor

131

import logging

132

133

def worker_init(level):

134

"""Initialize logging in each worker process."""

135

logging.basicConfig(level=level)

136

logging.info("Worker process initialized")

137

138

def logged_task(x):

139

logging.info(f"Processing {x}")

140

return x * 2

141

142

# Executor with worker initialization

143

with ProcessPoolExecutor(

144

max_workers=2,

145

initializer=worker_init,

146

initargs=(logging.INFO,)

147

) as executor:

148

results = list(executor.map(logged_task, [1, 2, 3, 4]))

149

print(f"Results: {results}")

150

```

151

152

### Custom Environment Variables

153

154

```python

155

from loky import ProcessPoolExecutor

156

import os

157

158

def get_env_var(var_name):

159

"""Get environment variable from worker process."""

160

return os.environ.get(var_name, "Not set")

161

162

# Set custom environment in workers

163

with ProcessPoolExecutor(

164

max_workers=2,

165

env={"CUSTOM_VAR": "worker_value", "DEBUG": "1"}

166

) as executor:

167

results = list(executor.map(get_env_var, ["CUSTOM_VAR", "DEBUG"]))

168

print(f"Environment variables: {results}")

169

```

170

171

### Error Handling

172

173

```python

174

from loky import ProcessPoolExecutor, BrokenProcessPool

175

import time

176

177

def failing_task(x):

178

if x == 3:

179

raise ValueError(f"Task failed for input {x}")

180

return x * 2

181

182

try:

183

with ProcessPoolExecutor(max_workers=2) as executor:

184

futures = [executor.submit(failing_task, i) for i in range(5)]

185

186

for i, future in enumerate(futures):

187

try:

188

result = future.result(timeout=5)

189

print(f"Task {i}: {result}")

190

except ValueError as e:

191

print(f"Task {i} failed: {e}")

192

except TimeoutError:

193

print(f"Task {i} timed out")

194

195

except BrokenProcessPool as e:

196

print(f"Process pool broken: {e}")

197

```