0
# Pipeline System
1
2
Asynchronous execution pipeline with configurable phases for request/response processing. The pipeline system provides a powerful framework for building extensible, interceptable processing chains with support for phases, interceptors, and contextual execution.
3
4
## Capabilities
5
6
### Pipeline Class
7
8
Main pipeline implementation for executing asynchronous, extensible computations.
9
10
```kotlin { .api }
11
/**
12
* Represents an execution pipeline for asynchronous extensible computations
13
*/
14
open class Pipeline<TSubject : Any, TContext : Any>(vararg phases: PipelinePhase) {
15
/** Common place to store pipeline attributes */
16
val attributes: Attributes
17
18
/** Indicates if debug mode is enabled for detailed stacktraces */
19
open val developmentMode: Boolean
20
21
/** Phases of this pipeline */
22
val items: List<PipelinePhase>
23
24
/** Returns true if there are no interceptors installed */
25
val isEmpty: Boolean
26
27
/** Executes pipeline in given context with given subject */
28
suspend fun execute(context: TContext, subject: TSubject): TSubject
29
30
/** Adds phase to the end of pipeline */
31
fun addPhase(phase: PipelinePhase)
32
33
/** Inserts phase relative to another phase */
34
fun insertPhaseAfter(reference: PipelinePhase, phase: PipelinePhase)
35
fun insertPhaseBefore(reference: PipelinePhase, phase: PipelinePhase)
36
37
/** Installs interceptor for specific phase */
38
fun intercept(phase: PipelinePhase, block: PipelineInterceptor<TSubject, TContext>)
39
40
/** Checks if phase exists in pipeline */
41
fun hasPhase(phase: PipelinePhase): Boolean
42
43
/** Merges another pipeline into this one */
44
fun merge(from: Pipeline<TSubject, TContext>)
45
}
46
```
47
48
**Usage Examples:**
49
50
```kotlin
51
import io.ktor.util.pipeline.*
52
53
// Define custom phases
54
val Setup = PipelinePhase("Setup")
55
val Processing = PipelinePhase("Processing")
56
val Cleanup = PipelinePhase("Cleanup")
57
58
// Create pipeline
59
val pipeline = Pipeline<String, Unit>(Setup, Processing, Cleanup)
60
61
// Install interceptors
62
pipeline.intercept(Setup) { data ->
63
println("Setting up processing for: $data")
64
// Modify subject or context as needed
65
}
66
67
pipeline.intercept(Processing) { data ->
68
println("Processing: $data")
69
val processed = data.uppercase()
70
proceed(processed) // Continue with modified subject
71
}
72
73
pipeline.intercept(Cleanup) { data ->
74
println("Cleaning up after: $data")
75
}
76
77
// Execute pipeline
78
val result = pipeline.execute(Unit, "hello world")
79
println("Final result: $result")
80
```
81
82
### Pipeline Phases
83
84
Represent distinct phases in pipeline execution with ordering and relationships.
85
86
```kotlin { .api }
87
/**
88
* Represents a phase in pipeline execution
89
*/
90
class PipelinePhase(val name: String) {
91
override fun toString(): String = name
92
}
93
94
/**
95
* Interface for defining relationships between pipeline phases
96
*/
97
interface PipelinePhaseRelation {
98
/** Phase that this relation is relative to */
99
val reference: PipelinePhase
100
101
/** Insert phase after reference */
102
object After : PipelinePhaseRelation
103
104
/** Insert phase before reference */
105
object Before : PipelinePhaseRelation
106
107
/** Insert phase at the end */
108
object Last : PipelinePhaseRelation
109
}
110
```
111
112
**Usage Examples:**
113
114
```kotlin
115
import io.ktor.util.pipeline.*
116
117
// Create phases
118
val Authentication = PipelinePhase("Authentication")
119
val Authorization = PipelinePhase("Authorization")
120
val Processing = PipelinePhase("Processing")
121
122
// Create pipeline with phase ordering
123
val pipeline = Pipeline<Request, Context>()
124
125
// Add phases in specific order
126
pipeline.addPhase(Authentication)
127
pipeline.insertPhaseAfter(Authentication, Authorization)
128
pipeline.insertPhaseAfter(Authorization, Processing)
129
130
// Or define relationships when adding
131
pipeline.insertPhaseBefore(Processing, PipelinePhase("Validation"))
132
```
133
134
### Pipeline Context
135
136
Execution context providing access to pipeline state and control flow.
137
138
```kotlin { .api }
139
/**
140
* Context for pipeline execution providing access to subject and control flow
141
*/
142
class PipelineContext<TSubject : Any, TContext : Any>(
143
private val context: TContext,
144
private var subject: TSubject
145
) {
146
/** The current subject being processed */
147
val subject: TSubject get() = this.subject
148
149
/** The execution context */
150
val context: TContext get() = this.context
151
152
/** Proceed to next interceptor with current subject */
153
suspend fun proceed(): TSubject
154
155
/** Proceed to next interceptor with modified subject */
156
suspend fun proceed(subject: TSubject): TSubject
157
158
/** Finish pipeline execution with current subject */
159
suspend fun finish(): TSubject
160
161
/** Finish pipeline execution with modified subject */
162
suspend fun finish(subject: TSubject): TSubject
163
}
164
```
165
166
**Usage Examples:**
167
168
```kotlin
169
import io.ktor.util.pipeline.*
170
171
data class Request(val path: String, val method: String)
172
data class Context(val userId: String?)
173
174
val pipeline = Pipeline<Request, Context>()
175
176
pipeline.intercept(Authentication) { request ->
177
if (context.userId == null) {
178
// Modify request or throw exception
179
throw SecurityException("Authentication required")
180
}
181
proceed() // Continue with current request
182
}
183
184
pipeline.intercept(Processing) { request ->
185
val modifiedRequest = request.copy(path = request.path.lowercase())
186
proceed(modifiedRequest) // Continue with modified request
187
}
188
189
pipeline.intercept(Cleanup) { request ->
190
println("Processing complete for: ${request.path}")
191
finish() // End pipeline execution
192
}
193
```
194
195
### Pipeline Interceptors
196
197
Function types and utilities for defining pipeline interceptors.
198
199
```kotlin { .api }
200
/**
201
* Type alias for pipeline interceptor functions
202
*/
203
typealias PipelineInterceptor<TSubject, TContext> =
204
suspend PipelineContext<TSubject, TContext>.(TSubject) -> Unit
205
206
/**
207
* Creates a pipeline context for execution
208
*/
209
fun <TSubject : Any, TContext : Any> Pipeline<TSubject, TContext>.createContext(
210
context: TContext,
211
subject: TSubject,
212
coroutineContext: CoroutineContext
213
): PipelineContext<TSubject, TContext>
214
```
215
216
**Usage Examples:**
217
218
```kotlin
219
import io.ktor.util.pipeline.*
220
221
// Define reusable interceptors
222
val loggingInterceptor: PipelineInterceptor<String, Unit> = { subject ->
223
println("Processing: $subject")
224
proceed()
225
}
226
227
val transformInterceptor: PipelineInterceptor<String, Unit> = { subject ->
228
val transformed = subject.trim().lowercase()
229
proceed(transformed)
230
}
231
232
// Install interceptors
233
pipeline.intercept(Processing, loggingInterceptor)
234
pipeline.intercept(Processing, transformInterceptor)
235
236
// Create custom interceptor with complex logic
237
pipeline.intercept(Validation) { subject ->
238
if (subject.isBlank()) {
239
throw IllegalArgumentException("Subject cannot be blank")
240
}
241
242
val validated = subject.take(100) // Limit length
243
proceed(validated)
244
}
245
```
246
247
### Advanced Pipeline Features
248
249
Additional utilities for complex pipeline scenarios.
250
251
```kotlin { .api }
252
/**
253
* Suspend function gun for optimized pipeline execution
254
*/
255
class SuspendFunctionGun<TSubject : Any, TContext : Any> {
256
/** Execute pipeline with optimized suspend function handling */
257
suspend fun execute(
258
initial: TSubject,
259
context: PipelineContext<TSubject, TContext>
260
): TSubject
261
}
262
263
/**
264
* Phase content container for interceptors and metadata
265
*/
266
class PhaseContent<TSubject : Any, TContext : Any>(
267
val phase: PipelinePhase,
268
val relation: PipelinePhaseRelation
269
) {
270
/** Add interceptor to this phase */
271
fun addInterceptor(interceptor: PipelineInterceptor<TSubject, TContext>)
272
273
/** Remove interceptor from this phase */
274
fun removeInterceptor(interceptor: PipelineInterceptor<TSubject, TContext>)
275
276
/** Get all interceptors for this phase */
277
val interceptors: List<PipelineInterceptor<TSubject, TContext>>
278
}
279
```
280
281
**Usage Examples:**
282
283
```kotlin
284
import io.ktor.util.pipeline.*
285
286
// Advanced pipeline with custom phase content
287
class CustomPipeline<TSubject : Any, TContext : Any> : Pipeline<TSubject, TContext>() {
288
override val developmentMode: Boolean = true // Enable debug mode
289
290
init {
291
// Configure custom phase relationships
292
val setupPhase = PipelinePhase("Setup")
293
val mainPhase = PipelinePhase("Main")
294
val finishPhase = PipelinePhase("Finish")
295
296
addPhase(setupPhase)
297
insertPhaseAfter(setupPhase, mainPhase)
298
insertPhaseAfter(mainPhase, finishPhase)
299
}
300
}
301
302
// Use custom pipeline
303
val customPipeline = CustomPipeline<String, Map<String, Any>>()
304
305
customPipeline.intercept(PipelinePhase("Setup")) { subject ->
306
// Access pipeline attributes
307
attributes.put(AttributeKey("startTime"), System.currentTimeMillis())
308
proceed()
309
}
310
```
311
312
### Error Handling and Stack Traces
313
314
Pipeline system includes enhanced error handling and debugging capabilities.
315
316
```kotlin { .api }
317
/**
318
* Stack trace recovery utilities for pipeline debugging
319
*/
320
object StackTraceRecover {
321
/** Recover stack trace information from pipeline execution */
322
fun recoverStackTrace(exception: Throwable, continuation: Continuation<*>)
323
}
324
325
/**
326
* Debug pipeline context with enhanced error reporting
327
*/
328
class DebugPipelineContext<TSubject : Any, TContext : Any> : PipelineContext<TSubject, TContext> {
329
/** Stack trace information for debugging */
330
val stackTraceRecover: StackTraceRecover?
331
}
332
```
333
334
**Usage Examples:**
335
336
```kotlin
337
import io.ktor.util.pipeline.*
338
339
// Enable debug mode for better error reporting
340
class DebuggablePipeline<TSubject : Any, TContext : Any> : Pipeline<TSubject, TContext>() {
341
override val developmentMode: Boolean = true
342
}
343
344
val debugPipeline = DebuggablePipeline<String, Unit>()
345
346
debugPipeline.intercept(Processing) { subject ->
347
try {
348
// Potentially failing operation
349
val result = processData(subject)
350
proceed(result)
351
} catch (e: Exception) {
352
// Enhanced error information available in development mode
353
logger.error("Pipeline processing failed", e)
354
throw e
355
}
356
}
357
```