0
# Asynchronous Decorators
1
2
AsyncIO-compatible decorators for thread and process-based execution that return `asyncio.Future` objects. Perfect for integration with async/await patterns and AsyncIO applications, allowing seamless mixing of concurrent execution with asynchronous programming.
3
4
## Capabilities
5
6
### AsyncIO Thread Decorator
7
8
Executes the decorated function in a separate thread and returns an `asyncio.Future` object. Ideal for I/O-bound tasks in AsyncIO applications where you need to call synchronous functions without blocking the event loop.
9
10
```python { .api }
11
def thread(
12
func: Callable = None,
13
*,
14
name: Optional[str] = None,
15
daemon: bool = True,
16
pool: Optional[ThreadPool] = None
17
) -> Callable[..., asyncio.Future]:
18
"""
19
AsyncIO decorator for thread-based concurrent execution.
20
21
Parameters:
22
- func: Function to decorate (when used without parameters)
23
- name: Thread name for identification and debugging
24
- daemon: Whether thread runs as daemon (doesn't prevent program exit)
25
- pool: Existing ThreadPool instance to use instead of creating new thread
26
27
Returns:
28
Decorated function that returns asyncio.Future when called
29
"""
30
```
31
32
#### Usage Examples
33
34
```python
35
import asyncio
36
from pebble.asynchronous import thread
37
import time
38
import requests
39
40
# Simple usage for blocking I/O
41
@thread
42
def fetch_data(url):
43
# Synchronous I/O that would block event loop
44
response = requests.get(url)
45
return response.json()
46
47
# Usage with parameters
48
@thread(name="file-processor", daemon=False)
49
def process_file(filename):
50
with open(filename, 'r') as f:
51
return len(f.read())
52
53
# Using with existing pool
54
from pebble import ThreadPool
55
56
pool = ThreadPool(max_workers=4)
57
58
@thread(pool=pool)
59
def cpu_task(n):
60
return sum(i ** 2 for i in range(n))
61
62
# AsyncIO application
63
async def main():
64
# Schedule multiple concurrent operations
65
tasks = [
66
fetch_data("https://api.example.com/data1"),
67
fetch_data("https://api.example.com/data2"),
68
process_file("large_file.txt"),
69
cpu_task(1000)
70
]
71
72
# Wait for all to complete
73
results = await asyncio.gather(*tasks)
74
print(f"Results: {results}")
75
76
# Or process as they complete
77
for coro in asyncio.as_completed(tasks):
78
result = await coro
79
print(f"Completed: {result}")
80
81
# Run the AsyncIO application
82
asyncio.run(main())
83
```
84
85
### AsyncIO Process Decorator
86
87
Executes the decorated function in a separate process and returns an `asyncio.Future` object. Perfect for CPU-intensive tasks in AsyncIO applications that need true parallelism without blocking the event loop.
88
89
```python { .api }
90
def process(
91
func: Callable = None,
92
*,
93
name: Optional[str] = None,
94
daemon: bool = True,
95
timeout: Optional[float] = None,
96
mp_context: Optional[multiprocessing.context.BaseContext] = None,
97
pool: Optional[ProcessPool] = None
98
) -> Callable[..., asyncio.Future]:
99
"""
100
AsyncIO decorator for process-based concurrent execution.
101
102
Parameters:
103
- func: Function to decorate (when used without parameters)
104
- name: Process name for identification and debugging
105
- daemon: Whether process runs as daemon (doesn't prevent program exit)
106
- timeout: Maximum execution time in seconds (raises TimeoutError if exceeded)
107
- mp_context: Multiprocessing context for process creation
108
- pool: Existing ProcessPool instance to use instead of creating new process
109
110
Returns:
111
Decorated function that returns asyncio.Future when called
112
"""
113
```
114
115
#### Usage Examples
116
117
```python
118
import asyncio
119
import multiprocessing
120
from pebble.asynchronous import process
121
122
# CPU-intensive task
123
@process
124
def heavy_computation(data):
125
# Simulate heavy CPU work
126
result = 0
127
for item in data:
128
result += item ** 3
129
return result
130
131
# With timeout for long-running tasks
132
@process(timeout=30.0)
133
def data_analysis(dataset):
134
# Simulate data analysis that might take a while
135
import time
136
time.sleep(5) # Simulate processing
137
return {"mean": sum(dataset) / len(dataset), "size": len(dataset)}
138
139
# Using custom multiprocessing context
140
ctx = multiprocessing.get_context('spawn')
141
142
@process(mp_context=ctx, name="isolated-worker")
143
def isolated_task(config):
144
# Task that needs process isolation
145
return config["value"] * 2
146
147
# AsyncIO application with CPU-intensive tasks
148
async def data_pipeline():
149
# Generate data
150
datasets = [
151
list(range(1000 * i, 1000 * (i + 1)))
152
for i in range(5)
153
]
154
155
# Process datasets concurrently
156
computation_tasks = [
157
heavy_computation(dataset)
158
for dataset in datasets
159
]
160
161
analysis_tasks = [
162
data_analysis(dataset)
163
for dataset in datasets
164
]
165
166
# Wait for computations
167
print("Starting heavy computations...")
168
computation_results = await asyncio.gather(*computation_tasks)
169
170
# Wait for analysis
171
print("Starting data analysis...")
172
analysis_results = await asyncio.gather(*analysis_tasks)
173
174
print(f"Computation results: {computation_results}")
175
print(f"Analysis results: {analysis_results}")
176
177
# Mix with other async operations
178
isolated_result = await isolated_task({"value": 42})
179
print(f"Isolated result: {isolated_result}")
180
181
# Run the pipeline
182
asyncio.run(data_pipeline())
183
```
184
185
### Integration with AsyncIO Patterns
186
187
The asynchronous decorators integrate seamlessly with AsyncIO patterns and utilities:
188
189
```python
190
import asyncio
191
from pebble.asynchronous import thread, process
192
193
@thread
194
def sync_io_operation(url):
195
import requests
196
return requests.get(url).json()
197
198
@process
199
def cpu_bound_task(n):
200
return sum(i * i for i in range(n))
201
202
async def advanced_patterns():
203
# Using asyncio.wait with timeout
204
tasks = [
205
sync_io_operation("https://api1.example.com"),
206
sync_io_operation("https://api2.example.com"),
207
cpu_bound_task(1000000)
208
]
209
210
done, pending = await asyncio.wait(
211
tasks,
212
timeout=10.0,
213
return_when=asyncio.FIRST_COMPLETED
214
)
215
216
# Cancel pending tasks
217
for task in pending:
218
task.cancel()
219
220
# Process completed tasks
221
for task in done:
222
try:
223
result = await task
224
print(f"Completed: {result}")
225
except Exception as e:
226
print(f"Failed: {e}")
227
228
# Error handling with AsyncIO
229
async def error_handling_example():
230
@process(timeout=2.0)
231
def might_timeout():
232
import time
233
time.sleep(5) # Will timeout
234
return "Done"
235
236
@thread
237
def might_fail():
238
raise ValueError("Something went wrong")
239
240
try:
241
result1 = await might_timeout()
242
except asyncio.TimeoutError:
243
print("Process timed out")
244
245
try:
246
result2 = await might_fail()
247
except ValueError as e:
248
print(f"Thread failed: {e}")
249
250
asyncio.run(advanced_patterns())
251
asyncio.run(error_handling_example())
252
```
253
254
### AsyncIO Context Management
255
256
Using asynchronous decorators with AsyncIO context managers and resource management:
257
258
```python
259
import asyncio
260
from pebble.asynchronous import thread
261
from contextlib import asynccontextmanager
262
263
@thread
264
def database_query(query):
265
# Simulate database query
266
import time
267
time.sleep(1)
268
return f"Result for: {query}"
269
270
@asynccontextmanager
271
async def database_session():
272
print("Opening database session")
273
try:
274
yield "session"
275
finally:
276
print("Closing database session")
277
278
async def database_operations():
279
async with database_session() as session:
280
# Execute multiple queries concurrently
281
queries = [
282
database_query("SELECT * FROM users"),
283
database_query("SELECT * FROM orders"),
284
database_query("SELECT * FROM products")
285
]
286
287
results = await asyncio.gather(*queries)
288
return results
289
290
# Resource cleanup with asyncio
291
async def resource_management():
292
tasks = []
293
294
try:
295
# Start multiple background tasks
296
for i in range(5):
297
task = database_query(f"Query {i}")
298
tasks.append(task)
299
300
# Wait for all with timeout
301
results = await asyncio.wait_for(
302
asyncio.gather(*tasks),
303
timeout=10.0
304
)
305
306
return results
307
308
except asyncio.TimeoutError:
309
print("Operations timed out, cleaning up...")
310
# Tasks are automatically cancelled by wait_for
311
return None
312
313
asyncio.run(database_operations())
314
asyncio.run(resource_management())
315
```