Netty-based HTTP server engine for Ktor framework providing high-performance asynchronous server capabilities
—
Extension functions for seamless integration between Netty's Future-based asynchronous API and Kotlin coroutines.
Extension functions that allow Netty Future objects to be used directly in Kotlin coroutines with proper cancellation support.
/**
* Suspend until the future completion.
* Resumes with the same exception if the future completes exceptionally
* @return The result of the completed future
*/
suspend fun <T> Future<T>.suspendAwait(): T
/**
* Suspend until the future completion.
* Wraps futures completion exceptions into ChannelWriteException
* @return The result of the completed future
*/
suspend fun <T> Future<T>.suspendWriteAwait(): T
/**
* Suspend until the future completion handling exception from the future using exception function
* @param exception Function to handle exceptions during future completion
* @return The result of the completed future
*/
suspend fun <T> Future<T>.suspendAwait(
exception: (Throwable, Continuation<T>) -> Unit
): TUsage Examples:
import io.ktor.server.netty.*
import io.netty.channel.*
import kotlinx.coroutines.*
// Basic future await in coroutine
suspend fun writeToChannel(channel: Channel, data: ByteArray) {
val future = channel.writeAndFlush(data)
future.suspendAwait() // Suspends until write completes
println("Write completed successfully")
}
// Write-specific await with exception wrapping
suspend fun safeWriteToChannel(channel: Channel, data: ByteArray) {
try {
val future = channel.writeAndFlush(data)
future.suspendWriteAwait() // Wraps IOException as ChannelWriteException
println("Write completed")
} catch (e: ChannelWriteException) {
println("Write failed: ${e.message}")
// Handle write-specific errors
}
}
// Custom exception handling
suspend fun writeWithCustomErrorHandling(channel: Channel, data: ByteArray) {
val future = channel.writeAndFlush(data)
future.suspendAwait { throwable, continuation ->
when (throwable) {
is IOException -> {
println("I/O error during write: ${throwable.message}")
continuation.resumeWithException(CustomWriteException(throwable))
}
else -> {
continuation.resumeWithException(throwable)
}
}
}
}Coroutine dispatcher that ensures code runs on the appropriate Netty event loop thread.
/**
* Coroutine dispatcher for Netty event loop integration
*/
internal object NettyDispatcher : CoroutineDispatcher() {
/**
* Check if dispatch is needed for the given context
* @param context Coroutine context containing Netty channel context
* @return True if dispatch is needed, false if already on correct thread
*/
override fun isDispatchNeeded(context: CoroutineContext): Boolean
/**
* Dispatch coroutine execution to the appropriate Netty event loop
* @param context Coroutine context
* @param block Runnable to execute
*/
override fun dispatch(context: CoroutineContext, block: Runnable)
/**
* Current Netty channel context wrapper
*/
class CurrentContext(val context: ChannelHandlerContext) : AbstractCoroutineContextElement(CurrentContextKey)
/**
* Coroutine context key for NettyDispatcher.CurrentContext
*/
object CurrentContextKey : CoroutineContext.Key<CurrentContext>
}Usage Context:
The NettyDispatcher is used internally by the Ktor Netty engine to ensure that coroutines run on the correct Netty event loop threads, maintaining thread safety and optimal performance.
The future extension functions properly handle coroutine cancellation, ensuring that cancelled coroutines also cancel their associated Netty futures.
// Example of cancellation handling
suspend fun cancellableChannelOperation(channel: Channel, data: ByteArray) = coroutineScope {
val job = launch {
try {
val future = channel.writeAndFlush(data)
future.suspendAwait()
println("Operation completed")
} catch (e: CancellationException) {
println("Operation was cancelled")
throw e
}
}
// Cancel after timeout
delay(5000)
job.cancel("Operation timed out")
}Different patterns for handling exceptions in Netty-coroutine integration:
Identity Exception Handler (default for suspendAwait):
// Passes through exceptions unchanged
suspend fun basicOperation(future: Future<String>): String {
return future.suspendAwait() // IOException remains IOException
}Write Exception Wrapper (used by suspendWriteAwait):
// Wraps IOExceptions as ChannelWriteException
suspend fun writeOperation(future: Future<Void>) {
future.suspendWriteAwait() // IOException becomes ChannelWriteException
}Custom Exception Handling:
suspend fun customExceptionHandling(future: Future<String>): String {
return future.suspendAwait { throwable, continuation ->
when (throwable) {
is ConnectException -> {
log.warn("Connection failed: ${throwable.message}")
continuation.resumeWithException(ServiceUnavailableException(throwable))
}
is ReadTimeoutException -> {
log.warn("Read timeout: ${throwable.message}")
continuation.resumeWithException(RequestTimeoutException(throwable))
}
else -> {
continuation.resumeWithException(throwable)
}
}
}
}The coroutine integration enables seamless use of Netty operations within Ktor's request processing pipeline:
// Using in route handlers
post("/upload") {
val nettyCall = call as NettyApplicationCall
val channel = nettyCall.context.channel()
// Read request body
val bodyBytes = call.receive<ByteArray>()
// Process asynchronously with Netty
withContext(Dispatchers.IO) {
// Write to external channel or file
val future = externalChannel.writeAndFlush(bodyBytes)
future.suspendWriteAwait() // Integrates with coroutines
}
call.respondText("Upload processed")
}The integration optimizes performance by:
// Optimized usage patterns
suspend fun optimizedNettyOperation(channel: Channel, data: ByteArray) {
// Check if channel is writable before attempting write
if (!channel.isWritable) {
delay(10) // Brief delay before retry
}
val future = channel.writeAndFlush(data)
// suspendAwait will return immediately if future is already complete
future.suspendWriteAwait()
}Common patterns for error recovery in Netty-coroutine integration:
suspend fun robustChannelWrite(channel: Channel, data: ByteArray, retries: Int = 3) {
repeat(retries) { attempt ->
try {
val future = channel.writeAndFlush(data)
future.suspendWriteAwait()
return // Success, exit retry loop
} catch (e: ChannelWriteException) {
if (attempt == retries - 1) {
throw e // Last attempt failed
}
delay(100 * (attempt + 1)) // Exponential backoff
}
}
}Install with Tessl CLI
npx tessl i tessl/maven-io-ktor--ktor-server-netty-jvm