0
# Apply Functions
1
2
Apply-style parallel execution for single function calls and asynchronous operations. These functions are ideal for submitting individual tasks rather than processing iterables.
3
4
## Capabilities
5
6
### Synchronous Apply
7
8
Execute a single function call synchronously in a worker process.
9
10
```python { .api }
11
def apply(self, func: Callable, args: Any = (), kwargs: Dict = None,
12
callback: Optional[Callable] = None, error_callback: Optional[Callable] = None,
13
worker_init: Optional[Callable] = None, worker_exit: Optional[Callable] = None,
14
task_timeout: Optional[float] = None, worker_init_timeout: Optional[float] = None,
15
worker_exit_timeout: Optional[float] = None) -> Any
16
```
17
18
**Parameters:**
19
- `func` (Callable): Function to execute
20
- `args` (Any): Positional arguments for the function. Default: ()
21
- `kwargs` (Dict): Keyword arguments for the function. Default: None
22
- `callback` (Optional[Callable]): Function called with result when task succeeds
23
- `error_callback` (Optional[Callable]): Function called with exception when task fails
24
- `worker_init` (Optional[Callable]): Function called when worker starts
25
- `worker_exit` (Optional[Callable]): Function called when worker exits
26
- `task_timeout` (Optional[float]): Timeout in seconds for the task
27
- `worker_init_timeout` (Optional[float]): Timeout for worker initialization
28
- `worker_exit_timeout` (Optional[float]): Timeout for worker exit
29
30
**Returns:** The result of the function call
31
32
### Asynchronous Apply
33
34
Execute a single function call asynchronously and return an AsyncResult object.
35
36
```python { .api }
37
def apply_async(self, func: Callable, args: Any = (), kwargs: Dict = None,
38
callback: Optional[Callable] = None, error_callback: Optional[Callable] = None,
39
worker_init: Optional[Callable] = None, worker_exit: Optional[Callable] = None,
40
task_timeout: Optional[float] = None, worker_init_timeout: Optional[float] = None,
41
worker_exit_timeout: Optional[float] = None) -> AsyncResult
42
```
43
44
**Parameters:** Same as `apply()` method
45
46
**Returns:** AsyncResult object for retrieving the result when ready
47
48
## Usage Examples
49
50
### Basic Apply Operations
51
52
```python
53
from mpire import WorkerPool
54
55
def expensive_calculation(x, y, multiplier=1):
56
import time
57
time.sleep(1) # Simulate expensive work
58
return (x + y) * multiplier
59
60
with WorkerPool(n_jobs=4) as pool:
61
# Synchronous apply - blocks until result is ready
62
result = pool.apply(expensive_calculation, args=(10, 20), kwargs={'multiplier': 2})
63
print(result) # 60
64
65
# Asynchronous apply - returns immediately
66
async_result = pool.apply_async(expensive_calculation, args=(5, 15), kwargs={'multiplier': 3})
67
# Do other work...
68
result = async_result.get() # Blocks until ready
69
print(result) # 60
70
```
71
72
### Callback Functions
73
74
```python
75
def process_data(data):
76
# Some processing
77
return data.upper()
78
79
def success_callback(result):
80
print(f"Task completed successfully: {result}")
81
82
def error_callback(exception):
83
print(f"Task failed with error: {exception}")
84
85
with WorkerPool(n_jobs=4) as pool:
86
# Apply with callbacks
87
result = pool.apply(
88
process_data,
89
args=("hello world",),
90
callback=success_callback,
91
error_callback=error_callback
92
)
93
94
# Async apply with callbacks
95
async_result = pool.apply_async(
96
process_data,
97
args=("hello async",),
98
callback=success_callback,
99
error_callback=error_callback
100
)
101
```
102
103
### Multiple Async Tasks
104
105
```python
106
from mpire import WorkerPool
107
108
def compute_factorial(n):
109
import math
110
return math.factorial(n)
111
112
with WorkerPool(n_jobs=4) as pool:
113
# Submit multiple async tasks
114
async_results = []
115
for i in range(10, 20):
116
result = pool.apply_async(compute_factorial, args=(i,))
117
async_results.append(result)
118
119
# Collect results as they become available
120
results = []
121
for async_result in async_results:
122
result = async_result.get(timeout=10) # 10 second timeout
123
results.append(result)
124
125
print("Factorials:", results)
126
```
127
128
### Worker State with Apply
129
130
```python
131
def init_worker(worker_state):
132
worker_state['counter'] = 0
133
134
def increment_counter(worker_state, value):
135
worker_state['counter'] += 1
136
return worker_state['counter'] * value
137
138
with WorkerPool(n_jobs=2, use_worker_state=True) as pool:
139
# Each apply call will reuse the same worker state
140
result1 = pool.apply(increment_counter, args=(10,), worker_init=init_worker)
141
result2 = pool.apply(increment_counter, args=(20,))
142
result3 = pool.apply(increment_counter, args=(30,))
143
144
print(f"Results: {result1}, {result2}, {result3}") # Results depend on worker assignment
145
```
146
147
### Error Handling
148
149
```python
150
def risky_function(x):
151
if x < 0:
152
raise ValueError("Negative values not allowed")
153
return x ** 2
154
155
def handle_error(exception):
156
print(f"Caught exception: {type(exception).__name__}: {exception}")
157
158
with WorkerPool(n_jobs=2) as pool:
159
try:
160
# This will succeed
161
result = pool.apply(risky_function, args=(5,))
162
print(f"Success: {result}")
163
164
# This will fail
165
result = pool.apply(risky_function, args=(-3,), error_callback=handle_error)
166
except Exception as e:
167
print(f"Apply failed: {e}")
168
169
# Async version with error handling
170
async_result = pool.apply_async(risky_function, args=(-5,), error_callback=handle_error)
171
try:
172
result = async_result.get()
173
except Exception as e:
174
print(f"Async apply failed: {e}")
175
```
176
177
### Timeouts
178
179
```python
180
def slow_function(duration):
181
import time
182
time.sleep(duration)
183
return f"Slept for {duration} seconds"
184
185
with WorkerPool(n_jobs=2) as pool:
186
try:
187
# This will succeed
188
result = pool.apply(slow_function, args=(1,), task_timeout=5.0)
189
print(result)
190
191
# This will timeout
192
result = pool.apply(slow_function, args=(10,), task_timeout=2.0)
193
print(result)
194
except TimeoutError:
195
print("Task timed out")
196
197
# Async version with timeout
198
async_result = pool.apply_async(slow_function, args=(3,), task_timeout=5.0)
199
try:
200
result = async_result.get(timeout=2.0) # Different timeout for getting result
201
print(result)
202
except TimeoutError:
203
print("Getting result timed out")
204
```