or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cold-publisher-creation.mdflow-publisher-conversion.mdindex.mdpublisher-consumption.md

publisher-consumption.mddocs/

0

# Publisher Consumption

1

2

Suspending extension functions for consuming values from reactive Publishers without blocking threads. These functions integrate with coroutine cancellation and provide various consumption patterns for different use cases.

3

4

## Capabilities

5

6

### Await First Value

7

8

Suspends until the first value is emitted from the publisher, then returns that value.

9

10

```kotlin { .api }

11

/**

12

* Awaits the first value from the given publisher without blocking the thread and returns the resulting value, or, if

13

* the publisher has produced an error, throws the corresponding exception.

14

*

15

* This suspending function is cancellable.

16

* If the Job of the current coroutine is cancelled while the suspending function is waiting, this

17

* function immediately cancels its Subscription and resumes with CancellationException.

18

*

19

* @throws NoSuchElementException if the publisher does not emit any value

20

*/

21

suspend fun <T> Publisher<T>.awaitFirst(): T

22

```

23

24

**Usage Example:**

25

26

```kotlin

27

import kotlinx.coroutines.reactive.*

28

import kotlinx.coroutines.*

29

30

runBlocking {

31

val publisher: Publisher<String> = // ... some reactive publisher

32

try {

33

val firstValue = publisher.awaitFirst()

34

println("First value: $firstValue")

35

} catch (e: NoSuchElementException) {

36

println("Publisher emitted no values")

37

}

38

}

39

```

40

41

### Await First Value with Default

42

43

Suspends until the first value is emitted, or returns a default value if no values are emitted.

44

45

```kotlin { .api }

46

/**

47

* Awaits the first value from the given publisher, or returns the default value if none is emitted, without blocking

48

* the thread, and returns the resulting value, or, if this publisher has produced an error, throws the corresponding

49

* exception.

50

*

51

* This suspending function is cancellable.

52

* If the Job of the current coroutine is cancelled while the suspending function is waiting, this

53

* function immediately cancels its Subscription and resumes with CancellationException.

54

*/

55

suspend fun <T> Publisher<T>.awaitFirstOrDefault(default: T): T

56

```

57

58

### Await First Value or Null

59

60

Suspends until the first value is emitted, or returns null if no values are emitted.

61

62

```kotlin { .api }

63

/**

64

* Awaits the first value from the given publisher, or returns null if none is emitted, without blocking the thread,

65

* and returns the resulting value, or, if this publisher has produced an error, throws the corresponding exception.

66

*

67

* This suspending function is cancellable.

68

* If the Job of the current coroutine is cancelled while the suspending function is waiting, this

69

* function immediately cancels its Subscription and resumes with CancellationException.

70

*/

71

suspend fun <T> Publisher<T>.awaitFirstOrNull(): T?

72

```

73

74

### Await First Value or Computed Default

75

76

Suspends until the first value is emitted, or calls a function to compute a default value if no values are emitted.

77

78

```kotlin { .api }

79

/**

80

* Awaits the first value from the given publisher, or calls defaultValue to get a value if none is emitted, without

81

* blocking the thread, and returns the resulting value, or, if this publisher has produced an error, throws the

82

* corresponding exception.

83

*

84

* This suspending function is cancellable.

85

* If the Job of the current coroutine is cancelled while the suspending function is waiting, this

86

* function immediately cancels its Subscription and resumes with CancellationException.

87

*/

88

suspend fun <T> Publisher<T>.awaitFirstOrElse(defaultValue: () -> T): T

89

```

90

91

**Usage Example:**

92

93

```kotlin

94

import kotlinx.coroutines.reactive.*

95

import kotlinx.coroutines.*

96

97

runBlocking {

98

val emptyPublisher: Publisher<String> = // ... publisher that emits no values

99

100

// Using default value

101

val withDefault = emptyPublisher.awaitFirstOrDefault("default")

102

103

// Using null fallback

104

val withNull = emptyPublisher.awaitFirstOrNull()

105

106

// Using computed default

107

val withComputed = emptyPublisher.awaitFirstOrElse {

108

"computed at ${System.currentTimeMillis()}"

109

}

110

}

111

```

112

113

### Await Last Value

114

115

Suspends until all values are emitted, then returns the last value.

116

117

```kotlin { .api }

118

/**

119

* Awaits the last value from the given publisher without blocking the thread and

120

* returns the resulting value, or, if this publisher has produced an error, throws the corresponding exception.

121

*

122

* This suspending function is cancellable.

123

* If the Job of the current coroutine is cancelled while the suspending function is waiting, this

124

* function immediately cancels its Subscription and resumes with CancellationException.

125

*

126

* @throws NoSuchElementException if the publisher does not emit any value

127

*/

128

suspend fun <T> Publisher<T>.awaitLast(): T

129

```

130

131

**Usage Example:**

132

133

```kotlin

134

import kotlinx.coroutines.reactive.*

135

import kotlinx.coroutines.*

136

137

runBlocking {

138

val publisher: Publisher<Int> = // ... publisher that emits 1, 2, 3, 4, 5

139

val lastValue = publisher.awaitLast() // Returns 5

140

println("Last value: $lastValue")

141

}

142

```

143

144

### Await Single Value

145

146

Suspends until exactly one value is emitted, validating that no more than one value is produced.

147

148

```kotlin { .api }

149

/**

150

* Awaits the single value from the given publisher without blocking the thread and returns the resulting value, or,

151

* if this publisher has produced an error, throws the corresponding exception.

152

*

153

* This suspending function is cancellable.

154

* If the Job of the current coroutine is cancelled while the suspending function is waiting, this

155

* function immediately cancels its Subscription and resumes with CancellationException.

156

*

157

* @throws NoSuchElementException if the publisher does not emit any value

158

* @throws IllegalArgumentException if the publisher emits more than one value

159

*/

160

suspend fun <T> Publisher<T>.awaitSingle(): T

161

```

162

163

**Usage Example:**

164

165

```kotlin

166

import kotlinx.coroutines.reactive.*

167

import kotlinx.coroutines.*

168

169

runBlocking {

170

val singleValuePublisher: Publisher<String> = // ... publisher that emits exactly one value

171

try {

172

val singleValue = singleValuePublisher.awaitSingle()

173

println("Single value: $singleValue")

174

} catch (e: NoSuchElementException) {

175

println("Publisher emitted no values")

176

} catch (e: IllegalArgumentException) {

177

println("Publisher emitted more than one value")

178

}

179

}

180

```

181

182

### Collect Publisher Values

183

184

Subscribes to the publisher and performs an action for each received element.

185

186

```kotlin { .api }

187

/**

188

* Subscribes to this Publisher and performs the specified action for each received element.

189

*

190

* If action throws an exception at some point, the subscription is cancelled, and the exception is rethrown from

191

* collect. Also, if the publisher signals an error, that error is rethrown from collect.

192

*/

193

suspend inline fun <T> Publisher<T>.collect(action: (T) -> Unit): Unit

194

```

195

196

**Usage Example:**

197

198

```kotlin

199

import kotlinx.coroutines.reactive.*

200

import kotlinx.coroutines.*

201

202

runBlocking {

203

val publisher: Publisher<String> = // ... some reactive publisher

204

205

// Collect all values and process them

206

publisher.collect { value ->

207

println("Received: $value")

208

// Process each value as it arrives

209

}

210

}

211

```

212

213

### Legacy/Deprecated Functions

214

215

The following functions are deprecated but still available for backward compatibility:

216

217

```kotlin { .api }

218

/**

219

* @deprecated Deprecated without a replacement due to its name incorrectly conveying the behavior.

220

* Please consider using awaitFirstOrDefault().

221

*/

222

@Deprecated("Deprecated without a replacement due to its name incorrectly conveying the behavior. Please consider using awaitFirstOrDefault().", level = DeprecationLevel.HIDDEN)

223

suspend fun <T> Publisher<T>.awaitSingleOrDefault(default: T): T

224

225

/**

226

* @deprecated Deprecated without a replacement due to its name incorrectly conveying the behavior.

227

* There is a specialized version for Reactor's Mono, please use that where applicable.

228

* Alternatively, please consider using awaitFirstOrNull().

229

*/

230

@Deprecated("Deprecated without a replacement due to its name incorrectly conveying the behavior. There is a specialized version for Reactor's Mono, please use that where applicable. Alternatively, please consider using awaitFirstOrNull().", level = DeprecationLevel.HIDDEN)

231

suspend fun <T> Publisher<T>.awaitSingleOrNull(): T?

232

233

/**

234

* @deprecated Deprecated without a replacement due to its name incorrectly conveying the behavior.

235

* Please consider using awaitFirstOrElse().

236

*/

237

@Deprecated("Deprecated without a replacement due to its name incorrectly conveying the behavior. Please consider using awaitFirstOrElse().", level = DeprecationLevel.HIDDEN)

238

suspend fun <T> Publisher<T>.awaitSingleOrElse(defaultValue: () -> T): T

239

```

240

241

## Cancellation Behavior

242

243

All await functions support coroutine cancellation:

244

245

- When the coroutine is cancelled, the reactive stream subscription is immediately cancelled

246

- The function resumes with `CancellationException`

247

- Any pending reactive stream operations are cleaned up properly

248

249

## Error Propagation

250

251

Errors from the reactive publisher are propagated as exceptions:

252

253

- Publisher errors are thrown as-is from the await functions

254

- `NoSuchElementException` is thrown when no values are emitted for functions requiring values

255

- `IllegalArgumentException` is thrown when multiple values are emitted for single-value functions

256

257

## Thread Safety

258

259

These functions are thread-safe and designed to be used from any coroutine context. They do not block threads and properly handle reactive streams threading requirements.