0
# Heap Operations
1
2
Async versions of heapq functions for working with priority queues and sorted data structures in async contexts. These functions enable efficient operations on sorted async iterables.
3
4
## Capabilities
5
6
### Merging Sorted Iterables
7
8
Efficiently merge multiple sorted async iterables into a single sorted output.
9
10
```python { .api }
11
def merge(*iterables, key=None, reverse=False):
12
"""
13
Merge multiple sorted async iterables into single sorted iterator.
14
15
Parameters:
16
- *iterables: AnyIterable - Variable number of sorted iterables to merge
17
- key: Callable[[T], Any] or None - Key function for comparison
18
- reverse: bool - If True, merge in descending order
19
20
Returns:
21
AsyncIterator[T] - Iterator yielding items in sorted order
22
23
Note:
24
Input iterables must already be sorted in the same order as specified
25
by key and reverse parameters.
26
"""
27
```
28
29
### Finding Largest/Smallest Elements
30
31
Extract the n largest or smallest elements from async iterables efficiently.
32
33
```python { .api }
34
async def nlargest(iterable, n, key=None):
35
"""
36
Find n largest elements from async iterable.
37
38
Parameters:
39
- iterable: AsyncIterator[T] - Input iterable
40
- n: int - Number of largest elements to return
41
- key: Callable[[T], Any] or None - Key function for comparison
42
43
Returns:
44
List[T] - List of n largest elements in descending order
45
46
Note:
47
Equivalent to sorted(iterable, key=key, reverse=True)[:n] but more efficient
48
for small values of n relative to iterable length.
49
"""
50
51
async def nsmallest(iterable, n, key=None):
52
"""
53
Find n smallest elements from async iterable.
54
55
Parameters:
56
- iterable: AsyncIterator[T] - Input iterable
57
- n: int - Number of smallest elements to return
58
- key: Callable[[T], Any] or None - Key function for comparison
59
60
Returns:
61
List[T] - List of n smallest elements in ascending order
62
63
Note:
64
Equivalent to sorted(iterable, key=key)[:n] but more efficient
65
for small values of n relative to iterable length.
66
"""
67
```
68
69
## Usage Examples
70
71
### Merging Sorted Data Streams
72
```python
73
from asyncstdlib import merge, list as alist
74
75
async def merge_example():
76
# Simulate sorted data streams
77
async def stream1():
78
for x in [1, 4, 7, 10]:
79
yield x
80
81
async def stream2():
82
for x in [2, 5, 8]:
83
yield x
84
85
async def stream3():
86
for x in [3, 6, 9, 11, 12]:
87
yield x
88
89
# Merge all streams in sorted order
90
merged = merge(stream1(), stream2(), stream3())
91
result = await alist(merged)
92
print(result) # [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
93
94
async def merge_with_key():
95
# Merge data with custom key function
96
async def scores1():
97
data = [("Alice", 95), ("Charlie", 85)]
98
for item in data:
99
yield item
100
101
async def scores2():
102
data = [("Bob", 90), ("David", 80)]
103
for item in data:
104
yield item
105
106
# Merge by score (descending)
107
merged = merge(scores1(), scores2(), key=lambda x: x[1], reverse=True)
108
result = await alist(merged)
109
print(result) # [('Alice', 95), ('Bob', 90), ('Charlie', 85), ('David', 80)]
110
```
111
112
### Finding Top/Bottom Elements
113
```python
114
from asyncstdlib import nlargest, nsmallest
115
116
async def top_bottom_example():
117
async def random_scores():
118
import random
119
for _ in range(100):
120
yield random.randint(1, 1000)
121
122
# Find top 5 scores efficiently
123
top5 = await nlargest(random_scores(), 5)
124
print(f"Top 5 scores: {top5}")
125
126
# Find bottom 3 scores efficiently
127
bottom3 = await nsmallest(random_scores(), 3)
128
print(f"Bottom 3 scores: {bottom3}")
129
130
async def key_based_selection():
131
async def students():
132
data = [
133
{"name": "Alice", "gpa": 3.8, "age": 20},
134
{"name": "Bob", "gpa": 3.2, "age": 22},
135
{"name": "Charlie", "gpa": 3.9, "age": 19},
136
{"name": "David", "gpa": 3.1, "age": 21},
137
{"name": "Eve", "gpa": 4.0, "age": 20}
138
]
139
for student in data:
140
yield student
141
142
# Find students with highest GPAs
143
top_students = await nlargest(students(), 3, key=lambda s: s["gpa"])
144
for student in top_students:
145
print(f"{student['name']}: {student['gpa']}")
146
# Output:
147
# Eve: 4.0
148
# Charlie: 3.9
149
# Alice: 3.8
150
151
# Find youngest students
152
youngest = await nsmallest(students(), 2, key=lambda s: s["age"])
153
for student in youngest:
154
print(f"{student['name']}: {student['age']} years old")
155
# Output:
156
# Charlie: 19 years old
157
# Alice: 20 years old (or Eve: 20 years old - stable sort)
158
```
159
160
### Log Processing Example
161
```python
162
import asyncio
163
from asyncstdlib import merge, nlargest
164
from datetime import datetime
165
166
async def log_processing():
167
# Simulate log streams from different sources
168
async def web_server_logs():
169
logs = [
170
{"timestamp": "2024-01-01 10:00:00", "level": "INFO", "source": "web"},
171
{"timestamp": "2024-01-01 10:05:00", "level": "ERROR", "source": "web"},
172
{"timestamp": "2024-01-01 10:10:00", "level": "INFO", "source": "web"}
173
]
174
for log in logs:
175
yield log
176
177
async def database_logs():
178
logs = [
179
{"timestamp": "2024-01-01 10:02:00", "level": "INFO", "source": "db"},
180
{"timestamp": "2024-01-01 10:07:00", "level": "WARN", "source": "db"},
181
{"timestamp": "2024-01-01 10:12:00", "level": "ERROR", "source": "db"}
182
]
183
for log in logs:
184
yield log
185
186
# Merge logs by timestamp
187
def parse_timestamp(log):
188
return datetime.fromisoformat(log["timestamp"])
189
190
merged_logs = merge(
191
web_server_logs(),
192
database_logs(),
193
key=parse_timestamp
194
)
195
196
# Process merged logs in chronological order
197
async for log in merged_logs:
198
print(f"{log['timestamp']} [{log['source']}] {log['level']}")
199
200
# Find most recent error logs
201
all_logs = merge(web_server_logs(), database_logs(), key=parse_timestamp)
202
error_logs = (log async for log in all_logs if log["level"] == "ERROR")
203
recent_errors = await nlargest(error_logs, 5, key=lambda log: log["timestamp"])
204
205
print("\nMost recent errors:")
206
for error in recent_errors:
207
print(f"{error['timestamp']} [{error['source']}] {error['level']}")
208
```
209
210
### Performance Considerations
211
```python
212
async def performance_example():
213
async def large_dataset():
214
# Simulate large dataset
215
for i in range(10000):
216
yield i
217
218
# nlargest/nsmallest are more efficient than sorting + slicing
219
# for small n relative to data size
220
221
# Efficient: O(n log k) where k=10, n=10000
222
top10 = await nlargest(large_dataset(), 10)
223
224
# Less efficient: O(n log n) where n=10000
225
# all_sorted = await sorted(list(large_dataset()), reverse=True)
226
# top10_slow = all_sorted[:10]
227
228
print(f"Top 10: {top10}")
229
```