or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

cold-publisher-creation.mdflow-publisher-conversion.mdindex.mdpublisher-consumption.md
tile.json

tessl/maven-org-jetbrains-kotlinx--kotlinx-coroutines-reactive

Kotlin coroutines integration utilities for Reactive Streams specification providing seamless interoperability between coroutines and reactive programming paradigms

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.jetbrains.kotlinx/kotlinx-coroutines-reactive@1.10.x

To install, run

npx @tessl/cli install tessl/maven-org-jetbrains-kotlinx--kotlinx-coroutines-reactive@1.10.0

index.mddocs/

Kotlinx Coroutines Reactive

Kotlin coroutines integration utilities for Reactive Streams specification providing seamless interoperability between coroutines and reactive programming paradigms. This library enables developers to bridge coroutines with reactive frameworks like RxJava, Reactor, or any Reactive Streams-compliant library while maintaining performance and correctness guarantees.

Package Information

  • Package Name: kotlinx-coroutines-reactive
  • Package Type: maven
  • Language: Kotlin
  • Installation: Add dependency to build.gradle.kts:
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactive:1.10.2")

Core Imports

import kotlinx.coroutines.reactive.*
import org.reactivestreams.Publisher
import kotlinx.coroutines.flow.Flow

Basic Usage

import kotlinx.coroutines.reactive.*
import kotlinx.coroutines.flow.*
import org.reactivestreams.Publisher

// Convert Publisher to Flow
val publisher: Publisher<String> = // ... some reactive publisher
val flow: Flow<String> = publisher.asFlow()

// Convert Flow to Publisher  
val sourceFlow = flowOf("hello", "world", "reactive")
val convertedPublisher: Publisher<String> = sourceFlow.asPublisher()

// Await values from publishers
val firstValue = publisher.awaitFirst()
val lastValue = publisher.awaitLast()
val singleValue = publisher.awaitSingle()

// Collect all values from a publisher
publisher.collect { value ->
    println("Received: $value")
}

// Create cold reactive publishers from coroutines
val coldPublisher = publish {
    send("first")
    delay(100)
    send("second")
    delay(100) 
    send("third")
}

Architecture

Kotlinx-coroutines-reactive is built around several key integration patterns:

  • Flow/Publisher Conversion: Bidirectional adapters maintaining TCK compliance and proper back-pressure handling
  • Suspending Extensions: Coroutine-based await functions for consuming reactive streams without blocking threads
  • Cold Publisher Builder: publish coroutine builder creating reactive publishers that start on subscription
  • Context Propagation: Integration with ReactorContext when kotlinx-coroutines-reactor is present
  • Back-pressure Management: Proper handling of reactive streams demand and flow control mechanisms

Capabilities

Publisher Consumption

Suspending extension functions for consuming values from reactive Publishers without blocking threads. These functions integrate with coroutine cancellation and provide various consumption patterns.

suspend fun <T> Publisher<T>.awaitFirst(): T
suspend fun <T> Publisher<T>.awaitFirstOrDefault(default: T): T  
suspend fun <T> Publisher<T>.awaitFirstOrNull(): T?
suspend fun <T> Publisher<T>.awaitFirstOrElse(defaultValue: () -> T): T
suspend fun <T> Publisher<T>.awaitLast(): T
suspend fun <T> Publisher<T>.awaitSingle(): T
suspend inline fun <T> Publisher<T>.collect(action: (T) -> Unit): Unit

Publisher Consumption

Flow Publisher Conversion

Bidirectional conversion utilities between Kotlin Flow and Reactive Streams Publisher, maintaining TCK compliance and providing seamless integration between coroutines and reactive paradigms.

fun <T : Any> Publisher<T>.asFlow(): Flow<T>
fun <T : Any> Flow<T>.asPublisher(context: CoroutineContext = EmptyCoroutineContext): Publisher<T>

Flow Publisher Conversion

Cold Publisher Creation

Coroutine builder for creating cold reactive Publishers that execute a coroutine block on each subscription, with proper back-pressure handling and subscription lifecycle management.

fun <T> publish(
    context: CoroutineContext = EmptyCoroutineContext,
    block: suspend ProducerScope<T>.() -> Unit
): Publisher<T>

Cold Publisher Creation

Types

interface ProducerScope<in E> : CoroutineScope, SendChannel<E> {
    val channel: SendChannel<E>
}

interface ContextInjector {
    fun <T> injectCoroutineContext(publisher: Publisher<T>, coroutineContext: CoroutineContext): Publisher<T>
}

class FlowSubscription<T>(
    val flow: Flow<T>,
    val subscriber: Subscriber<in T>,
    context: CoroutineContext
) : Subscription, AbstractCoroutine<Unit>

class PublisherCoroutine<in T>(
    parentContext: CoroutineContext,
    subscriber: Subscriber<T>,
    exceptionOnCancelHandler: (Throwable, CoroutineContext) -> Unit
) : AbstractCoroutine<Unit>, ProducerScope<T>, Subscription

Error Handling

All suspending functions in this library are cancellable and will cancel their reactive stream subscriptions when the coroutine is cancelled.

Publisher Consumption Functions

Publisher consumption functions may throw:

  • NoSuchElementException - when no elements are emitted for functions requiring at least one element (awaitFirst, awaitLast, awaitSingle)
  • IllegalArgumentException - when multiple elements are emitted for functions requiring exactly one element (awaitSingle)
  • CancellationException - when the coroutine is cancelled during execution
  • Any exception thrown by the underlying reactive publisher is propagated as-is

Publisher Creation Functions

The publish builder may throw:

  • IllegalArgumentException - if the provided context contains a Job instance, as the publisher lifecycle should be managed via subscription
  • NullPointerException - if attempting to emit null values through the publisher (reactive streams specification prohibits null values)

Conversion Functions

Flow/Publisher conversions handle errors as follows:

  • Flow exceptions are propagated to reactive subscribers via Subscriber.onError
  • Publisher errors are propagated as flow exceptions during collection
  • Resource cleanup is guaranteed even when errors occur
  • Cancellation is properly handled in both directions with appropriate subscription cancellation