0
# Iteration Utilities
1
2
Utilities for working with iterables of containers, providing declarative approaches to collection processing with type-safe error propagation and functional composition patterns.
3
4
## Capabilities
5
6
### Fold Operations
7
8
Declarative processing of iterables containing containers, enabling functional-style iteration without explicit loops.
9
10
```python { .api }
11
class AbstractFold:
12
"""Abstract base for folding operations"""
13
14
class Fold:
15
"""Concrete implementation for declarative iterable actions"""
16
17
@staticmethod
18
def loop(
19
iterable: Iterable[Container[T]],
20
acc: Container[U],
21
func: Callable[[U, T], Container[U]]
22
) -> Container[U]:
23
"""Declarative loops for applicative types"""
24
25
@staticmethod
26
def collect(
27
iterable: Iterable[Container[T]],
28
acc: Container[list[T]]
29
) -> Container[list[T]]:
30
"""Transform iterable of containers to single container of list"""
31
32
@staticmethod
33
def collect_all(
34
iterable: Iterable[Container[T, E]],
35
acc: Container[list[T], E]
36
) -> Container[list[Result[T, E]], Never]:
37
"""Collect all values even if some operations fail"""
38
```
39
40
Usage examples:
41
42
```python
43
from returns.iterables import Fold
44
from returns.result import Success, Failure, Result
45
from returns.maybe import Some, Nothing, Maybe
46
47
# Collect successful results
48
results = [Success(1), Success(2), Success(3)]
49
collected = Fold.collect(results, Success([])) # Success([1, 2, 3])
50
51
# Collect with failures (fails fast)
52
mixed_results = [Success(1), Failure("error"), Success(3)]
53
collected = Fold.collect(mixed_results, Success([])) # Failure("error")
54
55
# Collect all results (including failures)
56
all_collected = Fold.collect_all(mixed_results, Success([]))
57
# Success([Success(1), Failure("error"), Success(3)])
58
59
# Loop with accumulator
60
def sum_values(acc: int, value: int) -> Result[int, str]:
61
return Success(acc + value)
62
63
numbers = [Success(1), Success(2), Success(3)]
64
total = Fold.loop(numbers, Success(0), sum_values) # Success(6)
65
66
# Loop with Maybe values
67
maybe_values = [Some(1), Some(2), Nothing, Some(4)]
68
def sum_maybe(acc: int, value: int) -> Maybe[int]:
69
return Some(acc + value)
70
71
total_maybe = Fold.loop(maybe_values, Some(0), sum_maybe) # Nothing (due to Nothing in list)
72
```
73
74
### Collection Processing Functions
75
76
Utility functions for processing collections of containers with various strategies.
77
78
```python { .api }
79
def partition(containers: Iterable[Result[T, E]]) -> tuple[list[T], list[E]]:
80
"""Partition Result containers into successes and failures"""
81
82
def sequence(containers: Iterable[Container[T]]) -> Container[list[T]]:
83
"""Convert iterable of containers to container of list (fails fast)"""
84
85
def traverse(func: Callable[[T], Container[U]], iterable: Iterable[T]) -> Container[list[U]]:
86
"""Map function over iterable and sequence results"""
87
88
def filter_success(containers: Iterable[Result[T, E]]) -> Iterator[T]:
89
"""Extract only successful values from Result containers"""
90
91
def filter_failure(containers: Iterable[Result[T, E]]) -> Iterator[E]:
92
"""Extract only failure values from Result containers"""
93
94
def filter_some(containers: Iterable[Maybe[T]]) -> Iterator[T]:
95
"""Extract only Some values from Maybe containers"""
96
```
97
98
Usage examples:
99
100
```python
101
from returns.iterables import partition, sequence, traverse, filter_success
102
from returns.result import Success, Failure, safe
103
from returns.maybe import Some, Nothing
104
105
# Partition results
106
results = [Success(1), Failure("error1"), Success(2), Failure("error2")]
107
successes, failures = partition(results)
108
# successes: [1, 2]
109
# failures: ["error1", "error2"]
110
111
# Sequence containers (all must succeed)
112
all_success = [Success(1), Success(2), Success(3)]
113
sequenced = sequence(all_success) # Success([1, 2, 3])
114
115
mixed = [Success(1), Failure("error"), Success(3)]
116
sequenced_mixed = sequence(mixed) # Failure("error")
117
118
# Traverse with function
119
@safe
120
def double(x: int) -> int:
121
return x * 2
122
123
numbers = [1, 2, 3, 4]
124
doubled = traverse(double, numbers) # Success([2, 4, 6, 8])
125
126
# Filter successful values
127
mixed_results = [Success(1), Failure("error"), Success(3), Success(5)]
128
successful_values = list(filter_success(mixed_results)) # [1, 3, 5]
129
130
# Filter Some values
131
maybe_values = [Some(1), Nothing, Some(3), Nothing, Some(5)]
132
some_values = list(filter_some(maybe_values)) # [1, 3, 5]
133
```
134
135
### Async Collection Processing
136
137
Utilities for processing collections of async containers.
138
139
```python { .api }
140
async def async_sequence(containers: Iterable[Future[T]]) -> Future[list[T]]:
141
"""Sequence Future containers asynchronously"""
142
143
async def async_traverse(
144
func: Callable[[T], Future[U]],
145
iterable: Iterable[T]
146
) -> Future[list[U]]:
147
"""Async traverse with function"""
148
149
async def async_collect(
150
containers: Iterable[FutureResult[T, E]]
151
) -> FutureResult[list[T], E]:
152
"""Collect FutureResult containers (fails fast)"""
153
154
async def async_collect_all(
155
containers: Iterable[FutureResult[T, E]]
156
) -> FutureResult[list[Result[T, E]], Never]:
157
"""Collect all FutureResult containers"""
158
```
159
160
Usage examples:
161
162
```python
163
import asyncio
164
from returns.iterables import async_sequence, async_traverse, async_collect
165
from returns.future import Future, FutureResult, future, future_safe
166
167
# Async sequence
168
@future
169
async def fetch_data(id: int) -> str:
170
await asyncio.sleep(0.1)
171
return f"Data {id}"
172
173
futures = [fetch_data(i) for i in range(1, 4)]
174
all_data = await async_sequence(futures) # ["Data 1", "Data 2", "Data 3"]
175
176
# Async traverse
177
@future
178
async def process_item(item: str) -> str:
179
await asyncio.sleep(0.1)
180
return item.upper()
181
182
items = ["hello", "world", "async"]
183
processed = await async_traverse(process_item, items) # ["HELLO", "WORLD", "ASYNC"]
184
185
# Async collect with error handling
186
@future_safe
187
async def risky_operation(x: int) -> int:
188
if x < 0:
189
raise ValueError(f"Negative value: {x}")
190
await asyncio.sleep(0.1)
191
return x * 2
192
193
operations = [risky_operation(i) for i in [1, 2, 3]]
194
results = await async_collect(operations) # Success([2, 4, 6])
195
196
# With errors
197
operations_with_error = [risky_operation(i) for i in [1, -2, 3]]
198
results_with_error = await async_collect(operations_with_error) # Failure(ValueError("Negative value: -2"))
199
```
200
201
### Batching and Chunking
202
203
Utilities for processing large collections in batches.
204
205
```python { .api }
206
def batch_process(
207
items: Iterable[T],
208
batch_size: int,
209
processor: Callable[[list[T]], Container[list[U]]]
210
) -> Container[list[U]]:
211
"""Process items in batches"""
212
213
async def async_batch_process(
214
items: Iterable[T],
215
batch_size: int,
216
processor: Callable[[list[T]], Future[list[U]]]
217
) -> Future[list[U]]:
218
"""Async batch processing"""
219
220
def chunk(iterable: Iterable[T], size: int) -> Iterator[list[T]]:
221
"""Split iterable into chunks of specified size"""
222
```
223
224
Usage examples:
225
226
```python
227
from returns.iterables import batch_process, chunk
228
from returns.result import Success, safe
229
230
# Batch processing
231
@safe
232
def process_batch(items: list[int]) -> list[int]:
233
# Simulate batch processing (e.g., database operations)
234
return [x * 2 for x in items]
235
236
large_dataset = list(range(100))
237
processed = batch_process(large_dataset, 10, process_batch)
238
# Success([0, 2, 4, 6, ..., 198])
239
240
# Chunking
241
items = list(range(10))
242
chunks = list(chunk(items, 3))
243
# [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]
244
245
# Manual batch processing with chunks
246
def process_in_batches(items: list[int], batch_size: int) -> Result[list[int], str]:
247
results = []
248
for batch in chunk(items, batch_size):
249
batch_result = process_batch(batch)
250
if isinstance(batch_result, Failure):
251
return batch_result
252
results.extend(batch_result.unwrap())
253
return Success(results)
254
```
255
256
### Stream Processing
257
258
Utilities for lazy stream processing with containers.
259
260
```python { .api }
261
def stream_map(func: Callable[[T], Container[U]], stream: Iterator[T]) -> Iterator[Container[U]]:
262
"""Lazy map over stream with containers"""
263
264
def stream_filter(predicate: Callable[[T], Container[bool]], stream: Iterator[T]) -> Iterator[T]:
265
"""Filter stream with container-returning predicate"""
266
267
def stream_take_while(predicate: Callable[[T], Container[bool]], stream: Iterator[T]) -> Iterator[T]:
268
"""Take elements while predicate returns Success(True)"""
269
270
def stream_fold(
271
func: Callable[[U, T], Container[U]],
272
initial: U,
273
stream: Iterator[T]
274
) -> Container[U]:
275
"""Fold over stream with early termination on failure"""
276
```
277
278
Usage examples:
279
280
```python
281
from returns.iterables import stream_map, stream_filter, stream_fold
282
from returns.result import Success, Failure, safe
283
from returns.maybe import Some, Nothing
284
285
# Stream mapping
286
@safe
287
def parse_int(s: str) -> int:
288
return int(s)
289
290
number_strings = ["1", "2", "invalid", "4", "5"]
291
parsed_stream = stream_map(parse_int, iter(number_strings))
292
results = list(parsed_stream)
293
# [Success(1), Success(2), Failure(ValueError(...)), Success(4), Success(5)]
294
295
# Stream filtering
296
@safe
297
def is_even(x: int) -> bool:
298
return x % 2 == 0
299
300
numbers = [1, 2, 3, 4, 5, 6]
301
even_stream = stream_filter(is_even, iter(numbers))
302
evens = list(even_stream) # [2, 4, 6]
303
304
# Stream folding with early termination
305
def safe_add(acc: int, x: int) -> Result[int, str]:
306
if x < 0:
307
return Failure("Negative number encountered")
308
return Success(acc + x)
309
310
positive_numbers = [1, 2, 3, 4, 5]
311
total = stream_fold(safe_add, 0, iter(positive_numbers)) # Success(15)
312
313
mixed_numbers = [1, 2, -3, 4, 5]
314
total_mixed = stream_fold(safe_add, 0, iter(mixed_numbers)) # Failure("Negative number encountered")
315
```
316
317
## Processing Patterns
318
319
### Error Accumulation
320
321
```python
322
from returns.iterables import Fold, partition
323
from returns.result import Success, Failure
324
325
def validate_all(items: list[str]) -> Result[list[int], list[str]]:
326
"""Validate all items and accumulate errors"""
327
results = [safe(int)(item) for item in items]
328
329
# Separate successes and failures
330
successes, failures = partition(results)
331
332
if failures:
333
return Failure([str(error) for error in failures])
334
return Success(successes)
335
336
# Usage
337
valid_items = ["1", "2", "3"]
338
result = validate_all(valid_items) # Success([1, 2, 3])
339
340
invalid_items = ["1", "invalid", "3", "also_invalid"]
341
result_with_errors = validate_all(invalid_items)
342
# Failure(["invalid literal...", "invalid literal..."])
343
```
344
345
### Pipeline Processing
346
347
```python
348
from returns.iterables import traverse, sequence
349
from returns.result import safe
350
from returns.pipeline import flow
351
352
# Multi-step pipeline
353
@safe
354
def validate_positive(x: int) -> int:
355
if x <= 0:
356
raise ValueError("Must be positive")
357
return x
358
359
@safe
360
def square(x: int) -> int:
361
return x * x
362
363
def process_numbers(numbers: list[int]) -> Result[list[int], Exception]:
364
return flow(
365
numbers,
366
lambda nums: traverse(validate_positive, nums),
367
lambda container: container.bind(lambda vals: traverse(square, vals))
368
)
369
370
# Usage
371
numbers = [1, 2, 3, 4]
372
result = process_numbers(numbers) # Success([1, 4, 9, 16])
373
374
numbers_with_negative = [1, -2, 3]
375
result_with_error = process_numbers(numbers_with_negative) # Failure(ValueError("Must be positive"))
376
```
377
378
Iteration utilities provide powerful abstractions for working with collections of containers, enabling functional-style processing with proper error handling and type safety while maintaining composability and expressiveness.