A modern I/O library for Android, Java, and Kotlin Multiplatform
—
This document covers Okio's helper functions, timeout management, advanced buffer operations, selection utilities, and platform-specific extensions.
Timeout provides configurable timeout policies for I/O operations, allowing you to set deadlines and prevent operations from blocking indefinitely.
expect open class Timeout {
// Timeout configuration
fun timeout(timeout: Long, unit: TimeUnit): Timeout
fun deadlineNanoTime(deadlineNanoTime: Long): Timeout
fun clearTimeout(): Timeout
fun clearDeadline(): Timeout
// Timeout checking
fun hasDeadline(): Boolean
fun deadlineNanoTime(): Long
fun timeoutNanos(): Long
// Timeout enforcement
fun throwIfReached()
fun waitUntilNotified(monitor: Any)
companion object {
val NONE: Timeout // Timeout that never times out
}
}// Basic timeout configuration
val timeout = Timeout()
.timeout(5, TimeUnit.SECONDS)
.deadlineNanoTime(System.nanoTime() + TimeUnit.MINUTES.toNanos(1))
// Check timeout status
if (timeout.hasDeadline()) {
val remainingTime = timeout.deadlineNanoTime() - System.nanoTime()
println("Time remaining: ${remainingTime / 1_000_000}ms")
}
// Using timeout with I/O operations
val fs = FileSystem.SYSTEM
val path = "/tmp/large-file.txt".toPath()
try {
val source = fs.source(path)
source.timeout().timeout(30, TimeUnit.SECONDS)
source.buffer().use { bufferedSource ->
while (!bufferedSource.exhausted()) {
// This will throw if timeout is exceeded
val data = bufferedSource.readByteString(8192)
processData(data)
}
}
} catch (e: IOException) {
if (e.message?.contains("timeout") == true) {
println("Operation timed out")
} else {
println("I/O error: ${e.message}")
}
}// Monitor long-running operation with timeout
fun monitoredOperation(operation: () -> Unit, timeoutSeconds: Long) {
val timeout = Timeout().timeout(timeoutSeconds, TimeUnit.SECONDS)
val startTime = System.nanoTime()
try {
operation()
val duration = (System.nanoTime() - startTime) / 1_000_000
println("Operation completed in ${duration}ms")
} catch (e: Exception) {
timeout.throwIfReached()
throw e
}
}Okio provides efficient selection mechanisms for matching byte sequences from streams.
class Options private constructor() : List<ByteString> {
override val size: Int
override fun get(index: Int): ByteString
override fun iterator(): Iterator<ByteString>
companion object {
fun of(vararg byteStrings: ByteString): Options
}
}class TypedOptions<T : Any> private constructor() : List<T> {
override val size: Int
override fun get(index: Int): T
companion object {
inline fun <T : Any> of(
values: Iterable<T>,
encode: (T) -> ByteString
): TypedOptions<T>
}
}expect sealed interface BufferedSource {
// Selection operations
fun select(options: Options): Int
fun <T : Any> select(options: TypedOptions<T>): T?
}// HTTP method parsing
val httpMethods = Options.of(
"GET".encodeUtf8(),
"POST".encodeUtf8(),
"PUT".encodeUtf8(),
"DELETE".encodeUtf8(),
"PATCH".encodeUtf8()
)
fun parseHttpMethod(request: BufferedSource): String? {
return when (request.select(httpMethods)) {
0 -> "GET"
1 -> "POST"
2 -> "PUT"
3 -> "DELETE"
4 -> "PATCH"
else -> null // No match found
}
}
// Usage
val requestBuffer = Buffer().writeUtf8("POST /api/users HTTP/1.1\r\n")
val method = parseHttpMethod(requestBuffer)
println("HTTP Method: $method")
// Typed options for enum-like behavior
enum class ResponseCode(val code: String) {
OK("200 OK"),
NOT_FOUND("404 Not Found"),
SERVER_ERROR("500 Internal Server Error")
}
val responseOptions = TypedOptions.of(
ResponseCode.values().asIterable()
) { code -> code.code.encodeUtf8() }
fun parseResponseCode(response: BufferedSource): ResponseCode? {
return response.select(responseOptions)
}
// Usage
val responseBuffer = Buffer().writeUtf8("404 Not Found\r\n")
val responseCode = parseResponseCode(responseBuffer)
println("Response Code: $responseCode")// Parse different message types in a protocol
sealed class Message
object PingMessage : Message()
data class DataMessage(val payload: String) : Message()
data class ErrorMessage(val error: String) : Message()
val messageTypes = Options.of(
"PING".encodeUtf8(),
"DATA".encodeUtf8(),
"ERROR".encodeUtf8()
)
fun parseMessage(source: BufferedSource): Message? {
return when (source.select(messageTypes)) {
0 -> {
source.readUtf8Line() // Consume rest of line
PingMessage
}
1 -> {
val length = source.readUtf8Line()?.toIntOrNull() ?: return null
val payload = source.readUtf8(length.toLong())
DataMessage(payload)
}
2 -> {
val error = source.readUtf8Line() ?: return null
ErrorMessage(error)
}
else -> null
}
}
// Test protocol parsing
val protocolBuffer = Buffer()
.writeUtf8("DATA\n")
.writeUtf8("12\n")
.writeUtf8("Hello, World")
val message = parseMessage(protocolBuffer)
println("Parsed message: $message")The UnsafeCursor provides low-level access to Buffer's internal segments for high-performance operations.
class Buffer.UnsafeCursor {
var buffer: Buffer?
var readWrite: Boolean
var offset: Long
var data: ByteArray?
var start: Int
var end: Int
fun next(): Int
fun seek(offset: Long): Int
fun resizeBuffer(newSize: Long): Long
fun expandBuffer(minByteCount: Int): Long
}
expect class Buffer {
fun readUnsafe(unsafeCursor: UnsafeCursor = UnsafeCursor()): UnsafeCursor
fun readAndWriteUnsafe(unsafeCursor: UnsafeCursor = UnsafeCursor()): UnsafeCursor
}// High-performance byte array search
fun findBytePattern(buffer: Buffer, pattern: ByteArray): Long {
var position = -1L
buffer.readUnsafe().use { cursor ->
cursor.seek(0L)
while (cursor.next() != -1) {
val segment = cursor.data ?: continue
// Search within current segment
for (i in cursor.start until cursor.end - pattern.size + 1) {
var match = true
for (j in pattern.indices) {
if (segment[i + j] != pattern[j]) {
match = false
break
}
}
if (match) {
position = cursor.offset + (i - cursor.start)
return position
}
}
}
}
return position
}
// Efficient byte manipulation
fun xorBuffer(buffer: Buffer, key: ByteArray) {
buffer.readAndWriteUnsafe().use { cursor ->
cursor.seek(0L)
var keyIndex = 0
while (cursor.next() != -1) {
val segment = cursor.data ?: continue
for (i in cursor.start until cursor.end) {
segment[i] = (segment[i].toInt() xor key[keyIndex % key.size].toInt()).toByte()
keyIndex++
}
}
}
}
// Usage
val buffer = Buffer().writeUtf8("Hello, Okio UnsafeCursor!")
val pattern = "Okio".encodeUtf8().toByteArray()
val position = findBytePattern(buffer, pattern)
println("Pattern found at position: $position")
// XOR encryption/decryption
val key = "SECRET".toByteArray()
val originalText = buffer.copy().readUtf8()
xorBuffer(buffer, key)
val encrypted = buffer.copy().readUtf8()
xorBuffer(buffer, key) // XOR again to decrypt
val decrypted = buffer.readUtf8()
println("Original: $originalText")
println("Encrypted: ${encrypted.encodeUtf8().hex()}")
println("Decrypted: $decrypted")// Automatic resource cleanup
inline fun <T : Closeable?, R> T.use(block: (T) -> R): R// Source and Sink utilities
fun Source.buffer(): BufferedSource
fun Sink.buffer(): BufferedSink
fun blackholeSink(): Sink// Safe resource handling
fun processFileWithCleanup(path: Path): String {
return FileSystem.SYSTEM.source(path).use { source ->
source.buffer().use { buffered ->
buffered.readUtf8()
}
}
}
// Blackhole sink for testing or discarding data
fun measureCompressionRatio(data: String): Double {
val originalSize = data.length
val compressedSize = Buffer()
blackholeSink().gzip().use { compressor ->
compressor.writeUtf8(data)
// Data is compressed but discarded
}
// In real scenario, you'd measure compressed size
return 0.5 // Placeholder
}
// Chaining buffer operations
fun transformData(input: String): String {
return Buffer()
.writeUtf8(input)
.let { buffer ->
// Transform to uppercase
val upperData = buffer.readUtf8().uppercase()
Buffer().writeUtf8(upperData)
}
.let { buffer ->
// Add prefix and suffix
buffer.writeUtf8(" [PROCESSED]")
"[DATA] ${buffer.readUtf8()}"
}
}
val result = transformData("hello world")
println(result) // [DATA] HELLO WORLD [PROCESSED]Okio provides specific extensions for Apple platforms to integrate with Foundation framework.
// Foundation NSData integration
fun NSData.toByteString(): ByteString// Converting Foundation NSData to Okio ByteString (iOS/macOS)
import platform.Foundation.NSData
import platform.Foundation.NSString
import platform.Foundation.dataUsingEncoding
import platform.Foundation.NSUTF8StringEncoding
fun foundationIntegration() {
// Create NSData from NSString
val nsString = NSString.create(string = "Hello from Foundation!")
val nsData = nsString.dataUsingEncoding(NSUTF8StringEncoding) ?: return
// Convert to Okio ByteString
val byteString = nsData.toByteString()
println("NSData converted to ByteString: ${byteString.utf8()}")
println("ByteString hex: ${byteString.hex()}")
// Use with Okio operations
val hash = byteString.sha256()
println("SHA-256: ${hash.hex()}")
}On native platforms (including iOS), Okio provides optimized implementations:
// Platform-specific lock implementation
expect class Lock() {
inline fun <T> withLock(action: () -> T): T
}// Thread-safe buffer operations
class ThreadSafeBuffer {
private val buffer = Buffer()
private val lock = Lock()
fun write(data: ByteString) {
lock.withLock {
buffer.write(data)
}
}
fun read(): ByteString? {
return lock.withLock {
if (buffer.size > 0) {
buffer.readByteString()
} else {
null
}
}
}
fun size(): Long {
return lock.withLock {
buffer.size
}
}
}
// Usage in multi-threaded environment
val safeBuffer = ThreadSafeBuffer()
// Thread 1: Writing data
Thread {
repeat(100) { i ->
safeBuffer.write("Message $i\n".encodeUtf8())
Thread.sleep(10)
}
}.start()
// Thread 2: Reading data
Thread {
repeat(50) {
val data = safeBuffer.read()
if (data != null) {
println("Read: ${data.utf8().trim()}")
}
Thread.sleep(20)
}
}.start()// Platform-specific exception implementations
expect open class IOException : Exception
expect class EOFException : IOException
expect class FileNotFoundException : IOException
expect class ProtocolException : IOException
expect class ArrayIndexOutOfBoundsException : Exception// Retry mechanism for I/O operations
fun <T> retryOperation(
maxAttempts: Int = 3,
delayMs: Long = 1000,
operation: () -> T
): T? {
repeat(maxAttempts) { attempt ->
try {
return operation()
} catch (e: IOException) {
println("Attempt ${attempt + 1} failed: ${e.message}")
if (attempt < maxAttempts - 1) {
Thread.sleep(delayMs)
}
}
}
return null
}
// Safe file operations with fallback
fun safeReadFile(path: Path, fallbackContent: String = ""): String {
return try {
FileSystem.SYSTEM.read(path) { readUtf8() }
} catch (e: FileNotFoundException) {
println("File not found, using fallback: $path")
fallbackContent
} catch (e: IOException) {
println("I/O error reading file: ${e.message}")
fallbackContent
}
}
// Graceful resource cleanup
fun <T : Closeable> safeClose(resource: T?) {
try {
resource?.close()
} catch (e: Exception) {
println("Error closing resource: ${e.message}")
}
}
// Usage
val result = retryOperation(maxAttempts = 3) {
FileSystem.SYSTEM.read("/tmp/unreliable-file.txt".toPath()) {
readUtf8()
}
}
if (result != null) {
println("Successfully read file: ${result.take(50)}...")
} else {
println("Failed to read file after retries")
}// Efficient buffer reuse
class BufferPool(private val maxSize: Int = 10) {
private val pool = mutableListOf<Buffer>()
private val lock = Lock()
fun acquire(): Buffer {
return lock.withLock {
if (pool.isNotEmpty()) {
pool.removeAt(pool.size - 1)
} else {
Buffer()
}
}
}
fun release(buffer: Buffer) {
lock.withLock {
if (pool.size < maxSize) {
buffer.clear()
pool.add(buffer)
}
}
}
inline fun <T> withBuffer(action: (Buffer) -> T): T {
val buffer = acquire()
try {
return action(buffer)
} finally {
release(buffer)
}
}
}
// Usage
val bufferPool = BufferPool()
// Efficient data processing without allocations
fun processData(data: List<String>): List<String> {
return data.map { input ->
bufferPool.withBuffer { buffer ->
buffer.writeUtf8(input.uppercase())
buffer.writeUtf8(" [PROCESSED]")
buffer.readUtf8()
}
}
}// Efficient batch file processing
fun processBatchFiles(filePaths: List<Path>, processor: (String) -> String) {
val fs = FileSystem.SYSTEM
filePaths.chunked(10).forEach { batch ->
batch.forEach { path ->
try {
val content = fs.read(path) { readUtf8() }
val processed = processor(content)
val outputPath = path.parent!! / "${path.name}.processed"
fs.write(outputPath) { writeUtf8(processed) }
} catch (e: Exception) {
println("Error processing $path: ${e.message}")
}
}
// Small delay between batches to prevent resource exhaustion
Thread.sleep(100)
}
}// Debug buffer contents
fun debugBuffer(buffer: Buffer, name: String = "Buffer") {
println("=== $name Debug Info ===")
println("Size: ${buffer.size} bytes")
if (buffer.size > 0) {
val snapshot = buffer.snapshot()
println("Hex: ${snapshot.hex()}")
// Try to display as text if printable
val text = try {
snapshot.utf8()
} catch (e: Exception) {
"[Non-UTF8 data]"
}
println("Text: ${text.take(100)}${if (text.length > 100) "..." else ""}")
}
println("========================")
}
// Trace I/O operations
class TracingSource(private val delegate: Source, private val name: String) : Source by delegate {
override fun read(sink: Buffer, byteCount: Long): Long {
val bytesRead = delegate.read(sink, byteCount)
println("$name: Read $bytesRead bytes (requested $byteCount)")
return bytesRead
}
}
class TracingSink(private val delegate: Sink, private val name: String) : Sink by delegate {
override fun write(source: Buffer, byteCount: Long) {
println("$name: Writing $byteCount bytes")
delegate.write(source, byteCount)
}
}
// Usage
val buffer = Buffer().writeUtf8("Hello, debugging!")
debugBuffer(buffer, "Test Buffer")
val tracingSource = TracingSource(buffer, "Debug Source")
val output = Buffer()
tracingSource.read(output, 5) // Will print tracing infoInstall with Tessl CLI
npx tessl i tessl/maven-com-squareup-okio--okio