Kotlin coroutines integration utilities for Reactive Streams specification providing seamless interoperability between coroutines and reactive programming paradigms
npx @tessl/cli install tessl/maven-org-jetbrains-kotlinx--kotlinx-coroutines-reactive@1.10.0Kotlin coroutines integration utilities for Reactive Streams specification providing seamless interoperability between coroutines and reactive programming paradigms. This library enables developers to bridge coroutines with reactive frameworks like RxJava, Reactor, or any Reactive Streams-compliant library while maintaining performance and correctness guarantees.
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactive:1.10.2")import kotlinx.coroutines.reactive.*
import org.reactivestreams.Publisher
import kotlinx.coroutines.flow.Flowimport kotlinx.coroutines.reactive.*
import kotlinx.coroutines.flow.*
import org.reactivestreams.Publisher
// Convert Publisher to Flow
val publisher: Publisher<String> = // ... some reactive publisher
val flow: Flow<String> = publisher.asFlow()
// Convert Flow to Publisher
val sourceFlow = flowOf("hello", "world", "reactive")
val convertedPublisher: Publisher<String> = sourceFlow.asPublisher()
// Await values from publishers
val firstValue = publisher.awaitFirst()
val lastValue = publisher.awaitLast()
val singleValue = publisher.awaitSingle()
// Collect all values from a publisher
publisher.collect { value ->
println("Received: $value")
}
// Create cold reactive publishers from coroutines
val coldPublisher = publish {
send("first")
delay(100)
send("second")
delay(100)
send("third")
}Kotlinx-coroutines-reactive is built around several key integration patterns:
publish coroutine builder creating reactive publishers that start on subscriptionSuspending extension functions for consuming values from reactive Publishers without blocking threads. These functions integrate with coroutine cancellation and provide various consumption patterns.
suspend fun <T> Publisher<T>.awaitFirst(): T
suspend fun <T> Publisher<T>.awaitFirstOrDefault(default: T): T
suspend fun <T> Publisher<T>.awaitFirstOrNull(): T?
suspend fun <T> Publisher<T>.awaitFirstOrElse(defaultValue: () -> T): T
suspend fun <T> Publisher<T>.awaitLast(): T
suspend fun <T> Publisher<T>.awaitSingle(): T
suspend inline fun <T> Publisher<T>.collect(action: (T) -> Unit): UnitBidirectional conversion utilities between Kotlin Flow and Reactive Streams Publisher, maintaining TCK compliance and providing seamless integration between coroutines and reactive paradigms.
fun <T : Any> Publisher<T>.asFlow(): Flow<T>
fun <T : Any> Flow<T>.asPublisher(context: CoroutineContext = EmptyCoroutineContext): Publisher<T>Coroutine builder for creating cold reactive Publishers that execute a coroutine block on each subscription, with proper back-pressure handling and subscription lifecycle management.
fun <T> publish(
context: CoroutineContext = EmptyCoroutineContext,
block: suspend ProducerScope<T>.() -> Unit
): Publisher<T>interface ProducerScope<in E> : CoroutineScope, SendChannel<E> {
val channel: SendChannel<E>
}
interface ContextInjector {
fun <T> injectCoroutineContext(publisher: Publisher<T>, coroutineContext: CoroutineContext): Publisher<T>
}
class FlowSubscription<T>(
val flow: Flow<T>,
val subscriber: Subscriber<in T>,
context: CoroutineContext
) : Subscription, AbstractCoroutine<Unit>
class PublisherCoroutine<in T>(
parentContext: CoroutineContext,
subscriber: Subscriber<T>,
exceptionOnCancelHandler: (Throwable, CoroutineContext) -> Unit
) : AbstractCoroutine<Unit>, ProducerScope<T>, SubscriptionAll suspending functions in this library are cancellable and will cancel their reactive stream subscriptions when the coroutine is cancelled.
Publisher consumption functions may throw:
NoSuchElementException - when no elements are emitted for functions requiring at least one element (awaitFirst, awaitLast, awaitSingle)IllegalArgumentException - when multiple elements are emitted for functions requiring exactly one element (awaitSingle)CancellationException - when the coroutine is cancelled during executionThe publish builder may throw:
IllegalArgumentException - if the provided context contains a Job instance, as the publisher lifecycle should be managed via subscriptionNullPointerException - if attempting to emit null values through the publisher (reactive streams specification prohibits null values)Flow/Publisher conversions handle errors as follows:
Subscriber.onError