or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

algorithms.mddistance-metrics.mdindex.mdlinear-algebra.mdoptimization.mdoutlier-detection.mdpipeline.mdpreprocessing.md

pipeline.mddocs/

0

# Pipeline Framework

1

2

Apache Flink ML provides a pipeline framework that enables building complex machine learning workflows by chaining transformers and predictors. The framework follows a similar design to scikit-learn with Estimator, Predictor, and Transformer abstractions.

3

4

## Core Pipeline Components

5

6

### Estimator

7

8

Base trait for components that can be fitted to data to learn parameters.

9

10

```scala { .api }

11

trait Estimator[Self] extends WithParameters {

12

def fit[Training](training: DataSet[Training])(implicit fitOperation: FitOperation[Self, Training]): Self

13

}

14

```

15

16

All machine learning algorithms in Flink ML extend `Estimator` to provide the `fit` method for training.

17

18

### Predictor

19

20

Extends `Estimator` to add prediction capabilities for supervised learning algorithms.

21

22

```scala { .api }

23

trait Predictor[Self] extends Estimator[Self] {

24

def predict[Testing](

25

testing: DataSet[Testing]

26

)(implicit predictOperation: PredictDataSetOperation[Self, Testing, Prediction]): DataSet[Prediction]

27

28

def evaluate[Testing, Prediction](

29

testing: DataSet[Testing]

30

)(implicit evaluateOperation: EvaluateDataSetOperation[Self, Testing, Prediction]): DataSet[Prediction]

31

}

32

```

33

34

**Usage Example:**

35

36

```scala

37

import org.apache.flink.ml.classification.SVM

38

39

val svm: SVM = SVM()

40

.setIterations(100)

41

.setRegularization(0.01)

42

43

// Fit the predictor

44

val trainedModel = svm.fit(trainingData)

45

46

// Make predictions

47

val predictions = trainedModel.predict(testData)

48

49

// Evaluate model performance

50

val evaluationResults = trainedModel.evaluate(testData)

51

```

52

53

### Transformer

54

55

Base trait for unsupervised learning components that transform data without learning from labels.

56

57

```scala { .api }

58

trait Transformer[Self] extends WithParameters {

59

def transform[Input](

60

input: DataSet[Input]

61

)(implicit transformOperation: TransformDataSetOperation[Self, Input, Output]): DataSet[Output]

62

63

def chainTransformer[T <: Transformer[T]](transformer: T): ChainedTransformer[Self, T]

64

def chainPredictor[P <: Predictor[P]](predictor: P): ChainedPredictor[Self, P]

65

}

66

```

67

68

**Usage Example:**

69

70

```scala

71

import org.apache.flink.ml.preprocessing.StandardScaler

72

73

val scaler: StandardScaler = StandardScaler()

74

.setMean(true)

75

.setStd(true)

76

77

// Fit the transformer (learn mean and std)

78

val fittedScaler = scaler.fit(trainingData)

79

80

// Transform data

81

val scaledData = fittedScaler.transform(trainingData)

82

```

83

84

## Chained Components

85

86

### Chained Transformer

87

88

Combines multiple transformers into a single pipeline component.

89

90

```scala { .api }

91

case class ChainedTransformer[L <: Transformer[L], R <: Transformer[R]](

92

left: L,

93

right: R

94

) extends Transformer[ChainedTransformer[L, R]] {

95

def transform[Input](input: DataSet[Input]): DataSet[Output]

96

}

97

```

98

99

### Chained Predictor

100

101

Combines transformers with a final predictor for end-to-end ML pipelines.

102

103

```scala { .api }

104

case class ChainedPredictor[T <: Transformer[T], P <: Predictor[P]](

105

transformer: T,

106

predictor: P

107

) extends Predictor[ChainedPredictor[T, P]] {

108

def fit[Training](training: DataSet[Training]): ChainedPredictor[T, P]

109

def predict[Testing](testing: DataSet[Testing]): DataSet[Prediction]

110

}

111

```

112

113

## Building Pipelines

114

115

### Simple Transformer Chain

116

117

```scala

118

import org.apache.flink.ml.preprocessing.{StandardScaler, MinMaxScaler, PolynomialFeatures}

119

120

val scaler1 = MinMaxScaler().setMin(0.0).setMax(1.0)

121

val polyFeatures = PolynomialFeatures().setDegree(2)

122

val scaler2 = StandardScaler()

123

124

// Chain transformers

125

val preprocessingPipeline = scaler1

126

.chainTransformer(polyFeatures)

127

.chainTransformer(scaler2)

128

129

// Fit and transform

130

val fittedPipeline = preprocessingPipeline.fit(trainingData)

131

val transformedData = fittedPipeline.transform(trainingData)

132

```

133

134

### Complete ML Pipeline

135

136

```scala

137

import org.apache.flink.ml.preprocessing.StandardScaler

138

import org.apache.flink.ml.classification.SVM

139

140

val scaler = StandardScaler()

141

val svm = SVM().setIterations(100).setRegularization(0.01)

142

143

// Create end-to-end pipeline

144

val mlPipeline = scaler.chainPredictor(svm)

145

146

// Fit entire pipeline

147

val trainedPipeline = mlPipeline.fit(trainingData)

148

149

// Make predictions (automatically applies scaling then SVM)

150

val predictions = trainedPipeline.predict(testData)

151

```

152

153

### Complex Multi-Stage Pipeline

154

155

```scala

156

import org.apache.flink.ml.preprocessing.{StandardScaler, PolynomialFeatures}

157

import org.apache.flink.ml.regression.MultipleLinearRegression

158

159

// Multi-stage preprocessing

160

val minMaxScaler = MinMaxScaler().setMin(-1.0).setMax(1.0)

161

val polyFeatures = PolynomialFeatures().setDegree(3)

162

val standardScaler = StandardScaler()

163

164

// Regression model

165

val regression = MultipleLinearRegression()

166

.setIterations(200)

167

.setStepsize(0.01)

168

169

// Build complex pipeline

170

val complexPipeline = minMaxScaler

171

.chainTransformer(polyFeatures)

172

.chainTransformer(standardScaler)

173

.chainPredictor(regression)

174

175

// Train pipeline

176

val trainedComplexPipeline = complexPipeline.fit(trainingData)

177

178

// Use pipeline

179

val predictions = trainedComplexPipeline.predict(testData)

180

```

181

182

## Type Class Operations

183

184

The pipeline framework uses type classes to provide flexible operations for different data types.

185

186

### Fit Operation

187

188

```scala { .api }

189

trait FitOperation[Self, Training] {

190

def fit(instance: Self, fitParameters: ParameterMap, input: DataSet[Training]): Self

191

}

192

```

193

194

### Transform Operations

195

196

```scala { .api }

197

trait TransformOperation[Instance, Model, Input, Output] {

198

def transform(

199

instance: Instance,

200

model: Model,

201

input: Input

202

): Output

203

}

204

205

trait TransformDataSetOperation[Instance, Input, Output] {

206

def transformDataSet(

207

instance: Instance,

208

transformParameters: ParameterMap,

209

input: DataSet[Input]

210

): DataSet[Output]

211

}

212

```

213

214

### Predict Operations

215

216

```scala { .api }

217

trait PredictOperation[Instance, Model, Testing, Prediction] {

218

def predict(instance: Instance, model: Model, testing: Testing): Prediction

219

}

220

221

trait PredictDataSetOperation[Instance, Testing, Prediction] {

222

def predictDataSet(

223

instance: Instance,

224

predictParameters: ParameterMap,

225

testing: DataSet[Testing]

226

): DataSet[Prediction]

227

}

228

```

229

230

### Evaluate Operation

231

232

```scala { .api }

233

trait EvaluateDataSetOperation[Instance, Testing, Prediction] {

234

def evaluateDataSet(

235

instance: Instance,

236

evaluateParameters: ParameterMap,

237

testing: DataSet[Testing]

238

): DataSet[Prediction]

239

}

240

```

241

242

## Parameter Management in Pipelines

243

244

Pipelines preserve and merge parameters from all components.

245

246

```scala

247

// Configure individual components

248

val scaler = StandardScaler()

249

.setMean(true)

250

.setStd(false)

251

252

val svm = SVM()

253

.setIterations(50)

254

.setRegularization(0.1)

255

256

// Create pipeline - parameters are preserved

257

val pipeline = scaler.chainPredictor(svm)

258

259

// Access combined parameters

260

val allParameters = pipeline.parameters

261

262

// You can still modify parameters of the chained pipeline

263

val modifiedPipeline = pipeline.set(SVM.Iterations, 100)

264

```

265

266

## Custom Pipeline Components

267

268

You can create custom transformers and predictors by implementing the respective traits.

269

270

### Custom Transformer Example

271

272

```scala

273

import org.apache.flink.ml.pipeline.Transformer

274

import org.apache.flink.ml.common.WithParameters

275

276

class LogTransformer extends Transformer[LogTransformer] with WithParameters {

277

def transform[Input](input: DataSet[Input]): DataSet[Output] = {

278

// Implementation depends on implicit TransformDataSetOperation

279

transformDataSet(this, parameters, input)

280

}

281

}

282

283

object LogTransformer {

284

def apply(): LogTransformer = new LogTransformer()

285

286

// Define implicit operations

287

implicit val logTransformVectors = new TransformDataSetOperation[LogTransformer, Vector, Vector] {

288

def transformDataSet(

289

instance: LogTransformer,

290

transformParameters: ParameterMap,

291

input: DataSet[Vector]

292

): DataSet[Vector] = {

293

input.map(vector => {

294

val logData = vector.toArray.map(x => if (x > 0) math.log(x) else 0.0)

295

DenseVector(logData)

296

})

297

}

298

}

299

}

300

301

// Usage

302

val logTransform = LogTransformer()

303

val scaler = StandardScaler()

304

val pipeline = logTransform.chainTransformer(scaler)

305

```

306

307

## Pipeline Persistence

308

309

While the core framework doesn't provide built-in persistence, you can save pipeline parameters and recreate pipelines:

310

311

```scala

312

// Save pipeline configuration

313

val scaler = StandardScaler().setMean(true).setStd(true)

314

val svm = SVM().setIterations(100).setRegularization(0.01)

315

val pipeline = scaler.chainPredictor(svm)

316

317

// Extract parameters for serialization

318

val scalerParams = scaler.parameters

319

val svmParams = svm.parameters

320

321

// Recreate pipeline later

322

val recreatedScaler = StandardScaler().setParameters(scalerParams)

323

val recreatedSVM = SVM().setParameters(svmParams)

324

val recreatedPipeline = recreatedScaler.chainPredictor(recreatedSVM)

325

```

326

327

## Error Handling in Pipelines

328

329

Pipeline components should handle errors gracefully:

330

331

```scala

332

try {

333

val trainedPipeline = pipeline.fit(trainingData)

334

val predictions = trainedPipeline.predict(testData)

335

} catch {

336

case e: IllegalArgumentException =>

337

println(s"Invalid parameters: ${e.getMessage}")

338

case e: RuntimeException =>

339

println(s"Runtime error in pipeline: ${e.getMessage}")

340

}

341

```

342

343

## Best Practices

344

345

1. **Chain transformers before predictors**: Always apply data transformations before supervised learning

346

2. **Fit on training data only**: Only use training data to fit transformers and predictors

347

3. **Preserve test data integrity**: Apply the same fitted transformations to test data

348

4. **Parameter validation**: Validate parameters before fitting pipelines

349

5. **Memory management**: Consider data persistence for large datasets in complex pipelines

350

351

```scala

352

// Good practice example

353

val pipeline = StandardScaler()

354

.chainTransformer(PolynomialFeatures().setDegree(2))

355

.chainPredictor(SVM().setIterations(100))

356

357

// Fit on training data only

358

val trainedPipeline = pipeline.fit(trainingData)

359

360

// Apply to test data

361

val predictions = trainedPipeline.predict(testData)

362

```