0
# Parallel Processing
1
2
Arrow FX Coroutines provides high-level parallel processing capabilities that enable concurrent execution while maintaining structured concurrency principles. All parallel operations respect cancellation, provide proper exception handling, and can be configured for optimal performance.
3
4
## Parallel Mapping
5
6
### Basic Parallel Map
7
8
```kotlin { .api }
9
suspend fun <A, B> Iterable<A>.parMap(transform: suspend CoroutineScope.(A) -> B): List<B>
10
suspend fun <A, B> Iterable<A>.parMap(context: CoroutineContext = EmptyCoroutineContext, transform: suspend CoroutineScope.(A) -> B): List<B>
11
```
12
13
Execute a transformation function on each element of a collection in parallel.
14
15
```kotlin
16
val urls = listOf("url1", "url2", "url3")
17
val responses = urls.parMap { url ->
18
httpClient.get(url)
19
}
20
```
21
22
### Parallel Map with Concurrency Control
23
24
```kotlin { .api }
25
suspend fun <A, B> Iterable<A>.parMap(concurrency: Int, f: suspend CoroutineScope.(A) -> B): List<B>
26
suspend fun <A, B> Iterable<A>.parMap(context: CoroutineContext = EmptyCoroutineContext, concurrency: Int, f: suspend CoroutineScope.(A) -> B): List<B>
27
```
28
29
Control the maximum number of concurrent operations to prevent resource exhaustion.
30
31
```kotlin
32
val largeDataset = (1..1000).toList()
33
val processed = largeDataset.parMap(concurrency = 10) { item ->
34
expensiveOperation(item)
35
}
36
```
37
38
### Parallel Map with Null Filtering
39
40
```kotlin { .api }
41
suspend fun <A, B> Iterable<A>.parMapNotNull(transform: suspend CoroutineScope.(A) -> B?): List<B>
42
suspend fun <A, B> Iterable<A>.parMapNotNull(context: CoroutineContext = EmptyCoroutineContext, transform: suspend CoroutineScope.(A) -> B?): List<B>
43
suspend fun <A, B> Iterable<A>.parMapNotNull(concurrency: Int, transform: suspend CoroutineScope.(A) -> B?): List<B>
44
suspend fun <A, B> Iterable<A>.parMapNotNull(context: CoroutineContext = EmptyCoroutineContext, concurrency: Int, transform: suspend CoroutineScope.(A) -> B?): List<B>
45
```
46
47
Map in parallel and filter out null results.
48
49
```kotlin
50
val userIds = listOf(1, 2, 3, 4, 5)
51
val validUsers = userIds.parMapNotNull { id ->
52
userService.findById(id) // Returns null for non-existent users
53
}
54
```
55
56
## Parallel Zipping
57
58
### Two-Way Parallel Zip
59
60
```kotlin { .api }
61
suspend fun <A, B, C> parZip(fa: suspend CoroutineScope.() -> A, fb: suspend CoroutineScope.() -> B, f: suspend CoroutineScope.(A, B) -> C): C
62
suspend fun <A, B, C> parZip(context: CoroutineContext, fa: suspend CoroutineScope.() -> A, fb: suspend CoroutineScope.() -> B, f: suspend CoroutineScope.(A, B) -> C): C
63
```
64
65
Execute two operations in parallel and combine their results.
66
67
```kotlin
68
val result = parZip(
69
{ fetchUserProfile(userId) },
70
{ fetchUserPreferences(userId) }
71
) { profile, preferences ->
72
UserData(profile, preferences)
73
}
74
```
75
76
### Multi-Way Parallel Zip (3-9 arity)
77
78
```kotlin { .api }
79
suspend fun <A, B, C, D> parZip(
80
fa: suspend CoroutineScope.() -> A,
81
fb: suspend CoroutineScope.() -> B,
82
fc: suspend CoroutineScope.() -> C,
83
f: suspend CoroutineScope.(A, B, C) -> D
84
): D
85
86
// Similar signatures available for 4, 5, 6, 7, 8, and 9 parameters
87
```
88
89
Execute multiple operations in parallel and combine all results.
90
91
```kotlin
92
val dashboard = parZip(
93
{ fetchUserStats() },
94
{ fetchRecentActivity() },
95
{ fetchNotifications() },
96
{ fetchSystemStatus() }
97
) { stats, activity, notifications, status ->
98
DashboardData(stats, activity, notifications, status)
99
}
100
```
101
102
## Error-Accumulating Parallel Operations
103
104
### Parallel Map with Error Accumulation
105
106
```kotlin { .api }
107
suspend fun <A, B> Iterable<A>.parMapOrAccumulate(f: suspend CoroutineScope.(A) -> B): Either<NonEmptyList<Error>, List<B>>
108
suspend fun <A, B> Iterable<A>.parMapOrAccumulate(context: CoroutineContext, f: suspend CoroutineScope.(A) -> B): Either<NonEmptyList<Error>, List<B>>
109
suspend fun <A, B> Iterable<A>.parMapOrAccumulate(concurrency: Int, f: suspend CoroutineScope.(A) -> B): Either<NonEmptyList<Error>, List<B>>
110
suspend fun <A, B> Iterable<A>.parMapOrAccumulate(context: CoroutineContext, concurrency: Int, f: suspend CoroutineScope.(A) -> B): Either<NonEmptyList<Error>, List<B>>
111
```
112
113
Map in parallel and accumulate all errors instead of failing fast.
114
115
```kotlin
116
val validationResults = users.parMapOrAccumulate { user ->
117
validateUser(user) // Can raise validation errors
118
}
119
120
when (validationResults) {
121
is Either.Left -> println("Validation errors: ${validationResults.value}")
122
is Either.Right -> println("All users valid: ${validationResults.value}")
123
}
124
```
125
126
### Parallel Zip with Error Accumulation
127
128
```kotlin { .api }
129
suspend fun <A, B, C> Raise<E>.parZipOrAccumulate(
130
fa: suspend CoroutineScope.() -> A,
131
fb: suspend CoroutineScope.() -> B,
132
f: suspend CoroutineScope.(A, B) -> C
133
): C
134
135
suspend fun <A, B, C> Raise<NonEmptyList<E>>.parZipOrAccumulate(
136
fa: suspend CoroutineScope.() -> A,
137
fb: suspend CoroutineScope.() -> B,
138
f: suspend CoroutineScope.(A, B) -> C
139
): C
140
```
141
142
Execute operations in parallel within a Raise context, accumulating errors.
143
144
```kotlin
145
either<ValidationError, UserProfile> {
146
parZipOrAccumulate(
147
{ validateName(userData.name) },
148
{ validateEmail(userData.email) },
149
{ validateAge(userData.age) }
150
) { name, email, age ->
151
UserProfile(name, email, age)
152
}
153
}
154
```
155
156
## Advanced Parallel Processing
157
158
### ScopedRaiseAccumulate Type
159
160
```kotlin { .api }
161
interface ScopedRaiseAccumulate<Error> : CoroutineScope, RaiseAccumulate<Error>
162
```
163
164
Intersection type that combines `CoroutineScope` and `RaiseAccumulate` for advanced error handling scenarios.
165
166
### Custom Context Execution
167
168
```kotlin
169
val customDispatcher = Dispatchers.IO.limitedParallelism(5)
170
171
val results = items.parMap(context = customDispatcher) { item ->
172
performIOOperation(item)
173
}
174
```
175
176
### Mixing Parallel and Sequential Operations
177
178
```kotlin
179
val processedData = inputData
180
.chunked(100) // Process in batches
181
.parMap(concurrency = 3) { batch ->
182
// Each batch processed in parallel
183
batch.map { item ->
184
// Items within batch processed sequentially
185
processItem(item)
186
}
187
}
188
.flatten()
189
```
190
191
## Performance Considerations
192
193
### Concurrency Control
194
195
Always consider using concurrency limits for operations that consume significant resources:
196
197
```kotlin
198
// Good: Limited concurrency prevents resource exhaustion
199
val results = urls.parMap(concurrency = 10) { url ->
200
httpClient.get(url)
201
}
202
203
// Potentially problematic: Unlimited concurrency
204
val results = urls.parMap { url ->
205
httpClient.get(url)
206
}
207
```
208
209
### Context Switching
210
211
Use appropriate coroutine contexts for different types of work:
212
213
```kotlin
214
// CPU-intensive work
215
val cpuResults = data.parMap(context = Dispatchers.CPU) { item ->
216
performCpuIntensiveWork(item)
217
}
218
219
// IO work
220
val ioResults = urls.parMap(context = Dispatchers.IO) { url ->
221
fetchFromNetwork(url)
222
}
223
```
224
225
### Memory Considerations
226
227
For large datasets, consider processing in chunks:
228
229
```kotlin
230
val hugeLists = largeDataset
231
.chunked(1000)
232
.parMap(concurrency = 4) { chunk ->
233
chunk.parMap { item ->
234
processItem(item)
235
}
236
}
237
.flatten()
238
```
239
240
## Integration Examples
241
242
### With Resource Management
243
244
```kotlin
245
resourceScope {
246
val httpClient = httpClientResource.bind()
247
val database = databaseResource.bind()
248
249
val enrichedData = rawData.parMap(concurrency = 5) { item ->
250
val apiData = httpClient.fetch(item.url)
251
val dbData = database.query(item.id)
252
enrichData(item, apiData, dbData)
253
}
254
255
database.saveBatch(enrichedData)
256
}
257
```
258
259
### With Error Handling
260
261
```kotlin
262
val processedResults = either<ProcessingError, List<ProcessedItem>> {
263
rawItems.parMapOrAccumulate { item ->
264
when {
265
item.isValid() -> processValidItem(item)
266
else -> raise(ValidationError("Invalid item: ${item.id}"))
267
}
268
}.bind()
269
}
270
```