0
# Iteration Tools
1
2
Async versions of itertools functions providing powerful iteration patterns including infinite iterators, filtering, grouping, and combinatorial functions. These tools enable sophisticated data processing workflows with async iterables.
3
4
## Capabilities
5
6
### Infinite Iterators
7
8
Generate infinite sequences from finite inputs.
9
10
```python { .api }
11
def cycle(iterable: AnyIterable[T]) -> AsyncIterator[T]:
12
"""
13
Cycle through iterable infinitely.
14
15
Parameters:
16
- iterable: AnyIterable[T] - Iterable to cycle through
17
18
Returns:
19
AsyncIterator[T] - Infinite iterator cycling through items
20
"""
21
```
22
23
### Accumulation and Batching
24
25
Functions for accumulating values and grouping items.
26
27
```python { .api }
28
def accumulate(iterable: AnyIterable[ADD]) -> AsyncIterator[ADD]: ...
29
def accumulate(iterable: AnyIterable[ADD], *, initial: ADD) -> AsyncIterator[ADD]: ...
30
def accumulate(iterable: AnyIterable[T], function: Callable[[T, T], T] | Callable[[T, T], Awaitable[T]]) -> AsyncIterator[T]: ...
31
def accumulate(iterable: AnyIterable[T2], function: Callable[[T1, T2], T1] | Callable[[T1, T2], Awaitable[T1]], *, initial: T1) -> AsyncIterator[T1]:
32
"""
33
Running totals or accumulated values.
34
35
Parameters:
36
- iterable: AnyIterable[T] - Input iterable
37
- function: Callable[[T, T], T] or Callable[[T, T], Awaitable[T]], optional - Binary function (default: operator.add)
38
- initial: T, optional - Initial value
39
40
Returns:
41
AsyncIterator[T] - Iterator of accumulated values
42
"""
43
44
def batched(iterable, n, strict=False):
45
"""
46
Group items into fixed-size batches.
47
48
Parameters:
49
- iterable: AnyIterable[T] - Input iterable
50
- n: int - Batch size
51
- strict: bool - If True, raise ValueError if final batch is incomplete
52
53
Returns:
54
AsyncIterator[Tuple[T, ...]] - Iterator of batches
55
56
Raises:
57
ValueError - If strict=True and final batch is incomplete
58
"""
59
```
60
61
### Iterator Chaining and Combination
62
63
Combine multiple iterables in various ways.
64
65
```python { .api }
66
class chain:
67
"""
68
Chain multiple iterables into a single iterator.
69
"""
70
71
def __init__(self, *iterables):
72
"""
73
Parameters:
74
- *iterables: AnyIterable[T] - Variable number of iterables to chain
75
"""
76
77
@classmethod
78
def from_iterable(cls, iterable):
79
"""
80
Create chain from iterable of iterables.
81
82
Parameters:
83
- iterable: AnyIterable[AnyIterable[T]] - Iterable of iterables
84
85
Returns:
86
chain[T] - Chain iterator
87
"""
88
89
async def __anext__(self):
90
"""Get next item from chained iterables."""
91
92
async def aclose(self):
93
"""Close all underlying iterables."""
94
95
def zip_longest(*iterables, fillvalue=None):
96
"""
97
Zip iterables with padding for unequal lengths.
98
99
Parameters:
100
- *iterables: AnyIterable - Iterables to zip
101
- fillvalue: T - Value to use for padding shorter iterables
102
103
Returns:
104
AsyncIterator[Tuple] - Iterator of tuples with padding
105
"""
106
```
107
108
### Filtering and Selection
109
110
Functions for conditional iteration and selection.
111
112
```python { .api }
113
def compress(data, selectors):
114
"""
115
Filter data based on corresponding selectors.
116
117
Parameters:
118
- data: AnyIterable[T] - Data to filter
119
- selectors: AnyIterable[Any] - Boolean selectors
120
121
Returns:
122
AsyncIterator[T] - Iterator of selected data items
123
"""
124
125
def dropwhile(predicate, iterable):
126
"""
127
Drop items while predicate is true, then yield remaining items.
128
129
Parameters:
130
- predicate: Callable[[T], bool] - Test function
131
- iterable: AnyIterable[T] - Input iterable
132
133
Returns:
134
AsyncIterator[T] - Iterator starting after predicate becomes false
135
"""
136
137
def filterfalse(predicate, iterable):
138
"""
139
Filter items where predicate is false.
140
141
Parameters:
142
- predicate: Callable[[T], bool] or None - Test function
143
- iterable: AnyIterable[T] - Input iterable
144
145
Returns:
146
AsyncIterator[T] - Iterator of items where predicate is false
147
"""
148
149
def takewhile(predicate, iterable):
150
"""
151
Take items while predicate is true, then stop.
152
153
Parameters:
154
- predicate: Callable[[T], bool] - Test function
155
- iterable: AnyIterable[T] - Input iterable
156
157
Returns:
158
AsyncIterator[T] - Iterator of items while predicate is true
159
"""
160
161
def islice(iterable, start, stop=None, step=None):
162
"""
163
Slice async iterable like built-in slice.
164
165
Parameters:
166
- iterable: AnyIterable[T] - Input iterable
167
- start: int or None - Start index or stop if stop is None
168
- stop: int or None - Stop index
169
- step: int or None - Step size
170
171
Returns:
172
AsyncIterator[T] - Sliced iterator
173
"""
174
```
175
176
### Function Application and Mapping
177
178
Advanced mapping operations.
179
180
```python { .api }
181
def starmap(function, iterable):
182
"""
183
Apply function to unpacked tuples from iterable.
184
185
Parameters:
186
- function: Callable - Function to apply (can be sync or async)
187
- iterable: AnyIterable[Tuple] - Iterable of argument tuples
188
189
Returns:
190
AsyncIterator[R] - Iterator of function results
191
"""
192
```
193
194
### Iterator Utilities
195
196
Utility functions for working with iterators.
197
198
```python { .api }
199
def pairwise(iterable):
200
"""
201
Return successive overlapping pairs from iterable.
202
203
Parameters:
204
- iterable: AnyIterable[T] - Input iterable
205
206
Returns:
207
AsyncIterator[Tuple[T, T]] - Iterator of overlapping pairs
208
"""
209
210
class tee:
211
"""
212
Split async iterable into n independent iterators.
213
"""
214
215
def __init__(self, iterable, n=2, lock=None):
216
"""
217
Parameters:
218
- iterable: AnyIterable[T] - Source iterable
219
- n: int - Number of independent iterators
220
- lock: AsyncContextManager, optional - Lock for thread safety
221
"""
222
223
def __len__(self):
224
"""Return number of iterators."""
225
226
def __getitem__(self, item):
227
"""Get iterator by index or slice."""
228
229
def __iter__(self):
230
"""Return sync iterator over async iterators."""
231
232
async def __aenter__(self):
233
"""Enter async context."""
234
235
async def __aexit__(self, exc_type, exc_val, exc_tb):
236
"""Exit async context and cleanup."""
237
238
async def aclose(self):
239
"""Close all iterators."""
240
```
241
242
### Grouping Operations
243
244
Group items based on keys.
245
246
```python { .api }
247
def groupby(iterable, key=None):
248
"""
249
Group consecutive items by key function.
250
251
Parameters:
252
- iterable: AnyIterable[T] - Input iterable (should be sorted by key)
253
- key: Callable[[T], K] or None - Key function (default: identity)
254
255
Returns:
256
AsyncIterator[Tuple[K, AsyncIterator[T]]] - Iterator of (key, group) pairs
257
"""
258
```
259
260
## Usage Examples
261
262
### Data Processing Pipeline
263
```python
264
async def process_data():
265
async def sensor_readings():
266
for i in range(100):
267
yield {"temp": 20 + i % 10, "humidity": 50 + i % 20}
268
269
# Batch readings into groups of 10
270
batches = batched(sensor_readings(), 10)
271
272
# Calculate average temperature per batch
273
async for batch in batches:
274
temps = [reading["temp"] for reading in batch]
275
avg_temp = sum(temps) / len(temps)
276
print(f"Average temperature: {avg_temp}")
277
```
278
279
### Combining Multiple Data Sources
280
```python
281
async def combine_sources():
282
async def source1():
283
for i in range(5):
284
yield f"A{i}"
285
286
async def source2():
287
for i in range(3):
288
yield f"B{i}"
289
290
# Chain sources together
291
combined = chain(source1(), source2())
292
items = await list(combined) # ['A0', 'A1', 'A2', 'A3', 'A4', 'B0', 'B1', 'B2']
293
294
# Or create from iterable of iterables
295
sources = [source1(), source2()]
296
combined2 = chain.from_iterable(sources)
297
```
298
299
### Grouping and Filtering
300
```python
301
async def group_and_filter():
302
async def scores():
303
data = [("Alice", 85), ("Bob", 92), ("Alice", 78), ("Charlie", 95), ("Bob", 88)]
304
for item in data:
305
yield item
306
307
# Group by student name (data must be sorted first)
308
sorted_scores = sorted(await list(scores()), key=lambda x: x[0])
309
310
async def sorted_async():
311
for item in sorted_scores:
312
yield item
313
314
# Group consecutive items by student
315
async for student, grades in groupby(sorted_async(), key=lambda x: x[0]):
316
grade_list = [grade for _, grade in await list(grades)]
317
avg = sum(grade_list) / len(grade_list)
318
print(f"{student}: {avg:.1f}")
319
```