0
# Async Operations
1
2
Containers and utilities for handling asynchronous operations with type safety and composable error handling in concurrent environments. These provide monadic abstractions over Python's async/await functionality.
3
4
## Capabilities
5
6
### Future Container
7
8
Container for asynchronous operations that never fail, providing composable async programming.
9
10
```python { .api }
11
class Future[T]:
12
"""Container for async operations that never fail"""
13
def __init__(self, awaitable: Awaitable[T]): ...
14
async def bind(self, func: Callable[[T], Future[U]]) -> Future[U]: ...
15
async def map(self, func: Callable[[T], U]) -> Future[U]: ...
16
async def apply(self, wrapped_func: Future[Callable[[T], U]]) -> Future[U]: ...
17
async def awaitable(self) -> T: ...
18
def __await__(self): ... # Enable direct awaiting
19
20
@classmethod
21
def from_value(cls, value: T) -> Future[T]: ...
22
@classmethod
23
def from_coroutine(cls, coro: Awaitable[T]) -> Future[T]: ...
24
25
def future(func: Callable[..., Awaitable[T]]) -> Callable[..., Future[T]]:
26
"""Decorator to turn coroutine function into Future-returning function"""
27
28
async def async_identity(instance: T) -> T:
29
"""Async version of identity function"""
30
```
31
32
Usage examples:
33
34
```python
35
import asyncio
36
from returns.future import Future, future, async_identity
37
38
# Creating Future from awaitable
39
async def fetch_data() -> str:
40
await asyncio.sleep(1)
41
return "Hello, World!"
42
43
future_data = Future(fetch_data())
44
result = await future_data.awaitable() # "Hello, World!"
45
46
# Direct awaiting
47
result = await future_data # "Hello, World!"
48
49
# Using decorator
50
@future
51
async def fetch_user(user_id: int) -> dict:
52
await asyncio.sleep(0.5)
53
return {"id": user_id, "name": "John"}
54
55
user_future = fetch_user(123) # Returns Future[dict]
56
user = await user_future # {"id": 123, "name": "John"}
57
58
# Chaining async operations
59
async def get_user_name(user: dict) -> str:
60
return user["name"]
61
62
@future
63
async def format_greeting(name: str) -> str:
64
return f"Hello, {name}!"
65
66
# Compose async operations
67
async def greet_user(user_id: int) -> str:
68
return await (
69
fetch_user(user_id)
70
.bind(lambda user: Future(get_user_name(user)))
71
.bind(format_greeting)
72
)
73
74
greeting = await greet_user(123) # "Hello, John!"
75
```
76
77
### FutureResult Container
78
79
Container for asynchronous operations that can fail, combining Future with Result semantics.
80
81
```python { .api }
82
class FutureResult[T, E]:
83
"""Container for async operations that can fail"""
84
def __init__(self, awaitable: Awaitable[Result[T, E]]): ...
85
async def bind(self, func: Callable[[T], FutureResult[U, E]]) -> FutureResult[U, E]: ...
86
async def map(self, func: Callable[[T], U]) -> FutureResult[U, E]: ...
87
async def apply(self, wrapped_func: FutureResult[Callable[[T], U], E]) -> FutureResult[U, E]: ...
88
async def alt(self, func: Callable[[E], FutureResult[T, F]]) -> FutureResult[T, F]: ...
89
async def lash(self, func: Callable[[E], FutureResult[T, F]]) -> FutureResult[T, F]: ...
90
async def awaitable(self) -> Result[T, E]: ...
91
def __await__(self): ... # Enable direct awaiting
92
93
@classmethod
94
def from_success(cls, value: T) -> FutureResult[T, E]: ...
95
@classmethod
96
def from_failure(cls, error: E) -> FutureResult[T, E]: ...
97
@classmethod
98
def from_result(cls, result: Result[T, E]) -> FutureResult[T, E]: ...
99
100
# Type alias
101
FutureResultE[T] = FutureResult[T, Exception]
102
103
# Constructor functions
104
def FutureSuccess(value: T) -> FutureResult[T, E]:
105
"""Create successful FutureResult"""
106
107
def FutureFailure(error: E) -> FutureResult[T, E]:
108
"""Create failed FutureResult"""
109
110
def future_safe(func: Callable[..., Awaitable[T]]) -> Callable[..., FutureResult[T, Exception]]:
111
"""Convert exception-throwing coroutine to FutureResult"""
112
```
113
114
Usage examples:
115
116
```python
117
import asyncio
118
from returns.future import FutureResult, FutureSuccess, FutureFailure, future_safe
119
from returns.result import Success, Failure
120
121
# Creating FutureResult instances
122
async def risky_operation(x: int) -> Result[int, str]:
123
await asyncio.sleep(0.1)
124
if x < 0:
125
return Failure("Negative value not allowed")
126
return Success(x * 2)
127
128
future_result = FutureResult(risky_operation(5))
129
result = await future_result # Success(10)
130
131
# Using constructor functions
132
success_future = FutureSuccess(42)
133
failure_future = FutureFailure("Something went wrong")
134
135
# Using future_safe decorator
136
@future_safe
137
async def divide_async(a: int, b: int) -> float:
138
if b == 0:
139
raise ValueError("Division by zero")
140
await asyncio.sleep(0.1)
141
return a / b
142
143
division_result = divide_async(10, 2) # FutureResult[float, Exception]
144
result = await division_result # Success(5.0)
145
146
error_result = await divide_async(10, 0) # Failure(ValueError("Division by zero"))
147
148
# Chaining FutureResult operations
149
@future_safe
150
async def validate_positive(x: int) -> int:
151
if x <= 0:
152
raise ValueError("Must be positive")
153
return x
154
155
@future_safe
156
async def square_async(x: int) -> int:
157
await asyncio.sleep(0.1)
158
return x * x
159
160
# Compose operations
161
async def process_number(x: int) -> Result[int, Exception]:
162
return await (
163
FutureSuccess(x)
164
.bind(validate_positive)
165
.bind(square_async)
166
)
167
168
result = await process_number(5) # Success(25)
169
result = await process_number(-5) # Failure(ValueError("Must be positive"))
170
```
171
172
### Async Utilities
173
174
Utility functions for working with async operations and conversions.
175
176
```python { .api }
177
def asyncify(func: Callable[..., T]) -> Callable[..., Awaitable[T]]:
178
"""Convert sync function to async function"""
179
180
async def async_partial(func: Callable, *args, **kwargs) -> Callable:
181
"""Async version of partial application"""
182
183
def from_future(future: asyncio.Future[T]) -> Future[T]:
184
"""Convert asyncio.Future to Returns Future"""
185
186
def to_future(container: Future[T]) -> asyncio.Future[T]:
187
"""Convert Returns Future to asyncio.Future"""
188
```
189
190
Usage examples:
191
192
```python
193
import asyncio
194
from returns.future import asyncify, from_future, to_future, Future
195
196
# Convert sync to async
197
def sync_operation(x: int) -> int:
198
return x * 2
199
200
async_operation = asyncify(sync_operation)
201
result = await async_operation(21) # 42
202
203
# Working with asyncio.Future
204
async def create_asyncio_future() -> str:
205
future = asyncio.Future()
206
future.set_result("Hello from asyncio")
207
return future.result()
208
209
asyncio_future = asyncio.ensure_future(create_asyncio_future())
210
returns_future = from_future(asyncio_future)
211
result = await returns_future # "Hello from asyncio"
212
213
# Convert back to asyncio.Future
214
returns_future = Future(async_operation(10))
215
asyncio_future = to_future(returns_future)
216
result = await asyncio_future # 20
217
```
218
219
### Concurrent Operations
220
221
Utilities for handling multiple concurrent operations with proper error handling.
222
223
```python { .api }
224
async def gather(*futures: Future[T]) -> Future[list[T]]:
225
"""Gather multiple Future operations"""
226
227
async def gather_result(*futures: FutureResult[T, E]) -> FutureResult[list[T], E]:
228
"""Gather multiple FutureResult operations, failing fast on first error"""
229
230
async def gather_all(*futures: FutureResult[T, E]) -> FutureResult[list[Result[T, E]], Never]:
231
"""Gather all FutureResult operations, collecting all results"""
232
233
async def race(*futures: Future[T]) -> Future[T]:
234
"""Return the first Future to complete"""
235
```
236
237
Usage examples:
238
239
```python
240
import asyncio
241
from returns.future import Future, FutureResult, FutureSuccess
242
from returns.future import gather, gather_result, race
243
244
# Gather successful operations
245
@future
246
async def fetch_item(item_id: int) -> str:
247
await asyncio.sleep(0.1)
248
return f"Item {item_id}"
249
250
items_future = gather(
251
fetch_item(1),
252
fetch_item(2),
253
fetch_item(3)
254
)
255
items = await items_future # ["Item 1", "Item 2", "Item 3"]
256
257
# Gather with error handling
258
@future_safe
259
async def fetch_user_data(user_id: int) -> dict:
260
if user_id < 0:
261
raise ValueError("Invalid user ID")
262
await asyncio.sleep(0.1)
263
return {"id": user_id, "name": f"User {user_id}"}
264
265
# This will fail fast on first error
266
users_result = await gather_result(
267
fetch_user_data(1),
268
fetch_user_data(-1), # This will fail
269
fetch_user_data(3)
270
) # Failure(ValueError("Invalid user ID"))
271
272
# Race for first completion
273
fast_future = Future(asyncio.sleep(0.1, "fast"))
274
slow_future = Future(asyncio.sleep(0.2, "slow"))
275
276
winner = await race(fast_future, slow_future) # "fast"
277
```
278
279
### Async Context Operations
280
281
Async versions of context-dependent operations combining Reader pattern with Future.
282
283
```python { .api }
284
class RequiresContextFuture[T, Deps]:
285
"""Async context-dependent computation"""
286
def __init__(self, func: Callable[[Deps], Future[T]]): ...
287
def __call__(self, deps: Deps) -> Future[T]: ...
288
async def bind(self, func: Callable[[T], RequiresContextFuture[U, Deps]]) -> RequiresContextFuture[U, Deps]: ...
289
async def map(self, func: Callable[[T], U]) -> RequiresContextFuture[U, Deps]: ...
290
291
class RequiresContextFutureResult[T, E, Deps]:
292
"""Async context-dependent computation that can fail"""
293
def __init__(self, func: Callable[[Deps], FutureResult[T, E]]): ...
294
def __call__(self, deps: Deps) -> FutureResult[T, E]: ...
295
async def bind(self, func: Callable[[T], RequiresContextFutureResult[U, E, Deps]]) -> RequiresContextFutureResult[U, E, Deps]: ...
296
async def map(self, func: Callable[[T], U]) -> RequiresContextFutureResult[U, E, Deps]: ...
297
async def alt(self, func: Callable[[E], RequiresContextFutureResult[T, F, Deps]]) -> RequiresContextFutureResult[T, F, Deps]: ...
298
```
299
300
Usage examples:
301
302
```python
303
from returns.context import RequiresContextFutureResult
304
from returns.future import FutureResult, future_safe
305
306
class DatabaseConfig:
307
def __init__(self, url: str, timeout: int):
308
self.url = url
309
self.timeout = timeout
310
311
@future_safe
312
async def connect_to_db(config: DatabaseConfig) -> str:
313
# Simulate async database connection
314
await asyncio.sleep(config.timeout / 1000)
315
if not config.url:
316
raise ValueError("Database URL required")
317
return f"Connected to {config.url}"
318
319
def get_connection() -> RequiresContextFutureResult[str, Exception, DatabaseConfig]:
320
return RequiresContextFutureResult(connect_to_db)
321
322
# Usage with context
323
config = DatabaseConfig("postgresql://localhost", 100)
324
connection_result = await get_connection()(config) # Success("Connected to postgresql://localhost")
325
```
326
327
## Async Patterns
328
329
### Pipeline with Error Handling
330
331
```python
332
from returns.future import FutureResult, future_safe
333
from returns.pointfree import bind
334
335
@future_safe
336
async def validate_input(data: str) -> str:
337
if not data.strip():
338
raise ValueError("Empty input")
339
return data.strip()
340
341
@future_safe
342
async def process_data(data: str) -> dict:
343
await asyncio.sleep(0.1)
344
return {"processed": data.upper()}
345
346
@future_safe
347
async def save_result(result: dict) -> str:
348
await asyncio.sleep(0.1)
349
return f"Saved: {result}"
350
351
# Async pipeline
352
async def process_pipeline(input_data: str) -> Result[str, Exception]:
353
return await (
354
FutureResult.from_success(input_data)
355
.bind(validate_input)
356
.bind(process_data)
357
.bind(save_result)
358
)
359
```
360
361
### Concurrent Processing with Error Collection
362
363
```python
364
from returns.future import FutureResult, gather_all
365
366
async def process_items_concurrently(items: list[str]) -> list[Result[str, Exception]]:
367
# Process all items concurrently, collecting all results
368
futures = [
369
future_safe(lambda item: process_single_item(item))(item)
370
for item in items
371
]
372
373
all_results = await gather_all(*futures)
374
return await all_results # FutureResult[List[Result[str, Exception]], Never]
375
```
376
377
Async operations in Returns provide a clean, composable way to handle asynchronous programming with proper error handling and type safety, maintaining functional programming principles in concurrent environments.