Kotlin coroutines integration utilities for Reactive Streams specification providing seamless interoperability between coroutines and reactive programming paradigms
—
Coroutine builder for creating cold reactive Publishers that execute a coroutine block on each subscription. This enables creating reactive streams from coroutines with proper back-pressure handling, subscription lifecycle management, and seamless integration with reactive frameworks.
Creates a cold reactive Publisher that runs a coroutine block for each subscription.
/**
* Creates a cold reactive Publisher that runs a given block in a coroutine.
*
* Every time the returned flux is subscribed, it starts a new coroutine in the specified context.
* The coroutine emits (via Subscriber.onNext) values with send, completes (via Subscriber.onComplete)
* when the coroutine completes or channel is explicitly closed, and emits errors (via Subscriber.onError)
* if the coroutine throws an exception or closes channel with a cause.
* Unsubscribing cancels the running coroutine.
*
* Invocations of send are suspended appropriately when subscribers apply back-pressure and to
* ensure that onNext is not invoked concurrently.
*
* Coroutine context can be specified with context argument.
* If the context does not have any dispatcher nor any other ContinuationInterceptor, then Dispatchers.Default is used.
*
* Note: This is an experimental api. Behaviour of publishers that work as children in a parent scope with respect
* to cancellation and error handling may change in the future.
*
* @throws IllegalArgumentException if the provided context contains a Job instance.
*/
fun <T> publish(
context: CoroutineContext = EmptyCoroutineContext,
block: suspend ProducerScope<T>.() -> Unit
): Publisher<T>Basic Usage Examples:
import kotlinx.coroutines.reactive.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.ProducerScope
// Simple value emission
val simplePublisher = publish<String> {
send("Hello")
send("World")
send("Reactive")
}
// Subscribe to the publisher
simplePublisher.subscribe(object : Subscriber<String> {
override fun onSubscribe(s: Subscription) {
s.request(Long.MAX_VALUE)
}
override fun onNext(t: String) {
println("Received: $t")
}
override fun onError(t: Throwable) {
println("Error: ${t.message}")
}
override fun onComplete() {
println("Completed")
}
})
// Time-based emission
val timedPublisher = publish<Int> {
repeat(5) { i ->
send(i)
delay(1000) // Emit every second
}
}
// Conditional emission
val conditionalPublisher = publish<String> {
val data = fetchDataFromApi()
if (data.isNotEmpty()) {
data.forEach { item ->
send(item.toString())
}
} else {
close(RuntimeException("No data available"))
}
}The ProducerScope provides the execution context for the publisher coroutine.
interface ProducerScope<in E> : CoroutineScope, SendChannel<E> {
val channel: SendChannel<E>
// Inherited from SendChannel
suspend fun send(element: E)
fun trySend(element: E): ChannelResult<Unit>
fun close(cause: Throwable? = null): Boolean
val isClosedForSend: Boolean
}ProducerScope Usage:
import kotlinx.coroutines.reactive.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
val publisher = publish<Data> {
// Access to coroutine context
println("Publisher started in: ${coroutineContext[CoroutineDispatcher]}")
try {
// Send values
val items = generateItems()
items.forEach { item ->
send(item) // Suspends if back-pressure applied
}
// Check if channel is still open
if (!isClosedForSend) {
send(finalItem)
}
} catch (e: Exception) {
// Close with error
close(e)
return@publish
}
// Normal completion - close() called automatically
}Specify execution context for the publisher coroutine:
import kotlinx.coroutines.reactive.*
import kotlinx.coroutines.*
// Custom dispatcher
val ioPublisher = publish(Dispatchers.IO) {
val data = performIOOperation()
send(data)
}
// Custom context with multiple elements
val customContext = Dispatchers.Default +
CoroutineName("DataPublisher") +
CoroutineExceptionHandler { _, throwable ->
println("Unhandled publisher exception: $throwable")
}
val contextualPublisher = publish(customContext) {
processData()
}
// Job context not allowed
try {
val invalidPublisher = publish(Job()) { // Throws IllegalArgumentException
send("This won't work")
}
} catch (e: IllegalArgumentException) {
println("Cannot provide Job in context: ${e.message}")
}Dispatchers.Default is usedThe publish builder handles back-pressure automatically:
import kotlinx.coroutines.reactive.*
import kotlinx.coroutines.*
val backPressurePublisher = publish<Int> {
repeat(10000) { i ->
send(i) // Suspends when subscriber can't keep up
// Only proceeds when subscriber requests more
}
}
// Subscriber controls the flow
backPressurePublisher.subscribe(object : Subscriber<Int> {
private lateinit var subscription: Subscription
override fun onSubscribe(s: Subscription) {
subscription = s
s.request(1) // Request one item at a time
}
override fun onNext(t: Int) {
println("Processing: $t")
// Simulate slow processing
Thread.sleep(100)
subscription.request(1) // Request next item
}
override fun onError(t: Throwable) {
println("Error: ${t.message}")
}
override fun onComplete() {
println("Completed")
}
})Each subscription creates a new coroutine execution:
import kotlinx.coroutines.reactive.*
import kotlinx.coroutines.*
var executionCount = 0
val coldPublisher = publish<String> {
val id = ++executionCount
println("Execution $id started")
send("Message from execution $id")
delay(1000)
send("Final message from execution $id")
println("Execution $id completed")
}
// First subscription
coldPublisher.subscribe(subscriber1) // Prints: "Execution 1 started"
// Second subscription
coldPublisher.subscribe(subscriber2) // Prints: "Execution 2 started"
// Each subscriber gets independent executionPublisher completes when the coroutine block finishes normally:
val completingPublisher = publish<Int> {
repeat(3) { i ->
send(i)
}
// Completes normally - onComplete() called on subscriber
}Publisher signals error when exception is thrown or channel is closed with cause:
val errorPublisher = publish<String> {
send("Before error")
if (someCondition) {
throw RuntimeException("Something went wrong")
// onError() called on subscriber
}
// Alternative: close with cause
close(IllegalStateException("Invalid state"))
}Publisher handles subscription cancellation properly:
val cancellablePublisher = publish<Long> {
try {
var counter = 0L
while (true) {
send(counter++)
delay(500)
}
} catch (e: CancellationException) {
println("Publisher was cancelled")
throw e // Re-throw cancellation
} finally {
println("Publisher cleanup")
}
}
// Subscription can be cancelled
val subscription = // ... get subscription from onSubscribe
subscription.cancel() // Cancels the coroutineimport kotlinx.coroutines.reactive.*
import io.reactivex.rxjava3.core.Flowable
val coroutinePublisher = publish<String> {
send("From coroutine")
delay(1000)
send("After delay")
}
// Convert to RxJava Flowable
val flowable = Flowable.fromPublisher(coroutinePublisher)import kotlinx.coroutines.reactive.*
import reactor.core.publisher.Flux
val coroutinePublisher = publish<Int> {
repeat(5) { i ->
send(i * i)
delay(100)
}
}
// Convert to Reactor Flux
val flux = Flux.from(coroutinePublisher)The following function is deprecated but still available for backward compatibility:
/**
* @deprecated CoroutineScope.publish is deprecated in favour of top-level publish
*/
@Deprecated("CoroutineScope.publish is deprecated in favour of top-level publish", level = DeprecationLevel.HIDDEN)
fun <T> CoroutineScope.publish(
context: CoroutineContext = EmptyCoroutineContext,
block: suspend ProducerScope<T>.() -> Unit
): Publisher<T>Install with Tessl CLI
npx tessl i tessl/maven-org-jetbrains-kotlinx--kotlinx-coroutines-reactive