or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

coroutine-integration.mdengine-implementation.mdindex.mdplatform-optimization.mdrequest-response.mdserver-configuration.mdstandalone-server.md

coroutine-integration.mddocs/

0

# Coroutine Integration

1

2

Extension functions for seamless integration between Netty's Future-based asynchronous API and Kotlin coroutines.

3

4

## Capabilities

5

6

### Future Suspend Extensions

7

8

Extension functions that allow Netty Future objects to be used directly in Kotlin coroutines with proper cancellation support.

9

10

```kotlin { .api }

11

/**

12

* Suspend until the future completion.

13

* Resumes with the same exception if the future completes exceptionally

14

* @return The result of the completed future

15

*/

16

suspend fun <T> Future<T>.suspendAwait(): T

17

18

/**

19

* Suspend until the future completion.

20

* Wraps futures completion exceptions into ChannelWriteException

21

* @return The result of the completed future

22

*/

23

suspend fun <T> Future<T>.suspendWriteAwait(): T

24

25

/**

26

* Suspend until the future completion handling exception from the future using exception function

27

* @param exception Function to handle exceptions during future completion

28

* @return The result of the completed future

29

*/

30

suspend fun <T> Future<T>.suspendAwait(

31

exception: (Throwable, Continuation<T>) -> Unit

32

): T

33

```

34

35

**Usage Examples:**

36

37

```kotlin

38

import io.ktor.server.netty.*

39

import io.netty.channel.*

40

import kotlinx.coroutines.*

41

42

// Basic future await in coroutine

43

suspend fun writeToChannel(channel: Channel, data: ByteArray) {

44

val future = channel.writeAndFlush(data)

45

future.suspendAwait() // Suspends until write completes

46

println("Write completed successfully")

47

}

48

49

// Write-specific await with exception wrapping

50

suspend fun safeWriteToChannel(channel: Channel, data: ByteArray) {

51

try {

52

val future = channel.writeAndFlush(data)

53

future.suspendWriteAwait() // Wraps IOException as ChannelWriteException

54

println("Write completed")

55

} catch (e: ChannelWriteException) {

56

println("Write failed: ${e.message}")

57

// Handle write-specific errors

58

}

59

}

60

61

// Custom exception handling

62

suspend fun writeWithCustomErrorHandling(channel: Channel, data: ByteArray) {

63

val future = channel.writeAndFlush(data)

64

65

future.suspendAwait { throwable, continuation ->

66

when (throwable) {

67

is IOException -> {

68

println("I/O error during write: ${throwable.message}")

69

continuation.resumeWithException(CustomWriteException(throwable))

70

}

71

else -> {

72

continuation.resumeWithException(throwable)

73

}

74

}

75

}

76

}

77

```

78

79

### NettyDispatcher

80

81

Coroutine dispatcher that ensures code runs on the appropriate Netty event loop thread.

82

83

```kotlin { .api }

84

/**

85

* Coroutine dispatcher for Netty event loop integration

86

*/

87

internal object NettyDispatcher : CoroutineDispatcher() {

88

/**

89

* Check if dispatch is needed for the given context

90

* @param context Coroutine context containing Netty channel context

91

* @return True if dispatch is needed, false if already on correct thread

92

*/

93

override fun isDispatchNeeded(context: CoroutineContext): Boolean

94

95

/**

96

* Dispatch coroutine execution to the appropriate Netty event loop

97

* @param context Coroutine context

98

* @param block Runnable to execute

99

*/

100

override fun dispatch(context: CoroutineContext, block: Runnable)

101

102

/**

103

* Current Netty channel context wrapper

104

*/

105

class CurrentContext(val context: ChannelHandlerContext) : AbstractCoroutineContextElement(CurrentContextKey)

106

107

/**

108

* Coroutine context key for NettyDispatcher.CurrentContext

109

*/

110

object CurrentContextKey : CoroutineContext.Key<CurrentContext>

111

}

112

```

113

114

**Usage Context:**

115

116

The NettyDispatcher is used internally by the Ktor Netty engine to ensure that coroutines run on the correct Netty event loop threads, maintaining thread safety and optimal performance.

117

118

### Coroutine Cancellation Support

119

120

The future extension functions properly handle coroutine cancellation, ensuring that cancelled coroutines also cancel their associated Netty futures.

121

122

```kotlin

123

// Example of cancellation handling

124

suspend fun cancellableChannelOperation(channel: Channel, data: ByteArray) = coroutineScope {

125

val job = launch {

126

try {

127

val future = channel.writeAndFlush(data)

128

future.suspendAwait()

129

println("Operation completed")

130

} catch (e: CancellationException) {

131

println("Operation was cancelled")

132

throw e

133

}

134

}

135

136

// Cancel after timeout

137

delay(5000)

138

job.cancel("Operation timed out")

139

}

140

```

141

142

### Exception Handling Patterns

143

144

Different patterns for handling exceptions in Netty-coroutine integration:

145

146

**Identity Exception Handler (default for `suspendAwait`):**

147

```kotlin

148

// Passes through exceptions unchanged

149

suspend fun basicOperation(future: Future<String>): String {

150

return future.suspendAwait() // IOException remains IOException

151

}

152

```

153

154

**Write Exception Wrapper (used by `suspendWriteAwait`):**

155

```kotlin

156

// Wraps IOExceptions as ChannelWriteException

157

suspend fun writeOperation(future: Future<Void>) {

158

future.suspendWriteAwait() // IOException becomes ChannelWriteException

159

}

160

```

161

162

**Custom Exception Handling:**

163

```kotlin

164

suspend fun customExceptionHandling(future: Future<String>): String {

165

return future.suspendAwait { throwable, continuation ->

166

when (throwable) {

167

is ConnectException -> {

168

log.warn("Connection failed: ${throwable.message}")

169

continuation.resumeWithException(ServiceUnavailableException(throwable))

170

}

171

is ReadTimeoutException -> {

172

log.warn("Read timeout: ${throwable.message}")

173

continuation.resumeWithException(RequestTimeoutException(throwable))

174

}

175

else -> {

176

continuation.resumeWithException(throwable)

177

}

178

}

179

}

180

}

181

```

182

183

### Integration with Ktor Pipeline

184

185

The coroutine integration enables seamless use of Netty operations within Ktor's request processing pipeline:

186

187

```kotlin

188

// Using in route handlers

189

post("/upload") {

190

val nettyCall = call as NettyApplicationCall

191

val channel = nettyCall.context.channel()

192

193

// Read request body

194

val bodyBytes = call.receive<ByteArray>()

195

196

// Process asynchronously with Netty

197

withContext(Dispatchers.IO) {

198

// Write to external channel or file

199

val future = externalChannel.writeAndFlush(bodyBytes)

200

future.suspendWriteAwait() // Integrates with coroutines

201

}

202

203

call.respondText("Upload processed")

204

}

205

```

206

207

### Performance Optimization

208

209

The integration optimizes performance by:

210

211

1. **Immediate Return**: If the future is already completed, returns immediately without suspension

212

2. **Cancellation Support**: Properly handles coroutine cancellation to avoid resource leaks

213

3. **Thread Affinity**: Ensures operations run on appropriate Netty event loop threads

214

4. **Exception Optimization**: Unwraps ExecutionException to provide cleaner exception handling

215

216

```kotlin

217

// Optimized usage patterns

218

suspend fun optimizedNettyOperation(channel: Channel, data: ByteArray) {

219

// Check if channel is writable before attempting write

220

if (!channel.isWritable) {

221

delay(10) // Brief delay before retry

222

}

223

224

val future = channel.writeAndFlush(data)

225

226

// suspendAwait will return immediately if future is already complete

227

future.suspendWriteAwait()

228

}

229

```

230

231

### Error Recovery Patterns

232

233

Common patterns for error recovery in Netty-coroutine integration:

234

235

```kotlin

236

suspend fun robustChannelWrite(channel: Channel, data: ByteArray, retries: Int = 3) {

237

repeat(retries) { attempt ->

238

try {

239

val future = channel.writeAndFlush(data)

240

future.suspendWriteAwait()

241

return // Success, exit retry loop

242

} catch (e: ChannelWriteException) {

243

if (attempt == retries - 1) {

244

throw e // Last attempt failed

245

}

246

247

delay(100 * (attempt + 1)) // Exponential backoff

248

}

249

}

250

}

251

```