CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-io-arrow-kt--arrow-fx-coroutines-jvm

Arrow Fx Coroutines provides functional effect types and utilities for managing side effects in Kotlin coroutines

Pending
Overview
Eval results
Files

parallel-processing.mddocs/

Parallel Processing

Arrow FX Coroutines provides high-level parallel processing capabilities that enable concurrent execution while maintaining structured concurrency principles. All parallel operations respect cancellation, provide proper exception handling, and can be configured for optimal performance.

Parallel Mapping

Basic Parallel Map

suspend fun <A, B> Iterable<A>.parMap(transform: suspend CoroutineScope.(A) -> B): List<B>
suspend fun <A, B> Iterable<A>.parMap(context: CoroutineContext = EmptyCoroutineContext, transform: suspend CoroutineScope.(A) -> B): List<B>

Execute a transformation function on each element of a collection in parallel.

val urls = listOf("url1", "url2", "url3")
val responses = urls.parMap { url ->
    httpClient.get(url)
}

Parallel Map with Concurrency Control

suspend fun <A, B> Iterable<A>.parMap(concurrency: Int, f: suspend CoroutineScope.(A) -> B): List<B>
suspend fun <A, B> Iterable<A>.parMap(context: CoroutineContext = EmptyCoroutineContext, concurrency: Int, f: suspend CoroutineScope.(A) -> B): List<B>

Control the maximum number of concurrent operations to prevent resource exhaustion.

val largeDataset = (1..1000).toList()
val processed = largeDataset.parMap(concurrency = 10) { item ->
    expensiveOperation(item)
}

Parallel Map with Null Filtering

suspend fun <A, B> Iterable<A>.parMapNotNull(transform: suspend CoroutineScope.(A) -> B?): List<B>
suspend fun <A, B> Iterable<A>.parMapNotNull(context: CoroutineContext = EmptyCoroutineContext, transform: suspend CoroutineScope.(A) -> B?): List<B>
suspend fun <A, B> Iterable<A>.parMapNotNull(concurrency: Int, transform: suspend CoroutineScope.(A) -> B?): List<B>
suspend fun <A, B> Iterable<A>.parMapNotNull(context: CoroutineContext = EmptyCoroutineContext, concurrency: Int, transform: suspend CoroutineScope.(A) -> B?): List<B>

Map in parallel and filter out null results.

val userIds = listOf(1, 2, 3, 4, 5)
val validUsers = userIds.parMapNotNull { id ->
    userService.findById(id) // Returns null for non-existent users
}

Parallel Zipping

Two-Way Parallel Zip

suspend fun <A, B, C> parZip(fa: suspend CoroutineScope.() -> A, fb: suspend CoroutineScope.() -> B, f: suspend CoroutineScope.(A, B) -> C): C
suspend fun <A, B, C> parZip(context: CoroutineContext, fa: suspend CoroutineScope.() -> A, fb: suspend CoroutineScope.() -> B, f: suspend CoroutineScope.(A, B) -> C): C

Execute two operations in parallel and combine their results.

val result = parZip(
    { fetchUserProfile(userId) },
    { fetchUserPreferences(userId) }
) { profile, preferences ->
    UserData(profile, preferences)
}

Multi-Way Parallel Zip (3-9 arity)

suspend fun <A, B, C, D> parZip(
    fa: suspend CoroutineScope.() -> A,
    fb: suspend CoroutineScope.() -> B,
    fc: suspend CoroutineScope.() -> C,
    f: suspend CoroutineScope.(A, B, C) -> D
): D

// Similar signatures available for 4, 5, 6, 7, 8, and 9 parameters

Execute multiple operations in parallel and combine all results.

val dashboard = parZip(
    { fetchUserStats() },
    { fetchRecentActivity() },
    { fetchNotifications() },
    { fetchSystemStatus() }
) { stats, activity, notifications, status ->
    DashboardData(stats, activity, notifications, status)
}

Error-Accumulating Parallel Operations

Parallel Map with Error Accumulation

suspend fun <A, B> Iterable<A>.parMapOrAccumulate(f: suspend CoroutineScope.(A) -> B): Either<NonEmptyList<Error>, List<B>>
suspend fun <A, B> Iterable<A>.parMapOrAccumulate(context: CoroutineContext, f: suspend CoroutineScope.(A) -> B): Either<NonEmptyList<Error>, List<B>>
suspend fun <A, B> Iterable<A>.parMapOrAccumulate(concurrency: Int, f: suspend CoroutineScope.(A) -> B): Either<NonEmptyList<Error>, List<B>>
suspend fun <A, B> Iterable<A>.parMapOrAccumulate(context: CoroutineContext, concurrency: Int, f: suspend CoroutineScope.(A) -> B): Either<NonEmptyList<Error>, List<B>>

Map in parallel and accumulate all errors instead of failing fast.

val validationResults = users.parMapOrAccumulate { user ->
    validateUser(user) // Can raise validation errors
}

when (validationResults) {
    is Either.Left -> println("Validation errors: ${validationResults.value}")
    is Either.Right -> println("All users valid: ${validationResults.value}")
}

Parallel Zip with Error Accumulation

suspend fun <A, B, C> Raise<E>.parZipOrAccumulate(
    fa: suspend CoroutineScope.() -> A,
    fb: suspend CoroutineScope.() -> B,
    f: suspend CoroutineScope.(A, B) -> C
): C

suspend fun <A, B, C> Raise<NonEmptyList<E>>.parZipOrAccumulate(
    fa: suspend CoroutineScope.() -> A,
    fb: suspend CoroutineScope.() -> B,
    f: suspend CoroutineScope.(A, B) -> C
): C

Execute operations in parallel within a Raise context, accumulating errors.

either<ValidationError, UserProfile> {
    parZipOrAccumulate(
        { validateName(userData.name) },
        { validateEmail(userData.email) },
        { validateAge(userData.age) }
    ) { name, email, age ->
        UserProfile(name, email, age)
    }
}

Advanced Parallel Processing

ScopedRaiseAccumulate Type

interface ScopedRaiseAccumulate<Error> : CoroutineScope, RaiseAccumulate<Error>

Intersection type that combines CoroutineScope and RaiseAccumulate for advanced error handling scenarios.

Custom Context Execution

val customDispatcher = Dispatchers.IO.limitedParallelism(5)

val results = items.parMap(context = customDispatcher) { item ->
    performIOOperation(item)
}

Mixing Parallel and Sequential Operations

val processedData = inputData
    .chunked(100) // Process in batches
    .parMap(concurrency = 3) { batch ->
        // Each batch processed in parallel
        batch.map { item ->
            // Items within batch processed sequentially
            processItem(item)
        }
    }
    .flatten()

Performance Considerations

Concurrency Control

Always consider using concurrency limits for operations that consume significant resources:

// Good: Limited concurrency prevents resource exhaustion
val results = urls.parMap(concurrency = 10) { url ->
    httpClient.get(url)
}

// Potentially problematic: Unlimited concurrency
val results = urls.parMap { url ->
    httpClient.get(url)
}

Context Switching

Use appropriate coroutine contexts for different types of work:

// CPU-intensive work
val cpuResults = data.parMap(context = Dispatchers.CPU) { item ->
    performCpuIntensiveWork(item)
}

// IO work
val ioResults = urls.parMap(context = Dispatchers.IO) { url ->
    fetchFromNetwork(url)
}

Memory Considerations

For large datasets, consider processing in chunks:

val hugeLists = largeDataset
    .chunked(1000)
    .parMap(concurrency = 4) { chunk ->
        chunk.parMap { item ->
            processItem(item)
        }
    }
    .flatten()

Integration Examples

With Resource Management

resourceScope {
    val httpClient = httpClientResource.bind()
    val database = databaseResource.bind()
    
    val enrichedData = rawData.parMap(concurrency = 5) { item ->
        val apiData = httpClient.fetch(item.url)
        val dbData = database.query(item.id)
        enrichData(item, apiData, dbData)
    }
    
    database.saveBatch(enrichedData)
}

With Error Handling

val processedResults = either<ProcessingError, List<ProcessedItem>> {
    rawItems.parMapOrAccumulate { item ->
        when {
            item.isValid() -> processValidItem(item)
            else -> raise(ValidationError("Invalid item: ${item.id}"))
        }
    }.bind()
}

Install with Tessl CLI

npx tessl i tessl/maven-io-arrow-kt--arrow-fx-coroutines-jvm

docs

error-handling.md

index.md

parallel-processing.md

racing.md

resource-management.md

synchronization-flow.md

tile.json