or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cold-publisher-creation.mdflow-publisher-conversion.mdindex.mdpublisher-consumption.md

cold-publisher-creation.mddocs/

0

# Cold Publisher Creation

1

2

Coroutine builder for creating cold reactive Publishers that execute a coroutine block on each subscription. This enables creating reactive streams from coroutines with proper back-pressure handling, subscription lifecycle management, and seamless integration with reactive frameworks.

3

4

## Capabilities

5

6

### Publish Coroutine Builder

7

8

Creates a cold reactive Publisher that runs a coroutine block for each subscription.

9

10

```kotlin { .api }

11

/**

12

* Creates a cold reactive Publisher that runs a given block in a coroutine.

13

*

14

* Every time the returned flux is subscribed, it starts a new coroutine in the specified context.

15

* The coroutine emits (via Subscriber.onNext) values with send, completes (via Subscriber.onComplete)

16

* when the coroutine completes or channel is explicitly closed, and emits errors (via Subscriber.onError)

17

* if the coroutine throws an exception or closes channel with a cause.

18

* Unsubscribing cancels the running coroutine.

19

*

20

* Invocations of send are suspended appropriately when subscribers apply back-pressure and to

21

* ensure that onNext is not invoked concurrently.

22

*

23

* Coroutine context can be specified with context argument.

24

* If the context does not have any dispatcher nor any other ContinuationInterceptor, then Dispatchers.Default is used.

25

*

26

* Note: This is an experimental api. Behaviour of publishers that work as children in a parent scope with respect

27

* to cancellation and error handling may change in the future.

28

*

29

* @throws IllegalArgumentException if the provided context contains a Job instance.

30

*/

31

fun <T> publish(

32

context: CoroutineContext = EmptyCoroutineContext,

33

block: suspend ProducerScope<T>.() -> Unit

34

): Publisher<T>

35

```

36

37

**Basic Usage Examples:**

38

39

```kotlin

40

import kotlinx.coroutines.reactive.*

41

import kotlinx.coroutines.*

42

import kotlinx.coroutines.channels.ProducerScope

43

44

// Simple value emission

45

val simplePublisher = publish<String> {

46

send("Hello")

47

send("World")

48

send("Reactive")

49

}

50

51

// Subscribe to the publisher

52

simplePublisher.subscribe(object : Subscriber<String> {

53

override fun onSubscribe(s: Subscription) {

54

s.request(Long.MAX_VALUE)

55

}

56

57

override fun onNext(t: String) {

58

println("Received: $t")

59

}

60

61

override fun onError(t: Throwable) {

62

println("Error: ${t.message}")

63

}

64

65

override fun onComplete() {

66

println("Completed")

67

}

68

})

69

70

// Time-based emission

71

val timedPublisher = publish<Int> {

72

repeat(5) { i ->

73

send(i)

74

delay(1000) // Emit every second

75

}

76

}

77

78

// Conditional emission

79

val conditionalPublisher = publish<String> {

80

val data = fetchDataFromApi()

81

if (data.isNotEmpty()) {

82

data.forEach { item ->

83

send(item.toString())

84

}

85

} else {

86

close(RuntimeException("No data available"))

87

}

88

}

89

```

90

91

### Producer Scope

92

93

The `ProducerScope` provides the execution context for the publisher coroutine.

94

95

```kotlin { .api }

96

interface ProducerScope<in E> : CoroutineScope, SendChannel<E> {

97

val channel: SendChannel<E>

98

99

// Inherited from SendChannel

100

suspend fun send(element: E)

101

fun trySend(element: E): ChannelResult<Unit>

102

fun close(cause: Throwable? = null): Boolean

103

val isClosedForSend: Boolean

104

}

105

```

106

107

**ProducerScope Usage:**

108

109

```kotlin

110

import kotlinx.coroutines.reactive.*

111

import kotlinx.coroutines.*

112

import kotlinx.coroutines.channels.*

113

114

val publisher = publish<Data> {

115

// Access to coroutine context

116

println("Publisher started in: ${coroutineContext[CoroutineDispatcher]}")

117

118

try {

119

// Send values

120

val items = generateItems()

121

items.forEach { item ->

122

send(item) // Suspends if back-pressure applied

123

}

124

125

// Check if channel is still open

126

if (!isClosedForSend) {

127

send(finalItem)

128

}

129

130

} catch (e: Exception) {

131

// Close with error

132

close(e)

133

return@publish

134

}

135

136

// Normal completion - close() called automatically

137

}

138

```

139

140

## Context and Dispatchers

141

142

### Custom Context

143

144

Specify execution context for the publisher coroutine:

145

146

```kotlin

147

import kotlinx.coroutines.reactive.*

148

import kotlinx.coroutines.*

149

150

// Custom dispatcher

151

val ioPublisher = publish(Dispatchers.IO) {

152

val data = performIOOperation()

153

send(data)

154

}

155

156

// Custom context with multiple elements

157

val customContext = Dispatchers.Default +

158

CoroutineName("DataPublisher") +

159

CoroutineExceptionHandler { _, throwable ->

160

println("Unhandled publisher exception: $throwable")

161

}

162

163

val contextualPublisher = publish(customContext) {

164

processData()

165

}

166

167

// Job context not allowed

168

try {

169

val invalidPublisher = publish(Job()) { // Throws IllegalArgumentException

170

send("This won't work")

171

}

172

} catch (e: IllegalArgumentException) {

173

println("Cannot provide Job in context: ${e.message}")

174

}

175

```

176

177

### Default Context Behavior

178

179

- If no dispatcher is specified, `Dispatchers.Default` is used

180

- Context is applied to each new coroutine started per subscription

181

- Global scope is used if no parent scope is available

182

183

## Back-pressure and Flow Control

184

185

The publish builder handles back-pressure automatically:

186

187

```kotlin

188

import kotlinx.coroutines.reactive.*

189

import kotlinx.coroutines.*

190

191

val backPressurePublisher = publish<Int> {

192

repeat(10000) { i ->

193

send(i) // Suspends when subscriber can't keep up

194

// Only proceeds when subscriber requests more

195

}

196

}

197

198

// Subscriber controls the flow

199

backPressurePublisher.subscribe(object : Subscriber<Int> {

200

private lateinit var subscription: Subscription

201

202

override fun onSubscribe(s: Subscription) {

203

subscription = s

204

s.request(1) // Request one item at a time

205

}

206

207

override fun onNext(t: Int) {

208

println("Processing: $t")

209

// Simulate slow processing

210

Thread.sleep(100)

211

subscription.request(1) // Request next item

212

}

213

214

override fun onError(t: Throwable) {

215

println("Error: ${t.message}")

216

}

217

218

override fun onComplete() {

219

println("Completed")

220

}

221

})

222

```

223

224

## Cold Publisher Semantics

225

226

Each subscription creates a new coroutine execution:

227

228

```kotlin

229

import kotlinx.coroutines.reactive.*

230

import kotlinx.coroutines.*

231

232

var executionCount = 0

233

234

val coldPublisher = publish<String> {

235

val id = ++executionCount

236

println("Execution $id started")

237

238

send("Message from execution $id")

239

delay(1000)

240

send("Final message from execution $id")

241

242

println("Execution $id completed")

243

}

244

245

// First subscription

246

coldPublisher.subscribe(subscriber1) // Prints: "Execution 1 started"

247

248

// Second subscription

249

coldPublisher.subscribe(subscriber2) // Prints: "Execution 2 started"

250

251

// Each subscriber gets independent execution

252

```

253

254

## Error Handling and Completion

255

256

### Normal Completion

257

258

Publisher completes when the coroutine block finishes normally:

259

260

```kotlin

261

val completingPublisher = publish<Int> {

262

repeat(3) { i ->

263

send(i)

264

}

265

// Completes normally - onComplete() called on subscriber

266

}

267

```

268

269

### Error Completion

270

271

Publisher signals error when exception is thrown or channel is closed with cause:

272

273

```kotlin

274

val errorPublisher = publish<String> {

275

send("Before error")

276

277

if (someCondition) {

278

throw RuntimeException("Something went wrong")

279

// onError() called on subscriber

280

}

281

282

// Alternative: close with cause

283

close(IllegalStateException("Invalid state"))

284

}

285

```

286

287

### Cancellation Handling

288

289

Publisher handles subscription cancellation properly:

290

291

```kotlin

292

val cancellablePublisher = publish<Long> {

293

try {

294

var counter = 0L

295

while (true) {

296

send(counter++)

297

delay(500)

298

}

299

} catch (e: CancellationException) {

300

println("Publisher was cancelled")

301

throw e // Re-throw cancellation

302

} finally {

303

println("Publisher cleanup")

304

}

305

}

306

307

// Subscription can be cancelled

308

val subscription = // ... get subscription from onSubscribe

309

subscription.cancel() // Cancels the coroutine

310

```

311

312

## Integration with Other Libraries

313

314

### RxJava Integration

315

316

```kotlin

317

import kotlinx.coroutines.reactive.*

318

import io.reactivex.rxjava3.core.Flowable

319

320

val coroutinePublisher = publish<String> {

321

send("From coroutine")

322

delay(1000)

323

send("After delay")

324

}

325

326

// Convert to RxJava Flowable

327

val flowable = Flowable.fromPublisher(coroutinePublisher)

328

```

329

330

### Reactor Integration

331

332

```kotlin

333

import kotlinx.coroutines.reactive.*

334

import reactor.core.publisher.Flux

335

336

val coroutinePublisher = publish<Int> {

337

repeat(5) { i ->

338

send(i * i)

339

delay(100)

340

}

341

}

342

343

// Convert to Reactor Flux

344

val flux = Flux.from(coroutinePublisher)

345

```

346

347

## Legacy/Deprecated Functions

348

349

The following function is deprecated but still available for backward compatibility:

350

351

```kotlin { .api }

352

/**

353

* @deprecated CoroutineScope.publish is deprecated in favour of top-level publish

354

*/

355

@Deprecated("CoroutineScope.publish is deprecated in favour of top-level publish", level = DeprecationLevel.HIDDEN)

356

fun <T> CoroutineScope.publish(

357

context: CoroutineContext = EmptyCoroutineContext,

358

block: suspend ProducerScope<T>.() -> Unit

359

): Publisher<T>

360

```

361

362

## Performance Considerations

363

364

- Each subscription creates a new coroutine - consider caching for expensive operations

365

- Use appropriate dispatchers for I/O vs CPU-bound operations

366

- Back-pressure is handled efficiently through channel mechanics

367

- Memory usage scales with the number of concurrent subscriptions