CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-asyncstdlib

The missing async toolbox - re-implements functions and classes of the Python standard library to make them compatible with async callables, iterables and context managers

84

3.36x

Quality

Pending

Does it follow best practices?

Impact

84%

3.36x

Average score across 10 eval scenarios

Overview
Eval results
Files

heapq.mddocs/

Heap Operations

Async versions of heapq functions for working with priority queues and sorted data structures in async contexts. These functions enable efficient operations on sorted async iterables.

Capabilities

Merging Sorted Iterables

Efficiently merge multiple sorted async iterables into a single sorted output.

def merge(*iterables, key=None, reverse=False):
    """
    Merge multiple sorted async iterables into single sorted iterator.

    Parameters:
    - *iterables: AnyIterable - Variable number of sorted iterables to merge
    - key: Callable[[T], Any] or None - Key function for comparison
    - reverse: bool - If True, merge in descending order

    Returns:
    AsyncIterator[T] - Iterator yielding items in sorted order

    Note:
    Input iterables must already be sorted in the same order as specified
    by key and reverse parameters.
    """

Finding Largest/Smallest Elements

Extract the n largest or smallest elements from async iterables efficiently.

async def nlargest(iterable, n, key=None):
    """
    Find n largest elements from async iterable.

    Parameters:
    - iterable: AsyncIterator[T] - Input iterable
    - n: int - Number of largest elements to return
    - key: Callable[[T], Any] or None - Key function for comparison

    Returns:
    List[T] - List of n largest elements in descending order

    Note:
    Equivalent to sorted(iterable, key=key, reverse=True)[:n] but more efficient
    for small values of n relative to iterable length.
    """

async def nsmallest(iterable, n, key=None):
    """
    Find n smallest elements from async iterable.

    Parameters:
    - iterable: AsyncIterator[T] - Input iterable  
    - n: int - Number of smallest elements to return
    - key: Callable[[T], Any] or None - Key function for comparison

    Returns:
    List[T] - List of n smallest elements in ascending order

    Note:
    Equivalent to sorted(iterable, key=key)[:n] but more efficient
    for small values of n relative to iterable length.
    """

Usage Examples

Merging Sorted Data Streams

from asyncstdlib import merge, list as alist

async def merge_example():
    # Simulate sorted data streams
    async def stream1():
        for x in [1, 4, 7, 10]:
            yield x
    
    async def stream2():
        for x in [2, 5, 8]:
            yield x
    
    async def stream3():
        for x in [3, 6, 9, 11, 12]:
            yield x
    
    # Merge all streams in sorted order
    merged = merge(stream1(), stream2(), stream3())
    result = await alist(merged)
    print(result)  # [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]

async def merge_with_key():
    # Merge data with custom key function
    async def scores1():
        data = [("Alice", 95), ("Charlie", 85)]
        for item in data:
            yield item
    
    async def scores2():
        data = [("Bob", 90), ("David", 80)]
        for item in data:
            yield item
    
    # Merge by score (descending)
    merged = merge(scores1(), scores2(), key=lambda x: x[1], reverse=True)
    result = await alist(merged)
    print(result)  # [('Alice', 95), ('Bob', 90), ('Charlie', 85), ('David', 80)]

Finding Top/Bottom Elements

from asyncstdlib import nlargest, nsmallest

async def top_bottom_example():
    async def random_scores():
        import random
        for _ in range(100):
            yield random.randint(1, 1000)
    
    # Find top 5 scores efficiently
    top5 = await nlargest(random_scores(), 5)
    print(f"Top 5 scores: {top5}")
    
    # Find bottom 3 scores efficiently  
    bottom3 = await nsmallest(random_scores(), 3)
    print(f"Bottom 3 scores: {bottom3}")

async def key_based_selection():
    async def students():
        data = [
            {"name": "Alice", "gpa": 3.8, "age": 20},
            {"name": "Bob", "gpa": 3.2, "age": 22},
            {"name": "Charlie", "gpa": 3.9, "age": 19},
            {"name": "David", "gpa": 3.1, "age": 21},
            {"name": "Eve", "gpa": 4.0, "age": 20}
        ]
        for student in data:
            yield student
    
    # Find students with highest GPAs
    top_students = await nlargest(students(), 3, key=lambda s: s["gpa"])
    for student in top_students:
        print(f"{student['name']}: {student['gpa']}")
    # Output:
    # Eve: 4.0
    # Charlie: 3.9  
    # Alice: 3.8
    
    # Find youngest students
    youngest = await nsmallest(students(), 2, key=lambda s: s["age"])
    for student in youngest:
        print(f"{student['name']}: {student['age']} years old")
    # Output:
    # Charlie: 19 years old
    # Alice: 20 years old (or Eve: 20 years old - stable sort)

Log Processing Example

import asyncio
from asyncstdlib import merge, nlargest
from datetime import datetime

async def log_processing():
    # Simulate log streams from different sources
    async def web_server_logs():
        logs = [
            {"timestamp": "2024-01-01 10:00:00", "level": "INFO", "source": "web"},
            {"timestamp": "2024-01-01 10:05:00", "level": "ERROR", "source": "web"},
            {"timestamp": "2024-01-01 10:10:00", "level": "INFO", "source": "web"}
        ]
        for log in logs:
            yield log
    
    async def database_logs():
        logs = [
            {"timestamp": "2024-01-01 10:02:00", "level": "INFO", "source": "db"},
            {"timestamp": "2024-01-01 10:07:00", "level": "WARN", "source": "db"},
            {"timestamp": "2024-01-01 10:12:00", "level": "ERROR", "source": "db"}
        ]
        for log in logs:
            yield log
    
    # Merge logs by timestamp
    def parse_timestamp(log):
        return datetime.fromisoformat(log["timestamp"])
    
    merged_logs = merge(
        web_server_logs(), 
        database_logs(), 
        key=parse_timestamp
    )
    
    # Process merged logs in chronological order
    async for log in merged_logs:
        print(f"{log['timestamp']} [{log['source']}] {log['level']}")
    
    # Find most recent error logs
    all_logs = merge(web_server_logs(), database_logs(), key=parse_timestamp)
    error_logs = (log async for log in all_logs if log["level"] == "ERROR")
    recent_errors = await nlargest(error_logs, 5, key=lambda log: log["timestamp"])
    
    print("\nMost recent errors:")
    for error in recent_errors:
        print(f"{error['timestamp']} [{error['source']}] {error['level']}")

Performance Considerations

async def performance_example():
    async def large_dataset():
        # Simulate large dataset
        for i in range(10000):
            yield i
    
    # nlargest/nsmallest are more efficient than sorting + slicing
    # for small n relative to data size
    
    # Efficient: O(n log k) where k=10, n=10000
    top10 = await nlargest(large_dataset(), 10)
    
    # Less efficient: O(n log n) where n=10000
    # all_sorted = await sorted(list(large_dataset()), reverse=True)
    # top10_slow = all_sorted[:10]
    
    print(f"Top 10: {top10}")

Install with Tessl CLI

npx tessl i tessl/pypi-asyncstdlib

docs

asynctools.md

builtins.md

contextlib.md

functools.md

heapq.md

index.md

itertools.md

tile.json