or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

attributes.mdcollections.mdcompression.mdconversion.mdcrypto.mddatetime.mdindex.mdio.mdlogging.mdpipeline.mdstrings.md

pipeline.mddocs/

0

# Pipeline System

1

2

Asynchronous execution pipeline with configurable phases for request/response processing. The pipeline system provides a powerful framework for building extensible, interceptable processing chains with support for phases, interceptors, and contextual execution.

3

4

## Capabilities

5

6

### Pipeline Class

7

8

Main pipeline implementation for executing asynchronous, extensible computations.

9

10

```kotlin { .api }

11

/**

12

* Represents an execution pipeline for asynchronous extensible computations

13

*/

14

open class Pipeline<TSubject : Any, TContext : Any>(vararg phases: PipelinePhase) {

15

/** Common place to store pipeline attributes */

16

val attributes: Attributes

17

18

/** Indicates if debug mode is enabled for detailed stacktraces */

19

open val developmentMode: Boolean

20

21

/** Phases of this pipeline */

22

val items: List<PipelinePhase>

23

24

/** Returns true if there are no interceptors installed */

25

val isEmpty: Boolean

26

27

/** Executes pipeline in given context with given subject */

28

suspend fun execute(context: TContext, subject: TSubject): TSubject

29

30

/** Adds phase to the end of pipeline */

31

fun addPhase(phase: PipelinePhase)

32

33

/** Inserts phase relative to another phase */

34

fun insertPhaseAfter(reference: PipelinePhase, phase: PipelinePhase)

35

fun insertPhaseBefore(reference: PipelinePhase, phase: PipelinePhase)

36

37

/** Installs interceptor for specific phase */

38

fun intercept(phase: PipelinePhase, block: PipelineInterceptor<TSubject, TContext>)

39

40

/** Checks if phase exists in pipeline */

41

fun hasPhase(phase: PipelinePhase): Boolean

42

43

/** Merges another pipeline into this one */

44

fun merge(from: Pipeline<TSubject, TContext>)

45

}

46

```

47

48

**Usage Examples:**

49

50

```kotlin

51

import io.ktor.util.pipeline.*

52

53

// Define custom phases

54

val Setup = PipelinePhase("Setup")

55

val Processing = PipelinePhase("Processing")

56

val Cleanup = PipelinePhase("Cleanup")

57

58

// Create pipeline

59

val pipeline = Pipeline<String, Unit>(Setup, Processing, Cleanup)

60

61

// Install interceptors

62

pipeline.intercept(Setup) { data ->

63

println("Setting up processing for: $data")

64

// Modify subject or context as needed

65

}

66

67

pipeline.intercept(Processing) { data ->

68

println("Processing: $data")

69

val processed = data.uppercase()

70

proceed(processed) // Continue with modified subject

71

}

72

73

pipeline.intercept(Cleanup) { data ->

74

println("Cleaning up after: $data")

75

}

76

77

// Execute pipeline

78

val result = pipeline.execute(Unit, "hello world")

79

println("Final result: $result")

80

```

81

82

### Pipeline Phases

83

84

Represent distinct phases in pipeline execution with ordering and relationships.

85

86

```kotlin { .api }

87

/**

88

* Represents a phase in pipeline execution

89

*/

90

class PipelinePhase(val name: String) {

91

override fun toString(): String = name

92

}

93

94

/**

95

* Interface for defining relationships between pipeline phases

96

*/

97

interface PipelinePhaseRelation {

98

/** Phase that this relation is relative to */

99

val reference: PipelinePhase

100

101

/** Insert phase after reference */

102

object After : PipelinePhaseRelation

103

104

/** Insert phase before reference */

105

object Before : PipelinePhaseRelation

106

107

/** Insert phase at the end */

108

object Last : PipelinePhaseRelation

109

}

110

```

111

112

**Usage Examples:**

113

114

```kotlin

115

import io.ktor.util.pipeline.*

116

117

// Create phases

118

val Authentication = PipelinePhase("Authentication")

119

val Authorization = PipelinePhase("Authorization")

120

val Processing = PipelinePhase("Processing")

121

122

// Create pipeline with phase ordering

123

val pipeline = Pipeline<Request, Context>()

124

125

// Add phases in specific order

126

pipeline.addPhase(Authentication)

127

pipeline.insertPhaseAfter(Authentication, Authorization)

128

pipeline.insertPhaseAfter(Authorization, Processing)

129

130

// Or define relationships when adding

131

pipeline.insertPhaseBefore(Processing, PipelinePhase("Validation"))

132

```

133

134

### Pipeline Context

135

136

Execution context providing access to pipeline state and control flow.

137

138

```kotlin { .api }

139

/**

140

* Context for pipeline execution providing access to subject and control flow

141

*/

142

class PipelineContext<TSubject : Any, TContext : Any>(

143

private val context: TContext,

144

private var subject: TSubject

145

) {

146

/** The current subject being processed */

147

val subject: TSubject get() = this.subject

148

149

/** The execution context */

150

val context: TContext get() = this.context

151

152

/** Proceed to next interceptor with current subject */

153

suspend fun proceed(): TSubject

154

155

/** Proceed to next interceptor with modified subject */

156

suspend fun proceed(subject: TSubject): TSubject

157

158

/** Finish pipeline execution with current subject */

159

suspend fun finish(): TSubject

160

161

/** Finish pipeline execution with modified subject */

162

suspend fun finish(subject: TSubject): TSubject

163

}

164

```

165

166

**Usage Examples:**

167

168

```kotlin

169

import io.ktor.util.pipeline.*

170

171

data class Request(val path: String, val method: String)

172

data class Context(val userId: String?)

173

174

val pipeline = Pipeline<Request, Context>()

175

176

pipeline.intercept(Authentication) { request ->

177

if (context.userId == null) {

178

// Modify request or throw exception

179

throw SecurityException("Authentication required")

180

}

181

proceed() // Continue with current request

182

}

183

184

pipeline.intercept(Processing) { request ->

185

val modifiedRequest = request.copy(path = request.path.lowercase())

186

proceed(modifiedRequest) // Continue with modified request

187

}

188

189

pipeline.intercept(Cleanup) { request ->

190

println("Processing complete for: ${request.path}")

191

finish() // End pipeline execution

192

}

193

```

194

195

### Pipeline Interceptors

196

197

Function types and utilities for defining pipeline interceptors.

198

199

```kotlin { .api }

200

/**

201

* Type alias for pipeline interceptor functions

202

*/

203

typealias PipelineInterceptor<TSubject, TContext> =

204

suspend PipelineContext<TSubject, TContext>.(TSubject) -> Unit

205

206

/**

207

* Creates a pipeline context for execution

208

*/

209

fun <TSubject : Any, TContext : Any> Pipeline<TSubject, TContext>.createContext(

210

context: TContext,

211

subject: TSubject,

212

coroutineContext: CoroutineContext

213

): PipelineContext<TSubject, TContext>

214

```

215

216

**Usage Examples:**

217

218

```kotlin

219

import io.ktor.util.pipeline.*

220

221

// Define reusable interceptors

222

val loggingInterceptor: PipelineInterceptor<String, Unit> = { subject ->

223

println("Processing: $subject")

224

proceed()

225

}

226

227

val transformInterceptor: PipelineInterceptor<String, Unit> = { subject ->

228

val transformed = subject.trim().lowercase()

229

proceed(transformed)

230

}

231

232

// Install interceptors

233

pipeline.intercept(Processing, loggingInterceptor)

234

pipeline.intercept(Processing, transformInterceptor)

235

236

// Create custom interceptor with complex logic

237

pipeline.intercept(Validation) { subject ->

238

if (subject.isBlank()) {

239

throw IllegalArgumentException("Subject cannot be blank")

240

}

241

242

val validated = subject.take(100) // Limit length

243

proceed(validated)

244

}

245

```

246

247

### Advanced Pipeline Features

248

249

Additional utilities for complex pipeline scenarios.

250

251

```kotlin { .api }

252

/**

253

* Suspend function gun for optimized pipeline execution

254

*/

255

class SuspendFunctionGun<TSubject : Any, TContext : Any> {

256

/** Execute pipeline with optimized suspend function handling */

257

suspend fun execute(

258

initial: TSubject,

259

context: PipelineContext<TSubject, TContext>

260

): TSubject

261

}

262

263

/**

264

* Phase content container for interceptors and metadata

265

*/

266

class PhaseContent<TSubject : Any, TContext : Any>(

267

val phase: PipelinePhase,

268

val relation: PipelinePhaseRelation

269

) {

270

/** Add interceptor to this phase */

271

fun addInterceptor(interceptor: PipelineInterceptor<TSubject, TContext>)

272

273

/** Remove interceptor from this phase */

274

fun removeInterceptor(interceptor: PipelineInterceptor<TSubject, TContext>)

275

276

/** Get all interceptors for this phase */

277

val interceptors: List<PipelineInterceptor<TSubject, TContext>>

278

}

279

```

280

281

**Usage Examples:**

282

283

```kotlin

284

import io.ktor.util.pipeline.*

285

286

// Advanced pipeline with custom phase content

287

class CustomPipeline<TSubject : Any, TContext : Any> : Pipeline<TSubject, TContext>() {

288

override val developmentMode: Boolean = true // Enable debug mode

289

290

init {

291

// Configure custom phase relationships

292

val setupPhase = PipelinePhase("Setup")

293

val mainPhase = PipelinePhase("Main")

294

val finishPhase = PipelinePhase("Finish")

295

296

addPhase(setupPhase)

297

insertPhaseAfter(setupPhase, mainPhase)

298

insertPhaseAfter(mainPhase, finishPhase)

299

}

300

}

301

302

// Use custom pipeline

303

val customPipeline = CustomPipeline<String, Map<String, Any>>()

304

305

customPipeline.intercept(PipelinePhase("Setup")) { subject ->

306

// Access pipeline attributes

307

attributes.put(AttributeKey("startTime"), System.currentTimeMillis())

308

proceed()

309

}

310

```

311

312

### Error Handling and Stack Traces

313

314

Pipeline system includes enhanced error handling and debugging capabilities.

315

316

```kotlin { .api }

317

/**

318

* Stack trace recovery utilities for pipeline debugging

319

*/

320

object StackTraceRecover {

321

/** Recover stack trace information from pipeline execution */

322

fun recoverStackTrace(exception: Throwable, continuation: Continuation<*>)

323

}

324

325

/**

326

* Debug pipeline context with enhanced error reporting

327

*/

328

class DebugPipelineContext<TSubject : Any, TContext : Any> : PipelineContext<TSubject, TContext> {

329

/** Stack trace information for debugging */

330

val stackTraceRecover: StackTraceRecover?

331

}

332

```

333

334

**Usage Examples:**

335

336

```kotlin

337

import io.ktor.util.pipeline.*

338

339

// Enable debug mode for better error reporting

340

class DebuggablePipeline<TSubject : Any, TContext : Any> : Pipeline<TSubject, TContext>() {

341

override val developmentMode: Boolean = true

342

}

343

344

val debugPipeline = DebuggablePipeline<String, Unit>()

345

346

debugPipeline.intercept(Processing) { subject ->

347

try {

348

// Potentially failing operation

349

val result = processData(subject)

350

proceed(result)

351

} catch (e: Exception) {

352

// Enhanced error information available in development mode

353

logger.error("Pipeline processing failed", e)

354

throw e

355

}

356

}

357

```