CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-jetbrains-kotlinx--kotlinx-coroutines-reactive

Kotlin coroutines integration utilities for Reactive Streams specification providing seamless interoperability between coroutines and reactive programming paradigms

Pending
Overview
Eval results
Files

publisher-consumption.mddocs/

Publisher Consumption

Suspending extension functions for consuming values from reactive Publishers without blocking threads. These functions integrate with coroutine cancellation and provide various consumption patterns for different use cases.

Capabilities

Await First Value

Suspends until the first value is emitted from the publisher, then returns that value.

/**
 * Awaits the first value from the given publisher without blocking the thread and returns the resulting value, or, if
 * the publisher has produced an error, throws the corresponding exception.
 *
 * This suspending function is cancellable.
 * If the Job of the current coroutine is cancelled while the suspending function is waiting, this
 * function immediately cancels its Subscription and resumes with CancellationException.
 *
 * @throws NoSuchElementException if the publisher does not emit any value
 */
suspend fun <T> Publisher<T>.awaitFirst(): T

Usage Example:

import kotlinx.coroutines.reactive.*
import kotlinx.coroutines.*

runBlocking {
    val publisher: Publisher<String> = // ... some reactive publisher
    try {
        val firstValue = publisher.awaitFirst()
        println("First value: $firstValue")
    } catch (e: NoSuchElementException) {
        println("Publisher emitted no values")
    }
}

Await First Value with Default

Suspends until the first value is emitted, or returns a default value if no values are emitted.

/**
 * Awaits the first value from the given publisher, or returns the default value if none is emitted, without blocking
 * the thread, and returns the resulting value, or, if this publisher has produced an error, throws the corresponding
 * exception.
 *
 * This suspending function is cancellable.
 * If the Job of the current coroutine is cancelled while the suspending function is waiting, this
 * function immediately cancels its Subscription and resumes with CancellationException.
 */
suspend fun <T> Publisher<T>.awaitFirstOrDefault(default: T): T

Await First Value or Null

Suspends until the first value is emitted, or returns null if no values are emitted.

/**
 * Awaits the first value from the given publisher, or returns null if none is emitted, without blocking the thread,
 * and returns the resulting value, or, if this publisher has produced an error, throws the corresponding exception.
 *
 * This suspending function is cancellable.
 * If the Job of the current coroutine is cancelled while the suspending function is waiting, this
 * function immediately cancels its Subscription and resumes with CancellationException.
 */
suspend fun <T> Publisher<T>.awaitFirstOrNull(): T?

Await First Value or Computed Default

Suspends until the first value is emitted, or calls a function to compute a default value if no values are emitted.

/**
 * Awaits the first value from the given publisher, or calls defaultValue to get a value if none is emitted, without
 * blocking the thread, and returns the resulting value, or, if this publisher has produced an error, throws the
 * corresponding exception.
 *
 * This suspending function is cancellable.
 * If the Job of the current coroutine is cancelled while the suspending function is waiting, this
 * function immediately cancels its Subscription and resumes with CancellationException.
 */
suspend fun <T> Publisher<T>.awaitFirstOrElse(defaultValue: () -> T): T

Usage Example:

import kotlinx.coroutines.reactive.*
import kotlinx.coroutines.*

runBlocking {
    val emptyPublisher: Publisher<String> = // ... publisher that emits no values
    
    // Using default value
    val withDefault = emptyPublisher.awaitFirstOrDefault("default")
    
    // Using null fallback
    val withNull = emptyPublisher.awaitFirstOrNull()
    
    // Using computed default
    val withComputed = emptyPublisher.awaitFirstOrElse {
        "computed at ${System.currentTimeMillis()}"
    }
}

Await Last Value

Suspends until all values are emitted, then returns the last value.

/**
 * Awaits the last value from the given publisher without blocking the thread and
 * returns the resulting value, or, if this publisher has produced an error, throws the corresponding exception.
 *
 * This suspending function is cancellable.
 * If the Job of the current coroutine is cancelled while the suspending function is waiting, this
 * function immediately cancels its Subscription and resumes with CancellationException.
 *
 * @throws NoSuchElementException if the publisher does not emit any value
 */
suspend fun <T> Publisher<T>.awaitLast(): T

Usage Example:

import kotlinx.coroutines.reactive.*
import kotlinx.coroutines.*

runBlocking {
    val publisher: Publisher<Int> = // ... publisher that emits 1, 2, 3, 4, 5
    val lastValue = publisher.awaitLast() // Returns 5
    println("Last value: $lastValue")
}

Await Single Value

Suspends until exactly one value is emitted, validating that no more than one value is produced.

/**
 * Awaits the single value from the given publisher without blocking the thread and returns the resulting value, or,
 * if this publisher has produced an error, throws the corresponding exception.
 *
 * This suspending function is cancellable.
 * If the Job of the current coroutine is cancelled while the suspending function is waiting, this
 * function immediately cancels its Subscription and resumes with CancellationException.
 *
 * @throws NoSuchElementException if the publisher does not emit any value
 * @throws IllegalArgumentException if the publisher emits more than one value
 */
suspend fun <T> Publisher<T>.awaitSingle(): T

Usage Example:

import kotlinx.coroutines.reactive.*
import kotlinx.coroutines.*

runBlocking {
    val singleValuePublisher: Publisher<String> = // ... publisher that emits exactly one value
    try {
        val singleValue = singleValuePublisher.awaitSingle()
        println("Single value: $singleValue")
    } catch (e: NoSuchElementException) {
        println("Publisher emitted no values")
    } catch (e: IllegalArgumentException) {
        println("Publisher emitted more than one value")
    }
}

Collect Publisher Values

Subscribes to the publisher and performs an action for each received element.

/**
 * Subscribes to this Publisher and performs the specified action for each received element.
 *
 * If action throws an exception at some point, the subscription is cancelled, and the exception is rethrown from
 * collect. Also, if the publisher signals an error, that error is rethrown from collect.
 */
suspend inline fun <T> Publisher<T>.collect(action: (T) -> Unit): Unit

Usage Example:

import kotlinx.coroutines.reactive.*
import kotlinx.coroutines.*

runBlocking {
    val publisher: Publisher<String> = // ... some reactive publisher
    
    // Collect all values and process them
    publisher.collect { value ->
        println("Received: $value")
        // Process each value as it arrives
    }
}

Legacy/Deprecated Functions

The following functions are deprecated but still available for backward compatibility:

/**
 * @deprecated Deprecated without a replacement due to its name incorrectly conveying the behavior.
 * Please consider using awaitFirstOrDefault().
 */
@Deprecated("Deprecated without a replacement due to its name incorrectly conveying the behavior. Please consider using awaitFirstOrDefault().", level = DeprecationLevel.HIDDEN)
suspend fun <T> Publisher<T>.awaitSingleOrDefault(default: T): T

/**
 * @deprecated Deprecated without a replacement due to its name incorrectly conveying the behavior.
 * There is a specialized version for Reactor's Mono, please use that where applicable.
 * Alternatively, please consider using awaitFirstOrNull().
 */
@Deprecated("Deprecated without a replacement due to its name incorrectly conveying the behavior. There is a specialized version for Reactor's Mono, please use that where applicable. Alternatively, please consider using awaitFirstOrNull().", level = DeprecationLevel.HIDDEN)
suspend fun <T> Publisher<T>.awaitSingleOrNull(): T?

/**
 * @deprecated Deprecated without a replacement due to its name incorrectly conveying the behavior.
 * Please consider using awaitFirstOrElse().
 */
@Deprecated("Deprecated without a replacement due to its name incorrectly conveying the behavior. Please consider using awaitFirstOrElse().", level = DeprecationLevel.HIDDEN)
suspend fun <T> Publisher<T>.awaitSingleOrElse(defaultValue: () -> T): T

Cancellation Behavior

All await functions support coroutine cancellation:

  • When the coroutine is cancelled, the reactive stream subscription is immediately cancelled
  • The function resumes with CancellationException
  • Any pending reactive stream operations are cleaned up properly

Error Propagation

Errors from the reactive publisher are propagated as exceptions:

  • Publisher errors are thrown as-is from the await functions
  • NoSuchElementException is thrown when no values are emitted for functions requiring values
  • IllegalArgumentException is thrown when multiple values are emitted for single-value functions

Thread Safety

These functions are thread-safe and designed to be used from any coroutine context. They do not block threads and properly handle reactive streams threading requirements.

Install with Tessl CLI

npx tessl i tessl/maven-org-jetbrains-kotlinx--kotlinx-coroutines-reactive

docs

cold-publisher-creation.md

flow-publisher-conversion.md

index.md

publisher-consumption.md

tile.json