Functional programming library providing type-safe containers for error handling, side effects, and composable operations with monadic patterns.
—
Containers and utilities for handling asynchronous operations with type safety and composable error handling in concurrent environments. These provide monadic abstractions over Python's async/await functionality.
Container for asynchronous operations that never fail, providing composable async programming.
class Future[T]:
"""Container for async operations that never fail"""
def __init__(self, awaitable: Awaitable[T]): ...
async def bind(self, func: Callable[[T], Future[U]]) -> Future[U]: ...
async def map(self, func: Callable[[T], U]) -> Future[U]: ...
async def apply(self, wrapped_func: Future[Callable[[T], U]]) -> Future[U]: ...
async def awaitable(self) -> T: ...
def __await__(self): ... # Enable direct awaiting
@classmethod
def from_value(cls, value: T) -> Future[T]: ...
@classmethod
def from_coroutine(cls, coro: Awaitable[T]) -> Future[T]: ...
def future(func: Callable[..., Awaitable[T]]) -> Callable[..., Future[T]]:
"""Decorator to turn coroutine function into Future-returning function"""
async def async_identity(instance: T) -> T:
"""Async version of identity function"""Usage examples:
import asyncio
from returns.future import Future, future, async_identity
# Creating Future from awaitable
async def fetch_data() -> str:
await asyncio.sleep(1)
return "Hello, World!"
future_data = Future(fetch_data())
result = await future_data.awaitable() # "Hello, World!"
# Direct awaiting
result = await future_data # "Hello, World!"
# Using decorator
@future
async def fetch_user(user_id: int) -> dict:
await asyncio.sleep(0.5)
return {"id": user_id, "name": "John"}
user_future = fetch_user(123) # Returns Future[dict]
user = await user_future # {"id": 123, "name": "John"}
# Chaining async operations
async def get_user_name(user: dict) -> str:
return user["name"]
@future
async def format_greeting(name: str) -> str:
return f"Hello, {name}!"
# Compose async operations
async def greet_user(user_id: int) -> str:
return await (
fetch_user(user_id)
.bind(lambda user: Future(get_user_name(user)))
.bind(format_greeting)
)
greeting = await greet_user(123) # "Hello, John!"Container for asynchronous operations that can fail, combining Future with Result semantics.
class FutureResult[T, E]:
"""Container for async operations that can fail"""
def __init__(self, awaitable: Awaitable[Result[T, E]]): ...
async def bind(self, func: Callable[[T], FutureResult[U, E]]) -> FutureResult[U, E]: ...
async def map(self, func: Callable[[T], U]) -> FutureResult[U, E]: ...
async def apply(self, wrapped_func: FutureResult[Callable[[T], U], E]) -> FutureResult[U, E]: ...
async def alt(self, func: Callable[[E], FutureResult[T, F]]) -> FutureResult[T, F]: ...
async def lash(self, func: Callable[[E], FutureResult[T, F]]) -> FutureResult[T, F]: ...
async def awaitable(self) -> Result[T, E]: ...
def __await__(self): ... # Enable direct awaiting
@classmethod
def from_success(cls, value: T) -> FutureResult[T, E]: ...
@classmethod
def from_failure(cls, error: E) -> FutureResult[T, E]: ...
@classmethod
def from_result(cls, result: Result[T, E]) -> FutureResult[T, E]: ...
# Type alias
FutureResultE[T] = FutureResult[T, Exception]
# Constructor functions
def FutureSuccess(value: T) -> FutureResult[T, E]:
"""Create successful FutureResult"""
def FutureFailure(error: E) -> FutureResult[T, E]:
"""Create failed FutureResult"""
def future_safe(func: Callable[..., Awaitable[T]]) -> Callable[..., FutureResult[T, Exception]]:
"""Convert exception-throwing coroutine to FutureResult"""Usage examples:
import asyncio
from returns.future import FutureResult, FutureSuccess, FutureFailure, future_safe
from returns.result import Success, Failure
# Creating FutureResult instances
async def risky_operation(x: int) -> Result[int, str]:
await asyncio.sleep(0.1)
if x < 0:
return Failure("Negative value not allowed")
return Success(x * 2)
future_result = FutureResult(risky_operation(5))
result = await future_result # Success(10)
# Using constructor functions
success_future = FutureSuccess(42)
failure_future = FutureFailure("Something went wrong")
# Using future_safe decorator
@future_safe
async def divide_async(a: int, b: int) -> float:
if b == 0:
raise ValueError("Division by zero")
await asyncio.sleep(0.1)
return a / b
division_result = divide_async(10, 2) # FutureResult[float, Exception]
result = await division_result # Success(5.0)
error_result = await divide_async(10, 0) # Failure(ValueError("Division by zero"))
# Chaining FutureResult operations
@future_safe
async def validate_positive(x: int) -> int:
if x <= 0:
raise ValueError("Must be positive")
return x
@future_safe
async def square_async(x: int) -> int:
await asyncio.sleep(0.1)
return x * x
# Compose operations
async def process_number(x: int) -> Result[int, Exception]:
return await (
FutureSuccess(x)
.bind(validate_positive)
.bind(square_async)
)
result = await process_number(5) # Success(25)
result = await process_number(-5) # Failure(ValueError("Must be positive"))Utility functions for working with async operations and conversions.
def asyncify(func: Callable[..., T]) -> Callable[..., Awaitable[T]]:
"""Convert sync function to async function"""
async def async_partial(func: Callable, *args, **kwargs) -> Callable:
"""Async version of partial application"""
def from_future(future: asyncio.Future[T]) -> Future[T]:
"""Convert asyncio.Future to Returns Future"""
def to_future(container: Future[T]) -> asyncio.Future[T]:
"""Convert Returns Future to asyncio.Future"""Usage examples:
import asyncio
from returns.future import asyncify, from_future, to_future, Future
# Convert sync to async
def sync_operation(x: int) -> int:
return x * 2
async_operation = asyncify(sync_operation)
result = await async_operation(21) # 42
# Working with asyncio.Future
async def create_asyncio_future() -> str:
future = asyncio.Future()
future.set_result("Hello from asyncio")
return future.result()
asyncio_future = asyncio.ensure_future(create_asyncio_future())
returns_future = from_future(asyncio_future)
result = await returns_future # "Hello from asyncio"
# Convert back to asyncio.Future
returns_future = Future(async_operation(10))
asyncio_future = to_future(returns_future)
result = await asyncio_future # 20Utilities for handling multiple concurrent operations with proper error handling.
async def gather(*futures: Future[T]) -> Future[list[T]]:
"""Gather multiple Future operations"""
async def gather_result(*futures: FutureResult[T, E]) -> FutureResult[list[T], E]:
"""Gather multiple FutureResult operations, failing fast on first error"""
async def gather_all(*futures: FutureResult[T, E]) -> FutureResult[list[Result[T, E]], Never]:
"""Gather all FutureResult operations, collecting all results"""
async def race(*futures: Future[T]) -> Future[T]:
"""Return the first Future to complete"""Usage examples:
import asyncio
from returns.future import Future, FutureResult, FutureSuccess
from returns.future import gather, gather_result, race
# Gather successful operations
@future
async def fetch_item(item_id: int) -> str:
await asyncio.sleep(0.1)
return f"Item {item_id}"
items_future = gather(
fetch_item(1),
fetch_item(2),
fetch_item(3)
)
items = await items_future # ["Item 1", "Item 2", "Item 3"]
# Gather with error handling
@future_safe
async def fetch_user_data(user_id: int) -> dict:
if user_id < 0:
raise ValueError("Invalid user ID")
await asyncio.sleep(0.1)
return {"id": user_id, "name": f"User {user_id}"}
# This will fail fast on first error
users_result = await gather_result(
fetch_user_data(1),
fetch_user_data(-1), # This will fail
fetch_user_data(3)
) # Failure(ValueError("Invalid user ID"))
# Race for first completion
fast_future = Future(asyncio.sleep(0.1, "fast"))
slow_future = Future(asyncio.sleep(0.2, "slow"))
winner = await race(fast_future, slow_future) # "fast"Async versions of context-dependent operations combining Reader pattern with Future.
class RequiresContextFuture[T, Deps]:
"""Async context-dependent computation"""
def __init__(self, func: Callable[[Deps], Future[T]]): ...
def __call__(self, deps: Deps) -> Future[T]: ...
async def bind(self, func: Callable[[T], RequiresContextFuture[U, Deps]]) -> RequiresContextFuture[U, Deps]: ...
async def map(self, func: Callable[[T], U]) -> RequiresContextFuture[U, Deps]: ...
class RequiresContextFutureResult[T, E, Deps]:
"""Async context-dependent computation that can fail"""
def __init__(self, func: Callable[[Deps], FutureResult[T, E]]): ...
def __call__(self, deps: Deps) -> FutureResult[T, E]: ...
async def bind(self, func: Callable[[T], RequiresContextFutureResult[U, E, Deps]]) -> RequiresContextFutureResult[U, E, Deps]: ...
async def map(self, func: Callable[[T], U]) -> RequiresContextFutureResult[U, E, Deps]: ...
async def alt(self, func: Callable[[E], RequiresContextFutureResult[T, F, Deps]]) -> RequiresContextFutureResult[T, F, Deps]: ...Usage examples:
from returns.context import RequiresContextFutureResult
from returns.future import FutureResult, future_safe
class DatabaseConfig:
def __init__(self, url: str, timeout: int):
self.url = url
self.timeout = timeout
@future_safe
async def connect_to_db(config: DatabaseConfig) -> str:
# Simulate async database connection
await asyncio.sleep(config.timeout / 1000)
if not config.url:
raise ValueError("Database URL required")
return f"Connected to {config.url}"
def get_connection() -> RequiresContextFutureResult[str, Exception, DatabaseConfig]:
return RequiresContextFutureResult(connect_to_db)
# Usage with context
config = DatabaseConfig("postgresql://localhost", 100)
connection_result = await get_connection()(config) # Success("Connected to postgresql://localhost")from returns.future import FutureResult, future_safe
from returns.pointfree import bind
@future_safe
async def validate_input(data: str) -> str:
if not data.strip():
raise ValueError("Empty input")
return data.strip()
@future_safe
async def process_data(data: str) -> dict:
await asyncio.sleep(0.1)
return {"processed": data.upper()}
@future_safe
async def save_result(result: dict) -> str:
await asyncio.sleep(0.1)
return f"Saved: {result}"
# Async pipeline
async def process_pipeline(input_data: str) -> Result[str, Exception]:
return await (
FutureResult.from_success(input_data)
.bind(validate_input)
.bind(process_data)
.bind(save_result)
)from returns.future import FutureResult, gather_all
async def process_items_concurrently(items: list[str]) -> list[Result[str, Exception]]:
# Process all items concurrently, collecting all results
futures = [
future_safe(lambda item: process_single_item(item))(item)
for item in items
]
all_results = await gather_all(*futures)
return await all_results # FutureResult[List[Result[str, Exception]], Never]Async operations in Returns provide a clean, composable way to handle asynchronous programming with proper error handling and type safety, maintaining functional programming principles in concurrent environments.
Install with Tessl CLI
npx tessl i tessl/pypi-returns