Ktor utilities library for JVM platform containing common utility functions, cryptographic operations, date handling, logging utilities, pipeline functionality, I/O adapters, encoding/decoding utilities, network address handling, and various platform-specific implementations for the Ktor framework.
—
Asynchronous execution pipeline with configurable phases for request/response processing. The pipeline system provides a powerful framework for building extensible, interceptable processing chains with support for phases, interceptors, and contextual execution.
Main pipeline implementation for executing asynchronous, extensible computations.
/**
* Represents an execution pipeline for asynchronous extensible computations
*/
open class Pipeline<TSubject : Any, TContext : Any>(vararg phases: PipelinePhase) {
/** Common place to store pipeline attributes */
val attributes: Attributes
/** Indicates if debug mode is enabled for detailed stacktraces */
open val developmentMode: Boolean
/** Phases of this pipeline */
val items: List<PipelinePhase>
/** Returns true if there are no interceptors installed */
val isEmpty: Boolean
/** Executes pipeline in given context with given subject */
suspend fun execute(context: TContext, subject: TSubject): TSubject
/** Adds phase to the end of pipeline */
fun addPhase(phase: PipelinePhase)
/** Inserts phase relative to another phase */
fun insertPhaseAfter(reference: PipelinePhase, phase: PipelinePhase)
fun insertPhaseBefore(reference: PipelinePhase, phase: PipelinePhase)
/** Installs interceptor for specific phase */
fun intercept(phase: PipelinePhase, block: PipelineInterceptor<TSubject, TContext>)
/** Checks if phase exists in pipeline */
fun hasPhase(phase: PipelinePhase): Boolean
/** Merges another pipeline into this one */
fun merge(from: Pipeline<TSubject, TContext>)
}Usage Examples:
import io.ktor.util.pipeline.*
// Define custom phases
val Setup = PipelinePhase("Setup")
val Processing = PipelinePhase("Processing")
val Cleanup = PipelinePhase("Cleanup")
// Create pipeline
val pipeline = Pipeline<String, Unit>(Setup, Processing, Cleanup)
// Install interceptors
pipeline.intercept(Setup) { data ->
println("Setting up processing for: $data")
// Modify subject or context as needed
}
pipeline.intercept(Processing) { data ->
println("Processing: $data")
val processed = data.uppercase()
proceed(processed) // Continue with modified subject
}
pipeline.intercept(Cleanup) { data ->
println("Cleaning up after: $data")
}
// Execute pipeline
val result = pipeline.execute(Unit, "hello world")
println("Final result: $result")Represent distinct phases in pipeline execution with ordering and relationships.
/**
* Represents a phase in pipeline execution
*/
class PipelinePhase(val name: String) {
override fun toString(): String = name
}
/**
* Interface for defining relationships between pipeline phases
*/
interface PipelinePhaseRelation {
/** Phase that this relation is relative to */
val reference: PipelinePhase
/** Insert phase after reference */
object After : PipelinePhaseRelation
/** Insert phase before reference */
object Before : PipelinePhaseRelation
/** Insert phase at the end */
object Last : PipelinePhaseRelation
}Usage Examples:
import io.ktor.util.pipeline.*
// Create phases
val Authentication = PipelinePhase("Authentication")
val Authorization = PipelinePhase("Authorization")
val Processing = PipelinePhase("Processing")
// Create pipeline with phase ordering
val pipeline = Pipeline<Request, Context>()
// Add phases in specific order
pipeline.addPhase(Authentication)
pipeline.insertPhaseAfter(Authentication, Authorization)
pipeline.insertPhaseAfter(Authorization, Processing)
// Or define relationships when adding
pipeline.insertPhaseBefore(Processing, PipelinePhase("Validation"))Execution context providing access to pipeline state and control flow.
/**
* Context for pipeline execution providing access to subject and control flow
*/
class PipelineContext<TSubject : Any, TContext : Any>(
private val context: TContext,
private var subject: TSubject
) {
/** The current subject being processed */
val subject: TSubject get() = this.subject
/** The execution context */
val context: TContext get() = this.context
/** Proceed to next interceptor with current subject */
suspend fun proceed(): TSubject
/** Proceed to next interceptor with modified subject */
suspend fun proceed(subject: TSubject): TSubject
/** Finish pipeline execution with current subject */
suspend fun finish(): TSubject
/** Finish pipeline execution with modified subject */
suspend fun finish(subject: TSubject): TSubject
}Usage Examples:
import io.ktor.util.pipeline.*
data class Request(val path: String, val method: String)
data class Context(val userId: String?)
val pipeline = Pipeline<Request, Context>()
pipeline.intercept(Authentication) { request ->
if (context.userId == null) {
// Modify request or throw exception
throw SecurityException("Authentication required")
}
proceed() // Continue with current request
}
pipeline.intercept(Processing) { request ->
val modifiedRequest = request.copy(path = request.path.lowercase())
proceed(modifiedRequest) // Continue with modified request
}
pipeline.intercept(Cleanup) { request ->
println("Processing complete for: ${request.path}")
finish() // End pipeline execution
}Function types and utilities for defining pipeline interceptors.
/**
* Type alias for pipeline interceptor functions
*/
typealias PipelineInterceptor<TSubject, TContext> =
suspend PipelineContext<TSubject, TContext>.(TSubject) -> Unit
/**
* Creates a pipeline context for execution
*/
fun <TSubject : Any, TContext : Any> Pipeline<TSubject, TContext>.createContext(
context: TContext,
subject: TSubject,
coroutineContext: CoroutineContext
): PipelineContext<TSubject, TContext>Usage Examples:
import io.ktor.util.pipeline.*
// Define reusable interceptors
val loggingInterceptor: PipelineInterceptor<String, Unit> = { subject ->
println("Processing: $subject")
proceed()
}
val transformInterceptor: PipelineInterceptor<String, Unit> = { subject ->
val transformed = subject.trim().lowercase()
proceed(transformed)
}
// Install interceptors
pipeline.intercept(Processing, loggingInterceptor)
pipeline.intercept(Processing, transformInterceptor)
// Create custom interceptor with complex logic
pipeline.intercept(Validation) { subject ->
if (subject.isBlank()) {
throw IllegalArgumentException("Subject cannot be blank")
}
val validated = subject.take(100) // Limit length
proceed(validated)
}Additional utilities for complex pipeline scenarios.
/**
* Suspend function gun for optimized pipeline execution
*/
class SuspendFunctionGun<TSubject : Any, TContext : Any> {
/** Execute pipeline with optimized suspend function handling */
suspend fun execute(
initial: TSubject,
context: PipelineContext<TSubject, TContext>
): TSubject
}
/**
* Phase content container for interceptors and metadata
*/
class PhaseContent<TSubject : Any, TContext : Any>(
val phase: PipelinePhase,
val relation: PipelinePhaseRelation
) {
/** Add interceptor to this phase */
fun addInterceptor(interceptor: PipelineInterceptor<TSubject, TContext>)
/** Remove interceptor from this phase */
fun removeInterceptor(interceptor: PipelineInterceptor<TSubject, TContext>)
/** Get all interceptors for this phase */
val interceptors: List<PipelineInterceptor<TSubject, TContext>>
}Usage Examples:
import io.ktor.util.pipeline.*
// Advanced pipeline with custom phase content
class CustomPipeline<TSubject : Any, TContext : Any> : Pipeline<TSubject, TContext>() {
override val developmentMode: Boolean = true // Enable debug mode
init {
// Configure custom phase relationships
val setupPhase = PipelinePhase("Setup")
val mainPhase = PipelinePhase("Main")
val finishPhase = PipelinePhase("Finish")
addPhase(setupPhase)
insertPhaseAfter(setupPhase, mainPhase)
insertPhaseAfter(mainPhase, finishPhase)
}
}
// Use custom pipeline
val customPipeline = CustomPipeline<String, Map<String, Any>>()
customPipeline.intercept(PipelinePhase("Setup")) { subject ->
// Access pipeline attributes
attributes.put(AttributeKey("startTime"), System.currentTimeMillis())
proceed()
}Pipeline system includes enhanced error handling and debugging capabilities.
/**
* Stack trace recovery utilities for pipeline debugging
*/
object StackTraceRecover {
/** Recover stack trace information from pipeline execution */
fun recoverStackTrace(exception: Throwable, continuation: Continuation<*>)
}
/**
* Debug pipeline context with enhanced error reporting
*/
class DebugPipelineContext<TSubject : Any, TContext : Any> : PipelineContext<TSubject, TContext> {
/** Stack trace information for debugging */
val stackTraceRecover: StackTraceRecover?
}Usage Examples:
import io.ktor.util.pipeline.*
// Enable debug mode for better error reporting
class DebuggablePipeline<TSubject : Any, TContext : Any> : Pipeline<TSubject, TContext>() {
override val developmentMode: Boolean = true
}
val debugPipeline = DebuggablePipeline<String, Unit>()
debugPipeline.intercept(Processing) { subject ->
try {
// Potentially failing operation
val result = processData(subject)
proceed(result)
} catch (e: Exception) {
// Enhanced error information available in development mode
logger.error("Pipeline processing failed", e)
throw e
}
}Install with Tessl CLI
npx tessl i tessl/maven-io-ktor--ktor-utils-jvm