Arrow Fx Coroutines provides functional effect types and utilities for managing side effects in Kotlin coroutines
—
Arrow FX Coroutines provides high-level parallel processing capabilities that enable concurrent execution while maintaining structured concurrency principles. All parallel operations respect cancellation, provide proper exception handling, and can be configured for optimal performance.
suspend fun <A, B> Iterable<A>.parMap(transform: suspend CoroutineScope.(A) -> B): List<B>
suspend fun <A, B> Iterable<A>.parMap(context: CoroutineContext = EmptyCoroutineContext, transform: suspend CoroutineScope.(A) -> B): List<B>Execute a transformation function on each element of a collection in parallel.
val urls = listOf("url1", "url2", "url3")
val responses = urls.parMap { url ->
httpClient.get(url)
}suspend fun <A, B> Iterable<A>.parMap(concurrency: Int, f: suspend CoroutineScope.(A) -> B): List<B>
suspend fun <A, B> Iterable<A>.parMap(context: CoroutineContext = EmptyCoroutineContext, concurrency: Int, f: suspend CoroutineScope.(A) -> B): List<B>Control the maximum number of concurrent operations to prevent resource exhaustion.
val largeDataset = (1..1000).toList()
val processed = largeDataset.parMap(concurrency = 10) { item ->
expensiveOperation(item)
}suspend fun <A, B> Iterable<A>.parMapNotNull(transform: suspend CoroutineScope.(A) -> B?): List<B>
suspend fun <A, B> Iterable<A>.parMapNotNull(context: CoroutineContext = EmptyCoroutineContext, transform: suspend CoroutineScope.(A) -> B?): List<B>
suspend fun <A, B> Iterable<A>.parMapNotNull(concurrency: Int, transform: suspend CoroutineScope.(A) -> B?): List<B>
suspend fun <A, B> Iterable<A>.parMapNotNull(context: CoroutineContext = EmptyCoroutineContext, concurrency: Int, transform: suspend CoroutineScope.(A) -> B?): List<B>Map in parallel and filter out null results.
val userIds = listOf(1, 2, 3, 4, 5)
val validUsers = userIds.parMapNotNull { id ->
userService.findById(id) // Returns null for non-existent users
}suspend fun <A, B, C> parZip(fa: suspend CoroutineScope.() -> A, fb: suspend CoroutineScope.() -> B, f: suspend CoroutineScope.(A, B) -> C): C
suspend fun <A, B, C> parZip(context: CoroutineContext, fa: suspend CoroutineScope.() -> A, fb: suspend CoroutineScope.() -> B, f: suspend CoroutineScope.(A, B) -> C): CExecute two operations in parallel and combine their results.
val result = parZip(
{ fetchUserProfile(userId) },
{ fetchUserPreferences(userId) }
) { profile, preferences ->
UserData(profile, preferences)
}suspend fun <A, B, C, D> parZip(
fa: suspend CoroutineScope.() -> A,
fb: suspend CoroutineScope.() -> B,
fc: suspend CoroutineScope.() -> C,
f: suspend CoroutineScope.(A, B, C) -> D
): D
// Similar signatures available for 4, 5, 6, 7, 8, and 9 parametersExecute multiple operations in parallel and combine all results.
val dashboard = parZip(
{ fetchUserStats() },
{ fetchRecentActivity() },
{ fetchNotifications() },
{ fetchSystemStatus() }
) { stats, activity, notifications, status ->
DashboardData(stats, activity, notifications, status)
}suspend fun <A, B> Iterable<A>.parMapOrAccumulate(f: suspend CoroutineScope.(A) -> B): Either<NonEmptyList<Error>, List<B>>
suspend fun <A, B> Iterable<A>.parMapOrAccumulate(context: CoroutineContext, f: suspend CoroutineScope.(A) -> B): Either<NonEmptyList<Error>, List<B>>
suspend fun <A, B> Iterable<A>.parMapOrAccumulate(concurrency: Int, f: suspend CoroutineScope.(A) -> B): Either<NonEmptyList<Error>, List<B>>
suspend fun <A, B> Iterable<A>.parMapOrAccumulate(context: CoroutineContext, concurrency: Int, f: suspend CoroutineScope.(A) -> B): Either<NonEmptyList<Error>, List<B>>Map in parallel and accumulate all errors instead of failing fast.
val validationResults = users.parMapOrAccumulate { user ->
validateUser(user) // Can raise validation errors
}
when (validationResults) {
is Either.Left -> println("Validation errors: ${validationResults.value}")
is Either.Right -> println("All users valid: ${validationResults.value}")
}suspend fun <A, B, C> Raise<E>.parZipOrAccumulate(
fa: suspend CoroutineScope.() -> A,
fb: suspend CoroutineScope.() -> B,
f: suspend CoroutineScope.(A, B) -> C
): C
suspend fun <A, B, C> Raise<NonEmptyList<E>>.parZipOrAccumulate(
fa: suspend CoroutineScope.() -> A,
fb: suspend CoroutineScope.() -> B,
f: suspend CoroutineScope.(A, B) -> C
): CExecute operations in parallel within a Raise context, accumulating errors.
either<ValidationError, UserProfile> {
parZipOrAccumulate(
{ validateName(userData.name) },
{ validateEmail(userData.email) },
{ validateAge(userData.age) }
) { name, email, age ->
UserProfile(name, email, age)
}
}interface ScopedRaiseAccumulate<Error> : CoroutineScope, RaiseAccumulate<Error>Intersection type that combines CoroutineScope and RaiseAccumulate for advanced error handling scenarios.
val customDispatcher = Dispatchers.IO.limitedParallelism(5)
val results = items.parMap(context = customDispatcher) { item ->
performIOOperation(item)
}val processedData = inputData
.chunked(100) // Process in batches
.parMap(concurrency = 3) { batch ->
// Each batch processed in parallel
batch.map { item ->
// Items within batch processed sequentially
processItem(item)
}
}
.flatten()Always consider using concurrency limits for operations that consume significant resources:
// Good: Limited concurrency prevents resource exhaustion
val results = urls.parMap(concurrency = 10) { url ->
httpClient.get(url)
}
// Potentially problematic: Unlimited concurrency
val results = urls.parMap { url ->
httpClient.get(url)
}Use appropriate coroutine contexts for different types of work:
// CPU-intensive work
val cpuResults = data.parMap(context = Dispatchers.CPU) { item ->
performCpuIntensiveWork(item)
}
// IO work
val ioResults = urls.parMap(context = Dispatchers.IO) { url ->
fetchFromNetwork(url)
}For large datasets, consider processing in chunks:
val hugeLists = largeDataset
.chunked(1000)
.parMap(concurrency = 4) { chunk ->
chunk.parMap { item ->
processItem(item)
}
}
.flatten()resourceScope {
val httpClient = httpClientResource.bind()
val database = databaseResource.bind()
val enrichedData = rawData.parMap(concurrency = 5) { item ->
val apiData = httpClient.fetch(item.url)
val dbData = database.query(item.id)
enrichData(item, apiData, dbData)
}
database.saveBatch(enrichedData)
}val processedResults = either<ProcessingError, List<ProcessedItem>> {
rawItems.parMapOrAccumulate { item ->
when {
item.isValid() -> processValidItem(item)
else -> raise(ValidationError("Invalid item: ${item.id}"))
}
}.bind()
}Install with Tessl CLI
npx tessl i tessl/maven-io-arrow-kt--arrow-fx-coroutines-jvm