CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-asyncstdlib

The missing async toolbox - re-implements functions and classes of the Python standard library to make them compatible with async callables, iterables and context managers

84

3.36x

Quality

Pending

Does it follow best practices?

Impact

84%

3.36x

Average score across 10 eval scenarios

Overview
Eval results
Files

task.mdevals/scenario-7/

Async Data Stream Processor

Build a data processing system that handles incoming data streams and processes them efficiently in batches.

Requirements

Process async data streams by grouping items into fixed-size batches. This improves efficiency by reducing the number of processing operations.

Implement process_stream(data_stream, batch_size) that:

  • Takes an async iterable data_stream containing data items
  • Groups items into batches of size batch_size
  • Returns an async iterable where each item is a list containing a batch of items
  • The last batch may contain fewer items than batch_size if the stream doesn't divide evenly

Implement batch_process_records(records, batch_size, processor) that:

  • Takes an async iterable records containing record dictionaries
  • Groups records into batches of size batch_size
  • Applies the async processor function to each batch
  • Returns a list of all processing results in order

Test Cases

Basic batching

  • Given an async stream of integers [1, 2, 3, 4, 5] with batch size 2, produces batches [1, 2], [3, 4], [5] @test

Empty stream

  • Given an empty async stream with batch size 3, produces no batches @test

Batch processing

  • Given an async stream of records with batch size 2 and a processor that sums record values, correctly processes all batches and returns results @test

Single item batches

  • Given an async stream of 3 items with batch size 1, produces 3 batches each containing 1 item @test

Implementation

@generates

API

from typing import AsyncIterable, List, TypeVar, Callable, Any

T = TypeVar('T')

async def process_stream(data_stream: AsyncIterable[T], batch_size: int) -> AsyncIterable[List[T]]:
    """
    Process an async data stream by grouping items into fixed-size batches.

    Args:
        data_stream: An async iterable of data items
        batch_size: The number of items per batch

    Yields:
        Lists containing batches of items from the stream
    """
    pass

async def batch_process_records(
    records: AsyncIterable[dict],
    batch_size: int,
    processor: Callable[[List[dict]], Any]
) -> List[Any]:
    """
    Process records in batches using the provided processor function.

    Args:
        records: An async iterable of record dictionaries
        batch_size: The number of records per batch
        processor: An async function that processes a batch and returns a result

    Returns:
        A list of processing results in order
    """
    pass

Dependencies { .dependencies }

asyncstdlib { .dependency }

Provides async-compatible standard library functionality.

@satisfied-by

Install with Tessl CLI

npx tessl i tessl/pypi-asyncstdlib

tile.json