CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-io-arrow-kt--arrow-fx-coroutines-jvm

Arrow Fx Coroutines provides functional effect types and utilities for managing side effects in Kotlin coroutines

Pending
Overview
Eval results
Files

resource-management.mddocs/

Resource Management

Arrow FX Coroutines provides comprehensive resource management capabilities that ensure safe acquisition and cleanup of resources. All resource operations integrate with structured concurrency and guarantee cleanup even in the presence of exceptions or cancellation.

Core Resource Types

Resource Type

typealias Resource<A> = suspend ResourceScope.() -> A

A Resource<A> represents a resource that can be safely acquired and will be automatically cleaned up.

ResourceScope Interface

interface ResourceScope : AutoCloseScope {
    suspend fun <A> Resource<A>.bind(): A
    suspend fun <A> install(acquire: suspend AcquireStep.() -> A, release: suspend (A, ExitCase) -> Unit): A
    suspend infix fun <A> Resource<A>.release(release: suspend (A) -> Unit): A
    suspend infix fun <A> Resource<A>.releaseCase(release: suspend (A, ExitCase) -> Unit): A
    fun onClose(release: (Throwable?) -> Unit): Unit
    infix fun onRelease(release: suspend (ExitCase) -> Unit)
}

interface AcquireStep

Resource Creation

Basic Resource Creation

fun <A> resource(block: suspend ResourceScope.() -> A): Resource<A>
fun <A> resource(acquire: suspend () -> A, release: suspend (A, ExitCase) -> Unit): Resource<A>

Create a resource with custom acquisition and release logic.

val fileResource = resource(
    acquire = { File("data.txt").also { it.createNewFile() } },
    release = { file -> file.delete() }
)

Resource with Exit Case Handling

fun <A> resource(acquire: suspend () -> A, release: suspend (A, ExitCase) -> Unit): Resource<A>

Create a resource that receives information about how the scope exited.

val connectionResource = resource(
    acquire = { openDatabaseConnection() },
    release = { connection, exitCase ->
        when (exitCase) {
            is ExitCase.Completed -> connection.commit()
            is ExitCase.Cancelled -> connection.rollback()
            is ExitCase.Failure -> connection.rollback()
        }
        connection.close()
    }
)

AutoCloseable Resources

fun <A : AutoCloseable> autoCloseable(closingDispatcher: CoroutineDispatcher = IODispatcher, autoCloseable: suspend () -> A): Resource<A>
suspend fun <A : AutoCloseable> ResourceScope.autoCloseable(closingDispatcher: CoroutineDispatcher = IODispatcher, autoCloseable: suspend () -> A): A

Create resources from objects that implement AutoCloseable.

val inputStreamResource = autoCloseable { FileInputStream("input.txt") }
val suspendingResource = autoCloseable { openAsyncConnection() }

Resource Usage

ResourceScope Execution

suspend fun <A> resourceScope(block: suspend ResourceScope.() -> A): A

Execute code within a resource scope where resources can be safely acquired and will be automatically cleaned up.

val result = resourceScope {
    val file = fileResource.bind()
    val connection = connectionResource.bind()
    
    // Use resources safely
    processData(file, connection)
    // Resources automatically cleaned up here
}

Use Pattern

suspend fun <A, B> Resource<A>.use(f: suspend (A) -> B): B

The use pattern for single resources.

val result = fileResource.use { file ->
    file.readText()
}

Manual Resource Allocation (Delicate API)

@DelicateCoroutinesApi
suspend fun <A> Resource<A>.allocate(): Pair<A, suspend (ExitCase) -> Unit>

Manually allocate a resource, returning the resource and its release function.

val (connection, release) = connectionResource.allocate()
try {
    // Use connection
    connection.query("SELECT * FROM users")
} finally {
    release()
}

Advanced Resource Operations

Resource Binding

suspend fun <A> Resource<A>.bind(): A

Bind a resource to the current scope, ensuring it will be cleaned up when the scope exits.

resourceScope {
    val file = fileResource.bind()
    val stream = streamResource.bind()
    
    // Both resources cleaned up automatically
    processFileWithStream(file, stream)
}

Resource Installation

suspend fun <A> install(acquire: suspend () -> A, release: suspend (A) -> Unit): A

Install a resource directly in the current scope.

resourceScope {
    val tempFile = install(
        acquire = { createTempFile() },
        release = { it.delete() }
    )
    
    processFile(tempFile)
}

Adding Release Actions

infix fun <A> Resource<A>.release(release: suspend (A) -> Unit): Resource<A>
infix fun <A> Resource<A>.releaseCase(release: suspend (A, ExitCase) -> Unit): Resource<A>

Add additional release actions to existing resources.

val enhancedResource = fileResource
    .release { file -> file.setReadOnly() }
    .releaseCase { file, exitCase ->
        if (exitCase is ExitCase.Failure) {
            logError("Resource failed", exitCase.failure)
        }
    }

Scope-Level Release Handlers

infix fun onRelease(release: suspend () -> Unit)

Register a release handler that will run when the scope exits.

resourceScope {
    onRelease { println("Cleaning up scope") }
    
    val file = fileResource.bind()
    processFile(file)
    // Scope cleanup message printed after file cleanup
}

Flow Integration

Resource as Flow

fun <A> Resource<A>.asFlow(): Flow<A>

Convert a resource to a Flow that manages the resource lifecycle.

val dataFlow = fileResource.asFlow().map { file ->
    file.readLines()
}.flatten()

Exit Cases

ExitCase Sealed Class

sealed class ExitCase {
    object Completed : ExitCase()
    data class Cancelled(val exception: CancellationException) : ExitCase()
    data class Failure(val failure: Throwable) : ExitCase()
    
    companion object {
        fun ExitCase(error: Throwable): ExitCase
    }
}

Exit cases provide information about how a resource scope or operation terminated:

  • ExitCase.Completed: The operation completed successfully
  • ExitCase.Cancelled: The operation was cancelled
  • ExitCase.Failure: The operation failed with an exception

Resource Composition Examples

Sequential Resource Dependencies

val appResource = resource(
    acquire = {
        resourceScope {
            val config = configResource.bind()
            val database = databaseResource(config).bind()
            val cache = cacheResource(database).bind()
            
            Application(database, cache)
        }
    },
    release = { app -> app.shutdown() }
)

Parallel Resource Initialization

val parallelResources = resource(
    acquire = {
        parZip(
            { serviceAResource.use { it } },
            { serviceBResource.use { it } },
            { serviceCResource.use { it } }
        ) { serviceA, serviceB, serviceC ->
            CombinedServices(serviceA, serviceB, serviceC)
        }
    },
    release = { services -> services.shutdown() }
)

Resource Pool Management

class ConnectionPool(private val maxConnections: Int) {
    fun getConnection(): Resource<Connection> = resource(
        acquire = { acquireFromPool() },
        release = { connection -> returnToPool(connection) }
    )
    
    private suspend fun acquireFromPool(): Connection = TODO()
    private suspend fun returnToPool(connection: Connection) = TODO()
}

Bracket Pattern Functions

The bracket pattern provides fundamental resource safety guarantees without the overhead of the full Resource system. These functions ensure cleanup even when exceptions or cancellation occur.

Basic Bracket

suspend fun <A, B> bracket(
    crossinline acquire: suspend () -> A,
    use: suspend (A) -> B,
    crossinline release: suspend (A) -> Unit
): B

Acquire a resource, use it, and guarantee cleanup regardless of how the operation exits.

val result = bracket(
    acquire = { openFile("data.txt") },
    use = { file -> file.readText() },
    release = { file -> file.close() }
)

Bracket with Exit Case

suspend fun <A, B> bracketCase(
    crossinline acquire: suspend () -> A,
    use: suspend (A) -> B,
    crossinline release: suspend (A, ExitCase) -> Unit
): B

Like bracket, but the release function receives information about how the operation exited.

val result = bracketCase(
    acquire = { openDatabaseTransaction() },
    use = { tx -> tx.performOperations() },
    release = { tx, exitCase ->
        when (exitCase) {
            is ExitCase.Completed -> tx.commit()
            is ExitCase.Cancelled, is ExitCase.Failure -> tx.rollback()
        }
    }
)

Guarantee Finalizer

suspend fun <A> guarantee(
    fa: suspend () -> A,
    crossinline finalizer: suspend () -> Unit
): A

Execute an operation and guarantee a finalizer runs afterwards, regardless of how the operation exits.

val result = guarantee(
    fa = { performOperation() },
    finalizer = { cleanup() }
)

Guarantee with Exit Case

suspend fun <A> guaranteeCase(
    fa: suspend () -> A,
    crossinline finalizer: suspend (ExitCase) -> Unit
): A

Like guarantee, but the finalizer receives information about how the operation exited.

val result = guaranteeCase(
    fa = { riskyOperation() },
    finalizer = { exitCase ->
        when (exitCase) {
            is ExitCase.Completed -> logSuccess()
            is ExitCase.Cancelled -> logCancellation()
            is ExitCase.Failure -> logError(exitCase.failure)
        }
    }
)

Cancellation Handler

suspend fun <A> onCancel(
    fa: suspend () -> A,
    crossinline onCancel: suspend () -> Unit
): A

Execute an operation and run a specific handler only if the operation is cancelled.

val result = onCancel(
    fa = { longRunningOperation() },
    onCancel = { notifyCancellation() }
)

Install with Tessl CLI

npx tessl i tessl/maven-io-arrow-kt--arrow-fx-coroutines-jvm

docs

error-handling.md

index.md

parallel-processing.md

racing.md

resource-management.md

synchronization-flow.md

tile.json