Kotlin coroutines integration utilities for Reactive Streams specification providing seamless interoperability between coroutines and reactive programming paradigms
—
Bidirectional conversion utilities between Kotlin Flow and Reactive Streams Publisher, maintaining TCK compliance and providing seamless integration between coroutines and reactive paradigms. These conversions preserve back-pressure, context propagation, and error handling semantics.
Transforms a reactive Publisher into a Kotlin Flow with configurable back-pressure handling.
/**
* Transforms the given reactive Publisher into Flow.
* Use the buffer operator on the resulting flow to specify the size of the back-pressure.
* In effect, it specifies the value of the subscription's request.
* The default buffer capacity for a suspending channel is used by default.
*
* If any of the resulting flow transformations fails, the subscription is immediately cancelled and all the in-flight
* elements are discarded.
*
* This function is integrated with ReactorContext from kotlinx-coroutines-reactor module,
* see its documentation for additional details.
*/
fun <T : Any> Publisher<T>.asFlow(): Flow<T>Usage Examples:
import kotlinx.coroutines.reactive.*
import kotlinx.coroutines.flow.*
import org.reactivestreams.Publisher
// Basic conversion
val publisher: Publisher<String> = // ... some reactive publisher
val flow: Flow<String> = publisher.asFlow()
// Process the flow
flow.collect { value ->
println("Received: $value")
}
// With back-pressure control
val bufferedFlow = publisher.asFlow().buffer(capacity = 64)
bufferedFlow.collect { value ->
// Process with larger buffer
processValue(value)
}
// Chain with other flow operations
val transformedFlow = publisher.asFlow()
.filter { it.isNotEmpty() }
.map { it.uppercase() }
.take(10)
transformedFlow.collect { value ->
println("Transformed: $value")
}Transforms a Kotlin Flow into a reactive specification compliant Publisher.
/**
* Transforms the given flow into a reactive specification compliant Publisher.
*
* This function is integrated with ReactorContext from kotlinx-coroutines-reactor module,
* see its documentation for additional details.
*
* An optional context can be specified to control the execution context of calls to the Subscriber methods.
* A CoroutineDispatcher can be set to confine them to a specific thread; various ThreadContextElement can be set to
* inject additional context into the caller thread. By default, the Unconfined dispatcher
* is used, so calls are performed from an arbitrary thread.
*/
fun <T : Any> Flow<T>.asPublisher(context: CoroutineContext = EmptyCoroutineContext): Publisher<T>Usage Examples:
import kotlinx.coroutines.reactive.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.*
import kotlin.coroutines.CoroutineContext
// Basic conversion
val flow = flowOf("hello", "world", "reactive")
val publisher: Publisher<String> = flow.asPublisher()
// Subscribe to the publisher
publisher.subscribe(object : Subscriber<String> {
override fun onSubscribe(s: Subscription) {
s.request(Long.MAX_VALUE)
}
override fun onNext(t: String) {
println("Publisher emitted: $t")
}
override fun onError(t: Throwable) {
println("Error: ${t.message}")
}
override fun onComplete() {
println("Publisher completed")
}
})
// With custom context
val customContext = Dispatchers.IO + CoroutineName("FlowPublisher")
val contextualPublisher = flow.asPublisher(customContext)
// Convert complex flow
val complexFlow = flow {
repeat(5) { i ->
emit("item-$i")
delay(100)
}
}.map { it.uppercase() }
.filter { it.contains("ITEM") }
val complexPublisher = complexFlow.asPublisher()The asFlow() conversion respects reactive streams back-pressure:
Channel.BUFFERED capacity by default.buffer() operator on the resulting flowBack-pressure Configuration:
import kotlinx.coroutines.reactive.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.channels.Channel
val publisher: Publisher<Data> = // ... high-throughput publisher
// Different back-pressure strategies
val rendezvousFlow = publisher.asFlow().buffer(Channel.RENDEZVOUS) // No buffering
val bufferedFlow = publisher.asFlow().buffer(64) // Fixed buffer
val unlimitedFlow = publisher.asFlow().buffer(Channel.UNLIMITED) // Unlimited buffer
// Drop strategies
val dropOldestFlow = publisher.asFlow().buffer(10, BufferOverflow.DROP_OLDEST)
val dropLatestFlow = publisher.asFlow().buffer(10, BufferOverflow.DROP_LATEST)The asPublisher() conversion provides TCK-compliant back-pressure:
Dispatchers.Unconfined by default for subscriber callbacksWhen kotlinx-coroutines-reactor is in the classpath, these conversions automatically handle ReactorContext propagation:
import kotlinx.coroutines.reactive.*
import kotlinx.coroutines.reactor.*
// ReactorContext is automatically propagated
val publisher = flow.asPublisher()
// Context flows from coroutine to Reactor subscriber context
val flow = reactivePublisher.asFlow()
// Context flows from Reactor context to coroutine contextThe asPublisher() function accepts a custom CoroutineContext:
import kotlinx.coroutines.*
import kotlinx.coroutines.reactive.*
// Custom dispatcher
val publisher = flow.asPublisher(Dispatchers.IO)
// Custom context elements
val customContext = Dispatchers.Default +
CoroutineName("MyFlowPublisher") +
CoroutineExceptionHandler { _, throwable ->
println("Unhandled exception: $throwable")
}
val contextualPublisher = flow.asPublisher(customContext)Both conversions properly handle and propagate errors:
onErrorError Handling Examples:
import kotlinx.coroutines.reactive.*
import kotlinx.coroutines.flow.*
// Flow error handling
val errorFlow = flow {
emit("value1")
throw RuntimeException("Flow error")
}
val errorPublisher = errorFlow.asPublisher()
// Subscriber will receive "value1" then onError with RuntimeException
// Publisher error handling
val errorPublisher: Publisher<String> = // ... publisher that emits error
val errorFlow = errorPublisher.asFlow()
try {
errorFlow.collect { value ->
println(value)
}
} catch (e: Exception) {
println("Caught flow exception: ${e.message}")
}Both conversion functions require non-nullable types (T : Any) to comply with reactive streams specification which prohibits null values.
// Valid - non-nullable types
val stringFlow: Flow<String> = publisher.asFlow()
val stringPublisher: Publisher<String> = flow.asPublisher()
// Invalid - nullable types not supported
// val nullableFlow: Flow<String?> = publisher.asFlow() // Compilation errorThe following functions are deprecated but still available for backward compatibility:
/**
* @deprecated Replaced in favor of ReactiveFlow extension, please import kotlinx.coroutines.reactive.* instead of kotlinx.coroutines.reactive.FlowKt
*/
@Deprecated("Replaced in favor of ReactiveFlow extension, please import kotlinx.coroutines.reactive.* instead of kotlinx.coroutines.reactive.FlowKt", level = DeprecationLevel.HIDDEN)
fun <T : Any> Publisher<T>.asFlowDeprecated(): Flow<T>
/**
* @deprecated Replaced in favor of ReactiveFlow extension, please import kotlinx.coroutines.reactive.* instead of kotlinx.coroutines.reactive.FlowKt
*/
@Deprecated("Replaced in favor of ReactiveFlow extension, please import kotlinx.coroutines.reactive.* instead of kotlinx.coroutines.reactive.FlowKt", level = DeprecationLevel.HIDDEN)
fun <T : Any> Flow<T>.asPublisherDeprecated(): Publisher<T>
/**
* @deprecated batchSize parameter is deprecated, use .buffer() instead to control the backpressure
*/
@Deprecated("batchSize parameter is deprecated, use .buffer() instead to control the backpressure", level = DeprecationLevel.HIDDEN)
fun <T : Any> Publisher<T>.asFlow(batchSize: Int): Flow<T>/**
* @deprecated Deprecated in the favour of consumeAsFlow()
*/
@Deprecated("Deprecated in the favour of consumeAsFlow()", level = DeprecationLevel.HIDDEN)
fun <T> ReceiveChannel<T>.asPublisher(context: CoroutineContext = EmptyCoroutineContext): Publisher<T>
/**
* @deprecated Transforming publisher to channel is deprecated, use asFlow() instead
*/
@Deprecated("Transforming publisher to channel is deprecated, use asFlow() instead", level = DeprecationLevel.HIDDEN)
fun <T> Publisher<T>.openSubscription(request: Int = 1): ReceiveChannel<T>Install with Tessl CLI
npx tessl i tessl/maven-org-jetbrains-kotlinx--kotlinx-coroutines-reactive