0
# Cold Publisher Creation
1
2
Coroutine builder for creating cold reactive Publishers that execute a coroutine block on each subscription. This enables creating reactive streams from coroutines with proper back-pressure handling, subscription lifecycle management, and seamless integration with reactive frameworks.
3
4
## Capabilities
5
6
### Publish Coroutine Builder
7
8
Creates a cold reactive Publisher that runs a coroutine block for each subscription.
9
10
```kotlin { .api }
11
/**
12
* Creates a cold reactive Publisher that runs a given block in a coroutine.
13
*
14
* Every time the returned flux is subscribed, it starts a new coroutine in the specified context.
15
* The coroutine emits (via Subscriber.onNext) values with send, completes (via Subscriber.onComplete)
16
* when the coroutine completes or channel is explicitly closed, and emits errors (via Subscriber.onError)
17
* if the coroutine throws an exception or closes channel with a cause.
18
* Unsubscribing cancels the running coroutine.
19
*
20
* Invocations of send are suspended appropriately when subscribers apply back-pressure and to
21
* ensure that onNext is not invoked concurrently.
22
*
23
* Coroutine context can be specified with context argument.
24
* If the context does not have any dispatcher nor any other ContinuationInterceptor, then Dispatchers.Default is used.
25
*
26
* Note: This is an experimental api. Behaviour of publishers that work as children in a parent scope with respect
27
* to cancellation and error handling may change in the future.
28
*
29
* @throws IllegalArgumentException if the provided context contains a Job instance.
30
*/
31
fun <T> publish(
32
context: CoroutineContext = EmptyCoroutineContext,
33
block: suspend ProducerScope<T>.() -> Unit
34
): Publisher<T>
35
```
36
37
**Basic Usage Examples:**
38
39
```kotlin
40
import kotlinx.coroutines.reactive.*
41
import kotlinx.coroutines.*
42
import kotlinx.coroutines.channels.ProducerScope
43
44
// Simple value emission
45
val simplePublisher = publish<String> {
46
send("Hello")
47
send("World")
48
send("Reactive")
49
}
50
51
// Subscribe to the publisher
52
simplePublisher.subscribe(object : Subscriber<String> {
53
override fun onSubscribe(s: Subscription) {
54
s.request(Long.MAX_VALUE)
55
}
56
57
override fun onNext(t: String) {
58
println("Received: $t")
59
}
60
61
override fun onError(t: Throwable) {
62
println("Error: ${t.message}")
63
}
64
65
override fun onComplete() {
66
println("Completed")
67
}
68
})
69
70
// Time-based emission
71
val timedPublisher = publish<Int> {
72
repeat(5) { i ->
73
send(i)
74
delay(1000) // Emit every second
75
}
76
}
77
78
// Conditional emission
79
val conditionalPublisher = publish<String> {
80
val data = fetchDataFromApi()
81
if (data.isNotEmpty()) {
82
data.forEach { item ->
83
send(item.toString())
84
}
85
} else {
86
close(RuntimeException("No data available"))
87
}
88
}
89
```
90
91
### Producer Scope
92
93
The `ProducerScope` provides the execution context for the publisher coroutine.
94
95
```kotlin { .api }
96
interface ProducerScope<in E> : CoroutineScope, SendChannel<E> {
97
val channel: SendChannel<E>
98
99
// Inherited from SendChannel
100
suspend fun send(element: E)
101
fun trySend(element: E): ChannelResult<Unit>
102
fun close(cause: Throwable? = null): Boolean
103
val isClosedForSend: Boolean
104
}
105
```
106
107
**ProducerScope Usage:**
108
109
```kotlin
110
import kotlinx.coroutines.reactive.*
111
import kotlinx.coroutines.*
112
import kotlinx.coroutines.channels.*
113
114
val publisher = publish<Data> {
115
// Access to coroutine context
116
println("Publisher started in: ${coroutineContext[CoroutineDispatcher]}")
117
118
try {
119
// Send values
120
val items = generateItems()
121
items.forEach { item ->
122
send(item) // Suspends if back-pressure applied
123
}
124
125
// Check if channel is still open
126
if (!isClosedForSend) {
127
send(finalItem)
128
}
129
130
} catch (e: Exception) {
131
// Close with error
132
close(e)
133
return@publish
134
}
135
136
// Normal completion - close() called automatically
137
}
138
```
139
140
## Context and Dispatchers
141
142
### Custom Context
143
144
Specify execution context for the publisher coroutine:
145
146
```kotlin
147
import kotlinx.coroutines.reactive.*
148
import kotlinx.coroutines.*
149
150
// Custom dispatcher
151
val ioPublisher = publish(Dispatchers.IO) {
152
val data = performIOOperation()
153
send(data)
154
}
155
156
// Custom context with multiple elements
157
val customContext = Dispatchers.Default +
158
CoroutineName("DataPublisher") +
159
CoroutineExceptionHandler { _, throwable ->
160
println("Unhandled publisher exception: $throwable")
161
}
162
163
val contextualPublisher = publish(customContext) {
164
processData()
165
}
166
167
// Job context not allowed
168
try {
169
val invalidPublisher = publish(Job()) { // Throws IllegalArgumentException
170
send("This won't work")
171
}
172
} catch (e: IllegalArgumentException) {
173
println("Cannot provide Job in context: ${e.message}")
174
}
175
```
176
177
### Default Context Behavior
178
179
- If no dispatcher is specified, `Dispatchers.Default` is used
180
- Context is applied to each new coroutine started per subscription
181
- Global scope is used if no parent scope is available
182
183
## Back-pressure and Flow Control
184
185
The publish builder handles back-pressure automatically:
186
187
```kotlin
188
import kotlinx.coroutines.reactive.*
189
import kotlinx.coroutines.*
190
191
val backPressurePublisher = publish<Int> {
192
repeat(10000) { i ->
193
send(i) // Suspends when subscriber can't keep up
194
// Only proceeds when subscriber requests more
195
}
196
}
197
198
// Subscriber controls the flow
199
backPressurePublisher.subscribe(object : Subscriber<Int> {
200
private lateinit var subscription: Subscription
201
202
override fun onSubscribe(s: Subscription) {
203
subscription = s
204
s.request(1) // Request one item at a time
205
}
206
207
override fun onNext(t: Int) {
208
println("Processing: $t")
209
// Simulate slow processing
210
Thread.sleep(100)
211
subscription.request(1) // Request next item
212
}
213
214
override fun onError(t: Throwable) {
215
println("Error: ${t.message}")
216
}
217
218
override fun onComplete() {
219
println("Completed")
220
}
221
})
222
```
223
224
## Cold Publisher Semantics
225
226
Each subscription creates a new coroutine execution:
227
228
```kotlin
229
import kotlinx.coroutines.reactive.*
230
import kotlinx.coroutines.*
231
232
var executionCount = 0
233
234
val coldPublisher = publish<String> {
235
val id = ++executionCount
236
println("Execution $id started")
237
238
send("Message from execution $id")
239
delay(1000)
240
send("Final message from execution $id")
241
242
println("Execution $id completed")
243
}
244
245
// First subscription
246
coldPublisher.subscribe(subscriber1) // Prints: "Execution 1 started"
247
248
// Second subscription
249
coldPublisher.subscribe(subscriber2) // Prints: "Execution 2 started"
250
251
// Each subscriber gets independent execution
252
```
253
254
## Error Handling and Completion
255
256
### Normal Completion
257
258
Publisher completes when the coroutine block finishes normally:
259
260
```kotlin
261
val completingPublisher = publish<Int> {
262
repeat(3) { i ->
263
send(i)
264
}
265
// Completes normally - onComplete() called on subscriber
266
}
267
```
268
269
### Error Completion
270
271
Publisher signals error when exception is thrown or channel is closed with cause:
272
273
```kotlin
274
val errorPublisher = publish<String> {
275
send("Before error")
276
277
if (someCondition) {
278
throw RuntimeException("Something went wrong")
279
// onError() called on subscriber
280
}
281
282
// Alternative: close with cause
283
close(IllegalStateException("Invalid state"))
284
}
285
```
286
287
### Cancellation Handling
288
289
Publisher handles subscription cancellation properly:
290
291
```kotlin
292
val cancellablePublisher = publish<Long> {
293
try {
294
var counter = 0L
295
while (true) {
296
send(counter++)
297
delay(500)
298
}
299
} catch (e: CancellationException) {
300
println("Publisher was cancelled")
301
throw e // Re-throw cancellation
302
} finally {
303
println("Publisher cleanup")
304
}
305
}
306
307
// Subscription can be cancelled
308
val subscription = // ... get subscription from onSubscribe
309
subscription.cancel() // Cancels the coroutine
310
```
311
312
## Integration with Other Libraries
313
314
### RxJava Integration
315
316
```kotlin
317
import kotlinx.coroutines.reactive.*
318
import io.reactivex.rxjava3.core.Flowable
319
320
val coroutinePublisher = publish<String> {
321
send("From coroutine")
322
delay(1000)
323
send("After delay")
324
}
325
326
// Convert to RxJava Flowable
327
val flowable = Flowable.fromPublisher(coroutinePublisher)
328
```
329
330
### Reactor Integration
331
332
```kotlin
333
import kotlinx.coroutines.reactive.*
334
import reactor.core.publisher.Flux
335
336
val coroutinePublisher = publish<Int> {
337
repeat(5) { i ->
338
send(i * i)
339
delay(100)
340
}
341
}
342
343
// Convert to Reactor Flux
344
val flux = Flux.from(coroutinePublisher)
345
```
346
347
## Legacy/Deprecated Functions
348
349
The following function is deprecated but still available for backward compatibility:
350
351
```kotlin { .api }
352
/**
353
* @deprecated CoroutineScope.publish is deprecated in favour of top-level publish
354
*/
355
@Deprecated("CoroutineScope.publish is deprecated in favour of top-level publish", level = DeprecationLevel.HIDDEN)
356
fun <T> CoroutineScope.publish(
357
context: CoroutineContext = EmptyCoroutineContext,
358
block: suspend ProducerScope<T>.() -> Unit
359
): Publisher<T>
360
```
361
362
## Performance Considerations
363
364
- Each subscription creates a new coroutine - consider caching for expensive operations
365
- Use appropriate dispatchers for I/O vs CPU-bound operations
366
- Back-pressure is handled efficiently through channel mechanics
367
- Memory usage scales with the number of concurrent subscriptions