or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cold-publisher-creation.mdflow-publisher-conversion.mdindex.mdpublisher-consumption.md

flow-publisher-conversion.mddocs/

0

# Flow Publisher Conversion

1

2

Bidirectional conversion utilities between Kotlin Flow and Reactive Streams Publisher, maintaining TCK compliance and providing seamless integration between coroutines and reactive paradigms. These conversions preserve back-pressure, context propagation, and error handling semantics.

3

4

## Capabilities

5

6

### Publisher to Flow Conversion

7

8

Transforms a reactive Publisher into a Kotlin Flow with configurable back-pressure handling.

9

10

```kotlin { .api }

11

/**

12

* Transforms the given reactive Publisher into Flow.

13

* Use the buffer operator on the resulting flow to specify the size of the back-pressure.

14

* In effect, it specifies the value of the subscription's request.

15

* The default buffer capacity for a suspending channel is used by default.

16

*

17

* If any of the resulting flow transformations fails, the subscription is immediately cancelled and all the in-flight

18

* elements are discarded.

19

*

20

* This function is integrated with ReactorContext from kotlinx-coroutines-reactor module,

21

* see its documentation for additional details.

22

*/

23

fun <T : Any> Publisher<T>.asFlow(): Flow<T>

24

```

25

26

**Usage Examples:**

27

28

```kotlin

29

import kotlinx.coroutines.reactive.*

30

import kotlinx.coroutines.flow.*

31

import org.reactivestreams.Publisher

32

33

// Basic conversion

34

val publisher: Publisher<String> = // ... some reactive publisher

35

val flow: Flow<String> = publisher.asFlow()

36

37

// Process the flow

38

flow.collect { value ->

39

println("Received: $value")

40

}

41

42

// With back-pressure control

43

val bufferedFlow = publisher.asFlow().buffer(capacity = 64)

44

bufferedFlow.collect { value ->

45

// Process with larger buffer

46

processValue(value)

47

}

48

49

// Chain with other flow operations

50

val transformedFlow = publisher.asFlow()

51

.filter { it.isNotEmpty() }

52

.map { it.uppercase() }

53

.take(10)

54

55

transformedFlow.collect { value ->

56

println("Transformed: $value")

57

}

58

```

59

60

### Flow to Publisher Conversion

61

62

Transforms a Kotlin Flow into a reactive specification compliant Publisher.

63

64

```kotlin { .api }

65

/**

66

* Transforms the given flow into a reactive specification compliant Publisher.

67

*

68

* This function is integrated with ReactorContext from kotlinx-coroutines-reactor module,

69

* see its documentation for additional details.

70

*

71

* An optional context can be specified to control the execution context of calls to the Subscriber methods.

72

* A CoroutineDispatcher can be set to confine them to a specific thread; various ThreadContextElement can be set to

73

* inject additional context into the caller thread. By default, the Unconfined dispatcher

74

* is used, so calls are performed from an arbitrary thread.

75

*/

76

fun <T : Any> Flow<T>.asPublisher(context: CoroutineContext = EmptyCoroutineContext): Publisher<T>

77

```

78

79

**Usage Examples:**

80

81

```kotlin

82

import kotlinx.coroutines.reactive.*

83

import kotlinx.coroutines.flow.*

84

import kotlinx.coroutines.*

85

import kotlin.coroutines.CoroutineContext

86

87

// Basic conversion

88

val flow = flowOf("hello", "world", "reactive")

89

val publisher: Publisher<String> = flow.asPublisher()

90

91

// Subscribe to the publisher

92

publisher.subscribe(object : Subscriber<String> {

93

override fun onSubscribe(s: Subscription) {

94

s.request(Long.MAX_VALUE)

95

}

96

97

override fun onNext(t: String) {

98

println("Publisher emitted: $t")

99

}

100

101

override fun onError(t: Throwable) {

102

println("Error: ${t.message}")

103

}

104

105

override fun onComplete() {

106

println("Publisher completed")

107

}

108

})

109

110

// With custom context

111

val customContext = Dispatchers.IO + CoroutineName("FlowPublisher")

112

val contextualPublisher = flow.asPublisher(customContext)

113

114

// Convert complex flow

115

val complexFlow = flow {

116

repeat(5) { i ->

117

emit("item-$i")

118

delay(100)

119

}

120

}.map { it.uppercase() }

121

.filter { it.contains("ITEM") }

122

123

val complexPublisher = complexFlow.asPublisher()

124

```

125

126

## Back-pressure Handling

127

128

### Publisher to Flow

129

130

The `asFlow()` conversion respects reactive streams back-pressure:

131

132

- Uses `Channel.BUFFERED` capacity by default

133

- Can be controlled with `.buffer()` operator on the resulting flow

134

- Properly handles subscriber demand and publisher supply rates

135

- Cancels subscription when flow collection is cancelled

136

137

**Back-pressure Configuration:**

138

139

```kotlin

140

import kotlinx.coroutines.reactive.*

141

import kotlinx.coroutines.flow.*

142

import kotlinx.coroutines.channels.Channel

143

144

val publisher: Publisher<Data> = // ... high-throughput publisher

145

146

// Different back-pressure strategies

147

val rendezvousFlow = publisher.asFlow().buffer(Channel.RENDEZVOUS) // No buffering

148

val bufferedFlow = publisher.asFlow().buffer(64) // Fixed buffer

149

val unlimitedFlow = publisher.asFlow().buffer(Channel.UNLIMITED) // Unlimited buffer

150

151

// Drop strategies

152

val dropOldestFlow = publisher.asFlow().buffer(10, BufferOverflow.DROP_OLDEST)

153

val dropLatestFlow = publisher.asFlow().buffer(10, BufferOverflow.DROP_LATEST)

154

```

155

156

### Flow to Publisher

157

158

The `asPublisher()` conversion provides TCK-compliant back-pressure:

159

160

- Respects subscriber demand signals

161

- Suspends flow collection when demand is exhausted

162

- Properly handles cancellation and error propagation

163

- Uses `Dispatchers.Unconfined` by default for subscriber callbacks

164

165

## Context Propagation

166

167

### ReactorContext Integration

168

169

When `kotlinx-coroutines-reactor` is in the classpath, these conversions automatically handle ReactorContext propagation:

170

171

```kotlin

172

import kotlinx.coroutines.reactive.*

173

import kotlinx.coroutines.reactor.*

174

175

// ReactorContext is automatically propagated

176

val publisher = flow.asPublisher()

177

// Context flows from coroutine to Reactor subscriber context

178

179

val flow = reactivePublisher.asFlow()

180

// Context flows from Reactor context to coroutine context

181

```

182

183

### Custom Context Injection

184

185

The `asPublisher()` function accepts a custom CoroutineContext:

186

187

```kotlin

188

import kotlinx.coroutines.*

189

import kotlinx.coroutines.reactive.*

190

191

// Custom dispatcher

192

val publisher = flow.asPublisher(Dispatchers.IO)

193

194

// Custom context elements

195

val customContext = Dispatchers.Default +

196

CoroutineName("MyFlowPublisher") +

197

CoroutineExceptionHandler { _, throwable ->

198

println("Unhandled exception: $throwable")

199

}

200

201

val contextualPublisher = flow.asPublisher(customContext)

202

```

203

204

## Error Handling

205

206

Both conversions properly handle and propagate errors:

207

208

- Flow exceptions are propagated to reactive subscribers via `onError`

209

- Publisher errors are propagated as flow exceptions

210

- Cancellation is properly handled in both directions

211

- Resource cleanup is guaranteed even on failures

212

213

**Error Handling Examples:**

214

215

```kotlin

216

import kotlinx.coroutines.reactive.*

217

import kotlinx.coroutines.flow.*

218

219

// Flow error handling

220

val errorFlow = flow {

221

emit("value1")

222

throw RuntimeException("Flow error")

223

}

224

225

val errorPublisher = errorFlow.asPublisher()

226

// Subscriber will receive "value1" then onError with RuntimeException

227

228

// Publisher error handling

229

val errorPublisher: Publisher<String> = // ... publisher that emits error

230

val errorFlow = errorPublisher.asFlow()

231

232

try {

233

errorFlow.collect { value ->

234

println(value)

235

}

236

} catch (e: Exception) {

237

println("Caught flow exception: ${e.message}")

238

}

239

```

240

241

## Type Constraints

242

243

Both conversion functions require non-nullable types (`T : Any`) to comply with reactive streams specification which prohibits null values.

244

245

```kotlin

246

// Valid - non-nullable types

247

val stringFlow: Flow<String> = publisher.asFlow()

248

val stringPublisher: Publisher<String> = flow.asPublisher()

249

250

// Invalid - nullable types not supported

251

// val nullableFlow: Flow<String?> = publisher.asFlow() // Compilation error

252

```

253

254

## Legacy/Deprecated Functions

255

256

The following functions are deprecated but still available for backward compatibility:

257

258

### Deprecated Flow Conversions

259

260

```kotlin { .api }

261

/**

262

* @deprecated Replaced in favor of ReactiveFlow extension, please import kotlinx.coroutines.reactive.* instead of kotlinx.coroutines.reactive.FlowKt

263

*/

264

@Deprecated("Replaced in favor of ReactiveFlow extension, please import kotlinx.coroutines.reactive.* instead of kotlinx.coroutines.reactive.FlowKt", level = DeprecationLevel.HIDDEN)

265

fun <T : Any> Publisher<T>.asFlowDeprecated(): Flow<T>

266

267

/**

268

* @deprecated Replaced in favor of ReactiveFlow extension, please import kotlinx.coroutines.reactive.* instead of kotlinx.coroutines.reactive.FlowKt

269

*/

270

@Deprecated("Replaced in favor of ReactiveFlow extension, please import kotlinx.coroutines.reactive.* instead of kotlinx.coroutines.reactive.FlowKt", level = DeprecationLevel.HIDDEN)

271

fun <T : Any> Flow<T>.asPublisherDeprecated(): Publisher<T>

272

273

/**

274

* @deprecated batchSize parameter is deprecated, use .buffer() instead to control the backpressure

275

*/

276

@Deprecated("batchSize parameter is deprecated, use .buffer() instead to control the backpressure", level = DeprecationLevel.HIDDEN)

277

fun <T : Any> Publisher<T>.asFlow(batchSize: Int): Flow<T>

278

```

279

280

### Deprecated Channel Conversions

281

282

```kotlin { .api }

283

/**

284

* @deprecated Deprecated in the favour of consumeAsFlow()

285

*/

286

@Deprecated("Deprecated in the favour of consumeAsFlow()", level = DeprecationLevel.HIDDEN)

287

fun <T> ReceiveChannel<T>.asPublisher(context: CoroutineContext = EmptyCoroutineContext): Publisher<T>

288

289

/**

290

* @deprecated Transforming publisher to channel is deprecated, use asFlow() instead

291

*/

292

@Deprecated("Transforming publisher to channel is deprecated, use asFlow() instead", level = DeprecationLevel.HIDDEN)

293

fun <T> Publisher<T>.openSubscription(request: Int = 1): ReceiveChannel<T>

294

```