or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

apply-functions.mdasync-results.mddashboard-integration.mdexception-handling.mdindex.mdparallel-map.mdperformance-insights.mdutility-functions.mdworker-configuration.mdworkerpool-management.md

apply-functions.mddocs/

0

# Apply Functions

1

2

Apply-style parallel execution for single function calls and asynchronous operations. These functions are ideal for submitting individual tasks rather than processing iterables.

3

4

## Capabilities

5

6

### Synchronous Apply

7

8

Execute a single function call synchronously in a worker process.

9

10

```python { .api }

11

def apply(self, func: Callable, args: Any = (), kwargs: Dict = None,

12

callback: Optional[Callable] = None, error_callback: Optional[Callable] = None,

13

worker_init: Optional[Callable] = None, worker_exit: Optional[Callable] = None,

14

task_timeout: Optional[float] = None, worker_init_timeout: Optional[float] = None,

15

worker_exit_timeout: Optional[float] = None) -> Any

16

```

17

18

**Parameters:**

19

- `func` (Callable): Function to execute

20

- `args` (Any): Positional arguments for the function. Default: ()

21

- `kwargs` (Dict): Keyword arguments for the function. Default: None

22

- `callback` (Optional[Callable]): Function called with result when task succeeds

23

- `error_callback` (Optional[Callable]): Function called with exception when task fails

24

- `worker_init` (Optional[Callable]): Function called when worker starts

25

- `worker_exit` (Optional[Callable]): Function called when worker exits

26

- `task_timeout` (Optional[float]): Timeout in seconds for the task

27

- `worker_init_timeout` (Optional[float]): Timeout for worker initialization

28

- `worker_exit_timeout` (Optional[float]): Timeout for worker exit

29

30

**Returns:** The result of the function call

31

32

### Asynchronous Apply

33

34

Execute a single function call asynchronously and return an AsyncResult object.

35

36

```python { .api }

37

def apply_async(self, func: Callable, args: Any = (), kwargs: Dict = None,

38

callback: Optional[Callable] = None, error_callback: Optional[Callable] = None,

39

worker_init: Optional[Callable] = None, worker_exit: Optional[Callable] = None,

40

task_timeout: Optional[float] = None, worker_init_timeout: Optional[float] = None,

41

worker_exit_timeout: Optional[float] = None) -> AsyncResult

42

```

43

44

**Parameters:** Same as `apply()` method

45

46

**Returns:** AsyncResult object for retrieving the result when ready

47

48

## Usage Examples

49

50

### Basic Apply Operations

51

52

```python

53

from mpire import WorkerPool

54

55

def expensive_calculation(x, y, multiplier=1):

56

import time

57

time.sleep(1) # Simulate expensive work

58

return (x + y) * multiplier

59

60

with WorkerPool(n_jobs=4) as pool:

61

# Synchronous apply - blocks until result is ready

62

result = pool.apply(expensive_calculation, args=(10, 20), kwargs={'multiplier': 2})

63

print(result) # 60

64

65

# Asynchronous apply - returns immediately

66

async_result = pool.apply_async(expensive_calculation, args=(5, 15), kwargs={'multiplier': 3})

67

# Do other work...

68

result = async_result.get() # Blocks until ready

69

print(result) # 60

70

```

71

72

### Callback Functions

73

74

```python

75

def process_data(data):

76

# Some processing

77

return data.upper()

78

79

def success_callback(result):

80

print(f"Task completed successfully: {result}")

81

82

def error_callback(exception):

83

print(f"Task failed with error: {exception}")

84

85

with WorkerPool(n_jobs=4) as pool:

86

# Apply with callbacks

87

result = pool.apply(

88

process_data,

89

args=("hello world",),

90

callback=success_callback,

91

error_callback=error_callback

92

)

93

94

# Async apply with callbacks

95

async_result = pool.apply_async(

96

process_data,

97

args=("hello async",),

98

callback=success_callback,

99

error_callback=error_callback

100

)

101

```

102

103

### Multiple Async Tasks

104

105

```python

106

from mpire import WorkerPool

107

108

def compute_factorial(n):

109

import math

110

return math.factorial(n)

111

112

with WorkerPool(n_jobs=4) as pool:

113

# Submit multiple async tasks

114

async_results = []

115

for i in range(10, 20):

116

result = pool.apply_async(compute_factorial, args=(i,))

117

async_results.append(result)

118

119

# Collect results as they become available

120

results = []

121

for async_result in async_results:

122

result = async_result.get(timeout=10) # 10 second timeout

123

results.append(result)

124

125

print("Factorials:", results)

126

```

127

128

### Worker State with Apply

129

130

```python

131

def init_worker(worker_state):

132

worker_state['counter'] = 0

133

134

def increment_counter(worker_state, value):

135

worker_state['counter'] += 1

136

return worker_state['counter'] * value

137

138

with WorkerPool(n_jobs=2, use_worker_state=True) as pool:

139

# Each apply call will reuse the same worker state

140

result1 = pool.apply(increment_counter, args=(10,), worker_init=init_worker)

141

result2 = pool.apply(increment_counter, args=(20,))

142

result3 = pool.apply(increment_counter, args=(30,))

143

144

print(f"Results: {result1}, {result2}, {result3}") # Results depend on worker assignment

145

```

146

147

### Error Handling

148

149

```python

150

def risky_function(x):

151

if x < 0:

152

raise ValueError("Negative values not allowed")

153

return x ** 2

154

155

def handle_error(exception):

156

print(f"Caught exception: {type(exception).__name__}: {exception}")

157

158

with WorkerPool(n_jobs=2) as pool:

159

try:

160

# This will succeed

161

result = pool.apply(risky_function, args=(5,))

162

print(f"Success: {result}")

163

164

# This will fail

165

result = pool.apply(risky_function, args=(-3,), error_callback=handle_error)

166

except Exception as e:

167

print(f"Apply failed: {e}")

168

169

# Async version with error handling

170

async_result = pool.apply_async(risky_function, args=(-5,), error_callback=handle_error)

171

try:

172

result = async_result.get()

173

except Exception as e:

174

print(f"Async apply failed: {e}")

175

```

176

177

### Timeouts

178

179

```python

180

def slow_function(duration):

181

import time

182

time.sleep(duration)

183

return f"Slept for {duration} seconds"

184

185

with WorkerPool(n_jobs=2) as pool:

186

try:

187

# This will succeed

188

result = pool.apply(slow_function, args=(1,), task_timeout=5.0)

189

print(result)

190

191

# This will timeout

192

result = pool.apply(slow_function, args=(10,), task_timeout=2.0)

193

print(result)

194

except TimeoutError:

195

print("Task timed out")

196

197

# Async version with timeout

198

async_result = pool.apply_async(slow_function, args=(3,), task_timeout=5.0)

199

try:

200

result = async_result.get(timeout=2.0) # Different timeout for getting result

201

print(result)

202

except TimeoutError:

203

print("Getting result timed out")

204

```