Factory functions and constructors for creating various types of byte channels from different data sources.
Create read-only channels from various data sources.
/** Create a ByteReadChannel from a byte array */
fun ByteReadChannel(
content: ByteArray,
offset: Int = 0,
length: Int = content.size
): ByteReadChannel
/** Create a ByteReadChannel from a string with charset encoding */
fun ByteReadChannel(
text: String,
charset: Charset = Charsets.UTF_8
): ByteReadChannel
/** Create a ByteReadChannel from a kotlinx-io Source */
fun ByteReadChannel(source: Source): ByteReadChannelUsage Examples:
import io.ktor.utils.io.*
import io.ktor.utils.io.charsets.Charsets
suspend fun factoryExamples() {
// From byte array
val data = byteArrayOf(0x48, 0x65, 0x6C, 0x6C, 0x6F) // "Hello"
val fromBytes = ByteReadChannel(data)
// From partial byte array
val fromPartialBytes = ByteReadChannel(data, offset = 1, length = 3) // "ell"
// From string with default UTF-8
val fromString = ByteReadChannel("Hello, World!")
// From string with specific charset
val fromStringLatin1 = ByteReadChannel("Café", Charsets.ISO_8859_1)
// Read content
val content1 = fromString.toByteArray()
println(String(content1)) // "Hello, World!"
val content2 = fromPartialBytes.toByteArray()
println(String(content2)) // "ell"
}Create bidirectional channels for both reading and writing.
/**
* Sequential byte channel implementation supporting both read and write operations
* @param autoFlush if true, automatically flush after each write operation
*/
class ByteChannel(autoFlush: Boolean = false) : ByteReadChannel, BufferedByteWriteChannelUsage Examples:
import io.ktor.utils.io.*
import kotlinx.coroutines.*
suspend fun byteChannelExamples() {
// Basic channel
val channel = ByteChannel()
// Auto-flushing channel
val autoFlushChannel = ByteChannel(autoFlush = true)
// Producer-consumer pattern
val communicationChannel = ByteChannel()
// Producer coroutine
launch {
communicationChannel.writeStringUtf8("Message 1\n")
communicationChannel.writeStringUtf8("Message 2\n")
communicationChannel.writeStringUtf8("Message 3\n")
communicationChannel.flushAndClose()
}
// Consumer coroutine
launch {
while (!communicationChannel.isClosedForRead) {
val line = communicationChannel.readUTF8Line()
if (line != null) {
println("Received: $line")
}
}
}
}Create channels with additional functionality through wrapper implementations.
/** Channel that tracks total bytes read */
class CountedByteReadChannel(delegate: ByteReadChannel) : ByteReadChannel {
val totalBytesRead: Long
}
/** Channel that tracks total bytes written */
class CountedByteWriteChannel(delegate: ByteWriteChannel) : ByteWriteChannel {
val totalBytesWritten: Long
}
/** Channel that executes a hook when closed */
class CloseHookByteWriteChannel(
delegate: ByteWriteChannel,
hook: () -> Unit
) : ByteWriteChannel
/** ByteReadChannel backed by kotlinx-io Source */
class SourceByteReadChannel(source: Source) : ByteReadChannel
/** ByteWriteChannel backed by kotlinx-io Sink */
class SinkByteWriteChannel(sink: Sink) : ByteWriteChannelUsage Examples:
import io.ktor.utils.io.*
import kotlinx.io.*
suspend fun specializedChannelExamples() {
// Counted read channel
val baseChannel = ByteReadChannel("Hello World!")
val countedRead = CountedByteReadChannel(baseChannel)
val data = countedRead.toByteArray()
println("Read ${countedRead.totalBytesRead} bytes: ${String(data)}")
// Counted write channel
val writeChannel = ByteChannel()
val countedWrite = CountedByteWriteChannel(writeChannel)
countedWrite.writeStringUtf8("Test message")
countedWrite.flush()
println("Wrote ${countedWrite.totalBytesWritten} bytes")
// Close hook channel
var closed = false
val hookChannel = CloseHookByteWriteChannel(ByteChannel()) {
closed = true
println("Channel was closed!")
}
hookChannel.writeStringUtf8("Data")
hookChannel.flushAndClose()
// Prints: "Channel was closed!"
// Source/Sink adapters
val buffer = Buffer()
buffer.writeUtf8("From kotlinx-io")
val sourceChannel = SourceByteReadChannel(buffer)
val content = sourceChannel.toByteArray()
println(String(content)) // "From kotlinx-io"
val sink = Buffer()
val sinkChannel = SinkByteWriteChannel(sink)
sinkChannel.writeStringUtf8("To kotlinx-io")
sinkChannel.flush()
}Pre-built channel instances for common use cases.
/** Empty read channel that is always closed */
val ByteReadChannel.Empty: ByteReadChannelUsage Examples:
import io.ktor.utils.io.*
suspend fun emptyChannelExample() {
val empty = ByteReadChannel.Empty
println("Is closed: ${empty.isClosedForRead}") // true
println("Has content: ${empty.awaitContent()}") // false
// Attempting to read will immediately return EOF
val line = empty.readUTF8Line()
println("Line read: $line") // null
}Factory-like coroutine builders that create channels with associated coroutines.
/** Create a reader coroutine with an associated ByteReadChannel */
fun CoroutineScope.reader(
context: CoroutineContext = EmptyCoroutineContext,
block: suspend ByteWriteChannel.() -> Unit
): ReaderJob
/** Create a writer coroutine with an associated ByteWriteChannel */
fun CoroutineScope.writer(
context: CoroutineContext = EmptyCoroutineContext,
block: suspend ByteWriteChannel.() -> Unit
): WriterJobUsage Examples:
import io.ktor.utils.io.*
import kotlinx.coroutines.*
suspend fun coroutineFactoryExamples() {
val scope = CoroutineScope(Dispatchers.IO)
// Create a reader that produces data
val readerJob = scope.reader {
repeat(5) { i ->
writeStringUtf8("Line $i\n")
}
flushAndClose()
}
// Consume data from the reader's channel
val channel = readerJob.channel
while (!channel.isClosedForRead) {
val line = channel.readUTF8Line()
if (line != null) {
println("Consumed: $line")
}
}
readerJob.join()
// Create a writer that consumes data
val writerJob = scope.writer {
writeStringUtf8("Data written by writer coroutine")
flushAndClose()
}
// Write data to the writer's channel
val writerChannel = writerJob.channel
val result = writerChannel.toByteArray()
println("Writer produced: ${String(result)}")
writerJob.join()
}// From existing data
val channel1 = ByteReadChannel("text content")
val channel2 = ByteReadChannel(byteArrayOf(1, 2, 3, 4))
// From kotlinx-io primitives
val buffer = Buffer()
val channel3 = ByteReadChannel(buffer)// For communication between coroutines
val pipe = ByteChannel()
// For auto-flushing scenarios
val autoFlush = ByteChannel(autoFlush = true)// Add counting capability
val counted = CountedByteReadChannel(baseChannel)
// Add close hooks
val withHook = CloseHookByteWriteChannel(baseChannel) {
println("Cleanup completed")
}
// Adapt kotlinx-io types
val adapted = SourceByteReadChannel(source)