Kotlin coroutines integration utilities for Reactive Streams specification providing seamless interoperability between coroutines and reactive programming paradigms
—
Suspending extension functions for consuming values from reactive Publishers without blocking threads. These functions integrate with coroutine cancellation and provide various consumption patterns for different use cases.
Suspends until the first value is emitted from the publisher, then returns that value.
/**
* Awaits the first value from the given publisher without blocking the thread and returns the resulting value, or, if
* the publisher has produced an error, throws the corresponding exception.
*
* This suspending function is cancellable.
* If the Job of the current coroutine is cancelled while the suspending function is waiting, this
* function immediately cancels its Subscription and resumes with CancellationException.
*
* @throws NoSuchElementException if the publisher does not emit any value
*/
suspend fun <T> Publisher<T>.awaitFirst(): TUsage Example:
import kotlinx.coroutines.reactive.*
import kotlinx.coroutines.*
runBlocking {
val publisher: Publisher<String> = // ... some reactive publisher
try {
val firstValue = publisher.awaitFirst()
println("First value: $firstValue")
} catch (e: NoSuchElementException) {
println("Publisher emitted no values")
}
}Suspends until the first value is emitted, or returns a default value if no values are emitted.
/**
* Awaits the first value from the given publisher, or returns the default value if none is emitted, without blocking
* the thread, and returns the resulting value, or, if this publisher has produced an error, throws the corresponding
* exception.
*
* This suspending function is cancellable.
* If the Job of the current coroutine is cancelled while the suspending function is waiting, this
* function immediately cancels its Subscription and resumes with CancellationException.
*/
suspend fun <T> Publisher<T>.awaitFirstOrDefault(default: T): TSuspends until the first value is emitted, or returns null if no values are emitted.
/**
* Awaits the first value from the given publisher, or returns null if none is emitted, without blocking the thread,
* and returns the resulting value, or, if this publisher has produced an error, throws the corresponding exception.
*
* This suspending function is cancellable.
* If the Job of the current coroutine is cancelled while the suspending function is waiting, this
* function immediately cancels its Subscription and resumes with CancellationException.
*/
suspend fun <T> Publisher<T>.awaitFirstOrNull(): T?Suspends until the first value is emitted, or calls a function to compute a default value if no values are emitted.
/**
* Awaits the first value from the given publisher, or calls defaultValue to get a value if none is emitted, without
* blocking the thread, and returns the resulting value, or, if this publisher has produced an error, throws the
* corresponding exception.
*
* This suspending function is cancellable.
* If the Job of the current coroutine is cancelled while the suspending function is waiting, this
* function immediately cancels its Subscription and resumes with CancellationException.
*/
suspend fun <T> Publisher<T>.awaitFirstOrElse(defaultValue: () -> T): TUsage Example:
import kotlinx.coroutines.reactive.*
import kotlinx.coroutines.*
runBlocking {
val emptyPublisher: Publisher<String> = // ... publisher that emits no values
// Using default value
val withDefault = emptyPublisher.awaitFirstOrDefault("default")
// Using null fallback
val withNull = emptyPublisher.awaitFirstOrNull()
// Using computed default
val withComputed = emptyPublisher.awaitFirstOrElse {
"computed at ${System.currentTimeMillis()}"
}
}Suspends until all values are emitted, then returns the last value.
/**
* Awaits the last value from the given publisher without blocking the thread and
* returns the resulting value, or, if this publisher has produced an error, throws the corresponding exception.
*
* This suspending function is cancellable.
* If the Job of the current coroutine is cancelled while the suspending function is waiting, this
* function immediately cancels its Subscription and resumes with CancellationException.
*
* @throws NoSuchElementException if the publisher does not emit any value
*/
suspend fun <T> Publisher<T>.awaitLast(): TUsage Example:
import kotlinx.coroutines.reactive.*
import kotlinx.coroutines.*
runBlocking {
val publisher: Publisher<Int> = // ... publisher that emits 1, 2, 3, 4, 5
val lastValue = publisher.awaitLast() // Returns 5
println("Last value: $lastValue")
}Suspends until exactly one value is emitted, validating that no more than one value is produced.
/**
* Awaits the single value from the given publisher without blocking the thread and returns the resulting value, or,
* if this publisher has produced an error, throws the corresponding exception.
*
* This suspending function is cancellable.
* If the Job of the current coroutine is cancelled while the suspending function is waiting, this
* function immediately cancels its Subscription and resumes with CancellationException.
*
* @throws NoSuchElementException if the publisher does not emit any value
* @throws IllegalArgumentException if the publisher emits more than one value
*/
suspend fun <T> Publisher<T>.awaitSingle(): TUsage Example:
import kotlinx.coroutines.reactive.*
import kotlinx.coroutines.*
runBlocking {
val singleValuePublisher: Publisher<String> = // ... publisher that emits exactly one value
try {
val singleValue = singleValuePublisher.awaitSingle()
println("Single value: $singleValue")
} catch (e: NoSuchElementException) {
println("Publisher emitted no values")
} catch (e: IllegalArgumentException) {
println("Publisher emitted more than one value")
}
}Subscribes to the publisher and performs an action for each received element.
/**
* Subscribes to this Publisher and performs the specified action for each received element.
*
* If action throws an exception at some point, the subscription is cancelled, and the exception is rethrown from
* collect. Also, if the publisher signals an error, that error is rethrown from collect.
*/
suspend inline fun <T> Publisher<T>.collect(action: (T) -> Unit): UnitUsage Example:
import kotlinx.coroutines.reactive.*
import kotlinx.coroutines.*
runBlocking {
val publisher: Publisher<String> = // ... some reactive publisher
// Collect all values and process them
publisher.collect { value ->
println("Received: $value")
// Process each value as it arrives
}
}The following functions are deprecated but still available for backward compatibility:
/**
* @deprecated Deprecated without a replacement due to its name incorrectly conveying the behavior.
* Please consider using awaitFirstOrDefault().
*/
@Deprecated("Deprecated without a replacement due to its name incorrectly conveying the behavior. Please consider using awaitFirstOrDefault().", level = DeprecationLevel.HIDDEN)
suspend fun <T> Publisher<T>.awaitSingleOrDefault(default: T): T
/**
* @deprecated Deprecated without a replacement due to its name incorrectly conveying the behavior.
* There is a specialized version for Reactor's Mono, please use that where applicable.
* Alternatively, please consider using awaitFirstOrNull().
*/
@Deprecated("Deprecated without a replacement due to its name incorrectly conveying the behavior. There is a specialized version for Reactor's Mono, please use that where applicable. Alternatively, please consider using awaitFirstOrNull().", level = DeprecationLevel.HIDDEN)
suspend fun <T> Publisher<T>.awaitSingleOrNull(): T?
/**
* @deprecated Deprecated without a replacement due to its name incorrectly conveying the behavior.
* Please consider using awaitFirstOrElse().
*/
@Deprecated("Deprecated without a replacement due to its name incorrectly conveying the behavior. Please consider using awaitFirstOrElse().", level = DeprecationLevel.HIDDEN)
suspend fun <T> Publisher<T>.awaitSingleOrElse(defaultValue: () -> T): TAll await functions support coroutine cancellation:
CancellationExceptionErrors from the reactive publisher are propagated as exceptions:
NoSuchElementException is thrown when no values are emitted for functions requiring valuesIllegalArgumentException is thrown when multiple values are emitted for single-value functionsThese functions are thread-safe and designed to be used from any coroutine context. They do not block threads and properly handle reactive streams threading requirements.
Install with Tessl CLI
npx tessl i tessl/maven-org-jetbrains-kotlinx--kotlinx-coroutines-reactive