Arrow Fx Coroutines provides functional effect types and utilities for managing side effects in Kotlin coroutines
—
Arrow FX Coroutines provides comprehensive resource management capabilities that ensure safe acquisition and cleanup of resources. All resource operations integrate with structured concurrency and guarantee cleanup even in the presence of exceptions or cancellation.
typealias Resource<A> = suspend ResourceScope.() -> AA Resource<A> represents a resource that can be safely acquired and will be automatically cleaned up.
interface ResourceScope : AutoCloseScope {
suspend fun <A> Resource<A>.bind(): A
suspend fun <A> install(acquire: suspend AcquireStep.() -> A, release: suspend (A, ExitCase) -> Unit): A
suspend infix fun <A> Resource<A>.release(release: suspend (A) -> Unit): A
suspend infix fun <A> Resource<A>.releaseCase(release: suspend (A, ExitCase) -> Unit): A
fun onClose(release: (Throwable?) -> Unit): Unit
infix fun onRelease(release: suspend (ExitCase) -> Unit)
}
interface AcquireStepfun <A> resource(block: suspend ResourceScope.() -> A): Resource<A>
fun <A> resource(acquire: suspend () -> A, release: suspend (A, ExitCase) -> Unit): Resource<A>Create a resource with custom acquisition and release logic.
val fileResource = resource(
acquire = { File("data.txt").also { it.createNewFile() } },
release = { file -> file.delete() }
)fun <A> resource(acquire: suspend () -> A, release: suspend (A, ExitCase) -> Unit): Resource<A>Create a resource that receives information about how the scope exited.
val connectionResource = resource(
acquire = { openDatabaseConnection() },
release = { connection, exitCase ->
when (exitCase) {
is ExitCase.Completed -> connection.commit()
is ExitCase.Cancelled -> connection.rollback()
is ExitCase.Failure -> connection.rollback()
}
connection.close()
}
)fun <A : AutoCloseable> autoCloseable(closingDispatcher: CoroutineDispatcher = IODispatcher, autoCloseable: suspend () -> A): Resource<A>
suspend fun <A : AutoCloseable> ResourceScope.autoCloseable(closingDispatcher: CoroutineDispatcher = IODispatcher, autoCloseable: suspend () -> A): ACreate resources from objects that implement AutoCloseable.
val inputStreamResource = autoCloseable { FileInputStream("input.txt") }
val suspendingResource = autoCloseable { openAsyncConnection() }suspend fun <A> resourceScope(block: suspend ResourceScope.() -> A): AExecute code within a resource scope where resources can be safely acquired and will be automatically cleaned up.
val result = resourceScope {
val file = fileResource.bind()
val connection = connectionResource.bind()
// Use resources safely
processData(file, connection)
// Resources automatically cleaned up here
}suspend fun <A, B> Resource<A>.use(f: suspend (A) -> B): BThe use pattern for single resources.
val result = fileResource.use { file ->
file.readText()
}@DelicateCoroutinesApi
suspend fun <A> Resource<A>.allocate(): Pair<A, suspend (ExitCase) -> Unit>Manually allocate a resource, returning the resource and its release function.
val (connection, release) = connectionResource.allocate()
try {
// Use connection
connection.query("SELECT * FROM users")
} finally {
release()
}suspend fun <A> Resource<A>.bind(): ABind a resource to the current scope, ensuring it will be cleaned up when the scope exits.
resourceScope {
val file = fileResource.bind()
val stream = streamResource.bind()
// Both resources cleaned up automatically
processFileWithStream(file, stream)
}suspend fun <A> install(acquire: suspend () -> A, release: suspend (A) -> Unit): AInstall a resource directly in the current scope.
resourceScope {
val tempFile = install(
acquire = { createTempFile() },
release = { it.delete() }
)
processFile(tempFile)
}infix fun <A> Resource<A>.release(release: suspend (A) -> Unit): Resource<A>
infix fun <A> Resource<A>.releaseCase(release: suspend (A, ExitCase) -> Unit): Resource<A>Add additional release actions to existing resources.
val enhancedResource = fileResource
.release { file -> file.setReadOnly() }
.releaseCase { file, exitCase ->
if (exitCase is ExitCase.Failure) {
logError("Resource failed", exitCase.failure)
}
}infix fun onRelease(release: suspend () -> Unit)Register a release handler that will run when the scope exits.
resourceScope {
onRelease { println("Cleaning up scope") }
val file = fileResource.bind()
processFile(file)
// Scope cleanup message printed after file cleanup
}fun <A> Resource<A>.asFlow(): Flow<A>Convert a resource to a Flow that manages the resource lifecycle.
val dataFlow = fileResource.asFlow().map { file ->
file.readLines()
}.flatten()sealed class ExitCase {
object Completed : ExitCase()
data class Cancelled(val exception: CancellationException) : ExitCase()
data class Failure(val failure: Throwable) : ExitCase()
companion object {
fun ExitCase(error: Throwable): ExitCase
}
}Exit cases provide information about how a resource scope or operation terminated:
ExitCase.Completed: The operation completed successfullyExitCase.Cancelled: The operation was cancelledExitCase.Failure: The operation failed with an exceptionval appResource = resource(
acquire = {
resourceScope {
val config = configResource.bind()
val database = databaseResource(config).bind()
val cache = cacheResource(database).bind()
Application(database, cache)
}
},
release = { app -> app.shutdown() }
)val parallelResources = resource(
acquire = {
parZip(
{ serviceAResource.use { it } },
{ serviceBResource.use { it } },
{ serviceCResource.use { it } }
) { serviceA, serviceB, serviceC ->
CombinedServices(serviceA, serviceB, serviceC)
}
},
release = { services -> services.shutdown() }
)class ConnectionPool(private val maxConnections: Int) {
fun getConnection(): Resource<Connection> = resource(
acquire = { acquireFromPool() },
release = { connection -> returnToPool(connection) }
)
private suspend fun acquireFromPool(): Connection = TODO()
private suspend fun returnToPool(connection: Connection) = TODO()
}The bracket pattern provides fundamental resource safety guarantees without the overhead of the full Resource system. These functions ensure cleanup even when exceptions or cancellation occur.
suspend fun <A, B> bracket(
crossinline acquire: suspend () -> A,
use: suspend (A) -> B,
crossinline release: suspend (A) -> Unit
): BAcquire a resource, use it, and guarantee cleanup regardless of how the operation exits.
val result = bracket(
acquire = { openFile("data.txt") },
use = { file -> file.readText() },
release = { file -> file.close() }
)suspend fun <A, B> bracketCase(
crossinline acquire: suspend () -> A,
use: suspend (A) -> B,
crossinline release: suspend (A, ExitCase) -> Unit
): BLike bracket, but the release function receives information about how the operation exited.
val result = bracketCase(
acquire = { openDatabaseTransaction() },
use = { tx -> tx.performOperations() },
release = { tx, exitCase ->
when (exitCase) {
is ExitCase.Completed -> tx.commit()
is ExitCase.Cancelled, is ExitCase.Failure -> tx.rollback()
}
}
)suspend fun <A> guarantee(
fa: suspend () -> A,
crossinline finalizer: suspend () -> Unit
): AExecute an operation and guarantee a finalizer runs afterwards, regardless of how the operation exits.
val result = guarantee(
fa = { performOperation() },
finalizer = { cleanup() }
)suspend fun <A> guaranteeCase(
fa: suspend () -> A,
crossinline finalizer: suspend (ExitCase) -> Unit
): ALike guarantee, but the finalizer receives information about how the operation exited.
val result = guaranteeCase(
fa = { riskyOperation() },
finalizer = { exitCase ->
when (exitCase) {
is ExitCase.Completed -> logSuccess()
is ExitCase.Cancelled -> logCancellation()
is ExitCase.Failure -> logError(exitCase.failure)
}
}
)suspend fun <A> onCancel(
fa: suspend () -> A,
crossinline onCancel: suspend () -> Unit
): AExecute an operation and run a specific handler only if the operation is cancelled.
val result = onCancel(
fa = { longRunningOperation() },
onCancel = { notifyCancellation() }
)Install with Tessl CLI
npx tessl i tessl/maven-io-arrow-kt--arrow-fx-coroutines-jvm