0
# Publisher Consumption
1
2
Suspending extension functions for consuming values from reactive Publishers without blocking threads. These functions integrate with coroutine cancellation and provide various consumption patterns for different use cases.
3
4
## Capabilities
5
6
### Await First Value
7
8
Suspends until the first value is emitted from the publisher, then returns that value.
9
10
```kotlin { .api }
11
/**
12
* Awaits the first value from the given publisher without blocking the thread and returns the resulting value, or, if
13
* the publisher has produced an error, throws the corresponding exception.
14
*
15
* This suspending function is cancellable.
16
* If the Job of the current coroutine is cancelled while the suspending function is waiting, this
17
* function immediately cancels its Subscription and resumes with CancellationException.
18
*
19
* @throws NoSuchElementException if the publisher does not emit any value
20
*/
21
suspend fun <T> Publisher<T>.awaitFirst(): T
22
```
23
24
**Usage Example:**
25
26
```kotlin
27
import kotlinx.coroutines.reactive.*
28
import kotlinx.coroutines.*
29
30
runBlocking {
31
val publisher: Publisher<String> = // ... some reactive publisher
32
try {
33
val firstValue = publisher.awaitFirst()
34
println("First value: $firstValue")
35
} catch (e: NoSuchElementException) {
36
println("Publisher emitted no values")
37
}
38
}
39
```
40
41
### Await First Value with Default
42
43
Suspends until the first value is emitted, or returns a default value if no values are emitted.
44
45
```kotlin { .api }
46
/**
47
* Awaits the first value from the given publisher, or returns the default value if none is emitted, without blocking
48
* the thread, and returns the resulting value, or, if this publisher has produced an error, throws the corresponding
49
* exception.
50
*
51
* This suspending function is cancellable.
52
* If the Job of the current coroutine is cancelled while the suspending function is waiting, this
53
* function immediately cancels its Subscription and resumes with CancellationException.
54
*/
55
suspend fun <T> Publisher<T>.awaitFirstOrDefault(default: T): T
56
```
57
58
### Await First Value or Null
59
60
Suspends until the first value is emitted, or returns null if no values are emitted.
61
62
```kotlin { .api }
63
/**
64
* Awaits the first value from the given publisher, or returns null if none is emitted, without blocking the thread,
65
* and returns the resulting value, or, if this publisher has produced an error, throws the corresponding exception.
66
*
67
* This suspending function is cancellable.
68
* If the Job of the current coroutine is cancelled while the suspending function is waiting, this
69
* function immediately cancels its Subscription and resumes with CancellationException.
70
*/
71
suspend fun <T> Publisher<T>.awaitFirstOrNull(): T?
72
```
73
74
### Await First Value or Computed Default
75
76
Suspends until the first value is emitted, or calls a function to compute a default value if no values are emitted.
77
78
```kotlin { .api }
79
/**
80
* Awaits the first value from the given publisher, or calls defaultValue to get a value if none is emitted, without
81
* blocking the thread, and returns the resulting value, or, if this publisher has produced an error, throws the
82
* corresponding exception.
83
*
84
* This suspending function is cancellable.
85
* If the Job of the current coroutine is cancelled while the suspending function is waiting, this
86
* function immediately cancels its Subscription and resumes with CancellationException.
87
*/
88
suspend fun <T> Publisher<T>.awaitFirstOrElse(defaultValue: () -> T): T
89
```
90
91
**Usage Example:**
92
93
```kotlin
94
import kotlinx.coroutines.reactive.*
95
import kotlinx.coroutines.*
96
97
runBlocking {
98
val emptyPublisher: Publisher<String> = // ... publisher that emits no values
99
100
// Using default value
101
val withDefault = emptyPublisher.awaitFirstOrDefault("default")
102
103
// Using null fallback
104
val withNull = emptyPublisher.awaitFirstOrNull()
105
106
// Using computed default
107
val withComputed = emptyPublisher.awaitFirstOrElse {
108
"computed at ${System.currentTimeMillis()}"
109
}
110
}
111
```
112
113
### Await Last Value
114
115
Suspends until all values are emitted, then returns the last value.
116
117
```kotlin { .api }
118
/**
119
* Awaits the last value from the given publisher without blocking the thread and
120
* returns the resulting value, or, if this publisher has produced an error, throws the corresponding exception.
121
*
122
* This suspending function is cancellable.
123
* If the Job of the current coroutine is cancelled while the suspending function is waiting, this
124
* function immediately cancels its Subscription and resumes with CancellationException.
125
*
126
* @throws NoSuchElementException if the publisher does not emit any value
127
*/
128
suspend fun <T> Publisher<T>.awaitLast(): T
129
```
130
131
**Usage Example:**
132
133
```kotlin
134
import kotlinx.coroutines.reactive.*
135
import kotlinx.coroutines.*
136
137
runBlocking {
138
val publisher: Publisher<Int> = // ... publisher that emits 1, 2, 3, 4, 5
139
val lastValue = publisher.awaitLast() // Returns 5
140
println("Last value: $lastValue")
141
}
142
```
143
144
### Await Single Value
145
146
Suspends until exactly one value is emitted, validating that no more than one value is produced.
147
148
```kotlin { .api }
149
/**
150
* Awaits the single value from the given publisher without blocking the thread and returns the resulting value, or,
151
* if this publisher has produced an error, throws the corresponding exception.
152
*
153
* This suspending function is cancellable.
154
* If the Job of the current coroutine is cancelled while the suspending function is waiting, this
155
* function immediately cancels its Subscription and resumes with CancellationException.
156
*
157
* @throws NoSuchElementException if the publisher does not emit any value
158
* @throws IllegalArgumentException if the publisher emits more than one value
159
*/
160
suspend fun <T> Publisher<T>.awaitSingle(): T
161
```
162
163
**Usage Example:**
164
165
```kotlin
166
import kotlinx.coroutines.reactive.*
167
import kotlinx.coroutines.*
168
169
runBlocking {
170
val singleValuePublisher: Publisher<String> = // ... publisher that emits exactly one value
171
try {
172
val singleValue = singleValuePublisher.awaitSingle()
173
println("Single value: $singleValue")
174
} catch (e: NoSuchElementException) {
175
println("Publisher emitted no values")
176
} catch (e: IllegalArgumentException) {
177
println("Publisher emitted more than one value")
178
}
179
}
180
```
181
182
### Collect Publisher Values
183
184
Subscribes to the publisher and performs an action for each received element.
185
186
```kotlin { .api }
187
/**
188
* Subscribes to this Publisher and performs the specified action for each received element.
189
*
190
* If action throws an exception at some point, the subscription is cancelled, and the exception is rethrown from
191
* collect. Also, if the publisher signals an error, that error is rethrown from collect.
192
*/
193
suspend inline fun <T> Publisher<T>.collect(action: (T) -> Unit): Unit
194
```
195
196
**Usage Example:**
197
198
```kotlin
199
import kotlinx.coroutines.reactive.*
200
import kotlinx.coroutines.*
201
202
runBlocking {
203
val publisher: Publisher<String> = // ... some reactive publisher
204
205
// Collect all values and process them
206
publisher.collect { value ->
207
println("Received: $value")
208
// Process each value as it arrives
209
}
210
}
211
```
212
213
### Legacy/Deprecated Functions
214
215
The following functions are deprecated but still available for backward compatibility:
216
217
```kotlin { .api }
218
/**
219
* @deprecated Deprecated without a replacement due to its name incorrectly conveying the behavior.
220
* Please consider using awaitFirstOrDefault().
221
*/
222
@Deprecated("Deprecated without a replacement due to its name incorrectly conveying the behavior. Please consider using awaitFirstOrDefault().", level = DeprecationLevel.HIDDEN)
223
suspend fun <T> Publisher<T>.awaitSingleOrDefault(default: T): T
224
225
/**
226
* @deprecated Deprecated without a replacement due to its name incorrectly conveying the behavior.
227
* There is a specialized version for Reactor's Mono, please use that where applicable.
228
* Alternatively, please consider using awaitFirstOrNull().
229
*/
230
@Deprecated("Deprecated without a replacement due to its name incorrectly conveying the behavior. There is a specialized version for Reactor's Mono, please use that where applicable. Alternatively, please consider using awaitFirstOrNull().", level = DeprecationLevel.HIDDEN)
231
suspend fun <T> Publisher<T>.awaitSingleOrNull(): T?
232
233
/**
234
* @deprecated Deprecated without a replacement due to its name incorrectly conveying the behavior.
235
* Please consider using awaitFirstOrElse().
236
*/
237
@Deprecated("Deprecated without a replacement due to its name incorrectly conveying the behavior. Please consider using awaitFirstOrElse().", level = DeprecationLevel.HIDDEN)
238
suspend fun <T> Publisher<T>.awaitSingleOrElse(defaultValue: () -> T): T
239
```
240
241
## Cancellation Behavior
242
243
All await functions support coroutine cancellation:
244
245
- When the coroutine is cancelled, the reactive stream subscription is immediately cancelled
246
- The function resumes with `CancellationException`
247
- Any pending reactive stream operations are cleaned up properly
248
249
## Error Propagation
250
251
Errors from the reactive publisher are propagated as exceptions:
252
253
- Publisher errors are thrown as-is from the await functions
254
- `NoSuchElementException` is thrown when no values are emitted for functions requiring values
255
- `IllegalArgumentException` is thrown when multiple values are emitted for single-value functions
256
257
## Thread Safety
258
259
These functions are thread-safe and designed to be used from any coroutine context. They do not block threads and properly handle reactive streams threading requirements.