or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/asyncstdlib@3.13.x
tile.json

tessl/pypi-asyncstdlib

tessl install tessl/pypi-asyncstdlib@3.13.0

The missing async toolbox - re-implements functions and classes of the Python standard library to make them compatible with async callables, iterables and context managers

Agent Success

Agent success rate when using this tile

84%

Improvement

Agent success rate improvement when using this tile compared to baseline

3.36x

Baseline

Agent success rate without this tile

25%

task.mdevals/scenario-7/

Async Data Stream Processor

Build a data processing system that handles incoming data streams and processes them efficiently in batches.

Requirements

Process async data streams by grouping items into fixed-size batches. This improves efficiency by reducing the number of processing operations.

Implement process_stream(data_stream, batch_size) that:

  • Takes an async iterable data_stream containing data items
  • Groups items into batches of size batch_size
  • Returns an async iterable where each item is a list containing a batch of items
  • The last batch may contain fewer items than batch_size if the stream doesn't divide evenly

Implement batch_process_records(records, batch_size, processor) that:

  • Takes an async iterable records containing record dictionaries
  • Groups records into batches of size batch_size
  • Applies the async processor function to each batch
  • Returns a list of all processing results in order

Test Cases

Basic batching

  • Given an async stream of integers [1, 2, 3, 4, 5] with batch size 2, produces batches [1, 2], [3, 4], [5] @test

Empty stream

  • Given an empty async stream with batch size 3, produces no batches @test

Batch processing

  • Given an async stream of records with batch size 2 and a processor that sums record values, correctly processes all batches and returns results @test

Single item batches

  • Given an async stream of 3 items with batch size 1, produces 3 batches each containing 1 item @test

Implementation

@generates

API

from typing import AsyncIterable, List, TypeVar, Callable, Any

T = TypeVar('T')

async def process_stream(data_stream: AsyncIterable[T], batch_size: int) -> AsyncIterable[List[T]]:
    """
    Process an async data stream by grouping items into fixed-size batches.

    Args:
        data_stream: An async iterable of data items
        batch_size: The number of items per batch

    Yields:
        Lists containing batches of items from the stream
    """
    pass

async def batch_process_records(
    records: AsyncIterable[dict],
    batch_size: int,
    processor: Callable[[List[dict]], Any]
) -> List[Any]:
    """
    Process records in batches using the provided processor function.

    Args:
        records: An async iterable of record dictionaries
        batch_size: The number of records per batch
        processor: An async function that processes a batch and returns a result

    Returns:
        A list of processing results in order
    """
    pass

Dependencies { .dependencies }

asyncstdlib { .dependency }

Provides async-compatible standard library functionality.

@satisfied-by