The missing async toolbox - re-implements functions and classes of the Python standard library to make them compatible with async callables, iterables and context managers
84
Quality
Pending
Does it follow best practices?
Impact
84%
3.36xAverage score across 10 eval scenarios
Build a data processing system that handles incoming data streams and processes them efficiently in batches.
Process async data streams by grouping items into fixed-size batches. This improves efficiency by reducing the number of processing operations.
Implement process_stream(data_stream, batch_size) that:
data_stream containing data itemsbatch_sizebatch_size if the stream doesn't divide evenlyImplement batch_process_records(records, batch_size, processor) that:
records containing record dictionariesbatch_sizeprocessor function to each batch[1, 2, 3, 4, 5] with batch size 2, produces batches [1, 2], [3, 4], [5] @test@generates
from typing import AsyncIterable, List, TypeVar, Callable, Any
T = TypeVar('T')
async def process_stream(data_stream: AsyncIterable[T], batch_size: int) -> AsyncIterable[List[T]]:
"""
Process an async data stream by grouping items into fixed-size batches.
Args:
data_stream: An async iterable of data items
batch_size: The number of items per batch
Yields:
Lists containing batches of items from the stream
"""
pass
async def batch_process_records(
records: AsyncIterable[dict],
batch_size: int,
processor: Callable[[List[dict]], Any]
) -> List[Any]:
"""
Process records in batches using the provided processor function.
Args:
records: An async iterable of record dictionaries
batch_size: The number of records per batch
processor: An async function that processes a batch and returns a result
Returns:
A list of processing results in order
"""
passProvides async-compatible standard library functionality.
@satisfied-by
Install with Tessl CLI
npx tessl i tessl/pypi-asyncstdlibevals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10