0
# Flow Publisher Conversion
1
2
Bidirectional conversion utilities between Kotlin Flow and Reactive Streams Publisher, maintaining TCK compliance and providing seamless integration between coroutines and reactive paradigms. These conversions preserve back-pressure, context propagation, and error handling semantics.
3
4
## Capabilities
5
6
### Publisher to Flow Conversion
7
8
Transforms a reactive Publisher into a Kotlin Flow with configurable back-pressure handling.
9
10
```kotlin { .api }
11
/**
12
* Transforms the given reactive Publisher into Flow.
13
* Use the buffer operator on the resulting flow to specify the size of the back-pressure.
14
* In effect, it specifies the value of the subscription's request.
15
* The default buffer capacity for a suspending channel is used by default.
16
*
17
* If any of the resulting flow transformations fails, the subscription is immediately cancelled and all the in-flight
18
* elements are discarded.
19
*
20
* This function is integrated with ReactorContext from kotlinx-coroutines-reactor module,
21
* see its documentation for additional details.
22
*/
23
fun <T : Any> Publisher<T>.asFlow(): Flow<T>
24
```
25
26
**Usage Examples:**
27
28
```kotlin
29
import kotlinx.coroutines.reactive.*
30
import kotlinx.coroutines.flow.*
31
import org.reactivestreams.Publisher
32
33
// Basic conversion
34
val publisher: Publisher<String> = // ... some reactive publisher
35
val flow: Flow<String> = publisher.asFlow()
36
37
// Process the flow
38
flow.collect { value ->
39
println("Received: $value")
40
}
41
42
// With back-pressure control
43
val bufferedFlow = publisher.asFlow().buffer(capacity = 64)
44
bufferedFlow.collect { value ->
45
// Process with larger buffer
46
processValue(value)
47
}
48
49
// Chain with other flow operations
50
val transformedFlow = publisher.asFlow()
51
.filter { it.isNotEmpty() }
52
.map { it.uppercase() }
53
.take(10)
54
55
transformedFlow.collect { value ->
56
println("Transformed: $value")
57
}
58
```
59
60
### Flow to Publisher Conversion
61
62
Transforms a Kotlin Flow into a reactive specification compliant Publisher.
63
64
```kotlin { .api }
65
/**
66
* Transforms the given flow into a reactive specification compliant Publisher.
67
*
68
* This function is integrated with ReactorContext from kotlinx-coroutines-reactor module,
69
* see its documentation for additional details.
70
*
71
* An optional context can be specified to control the execution context of calls to the Subscriber methods.
72
* A CoroutineDispatcher can be set to confine them to a specific thread; various ThreadContextElement can be set to
73
* inject additional context into the caller thread. By default, the Unconfined dispatcher
74
* is used, so calls are performed from an arbitrary thread.
75
*/
76
fun <T : Any> Flow<T>.asPublisher(context: CoroutineContext = EmptyCoroutineContext): Publisher<T>
77
```
78
79
**Usage Examples:**
80
81
```kotlin
82
import kotlinx.coroutines.reactive.*
83
import kotlinx.coroutines.flow.*
84
import kotlinx.coroutines.*
85
import kotlin.coroutines.CoroutineContext
86
87
// Basic conversion
88
val flow = flowOf("hello", "world", "reactive")
89
val publisher: Publisher<String> = flow.asPublisher()
90
91
// Subscribe to the publisher
92
publisher.subscribe(object : Subscriber<String> {
93
override fun onSubscribe(s: Subscription) {
94
s.request(Long.MAX_VALUE)
95
}
96
97
override fun onNext(t: String) {
98
println("Publisher emitted: $t")
99
}
100
101
override fun onError(t: Throwable) {
102
println("Error: ${t.message}")
103
}
104
105
override fun onComplete() {
106
println("Publisher completed")
107
}
108
})
109
110
// With custom context
111
val customContext = Dispatchers.IO + CoroutineName("FlowPublisher")
112
val contextualPublisher = flow.asPublisher(customContext)
113
114
// Convert complex flow
115
val complexFlow = flow {
116
repeat(5) { i ->
117
emit("item-$i")
118
delay(100)
119
}
120
}.map { it.uppercase() }
121
.filter { it.contains("ITEM") }
122
123
val complexPublisher = complexFlow.asPublisher()
124
```
125
126
## Back-pressure Handling
127
128
### Publisher to Flow
129
130
The `asFlow()` conversion respects reactive streams back-pressure:
131
132
- Uses `Channel.BUFFERED` capacity by default
133
- Can be controlled with `.buffer()` operator on the resulting flow
134
- Properly handles subscriber demand and publisher supply rates
135
- Cancels subscription when flow collection is cancelled
136
137
**Back-pressure Configuration:**
138
139
```kotlin
140
import kotlinx.coroutines.reactive.*
141
import kotlinx.coroutines.flow.*
142
import kotlinx.coroutines.channels.Channel
143
144
val publisher: Publisher<Data> = // ... high-throughput publisher
145
146
// Different back-pressure strategies
147
val rendezvousFlow = publisher.asFlow().buffer(Channel.RENDEZVOUS) // No buffering
148
val bufferedFlow = publisher.asFlow().buffer(64) // Fixed buffer
149
val unlimitedFlow = publisher.asFlow().buffer(Channel.UNLIMITED) // Unlimited buffer
150
151
// Drop strategies
152
val dropOldestFlow = publisher.asFlow().buffer(10, BufferOverflow.DROP_OLDEST)
153
val dropLatestFlow = publisher.asFlow().buffer(10, BufferOverflow.DROP_LATEST)
154
```
155
156
### Flow to Publisher
157
158
The `asPublisher()` conversion provides TCK-compliant back-pressure:
159
160
- Respects subscriber demand signals
161
- Suspends flow collection when demand is exhausted
162
- Properly handles cancellation and error propagation
163
- Uses `Dispatchers.Unconfined` by default for subscriber callbacks
164
165
## Context Propagation
166
167
### ReactorContext Integration
168
169
When `kotlinx-coroutines-reactor` is in the classpath, these conversions automatically handle ReactorContext propagation:
170
171
```kotlin
172
import kotlinx.coroutines.reactive.*
173
import kotlinx.coroutines.reactor.*
174
175
// ReactorContext is automatically propagated
176
val publisher = flow.asPublisher()
177
// Context flows from coroutine to Reactor subscriber context
178
179
val flow = reactivePublisher.asFlow()
180
// Context flows from Reactor context to coroutine context
181
```
182
183
### Custom Context Injection
184
185
The `asPublisher()` function accepts a custom CoroutineContext:
186
187
```kotlin
188
import kotlinx.coroutines.*
189
import kotlinx.coroutines.reactive.*
190
191
// Custom dispatcher
192
val publisher = flow.asPublisher(Dispatchers.IO)
193
194
// Custom context elements
195
val customContext = Dispatchers.Default +
196
CoroutineName("MyFlowPublisher") +
197
CoroutineExceptionHandler { _, throwable ->
198
println("Unhandled exception: $throwable")
199
}
200
201
val contextualPublisher = flow.asPublisher(customContext)
202
```
203
204
## Error Handling
205
206
Both conversions properly handle and propagate errors:
207
208
- Flow exceptions are propagated to reactive subscribers via `onError`
209
- Publisher errors are propagated as flow exceptions
210
- Cancellation is properly handled in both directions
211
- Resource cleanup is guaranteed even on failures
212
213
**Error Handling Examples:**
214
215
```kotlin
216
import kotlinx.coroutines.reactive.*
217
import kotlinx.coroutines.flow.*
218
219
// Flow error handling
220
val errorFlow = flow {
221
emit("value1")
222
throw RuntimeException("Flow error")
223
}
224
225
val errorPublisher = errorFlow.asPublisher()
226
// Subscriber will receive "value1" then onError with RuntimeException
227
228
// Publisher error handling
229
val errorPublisher: Publisher<String> = // ... publisher that emits error
230
val errorFlow = errorPublisher.asFlow()
231
232
try {
233
errorFlow.collect { value ->
234
println(value)
235
}
236
} catch (e: Exception) {
237
println("Caught flow exception: ${e.message}")
238
}
239
```
240
241
## Type Constraints
242
243
Both conversion functions require non-nullable types (`T : Any`) to comply with reactive streams specification which prohibits null values.
244
245
```kotlin
246
// Valid - non-nullable types
247
val stringFlow: Flow<String> = publisher.asFlow()
248
val stringPublisher: Publisher<String> = flow.asPublisher()
249
250
// Invalid - nullable types not supported
251
// val nullableFlow: Flow<String?> = publisher.asFlow() // Compilation error
252
```
253
254
## Legacy/Deprecated Functions
255
256
The following functions are deprecated but still available for backward compatibility:
257
258
### Deprecated Flow Conversions
259
260
```kotlin { .api }
261
/**
262
* @deprecated Replaced in favor of ReactiveFlow extension, please import kotlinx.coroutines.reactive.* instead of kotlinx.coroutines.reactive.FlowKt
263
*/
264
@Deprecated("Replaced in favor of ReactiveFlow extension, please import kotlinx.coroutines.reactive.* instead of kotlinx.coroutines.reactive.FlowKt", level = DeprecationLevel.HIDDEN)
265
fun <T : Any> Publisher<T>.asFlowDeprecated(): Flow<T>
266
267
/**
268
* @deprecated Replaced in favor of ReactiveFlow extension, please import kotlinx.coroutines.reactive.* instead of kotlinx.coroutines.reactive.FlowKt
269
*/
270
@Deprecated("Replaced in favor of ReactiveFlow extension, please import kotlinx.coroutines.reactive.* instead of kotlinx.coroutines.reactive.FlowKt", level = DeprecationLevel.HIDDEN)
271
fun <T : Any> Flow<T>.asPublisherDeprecated(): Publisher<T>
272
273
/**
274
* @deprecated batchSize parameter is deprecated, use .buffer() instead to control the backpressure
275
*/
276
@Deprecated("batchSize parameter is deprecated, use .buffer() instead to control the backpressure", level = DeprecationLevel.HIDDEN)
277
fun <T : Any> Publisher<T>.asFlow(batchSize: Int): Flow<T>
278
```
279
280
### Deprecated Channel Conversions
281
282
```kotlin { .api }
283
/**
284
* @deprecated Deprecated in the favour of consumeAsFlow()
285
*/
286
@Deprecated("Deprecated in the favour of consumeAsFlow()", level = DeprecationLevel.HIDDEN)
287
fun <T> ReceiveChannel<T>.asPublisher(context: CoroutineContext = EmptyCoroutineContext): Publisher<T>
288
289
/**
290
* @deprecated Transforming publisher to channel is deprecated, use asFlow() instead
291
*/
292
@Deprecated("Transforming publisher to channel is deprecated, use asFlow() instead", level = DeprecationLevel.HIDDEN)
293
fun <T> Publisher<T>.openSubscription(request: Int = 1): ReceiveChannel<T>
294
```