The missing async toolbox - re-implements functions and classes of the Python standard library to make them compatible with async callables, iterables and context managers
84
Quality
Pending
Does it follow best practices?
Impact
84%
3.36xAverage score across 10 eval scenarios
Async versions of itertools functions providing powerful iteration patterns including infinite iterators, filtering, grouping, and combinatorial functions. These tools enable sophisticated data processing workflows with async iterables.
Generate infinite sequences from finite inputs.
def cycle(iterable: AnyIterable[T]) -> AsyncIterator[T]:
"""
Cycle through iterable infinitely.
Parameters:
- iterable: AnyIterable[T] - Iterable to cycle through
Returns:
AsyncIterator[T] - Infinite iterator cycling through items
"""Functions for accumulating values and grouping items.
def accumulate(iterable: AnyIterable[ADD]) -> AsyncIterator[ADD]: ...
def accumulate(iterable: AnyIterable[ADD], *, initial: ADD) -> AsyncIterator[ADD]: ...
def accumulate(iterable: AnyIterable[T], function: Callable[[T, T], T] | Callable[[T, T], Awaitable[T]]) -> AsyncIterator[T]: ...
def accumulate(iterable: AnyIterable[T2], function: Callable[[T1, T2], T1] | Callable[[T1, T2], Awaitable[T1]], *, initial: T1) -> AsyncIterator[T1]:
"""
Running totals or accumulated values.
Parameters:
- iterable: AnyIterable[T] - Input iterable
- function: Callable[[T, T], T] or Callable[[T, T], Awaitable[T]], optional - Binary function (default: operator.add)
- initial: T, optional - Initial value
Returns:
AsyncIterator[T] - Iterator of accumulated values
"""
def batched(iterable, n, strict=False):
"""
Group items into fixed-size batches.
Parameters:
- iterable: AnyIterable[T] - Input iterable
- n: int - Batch size
- strict: bool - If True, raise ValueError if final batch is incomplete
Returns:
AsyncIterator[Tuple[T, ...]] - Iterator of batches
Raises:
ValueError - If strict=True and final batch is incomplete
"""Combine multiple iterables in various ways.
class chain:
"""
Chain multiple iterables into a single iterator.
"""
def __init__(self, *iterables):
"""
Parameters:
- *iterables: AnyIterable[T] - Variable number of iterables to chain
"""
@classmethod
def from_iterable(cls, iterable):
"""
Create chain from iterable of iterables.
Parameters:
- iterable: AnyIterable[AnyIterable[T]] - Iterable of iterables
Returns:
chain[T] - Chain iterator
"""
async def __anext__(self):
"""Get next item from chained iterables."""
async def aclose(self):
"""Close all underlying iterables."""
def zip_longest(*iterables, fillvalue=None):
"""
Zip iterables with padding for unequal lengths.
Parameters:
- *iterables: AnyIterable - Iterables to zip
- fillvalue: T - Value to use for padding shorter iterables
Returns:
AsyncIterator[Tuple] - Iterator of tuples with padding
"""Functions for conditional iteration and selection.
def compress(data, selectors):
"""
Filter data based on corresponding selectors.
Parameters:
- data: AnyIterable[T] - Data to filter
- selectors: AnyIterable[Any] - Boolean selectors
Returns:
AsyncIterator[T] - Iterator of selected data items
"""
def dropwhile(predicate, iterable):
"""
Drop items while predicate is true, then yield remaining items.
Parameters:
- predicate: Callable[[T], bool] - Test function
- iterable: AnyIterable[T] - Input iterable
Returns:
AsyncIterator[T] - Iterator starting after predicate becomes false
"""
def filterfalse(predicate, iterable):
"""
Filter items where predicate is false.
Parameters:
- predicate: Callable[[T], bool] or None - Test function
- iterable: AnyIterable[T] - Input iterable
Returns:
AsyncIterator[T] - Iterator of items where predicate is false
"""
def takewhile(predicate, iterable):
"""
Take items while predicate is true, then stop.
Parameters:
- predicate: Callable[[T], bool] - Test function
- iterable: AnyIterable[T] - Input iterable
Returns:
AsyncIterator[T] - Iterator of items while predicate is true
"""
def islice(iterable, start, stop=None, step=None):
"""
Slice async iterable like built-in slice.
Parameters:
- iterable: AnyIterable[T] - Input iterable
- start: int or None - Start index or stop if stop is None
- stop: int or None - Stop index
- step: int or None - Step size
Returns:
AsyncIterator[T] - Sliced iterator
"""Advanced mapping operations.
def starmap(function, iterable):
"""
Apply function to unpacked tuples from iterable.
Parameters:
- function: Callable - Function to apply (can be sync or async)
- iterable: AnyIterable[Tuple] - Iterable of argument tuples
Returns:
AsyncIterator[R] - Iterator of function results
"""Utility functions for working with iterators.
def pairwise(iterable):
"""
Return successive overlapping pairs from iterable.
Parameters:
- iterable: AnyIterable[T] - Input iterable
Returns:
AsyncIterator[Tuple[T, T]] - Iterator of overlapping pairs
"""
class tee:
"""
Split async iterable into n independent iterators.
"""
def __init__(self, iterable, n=2, lock=None):
"""
Parameters:
- iterable: AnyIterable[T] - Source iterable
- n: int - Number of independent iterators
- lock: AsyncContextManager, optional - Lock for thread safety
"""
def __len__(self):
"""Return number of iterators."""
def __getitem__(self, item):
"""Get iterator by index or slice."""
def __iter__(self):
"""Return sync iterator over async iterators."""
async def __aenter__(self):
"""Enter async context."""
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Exit async context and cleanup."""
async def aclose(self):
"""Close all iterators."""Group items based on keys.
def groupby(iterable, key=None):
"""
Group consecutive items by key function.
Parameters:
- iterable: AnyIterable[T] - Input iterable (should be sorted by key)
- key: Callable[[T], K] or None - Key function (default: identity)
Returns:
AsyncIterator[Tuple[K, AsyncIterator[T]]] - Iterator of (key, group) pairs
"""async def process_data():
async def sensor_readings():
for i in range(100):
yield {"temp": 20 + i % 10, "humidity": 50 + i % 20}
# Batch readings into groups of 10
batches = batched(sensor_readings(), 10)
# Calculate average temperature per batch
async for batch in batches:
temps = [reading["temp"] for reading in batch]
avg_temp = sum(temps) / len(temps)
print(f"Average temperature: {avg_temp}")async def combine_sources():
async def source1():
for i in range(5):
yield f"A{i}"
async def source2():
for i in range(3):
yield f"B{i}"
# Chain sources together
combined = chain(source1(), source2())
items = await list(combined) # ['A0', 'A1', 'A2', 'A3', 'A4', 'B0', 'B1', 'B2']
# Or create from iterable of iterables
sources = [source1(), source2()]
combined2 = chain.from_iterable(sources)async def group_and_filter():
async def scores():
data = [("Alice", 85), ("Bob", 92), ("Alice", 78), ("Charlie", 95), ("Bob", 88)]
for item in data:
yield item
# Group by student name (data must be sorted first)
sorted_scores = sorted(await list(scores()), key=lambda x: x[0])
async def sorted_async():
for item in sorted_scores:
yield item
# Group consecutive items by student
async for student, grades in groupby(sorted_async(), key=lambda x: x[0]):
grade_list = [grade for _, grade in await list(grades)]
avg = sum(grade_list) / len(grade_list)
print(f"{student}: {avg:.1f}")Install with Tessl CLI
npx tessl i tessl/pypi-asyncstdlibevals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10