0
# Async Utilities
1
2
Custom utilities specific to asyncstdlib for safely working with async iterables, borrowing iterators, scoped iteration, and bridging sync/async contexts. These tools provide essential safety and convenience features for async programming.
3
4
## Capabilities
5
6
### Safe Iterator Borrowing
7
8
Safely borrow async iterables without risking premature closure.
9
10
```python { .api }
11
def borrow(iterator: AsyncGenerator[T, S], /) -> AsyncGenerator[T, S]: ...
12
def borrow(iterator: AsyncIterator[T], /) -> AsyncIterator[T]:
13
"""
14
Safely borrow async iterable without closing it.
15
16
Parameters:
17
- iterator: AsyncIterator[T] or AsyncGenerator[T, S] - Iterator to borrow
18
19
Returns:
20
AsyncIterator[T] or AsyncGenerator[T, S] - Borrowed iterator that won't close the original
21
22
Note:
23
Prevents the borrowed iterator from closing the original iterable
24
when it goes out of scope, allowing safe sharing of iterables.
25
"""
26
```
27
28
### Scoped Iteration
29
30
Create scoped async iterators with automatic cleanup.
31
32
```python { .api }
33
def scoped_iter(iterable: AnyIterable[T], /) -> AsyncContextManager[AsyncIterator[T]]:
34
"""
35
Create scoped async iterator with automatic cleanup.
36
37
Parameters:
38
- iterable: AnyIterable[T] - Iterable to create scoped iterator from
39
40
Returns:
41
AsyncContextManager[AsyncIterator[T]] - Context manager yielding scoped iterator
42
43
Usage:
44
async with scoped_iter(iterable) as iterator:
45
async for item in iterator:
46
# Safe to break or return - iterator will be cleaned up
47
pass
48
"""
49
```
50
51
### Awaitable Processing
52
53
Work with multiple awaitables and async functions.
54
55
```python { .api }
56
async def await_each(awaitables: Iterable[Awaitable[T]], /) -> AsyncIterable[T]:
57
"""
58
Iterate through awaitables and await each item.
59
60
Parameters:
61
- awaitables: Iterable[Awaitable[T]] - Iterable of awaitables to process
62
63
Returns:
64
AsyncIterable[T] - Async iterable yielding awaited results
65
66
Note:
67
Converts an iterable of awaitables into an async iterator of results.
68
"""
69
70
async def apply(__func: Callable[[T1], T], __arg1: Awaitable[T1], /) -> T: ...
71
async def apply(__func: Callable[[T1, T2], T], __arg1: Awaitable[T1], __arg2: Awaitable[T2], /) -> T: ...
72
async def apply(__func: Callable[[T1, T2, T3], T], __arg1: Awaitable[T1], __arg2: Awaitable[T2], __arg3: Awaitable[T3], /) -> T: ...
73
async def apply(__func: Callable[[T1, T2, T3, T4], T], __arg1: Awaitable[T1], __arg2: Awaitable[T2], __arg3: Awaitable[T3], __arg4: Awaitable[T4], /) -> T: ...
74
async def apply(__func: Callable[[T1, T2, T3, T4, T5], T], __arg1: Awaitable[T1], __arg2: Awaitable[T2], __arg3: Awaitable[T3], __arg4: Awaitable[T4], __arg5: Awaitable[T5], /) -> T: ...
75
async def apply(__func: Callable[..., T], /, *args: Awaitable[Any], **kwargs: Awaitable[Any]) -> T:
76
"""
77
Apply function to awaited arguments.
78
79
Parameters:
80
- __func: Callable[..., T] - Function to apply
81
- *args: Awaitable[Any] - Awaitable arguments to pass to function
82
- **kwargs: Awaitable[Any] - Awaitable keyword arguments to pass to function
83
84
Returns:
85
T - Result from function call
86
"""
87
```
88
89
### Iterator Multiplexing
90
91
Work with multiple async iterables simultaneously.
92
93
```python { .api }
94
def any_iter(*iterables: AnyIterable[T]) -> AsyncIterator[T]:
95
"""
96
Iterate over multiple async iterables simultaneously.
97
98
Parameters:
99
- *iterables: AnyIterable[T] - Variable number of iterables
100
101
Returns:
102
AsyncIterator[T] - Iterator yielding items from any iterable as available
103
104
Note:
105
Items are yielded as they become available from any iterable,
106
not in round-robin fashion.
107
"""
108
```
109
110
### Sync/Async Bridge
111
112
Bridge between synchronous and asynchronous contexts.
113
114
```python { .api }
115
def sync(function: Callable[..., Awaitable[T]], /) -> Callable[..., Awaitable[T]]: ...
116
def sync(function: Callable[..., T], /) -> Callable[..., Awaitable[T]]:
117
"""
118
Convert function to be async-compatible.
119
120
Parameters:
121
- function: Callable[..., Awaitable[T]] or Callable[..., T] - Function to convert
122
123
Returns:
124
Callable[..., Awaitable[T]] - Async-compatible function
125
126
Note:
127
Creates new event loop if none is running, or runs in existing loop.
128
Use with caution in async contexts.
129
"""
130
```
131
132
## Usage Examples
133
134
### Safe Iterator Sharing
135
```python
136
from asyncstdlib import borrow, list as alist
137
138
async def sharing_example():
139
async def data_source():
140
for i in range(10):
141
print(f"Generating {i}")
142
yield i
143
144
source = data_source()
145
146
# Borrow the iterator safely
147
borrowed1 = borrow(source)
148
borrowed2 = borrow(source)
149
150
# Both can consume without closing the original
151
first_items = []
152
async for item in borrowed1:
153
first_items.append(item)
154
if len(first_items) == 3:
155
break # Safe to break - won't close source
156
157
# Continue with the original or another borrowed iterator
158
remaining = await alist(borrowed2)
159
print(f"First 3: {first_items}") # [0, 1, 2]
160
print(f"Remaining: {remaining}") # [3, 4, 5, 6, 7, 8, 9]
161
```
162
163
### Scoped Iteration with Cleanup
164
```python
165
from asyncstdlib import scoped_iter
166
import asyncio
167
168
async def scoped_example():
169
async def cleanup_aware_source():
170
try:
171
for i in range(10):
172
yield i
173
await asyncio.sleep(0.1)
174
finally:
175
print("Source cleaned up")
176
177
# Automatic cleanup even with early exit
178
async with scoped_iter(cleanup_aware_source()) as iterator:
179
async for item in iterator:
180
print(f"Processing {item}")
181
if item == 3:
182
break # Cleanup happens automatically
183
# "Source cleaned up" is printed here
184
185
print("Scope exited safely")
186
```
187
188
### Concurrent Awaitable Processing
189
```python
190
from asyncstdlib import await_each
191
import asyncio
192
193
async def concurrent_example():
194
async def fetch_data(url, delay):
195
await asyncio.sleep(delay)
196
return f"Data from {url}"
197
198
# Process multiple requests concurrently
199
requests = [
200
fetch_data("api1.com", 0.3),
201
fetch_data("api2.com", 0.1),
202
fetch_data("api3.com", 0.2)
203
]
204
205
# Results yielded as they complete
206
async for result in await_each(requests):
207
print(f"Completed: {result}")
208
209
# Output (order may vary):
210
# Completed: Data from api2.com (completes first - 0.1s)
211
# Completed: Data from api3.com (completes second - 0.2s)
212
# Completed: Data from api1.com (completes last - 0.3s)
213
```
214
215
### Multiplexed Iteration
216
```python
217
from asyncstdlib import any_iter
218
import asyncio
219
220
async def multiplex_example():
221
async def fast_source():
222
for i in range(5):
223
await asyncio.sleep(0.1)
224
yield f"Fast-{i}"
225
226
async def slow_source():
227
for i in range(3):
228
await asyncio.sleep(0.25)
229
yield f"Slow-{i}"
230
231
# Items yielded as available from any source
232
async for item in any_iter(fast_source(), slow_source()):
233
print(f"Received: {item}")
234
235
# Possible output (timing-dependent):
236
# Received: Fast-0
237
# Received: Fast-1
238
# Received: Slow-0
239
# Received: Fast-2
240
# Received: Fast-3
241
# Received: Fast-4
242
# Received: Slow-1
243
# Received: Slow-2
244
```
245
246
### Function Application and Sync Bridge
247
```python
248
from asyncstdlib import apply, sync
249
import asyncio
250
251
async def function_example():
252
async def async_computation(x, y, multiplier=1):
253
await asyncio.sleep(0.1) # Simulate async work
254
return (x + y) * multiplier
255
256
# Apply function with args and kwargs
257
result = await apply(async_computation, 5, 10, multiplier=2)
258
print(f"Result: {result}") # Result: 30
259
260
# Use sync bridge (be careful in async contexts)
261
def sync_wrapper():
262
return sync(async_computation, 3, 7, multiplier=3)
263
264
# sync_result = sync_wrapper() # 30
265
```
266
267
### Advanced Scoped Iteration Patterns
268
```python
269
from asyncstdlib import scoped_iter, borrow
270
import asyncio
271
272
async def advanced_scoped():
273
async def database_cursor():
274
"""Simulate database cursor that needs cleanup."""
275
try:
276
for i in range(100):
277
yield {"id": i, "data": f"record_{i}"}
278
await asyncio.sleep(0.01)
279
finally:
280
print("Database cursor closed")
281
282
# Process data in batches with guaranteed cleanup
283
batch_size = 10
284
cursor = database_cursor()
285
286
async with scoped_iter(cursor) as scoped_cursor:
287
batch = []
288
async for record in scoped_cursor:
289
batch.append(record)
290
291
if len(batch) >= batch_size:
292
# Process batch
293
print(f"Processing batch of {len(batch)} records")
294
batch.clear()
295
296
# Simulate processing error after 3 batches
297
if record["id"] >= 30:
298
print("Processing error - early exit")
299
break
300
301
# Process remaining records
302
if batch:
303
print(f"Processing final batch of {len(batch)} records")
304
305
# Database cursor is guaranteed to be closed even with early exit
306
```
307
308
### Error Handling with Safe Iteration
309
```python
310
from asyncstdlib import scoped_iter, borrow
311
312
async def error_handling_example():
313
async def fallible_source():
314
try:
315
for i in range(10):
316
if i == 5:
317
raise ValueError("Simulated error")
318
yield i
319
finally:
320
print("Source cleanup in finally block")
321
322
# Safe error handling with guaranteed cleanup
323
try:
324
async with scoped_iter(fallible_source()) as iterator:
325
async for item in iterator:
326
print(f"Processing {item}")
327
except ValueError as e:
328
print(f"Caught error: {e}")
329
# Source cleanup still happens due to scoped_iter
330
331
print("Error handled, resources cleaned up")
332
```