0
# Coroutine Integration
1
2
Extension functions for seamless integration between Netty's Future-based asynchronous API and Kotlin coroutines.
3
4
## Capabilities
5
6
### Future Suspend Extensions
7
8
Extension functions that allow Netty Future objects to be used directly in Kotlin coroutines with proper cancellation support.
9
10
```kotlin { .api }
11
/**
12
* Suspend until the future completion.
13
* Resumes with the same exception if the future completes exceptionally
14
* @return The result of the completed future
15
*/
16
suspend fun <T> Future<T>.suspendAwait(): T
17
18
/**
19
* Suspend until the future completion.
20
* Wraps futures completion exceptions into ChannelWriteException
21
* @return The result of the completed future
22
*/
23
suspend fun <T> Future<T>.suspendWriteAwait(): T
24
25
/**
26
* Suspend until the future completion handling exception from the future using exception function
27
* @param exception Function to handle exceptions during future completion
28
* @return The result of the completed future
29
*/
30
suspend fun <T> Future<T>.suspendAwait(
31
exception: (Throwable, Continuation<T>) -> Unit
32
): T
33
```
34
35
**Usage Examples:**
36
37
```kotlin
38
import io.ktor.server.netty.*
39
import io.netty.channel.*
40
import kotlinx.coroutines.*
41
42
// Basic future await in coroutine
43
suspend fun writeToChannel(channel: Channel, data: ByteArray) {
44
val future = channel.writeAndFlush(data)
45
future.suspendAwait() // Suspends until write completes
46
println("Write completed successfully")
47
}
48
49
// Write-specific await with exception wrapping
50
suspend fun safeWriteToChannel(channel: Channel, data: ByteArray) {
51
try {
52
val future = channel.writeAndFlush(data)
53
future.suspendWriteAwait() // Wraps IOException as ChannelWriteException
54
println("Write completed")
55
} catch (e: ChannelWriteException) {
56
println("Write failed: ${e.message}")
57
// Handle write-specific errors
58
}
59
}
60
61
// Custom exception handling
62
suspend fun writeWithCustomErrorHandling(channel: Channel, data: ByteArray) {
63
val future = channel.writeAndFlush(data)
64
65
future.suspendAwait { throwable, continuation ->
66
when (throwable) {
67
is IOException -> {
68
println("I/O error during write: ${throwable.message}")
69
continuation.resumeWithException(CustomWriteException(throwable))
70
}
71
else -> {
72
continuation.resumeWithException(throwable)
73
}
74
}
75
}
76
}
77
```
78
79
### NettyDispatcher
80
81
Coroutine dispatcher that ensures code runs on the appropriate Netty event loop thread.
82
83
```kotlin { .api }
84
/**
85
* Coroutine dispatcher for Netty event loop integration
86
*/
87
internal object NettyDispatcher : CoroutineDispatcher() {
88
/**
89
* Check if dispatch is needed for the given context
90
* @param context Coroutine context containing Netty channel context
91
* @return True if dispatch is needed, false if already on correct thread
92
*/
93
override fun isDispatchNeeded(context: CoroutineContext): Boolean
94
95
/**
96
* Dispatch coroutine execution to the appropriate Netty event loop
97
* @param context Coroutine context
98
* @param block Runnable to execute
99
*/
100
override fun dispatch(context: CoroutineContext, block: Runnable)
101
102
/**
103
* Current Netty channel context wrapper
104
*/
105
class CurrentContext(val context: ChannelHandlerContext) : AbstractCoroutineContextElement(CurrentContextKey)
106
107
/**
108
* Coroutine context key for NettyDispatcher.CurrentContext
109
*/
110
object CurrentContextKey : CoroutineContext.Key<CurrentContext>
111
}
112
```
113
114
**Usage Context:**
115
116
The NettyDispatcher is used internally by the Ktor Netty engine to ensure that coroutines run on the correct Netty event loop threads, maintaining thread safety and optimal performance.
117
118
### Coroutine Cancellation Support
119
120
The future extension functions properly handle coroutine cancellation, ensuring that cancelled coroutines also cancel their associated Netty futures.
121
122
```kotlin
123
// Example of cancellation handling
124
suspend fun cancellableChannelOperation(channel: Channel, data: ByteArray) = coroutineScope {
125
val job = launch {
126
try {
127
val future = channel.writeAndFlush(data)
128
future.suspendAwait()
129
println("Operation completed")
130
} catch (e: CancellationException) {
131
println("Operation was cancelled")
132
throw e
133
}
134
}
135
136
// Cancel after timeout
137
delay(5000)
138
job.cancel("Operation timed out")
139
}
140
```
141
142
### Exception Handling Patterns
143
144
Different patterns for handling exceptions in Netty-coroutine integration:
145
146
**Identity Exception Handler (default for `suspendAwait`):**
147
```kotlin
148
// Passes through exceptions unchanged
149
suspend fun basicOperation(future: Future<String>): String {
150
return future.suspendAwait() // IOException remains IOException
151
}
152
```
153
154
**Write Exception Wrapper (used by `suspendWriteAwait`):**
155
```kotlin
156
// Wraps IOExceptions as ChannelWriteException
157
suspend fun writeOperation(future: Future<Void>) {
158
future.suspendWriteAwait() // IOException becomes ChannelWriteException
159
}
160
```
161
162
**Custom Exception Handling:**
163
```kotlin
164
suspend fun customExceptionHandling(future: Future<String>): String {
165
return future.suspendAwait { throwable, continuation ->
166
when (throwable) {
167
is ConnectException -> {
168
log.warn("Connection failed: ${throwable.message}")
169
continuation.resumeWithException(ServiceUnavailableException(throwable))
170
}
171
is ReadTimeoutException -> {
172
log.warn("Read timeout: ${throwable.message}")
173
continuation.resumeWithException(RequestTimeoutException(throwable))
174
}
175
else -> {
176
continuation.resumeWithException(throwable)
177
}
178
}
179
}
180
}
181
```
182
183
### Integration with Ktor Pipeline
184
185
The coroutine integration enables seamless use of Netty operations within Ktor's request processing pipeline:
186
187
```kotlin
188
// Using in route handlers
189
post("/upload") {
190
val nettyCall = call as NettyApplicationCall
191
val channel = nettyCall.context.channel()
192
193
// Read request body
194
val bodyBytes = call.receive<ByteArray>()
195
196
// Process asynchronously with Netty
197
withContext(Dispatchers.IO) {
198
// Write to external channel or file
199
val future = externalChannel.writeAndFlush(bodyBytes)
200
future.suspendWriteAwait() // Integrates with coroutines
201
}
202
203
call.respondText("Upload processed")
204
}
205
```
206
207
### Performance Optimization
208
209
The integration optimizes performance by:
210
211
1. **Immediate Return**: If the future is already completed, returns immediately without suspension
212
2. **Cancellation Support**: Properly handles coroutine cancellation to avoid resource leaks
213
3. **Thread Affinity**: Ensures operations run on appropriate Netty event loop threads
214
4. **Exception Optimization**: Unwraps ExecutionException to provide cleaner exception handling
215
216
```kotlin
217
// Optimized usage patterns
218
suspend fun optimizedNettyOperation(channel: Channel, data: ByteArray) {
219
// Check if channel is writable before attempting write
220
if (!channel.isWritable) {
221
delay(10) // Brief delay before retry
222
}
223
224
val future = channel.writeAndFlush(data)
225
226
// suspendAwait will return immediately if future is already complete
227
future.suspendWriteAwait()
228
}
229
```
230
231
### Error Recovery Patterns
232
233
Common patterns for error recovery in Netty-coroutine integration:
234
235
```kotlin
236
suspend fun robustChannelWrite(channel: Channel, data: ByteArray, retries: Int = 3) {
237
repeat(retries) { attempt ->
238
try {
239
val future = channel.writeAndFlush(data)
240
future.suspendWriteAwait()
241
return // Success, exit retry loop
242
} catch (e: ChannelWriteException) {
243
if (attempt == retries - 1) {
244
throw e // Last attempt failed
245
}
246
247
delay(100 * (attempt + 1)) // Exponential backoff
248
}
249
}
250
}
251
```