Core channel abstractions for asynchronous byte I/O operations providing the foundation for all streaming operations in Ktor I/O.
Single-reader channel for asynchronous reading of sequences of bytes. Operations cannot be invoked concurrently.
/**
* Channel for asynchronous reading of sequences of bytes.
* This is a single-reader channel.
*/
interface ByteReadChannel {
/** Exception that caused the channel to be closed, null if closed normally */
val closedCause: Throwable?
/** True if the channel is closed for reading */
val isClosedForRead: Boolean
/**
* Suspend until the channel has at least min bytes available or gets closed.
* Returns false if EOF is reached, true otherwise.
*/
suspend fun awaitContent(min: Int = 1): Boolean
/** Cancel the channel with the specified cause */
fun cancel(cause: Throwable?)
companion object {
/** Empty channel instance that is always closed for reading */
val Empty: ByteReadChannel
}
}Usage Examples:
import io.ktor.utils.io.*
suspend fun readChannelExample() {
val channel: ByteReadChannel = ByteReadChannel("Hello")
// Check if channel has data
if (channel.awaitContent()) {
// Channel has data available
val byte = channel.readByte()
}
// Check closure status
if (channel.isClosedForRead) {
println("Channel closed: ${channel.closedCause}")
}
}Single-writer channel for asynchronous writing of sequences of bytes. Most operations cannot be invoked concurrently except close and flush.
/**
* Channel for asynchronous writing of sequences of bytes.
* This is a single-writer channel.
*/
interface ByteWriteChannel {
/** True if the channel is closed for writing */
val isClosedForWrite: Boolean
/** Exception that caused the channel to be closed, null if closed normally */
val closedCause: Throwable?
/** Flush any pending writes */
suspend fun flush()
/** Flush pending writes and close the channel */
suspend fun flushAndClose()
/** Cancel the channel with the specified cause */
fun cancel(cause: Throwable?)
}Usage Examples:
import io.ktor.utils.io.*
suspend fun writeChannelExample() {
val channel = ByteChannel()
// Write data
channel.writeStringUtf8("Hello World")
// Flush to ensure data is written
channel.flush()
// Close when done
if (!channel.isClosedForWrite) {
channel.flushAndClose()
}
}Enhanced write channel interface that extends ByteWriteChannel with immediate flushing capabilities.
/**
* A ByteWriteChannel with buffering and immediate flush capabilities
*/
interface BufferedByteWriteChannel : ByteWriteChannel {
/** Flush the write buffer immediately without suspending */
fun flushWriteBuffer()
/** Close the channel immediately */
fun close()
}Concrete implementation that combines both reading and writing capabilities in a single sequential channel.
/**
* A channel that supports both reading and writing operations sequentially
*/
class ByteChannel(autoFlush: Boolean = false) : ByteReadChannel, BufferedByteWriteChannel {
constructor(autoFlush: Boolean = false)
}Usage Examples:
import io.ktor.utils.io.*
import kotlinx.coroutines.*
suspend fun byteChannelExample() {
val channel = ByteChannel()
// Producer coroutine
launch {
channel.writeStringUtf8("Line 1\n")
channel.writeStringUtf8("Line 2\n")
channel.flushAndClose()
}
// Consumer coroutine
launch {
while (!channel.isClosedForRead) {
val line = channel.readUTF8Line()
if (line != null) {
println("Read: $line")
}
}
}
}Wrapper that tracks the total number of bytes read from the underlying channel.
class CountedByteReadChannel(delegate: ByteReadChannel) : ByteReadChannel {
/** Total number of bytes read through this channel */
val totalBytesRead: Long
}Wrapper that tracks the total number of bytes written to the underlying channel.
class CountedByteWriteChannel(delegate: ByteWriteChannel) : ByteWriteChannel {
/** Total number of bytes written through this channel */
val totalBytesWritten: Long
}Wrapper that executes a custom hook function when the channel is closed.
class CloseHookByteWriteChannel(
delegate: ByteWriteChannel,
hook: () -> Unit
) : ByteWriteChannelAdapters for integrating with kotlinx-io Source and Sink types.
class SourceByteReadChannel(source: Source) : ByteReadChannel
class SinkByteWriteChannel(sink: Sink) : ByteWriteChannelUsage Examples:
import io.ktor.utils.io.*
import kotlinx.io.*
suspend fun adapterExample() {
// Create from kotlinx-io Source
val buffer = Buffer()
buffer.writeUtf8("Hello from Source")
val readChannel = SourceByteReadChannel(buffer)
// Create from kotlinx-io Sink
val sink = Buffer()
val writeChannel = SinkByteWriteChannel(sink)
writeChannel.writeStringUtf8("Hello to Sink")
}