or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-org-jetbrains-kotlinx--kotlinx-coroutines-reactive

Kotlin coroutines integration utilities for Reactive Streams specification providing seamless interoperability between coroutines and reactive programming paradigms

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.jetbrains.kotlinx/kotlinx-coroutines-reactive@1.10.x

To install, run

npx @tessl/cli install tessl/maven-org-jetbrains-kotlinx--kotlinx-coroutines-reactive@1.10.0

0

# Kotlinx Coroutines Reactive

1

2

Kotlin coroutines integration utilities for Reactive Streams specification providing seamless interoperability between coroutines and reactive programming paradigms. This library enables developers to bridge coroutines with reactive frameworks like RxJava, Reactor, or any Reactive Streams-compliant library while maintaining performance and correctness guarantees.

3

4

## Package Information

5

6

- **Package Name**: kotlinx-coroutines-reactive

7

- **Package Type**: maven

8

- **Language**: Kotlin

9

- **Installation**: Add dependency to build.gradle.kts:

10

```kotlin

11

implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactive:1.10.2")

12

```

13

14

## Core Imports

15

16

```kotlin

17

import kotlinx.coroutines.reactive.*

18

import org.reactivestreams.Publisher

19

import kotlinx.coroutines.flow.Flow

20

```

21

22

## Basic Usage

23

24

```kotlin

25

import kotlinx.coroutines.reactive.*

26

import kotlinx.coroutines.flow.*

27

import org.reactivestreams.Publisher

28

29

// Convert Publisher to Flow

30

val publisher: Publisher<String> = // ... some reactive publisher

31

val flow: Flow<String> = publisher.asFlow()

32

33

// Convert Flow to Publisher

34

val sourceFlow = flowOf("hello", "world", "reactive")

35

val convertedPublisher: Publisher<String> = sourceFlow.asPublisher()

36

37

// Await values from publishers

38

val firstValue = publisher.awaitFirst()

39

val lastValue = publisher.awaitLast()

40

val singleValue = publisher.awaitSingle()

41

42

// Collect all values from a publisher

43

publisher.collect { value ->

44

println("Received: $value")

45

}

46

47

// Create cold reactive publishers from coroutines

48

val coldPublisher = publish {

49

send("first")

50

delay(100)

51

send("second")

52

delay(100)

53

send("third")

54

}

55

```

56

57

## Architecture

58

59

Kotlinx-coroutines-reactive is built around several key integration patterns:

60

61

- **Flow/Publisher Conversion**: Bidirectional adapters maintaining TCK compliance and proper back-pressure handling

62

- **Suspending Extensions**: Coroutine-based await functions for consuming reactive streams without blocking threads

63

- **Cold Publisher Builder**: `publish` coroutine builder creating reactive publishers that start on subscription

64

- **Context Propagation**: Integration with ReactorContext when kotlinx-coroutines-reactor is present

65

- **Back-pressure Management**: Proper handling of reactive streams demand and flow control mechanisms

66

67

## Capabilities

68

69

### Publisher Consumption

70

71

Suspending extension functions for consuming values from reactive Publishers without blocking threads. These functions integrate with coroutine cancellation and provide various consumption patterns.

72

73

```kotlin { .api }

74

suspend fun <T> Publisher<T>.awaitFirst(): T

75

suspend fun <T> Publisher<T>.awaitFirstOrDefault(default: T): T

76

suspend fun <T> Publisher<T>.awaitFirstOrNull(): T?

77

suspend fun <T> Publisher<T>.awaitFirstOrElse(defaultValue: () -> T): T

78

suspend fun <T> Publisher<T>.awaitLast(): T

79

suspend fun <T> Publisher<T>.awaitSingle(): T

80

suspend inline fun <T> Publisher<T>.collect(action: (T) -> Unit): Unit

81

```

82

83

[Publisher Consumption](./publisher-consumption.md)

84

85

### Flow Publisher Conversion

86

87

Bidirectional conversion utilities between Kotlin Flow and Reactive Streams Publisher, maintaining TCK compliance and providing seamless integration between coroutines and reactive paradigms.

88

89

```kotlin { .api }

90

fun <T : Any> Publisher<T>.asFlow(): Flow<T>

91

fun <T : Any> Flow<T>.asPublisher(context: CoroutineContext = EmptyCoroutineContext): Publisher<T>

92

```

93

94

[Flow Publisher Conversion](./flow-publisher-conversion.md)

95

96

### Cold Publisher Creation

97

98

Coroutine builder for creating cold reactive Publishers that execute a coroutine block on each subscription, with proper back-pressure handling and subscription lifecycle management.

99

100

```kotlin { .api }

101

fun <T> publish(

102

context: CoroutineContext = EmptyCoroutineContext,

103

block: suspend ProducerScope<T>.() -> Unit

104

): Publisher<T>

105

```

106

107

[Cold Publisher Creation](./cold-publisher-creation.md)

108

109

## Types

110

111

```kotlin { .api }

112

interface ProducerScope<in E> : CoroutineScope, SendChannel<E> {

113

val channel: SendChannel<E>

114

}

115

116

interface ContextInjector {

117

fun <T> injectCoroutineContext(publisher: Publisher<T>, coroutineContext: CoroutineContext): Publisher<T>

118

}

119

120

class FlowSubscription<T>(

121

val flow: Flow<T>,

122

val subscriber: Subscriber<in T>,

123

context: CoroutineContext

124

) : Subscription, AbstractCoroutine<Unit>

125

126

class PublisherCoroutine<in T>(

127

parentContext: CoroutineContext,

128

subscriber: Subscriber<T>,

129

exceptionOnCancelHandler: (Throwable, CoroutineContext) -> Unit

130

) : AbstractCoroutine<Unit>, ProducerScope<T>, Subscription

131

```

132

133

## Error Handling

134

135

All suspending functions in this library are cancellable and will cancel their reactive stream subscriptions when the coroutine is cancelled.

136

137

### Publisher Consumption Functions

138

139

Publisher consumption functions may throw:

140

141

- `NoSuchElementException` - when no elements are emitted for functions requiring at least one element (`awaitFirst`, `awaitLast`, `awaitSingle`)

142

- `IllegalArgumentException` - when multiple elements are emitted for functions requiring exactly one element (`awaitSingle`)

143

- `CancellationException` - when the coroutine is cancelled during execution

144

- Any exception thrown by the underlying reactive publisher is propagated as-is

145

146

### Publisher Creation Functions

147

148

The `publish` builder may throw:

149

150

- `IllegalArgumentException` - if the provided context contains a Job instance, as the publisher lifecycle should be managed via subscription

151

- `NullPointerException` - if attempting to emit null values through the publisher (reactive streams specification prohibits null values)

152

153

### Conversion Functions

154

155

Flow/Publisher conversions handle errors as follows:

156

157

- Flow exceptions are propagated to reactive subscribers via `Subscriber.onError`

158

- Publisher errors are propagated as flow exceptions during collection

159

- Resource cleanup is guaranteed even when errors occur

160

- Cancellation is properly handled in both directions with appropriate subscription cancellation