or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-operations.mdcontainer-methods.mdcontext-operations.mdconversions.mdcore-containers.mddevelopment-tools.mdfunctional-utilities.mdindex.mditeration-utilities.mdpointfree.mdtrampolines.mdunsafe-operations.md

async-operations.mddocs/

0

# Async Operations

1

2

Containers and utilities for handling asynchronous operations with type safety and composable error handling in concurrent environments. These provide monadic abstractions over Python's async/await functionality.

3

4

## Capabilities

5

6

### Future Container

7

8

Container for asynchronous operations that never fail, providing composable async programming.

9

10

```python { .api }

11

class Future[T]:

12

"""Container for async operations that never fail"""

13

def __init__(self, awaitable: Awaitable[T]): ...

14

async def bind(self, func: Callable[[T], Future[U]]) -> Future[U]: ...

15

async def map(self, func: Callable[[T], U]) -> Future[U]: ...

16

async def apply(self, wrapped_func: Future[Callable[[T], U]]) -> Future[U]: ...

17

async def awaitable(self) -> T: ...

18

def __await__(self): ... # Enable direct awaiting

19

20

@classmethod

21

def from_value(cls, value: T) -> Future[T]: ...

22

@classmethod

23

def from_coroutine(cls, coro: Awaitable[T]) -> Future[T]: ...

24

25

def future(func: Callable[..., Awaitable[T]]) -> Callable[..., Future[T]]:

26

"""Decorator to turn coroutine function into Future-returning function"""

27

28

async def async_identity(instance: T) -> T:

29

"""Async version of identity function"""

30

```

31

32

Usage examples:

33

34

```python

35

import asyncio

36

from returns.future import Future, future, async_identity

37

38

# Creating Future from awaitable

39

async def fetch_data() -> str:

40

await asyncio.sleep(1)

41

return "Hello, World!"

42

43

future_data = Future(fetch_data())

44

result = await future_data.awaitable() # "Hello, World!"

45

46

# Direct awaiting

47

result = await future_data # "Hello, World!"

48

49

# Using decorator

50

@future

51

async def fetch_user(user_id: int) -> dict:

52

await asyncio.sleep(0.5)

53

return {"id": user_id, "name": "John"}

54

55

user_future = fetch_user(123) # Returns Future[dict]

56

user = await user_future # {"id": 123, "name": "John"}

57

58

# Chaining async operations

59

async def get_user_name(user: dict) -> str:

60

return user["name"]

61

62

@future

63

async def format_greeting(name: str) -> str:

64

return f"Hello, {name}!"

65

66

# Compose async operations

67

async def greet_user(user_id: int) -> str:

68

return await (

69

fetch_user(user_id)

70

.bind(lambda user: Future(get_user_name(user)))

71

.bind(format_greeting)

72

)

73

74

greeting = await greet_user(123) # "Hello, John!"

75

```

76

77

### FutureResult Container

78

79

Container for asynchronous operations that can fail, combining Future with Result semantics.

80

81

```python { .api }

82

class FutureResult[T, E]:

83

"""Container for async operations that can fail"""

84

def __init__(self, awaitable: Awaitable[Result[T, E]]): ...

85

async def bind(self, func: Callable[[T], FutureResult[U, E]]) -> FutureResult[U, E]: ...

86

async def map(self, func: Callable[[T], U]) -> FutureResult[U, E]: ...

87

async def apply(self, wrapped_func: FutureResult[Callable[[T], U], E]) -> FutureResult[U, E]: ...

88

async def alt(self, func: Callable[[E], FutureResult[T, F]]) -> FutureResult[T, F]: ...

89

async def lash(self, func: Callable[[E], FutureResult[T, F]]) -> FutureResult[T, F]: ...

90

async def awaitable(self) -> Result[T, E]: ...

91

def __await__(self): ... # Enable direct awaiting

92

93

@classmethod

94

def from_success(cls, value: T) -> FutureResult[T, E]: ...

95

@classmethod

96

def from_failure(cls, error: E) -> FutureResult[T, E]: ...

97

@classmethod

98

def from_result(cls, result: Result[T, E]) -> FutureResult[T, E]: ...

99

100

# Type alias

101

FutureResultE[T] = FutureResult[T, Exception]

102

103

# Constructor functions

104

def FutureSuccess(value: T) -> FutureResult[T, E]:

105

"""Create successful FutureResult"""

106

107

def FutureFailure(error: E) -> FutureResult[T, E]:

108

"""Create failed FutureResult"""

109

110

def future_safe(func: Callable[..., Awaitable[T]]) -> Callable[..., FutureResult[T, Exception]]:

111

"""Convert exception-throwing coroutine to FutureResult"""

112

```

113

114

Usage examples:

115

116

```python

117

import asyncio

118

from returns.future import FutureResult, FutureSuccess, FutureFailure, future_safe

119

from returns.result import Success, Failure

120

121

# Creating FutureResult instances

122

async def risky_operation(x: int) -> Result[int, str]:

123

await asyncio.sleep(0.1)

124

if x < 0:

125

return Failure("Negative value not allowed")

126

return Success(x * 2)

127

128

future_result = FutureResult(risky_operation(5))

129

result = await future_result # Success(10)

130

131

# Using constructor functions

132

success_future = FutureSuccess(42)

133

failure_future = FutureFailure("Something went wrong")

134

135

# Using future_safe decorator

136

@future_safe

137

async def divide_async(a: int, b: int) -> float:

138

if b == 0:

139

raise ValueError("Division by zero")

140

await asyncio.sleep(0.1)

141

return a / b

142

143

division_result = divide_async(10, 2) # FutureResult[float, Exception]

144

result = await division_result # Success(5.0)

145

146

error_result = await divide_async(10, 0) # Failure(ValueError("Division by zero"))

147

148

# Chaining FutureResult operations

149

@future_safe

150

async def validate_positive(x: int) -> int:

151

if x <= 0:

152

raise ValueError("Must be positive")

153

return x

154

155

@future_safe

156

async def square_async(x: int) -> int:

157

await asyncio.sleep(0.1)

158

return x * x

159

160

# Compose operations

161

async def process_number(x: int) -> Result[int, Exception]:

162

return await (

163

FutureSuccess(x)

164

.bind(validate_positive)

165

.bind(square_async)

166

)

167

168

result = await process_number(5) # Success(25)

169

result = await process_number(-5) # Failure(ValueError("Must be positive"))

170

```

171

172

### Async Utilities

173

174

Utility functions for working with async operations and conversions.

175

176

```python { .api }

177

def asyncify(func: Callable[..., T]) -> Callable[..., Awaitable[T]]:

178

"""Convert sync function to async function"""

179

180

async def async_partial(func: Callable, *args, **kwargs) -> Callable:

181

"""Async version of partial application"""

182

183

def from_future(future: asyncio.Future[T]) -> Future[T]:

184

"""Convert asyncio.Future to Returns Future"""

185

186

def to_future(container: Future[T]) -> asyncio.Future[T]:

187

"""Convert Returns Future to asyncio.Future"""

188

```

189

190

Usage examples:

191

192

```python

193

import asyncio

194

from returns.future import asyncify, from_future, to_future, Future

195

196

# Convert sync to async

197

def sync_operation(x: int) -> int:

198

return x * 2

199

200

async_operation = asyncify(sync_operation)

201

result = await async_operation(21) # 42

202

203

# Working with asyncio.Future

204

async def create_asyncio_future() -> str:

205

future = asyncio.Future()

206

future.set_result("Hello from asyncio")

207

return future.result()

208

209

asyncio_future = asyncio.ensure_future(create_asyncio_future())

210

returns_future = from_future(asyncio_future)

211

result = await returns_future # "Hello from asyncio"

212

213

# Convert back to asyncio.Future

214

returns_future = Future(async_operation(10))

215

asyncio_future = to_future(returns_future)

216

result = await asyncio_future # 20

217

```

218

219

### Concurrent Operations

220

221

Utilities for handling multiple concurrent operations with proper error handling.

222

223

```python { .api }

224

async def gather(*futures: Future[T]) -> Future[list[T]]:

225

"""Gather multiple Future operations"""

226

227

async def gather_result(*futures: FutureResult[T, E]) -> FutureResult[list[T], E]:

228

"""Gather multiple FutureResult operations, failing fast on first error"""

229

230

async def gather_all(*futures: FutureResult[T, E]) -> FutureResult[list[Result[T, E]], Never]:

231

"""Gather all FutureResult operations, collecting all results"""

232

233

async def race(*futures: Future[T]) -> Future[T]:

234

"""Return the first Future to complete"""

235

```

236

237

Usage examples:

238

239

```python

240

import asyncio

241

from returns.future import Future, FutureResult, FutureSuccess

242

from returns.future import gather, gather_result, race

243

244

# Gather successful operations

245

@future

246

async def fetch_item(item_id: int) -> str:

247

await asyncio.sleep(0.1)

248

return f"Item {item_id}"

249

250

items_future = gather(

251

fetch_item(1),

252

fetch_item(2),

253

fetch_item(3)

254

)

255

items = await items_future # ["Item 1", "Item 2", "Item 3"]

256

257

# Gather with error handling

258

@future_safe

259

async def fetch_user_data(user_id: int) -> dict:

260

if user_id < 0:

261

raise ValueError("Invalid user ID")

262

await asyncio.sleep(0.1)

263

return {"id": user_id, "name": f"User {user_id}"}

264

265

# This will fail fast on first error

266

users_result = await gather_result(

267

fetch_user_data(1),

268

fetch_user_data(-1), # This will fail

269

fetch_user_data(3)

270

) # Failure(ValueError("Invalid user ID"))

271

272

# Race for first completion

273

fast_future = Future(asyncio.sleep(0.1, "fast"))

274

slow_future = Future(asyncio.sleep(0.2, "slow"))

275

276

winner = await race(fast_future, slow_future) # "fast"

277

```

278

279

### Async Context Operations

280

281

Async versions of context-dependent operations combining Reader pattern with Future.

282

283

```python { .api }

284

class RequiresContextFuture[T, Deps]:

285

"""Async context-dependent computation"""

286

def __init__(self, func: Callable[[Deps], Future[T]]): ...

287

def __call__(self, deps: Deps) -> Future[T]: ...

288

async def bind(self, func: Callable[[T], RequiresContextFuture[U, Deps]]) -> RequiresContextFuture[U, Deps]: ...

289

async def map(self, func: Callable[[T], U]) -> RequiresContextFuture[U, Deps]: ...

290

291

class RequiresContextFutureResult[T, E, Deps]:

292

"""Async context-dependent computation that can fail"""

293

def __init__(self, func: Callable[[Deps], FutureResult[T, E]]): ...

294

def __call__(self, deps: Deps) -> FutureResult[T, E]: ...

295

async def bind(self, func: Callable[[T], RequiresContextFutureResult[U, E, Deps]]) -> RequiresContextFutureResult[U, E, Deps]: ...

296

async def map(self, func: Callable[[T], U]) -> RequiresContextFutureResult[U, E, Deps]: ...

297

async def alt(self, func: Callable[[E], RequiresContextFutureResult[T, F, Deps]]) -> RequiresContextFutureResult[T, F, Deps]: ...

298

```

299

300

Usage examples:

301

302

```python

303

from returns.context import RequiresContextFutureResult

304

from returns.future import FutureResult, future_safe

305

306

class DatabaseConfig:

307

def __init__(self, url: str, timeout: int):

308

self.url = url

309

self.timeout = timeout

310

311

@future_safe

312

async def connect_to_db(config: DatabaseConfig) -> str:

313

# Simulate async database connection

314

await asyncio.sleep(config.timeout / 1000)

315

if not config.url:

316

raise ValueError("Database URL required")

317

return f"Connected to {config.url}"

318

319

def get_connection() -> RequiresContextFutureResult[str, Exception, DatabaseConfig]:

320

return RequiresContextFutureResult(connect_to_db)

321

322

# Usage with context

323

config = DatabaseConfig("postgresql://localhost", 100)

324

connection_result = await get_connection()(config) # Success("Connected to postgresql://localhost")

325

```

326

327

## Async Patterns

328

329

### Pipeline with Error Handling

330

331

```python

332

from returns.future import FutureResult, future_safe

333

from returns.pointfree import bind

334

335

@future_safe

336

async def validate_input(data: str) -> str:

337

if not data.strip():

338

raise ValueError("Empty input")

339

return data.strip()

340

341

@future_safe

342

async def process_data(data: str) -> dict:

343

await asyncio.sleep(0.1)

344

return {"processed": data.upper()}

345

346

@future_safe

347

async def save_result(result: dict) -> str:

348

await asyncio.sleep(0.1)

349

return f"Saved: {result}"

350

351

# Async pipeline

352

async def process_pipeline(input_data: str) -> Result[str, Exception]:

353

return await (

354

FutureResult.from_success(input_data)

355

.bind(validate_input)

356

.bind(process_data)

357

.bind(save_result)

358

)

359

```

360

361

### Concurrent Processing with Error Collection

362

363

```python

364

from returns.future import FutureResult, gather_all

365

366

async def process_items_concurrently(items: list[str]) -> list[Result[str, Exception]]:

367

# Process all items concurrently, collecting all results

368

futures = [

369

future_safe(lambda item: process_single_item(item))(item)

370

for item in items

371

]

372

373

all_results = await gather_all(*futures)

374

return await all_results # FutureResult[List[Result[str, Exception]], Never]

375

```

376

377

Async operations in Returns provide a clean, composable way to handle asynchronous programming with proper error handling and type safety, maintaining functional programming principles in concurrent environments.