CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-jetbrains-kotlinx--kotlinx-coroutines-reactive

Kotlin coroutines integration utilities for Reactive Streams specification providing seamless interoperability between coroutines and reactive programming paradigms

Pending
Overview
Eval results
Files

cold-publisher-creation.mddocs/

Cold Publisher Creation

Coroutine builder for creating cold reactive Publishers that execute a coroutine block on each subscription. This enables creating reactive streams from coroutines with proper back-pressure handling, subscription lifecycle management, and seamless integration with reactive frameworks.

Capabilities

Publish Coroutine Builder

Creates a cold reactive Publisher that runs a coroutine block for each subscription.

/**
 * Creates a cold reactive Publisher that runs a given block in a coroutine.
 *
 * Every time the returned flux is subscribed, it starts a new coroutine in the specified context.
 * The coroutine emits (via Subscriber.onNext) values with send, completes (via Subscriber.onComplete) 
 * when the coroutine completes or channel is explicitly closed, and emits errors (via Subscriber.onError) 
 * if the coroutine throws an exception or closes channel with a cause.
 * Unsubscribing cancels the running coroutine.
 *
 * Invocations of send are suspended appropriately when subscribers apply back-pressure and to
 * ensure that onNext is not invoked concurrently.
 *
 * Coroutine context can be specified with context argument.
 * If the context does not have any dispatcher nor any other ContinuationInterceptor, then Dispatchers.Default is used.
 *
 * Note: This is an experimental api. Behaviour of publishers that work as children in a parent scope with respect
 * to cancellation and error handling may change in the future.
 *
 * @throws IllegalArgumentException if the provided context contains a Job instance.
 */
fun <T> publish(
    context: CoroutineContext = EmptyCoroutineContext,
    block: suspend ProducerScope<T>.() -> Unit
): Publisher<T>

Basic Usage Examples:

import kotlinx.coroutines.reactive.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.ProducerScope

// Simple value emission
val simplePublisher = publish<String> {
    send("Hello")
    send("World")
    send("Reactive")
}

// Subscribe to the publisher
simplePublisher.subscribe(object : Subscriber<String> {
    override fun onSubscribe(s: Subscription) {
        s.request(Long.MAX_VALUE)
    }
    
    override fun onNext(t: String) {
        println("Received: $t")
    }
    
    override fun onError(t: Throwable) {
        println("Error: ${t.message}")
    }
    
    override fun onComplete() {
        println("Completed")
    }
})

// Time-based emission
val timedPublisher = publish<Int> {
    repeat(5) { i ->
        send(i)
        delay(1000) // Emit every second
    }
}

// Conditional emission
val conditionalPublisher = publish<String> {
    val data = fetchDataFromApi()
    if (data.isNotEmpty()) {
        data.forEach { item ->
            send(item.toString())
        }
    } else {
        close(RuntimeException("No data available"))
    }
}

Producer Scope

The ProducerScope provides the execution context for the publisher coroutine.

interface ProducerScope<in E> : CoroutineScope, SendChannel<E> {
    val channel: SendChannel<E>
    
    // Inherited from SendChannel
    suspend fun send(element: E)
    fun trySend(element: E): ChannelResult<Unit>
    fun close(cause: Throwable? = null): Boolean
    val isClosedForSend: Boolean
}

ProducerScope Usage:

import kotlinx.coroutines.reactive.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

val publisher = publish<Data> {
    // Access to coroutine context
    println("Publisher started in: ${coroutineContext[CoroutineDispatcher]}")
    
    try {
        // Send values
        val items = generateItems()
        items.forEach { item ->
            send(item) // Suspends if back-pressure applied
        }
        
        // Check if channel is still open
        if (!isClosedForSend) {
            send(finalItem)
        }
        
    } catch (e: Exception) {
        // Close with error
        close(e)
        return@publish
    }
    
    // Normal completion - close() called automatically
}

Context and Dispatchers

Custom Context

Specify execution context for the publisher coroutine:

import kotlinx.coroutines.reactive.*
import kotlinx.coroutines.*

// Custom dispatcher
val ioPublisher = publish(Dispatchers.IO) {
    val data = performIOOperation()
    send(data)
}

// Custom context with multiple elements
val customContext = Dispatchers.Default + 
                   CoroutineName("DataPublisher") +
                   CoroutineExceptionHandler { _, throwable ->
                       println("Unhandled publisher exception: $throwable")
                   }

val contextualPublisher = publish(customContext) {
    processData()
}

// Job context not allowed
try {
    val invalidPublisher = publish(Job()) { // Throws IllegalArgumentException
        send("This won't work")
    }
} catch (e: IllegalArgumentException) {
    println("Cannot provide Job in context: ${e.message}")
}

Default Context Behavior

  • If no dispatcher is specified, Dispatchers.Default is used
  • Context is applied to each new coroutine started per subscription
  • Global scope is used if no parent scope is available

Back-pressure and Flow Control

The publish builder handles back-pressure automatically:

import kotlinx.coroutines.reactive.*
import kotlinx.coroutines.*

val backPressurePublisher = publish<Int> {
    repeat(10000) { i ->
        send(i) // Suspends when subscriber can't keep up
        // Only proceeds when subscriber requests more
    }
}

// Subscriber controls the flow
backPressurePublisher.subscribe(object : Subscriber<Int> {
    private lateinit var subscription: Subscription
    
    override fun onSubscribe(s: Subscription) {
        subscription = s
        s.request(1) // Request one item at a time
    }
    
    override fun onNext(t: Int) {
        println("Processing: $t")
        // Simulate slow processing
        Thread.sleep(100)
        subscription.request(1) // Request next item
    }
    
    override fun onError(t: Throwable) {
        println("Error: ${t.message}")
    }
    
    override fun onComplete() {
        println("Completed")
    }
})

Cold Publisher Semantics

Each subscription creates a new coroutine execution:

import kotlinx.coroutines.reactive.*
import kotlinx.coroutines.*

var executionCount = 0

val coldPublisher = publish<String> {
    val id = ++executionCount
    println("Execution $id started")
    
    send("Message from execution $id")
    delay(1000)
    send("Final message from execution $id")
    
    println("Execution $id completed")
}

// First subscription
coldPublisher.subscribe(subscriber1) // Prints: "Execution 1 started"

// Second subscription  
coldPublisher.subscribe(subscriber2) // Prints: "Execution 2 started"

// Each subscriber gets independent execution

Error Handling and Completion

Normal Completion

Publisher completes when the coroutine block finishes normally:

val completingPublisher = publish<Int> {
    repeat(3) { i ->
        send(i)
    }
    // Completes normally - onComplete() called on subscriber
}

Error Completion

Publisher signals error when exception is thrown or channel is closed with cause:

val errorPublisher = publish<String> {
    send("Before error")
    
    if (someCondition) {
        throw RuntimeException("Something went wrong")
        // onError() called on subscriber
    }
    
    // Alternative: close with cause
    close(IllegalStateException("Invalid state"))
}

Cancellation Handling

Publisher handles subscription cancellation properly:

val cancellablePublisher = publish<Long> {
    try {
        var counter = 0L
        while (true) {
            send(counter++)
            delay(500)
        }
    } catch (e: CancellationException) {
        println("Publisher was cancelled")
        throw e // Re-throw cancellation
    } finally {
        println("Publisher cleanup")
    }
}

// Subscription can be cancelled
val subscription = // ... get subscription from onSubscribe
subscription.cancel() // Cancels the coroutine

Integration with Other Libraries

RxJava Integration

import kotlinx.coroutines.reactive.*
import io.reactivex.rxjava3.core.Flowable

val coroutinePublisher = publish<String> {
    send("From coroutine")
    delay(1000)
    send("After delay")
}

// Convert to RxJava Flowable
val flowable = Flowable.fromPublisher(coroutinePublisher)

Reactor Integration

import kotlinx.coroutines.reactive.*
import reactor.core.publisher.Flux

val coroutinePublisher = publish<Int> {
    repeat(5) { i ->
        send(i * i)
        delay(100)
    }
}

// Convert to Reactor Flux
val flux = Flux.from(coroutinePublisher)

Legacy/Deprecated Functions

The following function is deprecated but still available for backward compatibility:

/**
 * @deprecated CoroutineScope.publish is deprecated in favour of top-level publish
 */
@Deprecated("CoroutineScope.publish is deprecated in favour of top-level publish", level = DeprecationLevel.HIDDEN)
fun <T> CoroutineScope.publish(
    context: CoroutineContext = EmptyCoroutineContext,
    block: suspend ProducerScope<T>.() -> Unit
): Publisher<T>

Performance Considerations

  • Each subscription creates a new coroutine - consider caching for expensive operations
  • Use appropriate dispatchers for I/O vs CPU-bound operations
  • Back-pressure is handled efficiently through channel mechanics
  • Memory usage scales with the number of concurrent subscriptions

Install with Tessl CLI

npx tessl i tessl/maven-org-jetbrains-kotlinx--kotlinx-coroutines-reactive

docs

cold-publisher-creation.md

flow-publisher-conversion.md

index.md

publisher-consumption.md

tile.json