CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-returns

Functional programming library providing type-safe containers for error handling, side effects, and composable operations with monadic patterns.

Pending
Overview
Eval results
Files

async-operations.mddocs/

Async Operations

Containers and utilities for handling asynchronous operations with type safety and composable error handling in concurrent environments. These provide monadic abstractions over Python's async/await functionality.

Capabilities

Future Container

Container for asynchronous operations that never fail, providing composable async programming.

class Future[T]:
    """Container for async operations that never fail"""
    def __init__(self, awaitable: Awaitable[T]): ...
    async def bind(self, func: Callable[[T], Future[U]]) -> Future[U]: ...
    async def map(self, func: Callable[[T], U]) -> Future[U]: ...
    async def apply(self, wrapped_func: Future[Callable[[T], U]]) -> Future[U]: ...
    async def awaitable(self) -> T: ...
    def __await__(self): ...  # Enable direct awaiting
    
    @classmethod
    def from_value(cls, value: T) -> Future[T]: ...
    @classmethod
    def from_coroutine(cls, coro: Awaitable[T]) -> Future[T]: ...

def future(func: Callable[..., Awaitable[T]]) -> Callable[..., Future[T]]:
    """Decorator to turn coroutine function into Future-returning function"""

async def async_identity(instance: T) -> T:
    """Async version of identity function"""

Usage examples:

import asyncio
from returns.future import Future, future, async_identity

# Creating Future from awaitable
async def fetch_data() -> str:
    await asyncio.sleep(1)
    return "Hello, World!"

future_data = Future(fetch_data())
result = await future_data.awaitable()  # "Hello, World!"

# Direct awaiting
result = await future_data  # "Hello, World!"

# Using decorator
@future
async def fetch_user(user_id: int) -> dict:
    await asyncio.sleep(0.5)
    return {"id": user_id, "name": "John"}

user_future = fetch_user(123)  # Returns Future[dict]
user = await user_future  # {"id": 123, "name": "John"}

# Chaining async operations
async def get_user_name(user: dict) -> str:
    return user["name"]

@future
async def format_greeting(name: str) -> str:
    return f"Hello, {name}!"

# Compose async operations
async def greet_user(user_id: int) -> str:
    return await (
        fetch_user(user_id)
        .bind(lambda user: Future(get_user_name(user)))
        .bind(format_greeting)
    )

greeting = await greet_user(123)  # "Hello, John!"

FutureResult Container

Container for asynchronous operations that can fail, combining Future with Result semantics.

class FutureResult[T, E]:
    """Container for async operations that can fail"""
    def __init__(self, awaitable: Awaitable[Result[T, E]]): ...
    async def bind(self, func: Callable[[T], FutureResult[U, E]]) -> FutureResult[U, E]: ...
    async def map(self, func: Callable[[T], U]) -> FutureResult[U, E]: ...
    async def apply(self, wrapped_func: FutureResult[Callable[[T], U], E]) -> FutureResult[U, E]: ...
    async def alt(self, func: Callable[[E], FutureResult[T, F]]) -> FutureResult[T, F]: ...
    async def lash(self, func: Callable[[E], FutureResult[T, F]]) -> FutureResult[T, F]: ...
    async def awaitable(self) -> Result[T, E]: ...
    def __await__(self): ...  # Enable direct awaiting
    
    @classmethod
    def from_success(cls, value: T) -> FutureResult[T, E]: ...
    @classmethod
    def from_failure(cls, error: E) -> FutureResult[T, E]: ...
    @classmethod
    def from_result(cls, result: Result[T, E]) -> FutureResult[T, E]: ...

# Type alias
FutureResultE[T] = FutureResult[T, Exception]

# Constructor functions
def FutureSuccess(value: T) -> FutureResult[T, E]:
    """Create successful FutureResult"""

def FutureFailure(error: E) -> FutureResult[T, E]:
    """Create failed FutureResult"""

def future_safe(func: Callable[..., Awaitable[T]]) -> Callable[..., FutureResult[T, Exception]]:
    """Convert exception-throwing coroutine to FutureResult"""

Usage examples:

import asyncio
from returns.future import FutureResult, FutureSuccess, FutureFailure, future_safe
from returns.result import Success, Failure

# Creating FutureResult instances
async def risky_operation(x: int) -> Result[int, str]:
    await asyncio.sleep(0.1)
    if x < 0:
        return Failure("Negative value not allowed")
    return Success(x * 2)

future_result = FutureResult(risky_operation(5))
result = await future_result  # Success(10)

# Using constructor functions
success_future = FutureSuccess(42)
failure_future = FutureFailure("Something went wrong")

# Using future_safe decorator
@future_safe
async def divide_async(a: int, b: int) -> float:
    if b == 0:
        raise ValueError("Division by zero")
    await asyncio.sleep(0.1)
    return a / b

division_result = divide_async(10, 2)  # FutureResult[float, Exception]
result = await division_result  # Success(5.0)

error_result = await divide_async(10, 0)  # Failure(ValueError("Division by zero"))

# Chaining FutureResult operations
@future_safe
async def validate_positive(x: int) -> int:
    if x <= 0:
        raise ValueError("Must be positive")
    return x

@future_safe
async def square_async(x: int) -> int:
    await asyncio.sleep(0.1)
    return x * x

# Compose operations
async def process_number(x: int) -> Result[int, Exception]:
    return await (
        FutureSuccess(x)
        .bind(validate_positive)
        .bind(square_async)
    )

result = await process_number(5)   # Success(25)
result = await process_number(-5)  # Failure(ValueError("Must be positive"))

Async Utilities

Utility functions for working with async operations and conversions.

def asyncify(func: Callable[..., T]) -> Callable[..., Awaitable[T]]:
    """Convert sync function to async function"""

async def async_partial(func: Callable, *args, **kwargs) -> Callable:
    """Async version of partial application"""

def from_future(future: asyncio.Future[T]) -> Future[T]:
    """Convert asyncio.Future to Returns Future"""

def to_future(container: Future[T]) -> asyncio.Future[T]:
    """Convert Returns Future to asyncio.Future"""

Usage examples:

import asyncio
from returns.future import asyncify, from_future, to_future, Future

# Convert sync to async
def sync_operation(x: int) -> int:
    return x * 2

async_operation = asyncify(sync_operation)
result = await async_operation(21)  # 42

# Working with asyncio.Future
async def create_asyncio_future() -> str:
    future = asyncio.Future()
    future.set_result("Hello from asyncio")
    return future.result()

asyncio_future = asyncio.ensure_future(create_asyncio_future())
returns_future = from_future(asyncio_future)
result = await returns_future  # "Hello from asyncio"

# Convert back to asyncio.Future
returns_future = Future(async_operation(10))
asyncio_future = to_future(returns_future)
result = await asyncio_future  # 20

Concurrent Operations

Utilities for handling multiple concurrent operations with proper error handling.

async def gather(*futures: Future[T]) -> Future[list[T]]:
    """Gather multiple Future operations"""

async def gather_result(*futures: FutureResult[T, E]) -> FutureResult[list[T], E]:
    """Gather multiple FutureResult operations, failing fast on first error"""

async def gather_all(*futures: FutureResult[T, E]) -> FutureResult[list[Result[T, E]], Never]:
    """Gather all FutureResult operations, collecting all results"""

async def race(*futures: Future[T]) -> Future[T]:
    """Return the first Future to complete"""

Usage examples:

import asyncio
from returns.future import Future, FutureResult, FutureSuccess
from returns.future import gather, gather_result, race

# Gather successful operations
@future
async def fetch_item(item_id: int) -> str:
    await asyncio.sleep(0.1)
    return f"Item {item_id}"

items_future = gather(
    fetch_item(1),
    fetch_item(2),
    fetch_item(3)
)
items = await items_future  # ["Item 1", "Item 2", "Item 3"]

# Gather with error handling
@future_safe
async def fetch_user_data(user_id: int) -> dict:
    if user_id < 0:
        raise ValueError("Invalid user ID")
    await asyncio.sleep(0.1)
    return {"id": user_id, "name": f"User {user_id}"}

# This will fail fast on first error
users_result = await gather_result(
    fetch_user_data(1),
    fetch_user_data(-1),  # This will fail
    fetch_user_data(3)
)  # Failure(ValueError("Invalid user ID"))

# Race for first completion
fast_future = Future(asyncio.sleep(0.1, "fast"))
slow_future = Future(asyncio.sleep(0.2, "slow"))

winner = await race(fast_future, slow_future)  # "fast"

Async Context Operations

Async versions of context-dependent operations combining Reader pattern with Future.

class RequiresContextFuture[T, Deps]:
    """Async context-dependent computation"""
    def __init__(self, func: Callable[[Deps], Future[T]]): ...
    def __call__(self, deps: Deps) -> Future[T]: ...
    async def bind(self, func: Callable[[T], RequiresContextFuture[U, Deps]]) -> RequiresContextFuture[U, Deps]: ...
    async def map(self, func: Callable[[T], U]) -> RequiresContextFuture[U, Deps]: ...

class RequiresContextFutureResult[T, E, Deps]:
    """Async context-dependent computation that can fail"""
    def __init__(self, func: Callable[[Deps], FutureResult[T, E]]): ...
    def __call__(self, deps: Deps) -> FutureResult[T, E]: ...
    async def bind(self, func: Callable[[T], RequiresContextFutureResult[U, E, Deps]]) -> RequiresContextFutureResult[U, E, Deps]: ...
    async def map(self, func: Callable[[T], U]) -> RequiresContextFutureResult[U, E, Deps]: ...
    async def alt(self, func: Callable[[E], RequiresContextFutureResult[T, F, Deps]]) -> RequiresContextFutureResult[T, F, Deps]: ...

Usage examples:

from returns.context import RequiresContextFutureResult
from returns.future import FutureResult, future_safe

class DatabaseConfig:
    def __init__(self, url: str, timeout: int):
        self.url = url
        self.timeout = timeout

@future_safe
async def connect_to_db(config: DatabaseConfig) -> str:
    # Simulate async database connection
    await asyncio.sleep(config.timeout / 1000)
    if not config.url:
        raise ValueError("Database URL required")
    return f"Connected to {config.url}"

def get_connection() -> RequiresContextFutureResult[str, Exception, DatabaseConfig]:
    return RequiresContextFutureResult(connect_to_db)

# Usage with context
config = DatabaseConfig("postgresql://localhost", 100)
connection_result = await get_connection()(config)  # Success("Connected to postgresql://localhost")

Async Patterns

Pipeline with Error Handling

from returns.future import FutureResult, future_safe
from returns.pointfree import bind

@future_safe
async def validate_input(data: str) -> str:
    if not data.strip():
        raise ValueError("Empty input")
    return data.strip()

@future_safe
async def process_data(data: str) -> dict:
    await asyncio.sleep(0.1)
    return {"processed": data.upper()}

@future_safe
async def save_result(result: dict) -> str:
    await asyncio.sleep(0.1)
    return f"Saved: {result}"

# Async pipeline
async def process_pipeline(input_data: str) -> Result[str, Exception]:
    return await (
        FutureResult.from_success(input_data)
        .bind(validate_input)
        .bind(process_data)
        .bind(save_result)
    )

Concurrent Processing with Error Collection

from returns.future import FutureResult, gather_all

async def process_items_concurrently(items: list[str]) -> list[Result[str, Exception]]:
    # Process all items concurrently, collecting all results
    futures = [
        future_safe(lambda item: process_single_item(item))(item)
        for item in items
    ]
    
    all_results = await gather_all(*futures)
    return await all_results  # FutureResult[List[Result[str, Exception]], Never]

Async operations in Returns provide a clean, composable way to handle asynchronous programming with proper error handling and type safety, maintaining functional programming principles in concurrent environments.

Install with Tessl CLI

npx tessl i tessl/pypi-returns

docs

async-operations.md

container-methods.md

context-operations.md

conversions.md

core-containers.md

development-tools.md

functional-utilities.md

index.md

iteration-utilities.md

pointfree.md

trampolines.md

unsafe-operations.md

tile.json