CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-jetbrains-kotlinx--kotlinx-coroutines-reactive

Kotlin coroutines integration utilities for Reactive Streams specification providing seamless interoperability between coroutines and reactive programming paradigms

Pending
Overview
Eval results
Files

flow-publisher-conversion.mddocs/

Flow Publisher Conversion

Bidirectional conversion utilities between Kotlin Flow and Reactive Streams Publisher, maintaining TCK compliance and providing seamless integration between coroutines and reactive paradigms. These conversions preserve back-pressure, context propagation, and error handling semantics.

Capabilities

Publisher to Flow Conversion

Transforms a reactive Publisher into a Kotlin Flow with configurable back-pressure handling.

/**
 * Transforms the given reactive Publisher into Flow.
 * Use the buffer operator on the resulting flow to specify the size of the back-pressure.
 * In effect, it specifies the value of the subscription's request.
 * The default buffer capacity for a suspending channel is used by default.
 *
 * If any of the resulting flow transformations fails, the subscription is immediately cancelled and all the in-flight
 * elements are discarded.
 *
 * This function is integrated with ReactorContext from kotlinx-coroutines-reactor module,
 * see its documentation for additional details.
 */
fun <T : Any> Publisher<T>.asFlow(): Flow<T>

Usage Examples:

import kotlinx.coroutines.reactive.*
import kotlinx.coroutines.flow.*
import org.reactivestreams.Publisher

// Basic conversion
val publisher: Publisher<String> = // ... some reactive publisher
val flow: Flow<String> = publisher.asFlow()

// Process the flow
flow.collect { value ->
    println("Received: $value")
}

// With back-pressure control
val bufferedFlow = publisher.asFlow().buffer(capacity = 64)
bufferedFlow.collect { value ->
    // Process with larger buffer
    processValue(value)
}

// Chain with other flow operations
val transformedFlow = publisher.asFlow()
    .filter { it.isNotEmpty() }
    .map { it.uppercase() }
    .take(10)

transformedFlow.collect { value ->
    println("Transformed: $value")
}

Flow to Publisher Conversion

Transforms a Kotlin Flow into a reactive specification compliant Publisher.

/**
 * Transforms the given flow into a reactive specification compliant Publisher.
 *
 * This function is integrated with ReactorContext from kotlinx-coroutines-reactor module,
 * see its documentation for additional details.
 *
 * An optional context can be specified to control the execution context of calls to the Subscriber methods.
 * A CoroutineDispatcher can be set to confine them to a specific thread; various ThreadContextElement can be set to
 * inject additional context into the caller thread. By default, the Unconfined dispatcher
 * is used, so calls are performed from an arbitrary thread.
 */
fun <T : Any> Flow<T>.asPublisher(context: CoroutineContext = EmptyCoroutineContext): Publisher<T>

Usage Examples:

import kotlinx.coroutines.reactive.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.*
import kotlin.coroutines.CoroutineContext

// Basic conversion
val flow = flowOf("hello", "world", "reactive")
val publisher: Publisher<String> = flow.asPublisher()

// Subscribe to the publisher
publisher.subscribe(object : Subscriber<String> {
    override fun onSubscribe(s: Subscription) {
        s.request(Long.MAX_VALUE)
    }
    
    override fun onNext(t: String) {
        println("Publisher emitted: $t")
    }
    
    override fun onError(t: Throwable) {
        println("Error: ${t.message}")
    }
    
    override fun onComplete() {
        println("Publisher completed")
    }
})

// With custom context
val customContext = Dispatchers.IO + CoroutineName("FlowPublisher")
val contextualPublisher = flow.asPublisher(customContext)

// Convert complex flow
val complexFlow = flow {
    repeat(5) { i ->
        emit("item-$i")
        delay(100)
    }
}.map { it.uppercase() }
  .filter { it.contains("ITEM") }

val complexPublisher = complexFlow.asPublisher()

Back-pressure Handling

Publisher to Flow

The asFlow() conversion respects reactive streams back-pressure:

  • Uses Channel.BUFFERED capacity by default
  • Can be controlled with .buffer() operator on the resulting flow
  • Properly handles subscriber demand and publisher supply rates
  • Cancels subscription when flow collection is cancelled

Back-pressure Configuration:

import kotlinx.coroutines.reactive.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.channels.Channel

val publisher: Publisher<Data> = // ... high-throughput publisher

// Different back-pressure strategies
val rendezvousFlow = publisher.asFlow().buffer(Channel.RENDEZVOUS)     // No buffering
val bufferedFlow = publisher.asFlow().buffer(64)                       // Fixed buffer
val unlimitedFlow = publisher.asFlow().buffer(Channel.UNLIMITED)       // Unlimited buffer

// Drop strategies
val dropOldestFlow = publisher.asFlow().buffer(10, BufferOverflow.DROP_OLDEST)
val dropLatestFlow = publisher.asFlow().buffer(10, BufferOverflow.DROP_LATEST)

Flow to Publisher

The asPublisher() conversion provides TCK-compliant back-pressure:

  • Respects subscriber demand signals
  • Suspends flow collection when demand is exhausted
  • Properly handles cancellation and error propagation
  • Uses Dispatchers.Unconfined by default for subscriber callbacks

Context Propagation

ReactorContext Integration

When kotlinx-coroutines-reactor is in the classpath, these conversions automatically handle ReactorContext propagation:

import kotlinx.coroutines.reactive.*
import kotlinx.coroutines.reactor.*

// ReactorContext is automatically propagated
val publisher = flow.asPublisher()
// Context flows from coroutine to Reactor subscriber context

val flow = reactivePublisher.asFlow()  
// Context flows from Reactor context to coroutine context

Custom Context Injection

The asPublisher() function accepts a custom CoroutineContext:

import kotlinx.coroutines.*
import kotlinx.coroutines.reactive.*

// Custom dispatcher
val publisher = flow.asPublisher(Dispatchers.IO)

// Custom context elements
val customContext = Dispatchers.Default + 
                   CoroutineName("MyFlowPublisher") +
                   CoroutineExceptionHandler { _, throwable ->
                       println("Unhandled exception: $throwable")
                   }

val contextualPublisher = flow.asPublisher(customContext)

Error Handling

Both conversions properly handle and propagate errors:

  • Flow exceptions are propagated to reactive subscribers via onError
  • Publisher errors are propagated as flow exceptions
  • Cancellation is properly handled in both directions
  • Resource cleanup is guaranteed even on failures

Error Handling Examples:

import kotlinx.coroutines.reactive.*
import kotlinx.coroutines.flow.*

// Flow error handling
val errorFlow = flow {
    emit("value1")
    throw RuntimeException("Flow error")
}

val errorPublisher = errorFlow.asPublisher()
// Subscriber will receive "value1" then onError with RuntimeException

// Publisher error handling  
val errorPublisher: Publisher<String> = // ... publisher that emits error
val errorFlow = errorPublisher.asFlow()

try {
    errorFlow.collect { value ->
        println(value)
    }
} catch (e: Exception) {
    println("Caught flow exception: ${e.message}")
}

Type Constraints

Both conversion functions require non-nullable types (T : Any) to comply with reactive streams specification which prohibits null values.

// Valid - non-nullable types
val stringFlow: Flow<String> = publisher.asFlow()
val stringPublisher: Publisher<String> = flow.asPublisher()

// Invalid - nullable types not supported
// val nullableFlow: Flow<String?> = publisher.asFlow() // Compilation error

Legacy/Deprecated Functions

The following functions are deprecated but still available for backward compatibility:

Deprecated Flow Conversions

/**
 * @deprecated Replaced in favor of ReactiveFlow extension, please import kotlinx.coroutines.reactive.* instead of kotlinx.coroutines.reactive.FlowKt
 */
@Deprecated("Replaced in favor of ReactiveFlow extension, please import kotlinx.coroutines.reactive.* instead of kotlinx.coroutines.reactive.FlowKt", level = DeprecationLevel.HIDDEN)
fun <T : Any> Publisher<T>.asFlowDeprecated(): Flow<T>

/**
 * @deprecated Replaced in favor of ReactiveFlow extension, please import kotlinx.coroutines.reactive.* instead of kotlinx.coroutines.reactive.FlowKt
 */
@Deprecated("Replaced in favor of ReactiveFlow extension, please import kotlinx.coroutines.reactive.* instead of kotlinx.coroutines.reactive.FlowKt", level = DeprecationLevel.HIDDEN)
fun <T : Any> Flow<T>.asPublisherDeprecated(): Publisher<T>

/**
 * @deprecated batchSize parameter is deprecated, use .buffer() instead to control the backpressure
 */
@Deprecated("batchSize parameter is deprecated, use .buffer() instead to control the backpressure", level = DeprecationLevel.HIDDEN)
fun <T : Any> Publisher<T>.asFlow(batchSize: Int): Flow<T>

Deprecated Channel Conversions

/**
 * @deprecated Deprecated in the favour of consumeAsFlow()
 */
@Deprecated("Deprecated in the favour of consumeAsFlow()", level = DeprecationLevel.HIDDEN)
fun <T> ReceiveChannel<T>.asPublisher(context: CoroutineContext = EmptyCoroutineContext): Publisher<T>

/**
 * @deprecated Transforming publisher to channel is deprecated, use asFlow() instead
 */
@Deprecated("Transforming publisher to channel is deprecated, use asFlow() instead", level = DeprecationLevel.HIDDEN)
fun <T> Publisher<T>.openSubscription(request: Int = 1): ReceiveChannel<T>

Install with Tessl CLI

npx tessl i tessl/maven-org-jetbrains-kotlinx--kotlinx-coroutines-reactive

docs

cold-publisher-creation.md

flow-publisher-conversion.md

index.md

publisher-consumption.md

tile.json