Kotlin coroutines integration utilities for Reactive Streams specification providing seamless interoperability between coroutines and reactive programming paradigms
npx @tessl/cli install tessl/maven-org-jetbrains-kotlinx--kotlinx-coroutines-reactive@1.10.00
# Kotlinx Coroutines Reactive
1
2
Kotlin coroutines integration utilities for Reactive Streams specification providing seamless interoperability between coroutines and reactive programming paradigms. This library enables developers to bridge coroutines with reactive frameworks like RxJava, Reactor, or any Reactive Streams-compliant library while maintaining performance and correctness guarantees.
3
4
## Package Information
5
6
- **Package Name**: kotlinx-coroutines-reactive
7
- **Package Type**: maven
8
- **Language**: Kotlin
9
- **Installation**: Add dependency to build.gradle.kts:
10
```kotlin
11
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactive:1.10.2")
12
```
13
14
## Core Imports
15
16
```kotlin
17
import kotlinx.coroutines.reactive.*
18
import org.reactivestreams.Publisher
19
import kotlinx.coroutines.flow.Flow
20
```
21
22
## Basic Usage
23
24
```kotlin
25
import kotlinx.coroutines.reactive.*
26
import kotlinx.coroutines.flow.*
27
import org.reactivestreams.Publisher
28
29
// Convert Publisher to Flow
30
val publisher: Publisher<String> = // ... some reactive publisher
31
val flow: Flow<String> = publisher.asFlow()
32
33
// Convert Flow to Publisher
34
val sourceFlow = flowOf("hello", "world", "reactive")
35
val convertedPublisher: Publisher<String> = sourceFlow.asPublisher()
36
37
// Await values from publishers
38
val firstValue = publisher.awaitFirst()
39
val lastValue = publisher.awaitLast()
40
val singleValue = publisher.awaitSingle()
41
42
// Collect all values from a publisher
43
publisher.collect { value ->
44
println("Received: $value")
45
}
46
47
// Create cold reactive publishers from coroutines
48
val coldPublisher = publish {
49
send("first")
50
delay(100)
51
send("second")
52
delay(100)
53
send("third")
54
}
55
```
56
57
## Architecture
58
59
Kotlinx-coroutines-reactive is built around several key integration patterns:
60
61
- **Flow/Publisher Conversion**: Bidirectional adapters maintaining TCK compliance and proper back-pressure handling
62
- **Suspending Extensions**: Coroutine-based await functions for consuming reactive streams without blocking threads
63
- **Cold Publisher Builder**: `publish` coroutine builder creating reactive publishers that start on subscription
64
- **Context Propagation**: Integration with ReactorContext when kotlinx-coroutines-reactor is present
65
- **Back-pressure Management**: Proper handling of reactive streams demand and flow control mechanisms
66
67
## Capabilities
68
69
### Publisher Consumption
70
71
Suspending extension functions for consuming values from reactive Publishers without blocking threads. These functions integrate with coroutine cancellation and provide various consumption patterns.
72
73
```kotlin { .api }
74
suspend fun <T> Publisher<T>.awaitFirst(): T
75
suspend fun <T> Publisher<T>.awaitFirstOrDefault(default: T): T
76
suspend fun <T> Publisher<T>.awaitFirstOrNull(): T?
77
suspend fun <T> Publisher<T>.awaitFirstOrElse(defaultValue: () -> T): T
78
suspend fun <T> Publisher<T>.awaitLast(): T
79
suspend fun <T> Publisher<T>.awaitSingle(): T
80
suspend inline fun <T> Publisher<T>.collect(action: (T) -> Unit): Unit
81
```
82
83
[Publisher Consumption](./publisher-consumption.md)
84
85
### Flow Publisher Conversion
86
87
Bidirectional conversion utilities between Kotlin Flow and Reactive Streams Publisher, maintaining TCK compliance and providing seamless integration between coroutines and reactive paradigms.
88
89
```kotlin { .api }
90
fun <T : Any> Publisher<T>.asFlow(): Flow<T>
91
fun <T : Any> Flow<T>.asPublisher(context: CoroutineContext = EmptyCoroutineContext): Publisher<T>
92
```
93
94
[Flow Publisher Conversion](./flow-publisher-conversion.md)
95
96
### Cold Publisher Creation
97
98
Coroutine builder for creating cold reactive Publishers that execute a coroutine block on each subscription, with proper back-pressure handling and subscription lifecycle management.
99
100
```kotlin { .api }
101
fun <T> publish(
102
context: CoroutineContext = EmptyCoroutineContext,
103
block: suspend ProducerScope<T>.() -> Unit
104
): Publisher<T>
105
```
106
107
[Cold Publisher Creation](./cold-publisher-creation.md)
108
109
## Types
110
111
```kotlin { .api }
112
interface ProducerScope<in E> : CoroutineScope, SendChannel<E> {
113
val channel: SendChannel<E>
114
}
115
116
interface ContextInjector {
117
fun <T> injectCoroutineContext(publisher: Publisher<T>, coroutineContext: CoroutineContext): Publisher<T>
118
}
119
120
class FlowSubscription<T>(
121
val flow: Flow<T>,
122
val subscriber: Subscriber<in T>,
123
context: CoroutineContext
124
) : Subscription, AbstractCoroutine<Unit>
125
126
class PublisherCoroutine<in T>(
127
parentContext: CoroutineContext,
128
subscriber: Subscriber<T>,
129
exceptionOnCancelHandler: (Throwable, CoroutineContext) -> Unit
130
) : AbstractCoroutine<Unit>, ProducerScope<T>, Subscription
131
```
132
133
## Error Handling
134
135
All suspending functions in this library are cancellable and will cancel their reactive stream subscriptions when the coroutine is cancelled.
136
137
### Publisher Consumption Functions
138
139
Publisher consumption functions may throw:
140
141
- `NoSuchElementException` - when no elements are emitted for functions requiring at least one element (`awaitFirst`, `awaitLast`, `awaitSingle`)
142
- `IllegalArgumentException` - when multiple elements are emitted for functions requiring exactly one element (`awaitSingle`)
143
- `CancellationException` - when the coroutine is cancelled during execution
144
- Any exception thrown by the underlying reactive publisher is propagated as-is
145
146
### Publisher Creation Functions
147
148
The `publish` builder may throw:
149
150
- `IllegalArgumentException` - if the provided context contains a Job instance, as the publisher lifecycle should be managed via subscription
151
- `NullPointerException` - if attempting to emit null values through the publisher (reactive streams specification prohibits null values)
152
153
### Conversion Functions
154
155
Flow/Publisher conversions handle errors as follows:
156
157
- Flow exceptions are propagated to reactive subscribers via `Subscriber.onError`
158
- Publisher errors are propagated as flow exceptions during collection
159
- Resource cleanup is guaranteed even when errors occur
160
- Cancellation is properly handled in both directions with appropriate subscription cancellation