or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

backend-context.mdcloudpickle-integration.mderror-handling.mdindex.mdprocess-pool-executor.mdreusable-executor.md

reusable-executor.mddocs/

0

# Reusable Executor

1

2

The reusable executor system provides singleton executor management to avoid the overhead of repeatedly creating and destroying ProcessPoolExecutor instances. This is particularly useful for workflows with frequent parallel processing needs.

3

4

## Capabilities

5

6

### Get Reusable Executor

7

8

Returns a singleton ProcessPoolExecutor instance, creating a new one if needed or reusing an existing one.

9

10

```python { .api }

11

def get_reusable_executor(

12

max_workers=None,

13

context=None,

14

timeout=10,

15

kill_workers=False,

16

reuse="auto",

17

job_reducers=None,

18

result_reducers=None,

19

initializer=None,

20

initargs=(),

21

env=None

22

):

23

"""

24

Return the current reusable executor instance.

25

26

Parameters:

27

- max_workers (int, optional): Maximum number of worker processes

28

- context (multiprocessing context, optional): Context for creating processes

29

- timeout (int): Worker idle timeout in seconds before automatic shutdown

30

- kill_workers (bool): Whether to forcibly terminate previous workers

31

- reuse (str): Reuse strategy - "auto" or other values

32

- job_reducers (dict, optional): Custom reducers for job serialization

33

- result_reducers (dict, optional): Custom reducers for result serialization

34

- initializer (callable, optional): Function called at worker startup

35

- initargs (tuple): Arguments passed to initializer function

36

- env (dict, optional): Environment variables for worker processes

37

38

Returns:

39

ProcessPoolExecutor: Singleton executor instance

40

"""

41

```

42

43

## Usage Examples

44

45

### Basic Reusable Executor

46

47

```python

48

from loky import get_reusable_executor

49

import time

50

51

def compute_square(x):

52

"""Simple computation task."""

53

time.sleep(0.1) # Simulate work

54

return x * x

55

56

# Get reusable executor - will create new one if none exists

57

executor = get_reusable_executor(max_workers=4, timeout=2)

58

59

# First batch of tasks

60

results1 = list(executor.map(compute_square, range(5)))

61

print(f"First batch: {results1}")

62

63

# Second batch reuses same executor

64

results2 = list(executor.map(compute_square, range(5, 10)))

65

print(f"Second batch: {results2}")

66

67

# Executor will automatically shutdown after 2 seconds of inactivity

68

```

69

70

### Dynamic Resizing

71

72

```python

73

from loky import get_reusable_executor

74

75

def cpu_task(x):

76

return sum(i * i for i in range(x * 1000))

77

78

# Start with 2 workers

79

executor = get_reusable_executor(max_workers=2)

80

results1 = list(executor.map(cpu_task, [1, 2, 3]))

81

82

# Resize to 4 workers for larger workload

83

executor = get_reusable_executor(max_workers=4)

84

results2 = list(executor.map(cpu_task, range(1, 9)))

85

86

print(f"Small batch: {results1}")

87

print(f"Large batch: {results2}")

88

```

89

90

### Forcing Executor Restart

91

92

```python

93

from loky import get_reusable_executor

94

95

def worker_state_task(x):

96

# This task might modify global state in workers

97

import os

98

os.environ["WORKER_STATE"] = str(x)

99

return os.environ.get("WORKER_STATE")

100

101

# Run tasks that modify worker state

102

executor = get_reusable_executor(max_workers=2)

103

results1 = list(executor.map(worker_state_task, [1, 2]))

104

105

# Force restart of workers to clear state

106

executor = get_reusable_executor(max_workers=2, kill_workers=True)

107

results2 = list(executor.map(worker_state_task, [3, 4]))

108

109

print(f"First run: {results1}")

110

print(f"After restart: {results2}")

111

```

112

113

### Timeout Configuration

114

115

```python

116

from loky import get_reusable_executor

117

import time

118

119

def quick_task(x):

120

return x * 2

121

122

def setup_long_running_executor():

123

# Executor with longer timeout for persistent workflows

124

return get_reusable_executor(

125

max_workers=4,

126

timeout=60 # Workers stay alive for 60 seconds

127

)

128

129

def setup_short_lived_executor():

130

# Executor that shuts down quickly to free resources

131

return get_reusable_executor(

132

max_workers=2,

133

timeout=5 # Workers shutdown after 5 seconds

134

)

135

136

# For workflows with frequent but spaced-out tasks

137

long_executor = setup_long_running_executor()

138

results = list(long_executor.map(quick_task, range(10)))

139

140

time.sleep(10) # Simulate gap between task batches

141

142

# Executor still available due to longer timeout

143

more_results = list(long_executor.map(quick_task, range(10, 20)))

144

```

145

146

### Custom Initializer with Reusable Executor

147

148

```python

149

from loky import get_reusable_executor

150

import logging

151

152

def setup_worker_logging(log_level):

153

"""Initialize logging in each worker process."""

154

logging.basicConfig(

155

level=log_level,

156

format='%(processName)s: %(message)s'

157

)

158

logging.info("Worker initialized")

159

160

def logged_computation(x):

161

"""Task that uses logging."""

162

logging.info(f"Computing square of {x}")

163

result = x * x

164

logging.info(f"Result: {result}")

165

return result

166

167

# Reusable executor with worker initialization

168

executor = get_reusable_executor(

169

max_workers=3,

170

timeout=30,

171

initializer=setup_worker_logging,

172

initargs=(logging.INFO,)

173

)

174

175

# All tasks will run on workers with logging configured

176

results = list(executor.map(logged_computation, [1, 2, 3, 4, 5]))

177

print(f"Results with logging: {results}")

178

```

179

180

### Environment Variable Management

181

182

```python

183

from loky import get_reusable_executor

184

import os

185

186

def get_worker_env(var_name):

187

"""Get environment variable from worker."""

188

return f"{var_name}={os.environ.get(var_name, 'UNSET')}"

189

190

# Executor with custom environment

191

executor = get_reusable_executor(

192

max_workers=2,

193

env={

194

"PROCESSING_MODE": "parallel",

195

"WORKER_POOL": "loky",

196

"DEBUG_LEVEL": "2"

197

}

198

)

199

200

# Check environment variables in workers

201

env_vars = ["PROCESSING_MODE", "WORKER_POOL", "DEBUG_LEVEL", "PATH"]

202

results = list(executor.map(get_worker_env, env_vars))

203

204

for result in results:

205

print(result)

206

```

207

208

## Benefits

209

210

### Performance Advantages

211

212

- **Reduced Startup Overhead**: Workers remain alive between task batches, eliminating process creation costs

213

- **Memory Efficiency**: Imported modules and initialized state persist across task submissions

214

- **Dynamic Scaling**: Executor automatically adjusts worker count based on configuration

215

216

### Resource Management

217

218

- **Automatic Cleanup**: Workers automatically shutdown after configured idle timeout

219

- **Memory Leak Protection**: Built-in monitoring prevents worker memory accumulation

220

- **Graceful Shutdown**: Clean termination of workers when timeout expires

221

222

### Use Cases

223

224

- **Interactive Computing**: Jupyter notebooks and REPL environments

225

- **Batch Processing**: Multiple rounds of parallel computation

226

- **Scientific Computing**: Data analysis workflows with repeated parallel operations

227

- **Web Applications**: Background task processing with consistent resource usage