The missing async toolbox - re-implements functions and classes of the Python standard library to make them compatible with async callables, iterables and context managers
84
Quality
Pending
Does it follow best practices?
Impact
84%
3.36xAverage score across 10 eval scenarios
Custom utilities specific to asyncstdlib for safely working with async iterables, borrowing iterators, scoped iteration, and bridging sync/async contexts. These tools provide essential safety and convenience features for async programming.
Safely borrow async iterables without risking premature closure.
def borrow(iterator: AsyncGenerator[T, S], /) -> AsyncGenerator[T, S]: ...
def borrow(iterator: AsyncIterator[T], /) -> AsyncIterator[T]:
"""
Safely borrow async iterable without closing it.
Parameters:
- iterator: AsyncIterator[T] or AsyncGenerator[T, S] - Iterator to borrow
Returns:
AsyncIterator[T] or AsyncGenerator[T, S] - Borrowed iterator that won't close the original
Note:
Prevents the borrowed iterator from closing the original iterable
when it goes out of scope, allowing safe sharing of iterables.
"""Create scoped async iterators with automatic cleanup.
def scoped_iter(iterable: AnyIterable[T], /) -> AsyncContextManager[AsyncIterator[T]]:
"""
Create scoped async iterator with automatic cleanup.
Parameters:
- iterable: AnyIterable[T] - Iterable to create scoped iterator from
Returns:
AsyncContextManager[AsyncIterator[T]] - Context manager yielding scoped iterator
Usage:
async with scoped_iter(iterable) as iterator:
async for item in iterator:
# Safe to break or return - iterator will be cleaned up
pass
"""Work with multiple awaitables and async functions.
async def await_each(awaitables: Iterable[Awaitable[T]], /) -> AsyncIterable[T]:
"""
Iterate through awaitables and await each item.
Parameters:
- awaitables: Iterable[Awaitable[T]] - Iterable of awaitables to process
Returns:
AsyncIterable[T] - Async iterable yielding awaited results
Note:
Converts an iterable of awaitables into an async iterator of results.
"""
async def apply(__func: Callable[[T1], T], __arg1: Awaitable[T1], /) -> T: ...
async def apply(__func: Callable[[T1, T2], T], __arg1: Awaitable[T1], __arg2: Awaitable[T2], /) -> T: ...
async def apply(__func: Callable[[T1, T2, T3], T], __arg1: Awaitable[T1], __arg2: Awaitable[T2], __arg3: Awaitable[T3], /) -> T: ...
async def apply(__func: Callable[[T1, T2, T3, T4], T], __arg1: Awaitable[T1], __arg2: Awaitable[T2], __arg3: Awaitable[T3], __arg4: Awaitable[T4], /) -> T: ...
async def apply(__func: Callable[[T1, T2, T3, T4, T5], T], __arg1: Awaitable[T1], __arg2: Awaitable[T2], __arg3: Awaitable[T3], __arg4: Awaitable[T4], __arg5: Awaitable[T5], /) -> T: ...
async def apply(__func: Callable[..., T], /, *args: Awaitable[Any], **kwargs: Awaitable[Any]) -> T:
"""
Apply function to awaited arguments.
Parameters:
- __func: Callable[..., T] - Function to apply
- *args: Awaitable[Any] - Awaitable arguments to pass to function
- **kwargs: Awaitable[Any] - Awaitable keyword arguments to pass to function
Returns:
T - Result from function call
"""Work with multiple async iterables simultaneously.
def any_iter(*iterables: AnyIterable[T]) -> AsyncIterator[T]:
"""
Iterate over multiple async iterables simultaneously.
Parameters:
- *iterables: AnyIterable[T] - Variable number of iterables
Returns:
AsyncIterator[T] - Iterator yielding items from any iterable as available
Note:
Items are yielded as they become available from any iterable,
not in round-robin fashion.
"""Bridge between synchronous and asynchronous contexts.
def sync(function: Callable[..., Awaitable[T]], /) -> Callable[..., Awaitable[T]]: ...
def sync(function: Callable[..., T], /) -> Callable[..., Awaitable[T]]:
"""
Convert function to be async-compatible.
Parameters:
- function: Callable[..., Awaitable[T]] or Callable[..., T] - Function to convert
Returns:
Callable[..., Awaitable[T]] - Async-compatible function
Note:
Creates new event loop if none is running, or runs in existing loop.
Use with caution in async contexts.
"""from asyncstdlib import borrow, list as alist
async def sharing_example():
async def data_source():
for i in range(10):
print(f"Generating {i}")
yield i
source = data_source()
# Borrow the iterator safely
borrowed1 = borrow(source)
borrowed2 = borrow(source)
# Both can consume without closing the original
first_items = []
async for item in borrowed1:
first_items.append(item)
if len(first_items) == 3:
break # Safe to break - won't close source
# Continue with the original or another borrowed iterator
remaining = await alist(borrowed2)
print(f"First 3: {first_items}") # [0, 1, 2]
print(f"Remaining: {remaining}") # [3, 4, 5, 6, 7, 8, 9]from asyncstdlib import scoped_iter
import asyncio
async def scoped_example():
async def cleanup_aware_source():
try:
for i in range(10):
yield i
await asyncio.sleep(0.1)
finally:
print("Source cleaned up")
# Automatic cleanup even with early exit
async with scoped_iter(cleanup_aware_source()) as iterator:
async for item in iterator:
print(f"Processing {item}")
if item == 3:
break # Cleanup happens automatically
# "Source cleaned up" is printed here
print("Scope exited safely")from asyncstdlib import await_each
import asyncio
async def concurrent_example():
async def fetch_data(url, delay):
await asyncio.sleep(delay)
return f"Data from {url}"
# Process multiple requests concurrently
requests = [
fetch_data("api1.com", 0.3),
fetch_data("api2.com", 0.1),
fetch_data("api3.com", 0.2)
]
# Results yielded as they complete
async for result in await_each(requests):
print(f"Completed: {result}")
# Output (order may vary):
# Completed: Data from api2.com (completes first - 0.1s)
# Completed: Data from api3.com (completes second - 0.2s)
# Completed: Data from api1.com (completes last - 0.3s)from asyncstdlib import any_iter
import asyncio
async def multiplex_example():
async def fast_source():
for i in range(5):
await asyncio.sleep(0.1)
yield f"Fast-{i}"
async def slow_source():
for i in range(3):
await asyncio.sleep(0.25)
yield f"Slow-{i}"
# Items yielded as available from any source
async for item in any_iter(fast_source(), slow_source()):
print(f"Received: {item}")
# Possible output (timing-dependent):
# Received: Fast-0
# Received: Fast-1
# Received: Slow-0
# Received: Fast-2
# Received: Fast-3
# Received: Fast-4
# Received: Slow-1
# Received: Slow-2from asyncstdlib import apply, sync
import asyncio
async def function_example():
async def async_computation(x, y, multiplier=1):
await asyncio.sleep(0.1) # Simulate async work
return (x + y) * multiplier
# Apply function with args and kwargs
result = await apply(async_computation, 5, 10, multiplier=2)
print(f"Result: {result}") # Result: 30
# Use sync bridge (be careful in async contexts)
def sync_wrapper():
return sync(async_computation, 3, 7, multiplier=3)
# sync_result = sync_wrapper() # 30from asyncstdlib import scoped_iter, borrow
import asyncio
async def advanced_scoped():
async def database_cursor():
"""Simulate database cursor that needs cleanup."""
try:
for i in range(100):
yield {"id": i, "data": f"record_{i}"}
await asyncio.sleep(0.01)
finally:
print("Database cursor closed")
# Process data in batches with guaranteed cleanup
batch_size = 10
cursor = database_cursor()
async with scoped_iter(cursor) as scoped_cursor:
batch = []
async for record in scoped_cursor:
batch.append(record)
if len(batch) >= batch_size:
# Process batch
print(f"Processing batch of {len(batch)} records")
batch.clear()
# Simulate processing error after 3 batches
if record["id"] >= 30:
print("Processing error - early exit")
break
# Process remaining records
if batch:
print(f"Processing final batch of {len(batch)} records")
# Database cursor is guaranteed to be closed even with early exitfrom asyncstdlib import scoped_iter, borrow
async def error_handling_example():
async def fallible_source():
try:
for i in range(10):
if i == 5:
raise ValueError("Simulated error")
yield i
finally:
print("Source cleanup in finally block")
# Safe error handling with guaranteed cleanup
try:
async with scoped_iter(fallible_source()) as iterator:
async for item in iterator:
print(f"Processing {item}")
except ValueError as e:
print(f"Caught error: {e}")
# Source cleanup still happens due to scoped_iter
print("Error handled, resources cleaned up")Install with Tessl CLI
npx tessl i tessl/pypi-asyncstdlibevals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10