or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

classification.mdclustering.mdevaluation-tuning.mdfeature-engineering.mdindex.mdlinear-algebra.mdpipeline-components.mdrecommendation.mdregression.md

pipeline-components.mddocs/

0

# Pipeline Components

1

2

Core abstractions and utilities for building composable machine learning workflows with automated parameter management, model persistence, and metadata handling.

3

4

## Capabilities

5

6

### Core Pipeline Abstractions

7

8

Fundamental building blocks for creating machine learning pipelines with type-safe composition and parameter management.

9

10

```scala { .api }

11

/**

12

* Base pipeline stage that can be part of an ML pipeline

13

*/

14

abstract class PipelineStage extends Params with Logging {

15

def uid: String

16

def transformSchema(schema: StructType): StructType

17

def copy(extra: ParamMap): PipelineStage

18

}

19

20

/**

21

* Abstract estimator that fits models to data

22

*/

23

abstract class Estimator[M <: Model[M]] extends PipelineStage {

24

def fit(dataset: Dataset[_]): M

25

def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): Seq[M]

26

def fit(dataset: Dataset[_], paramMap: ParamMap): M

27

def fit(dataset: Dataset[_], firstParamPair: ParamPair[_], otherParamPairs: ParamPair[_]*): M

28

}

29

30

/**

31

* Abstract transformer that transforms datasets

32

*/

33

abstract class Transformer extends PipelineStage {

34

def transform(dataset: Dataset[_]): DataFrame

35

def transform(dataset: Dataset[_], paramMap: ParamMap): DataFrame

36

def transform(dataset: Dataset[_], firstParamPair: ParamPair[_], otherParamPairs: ParamPair[_]*): DataFrame

37

}

38

39

/**

40

* Abstract model that extends transformer (fitted estimators)

41

*/

42

abstract class Model[M <: Model[M]] extends Transformer {

43

def parent: Estimator[M]

44

def hasParent: Boolean

45

def setParent(parent: Estimator[M]): M

46

}

47

```

48

49

### Pipeline Construction

50

51

Composable pipeline classes for chaining multiple ML stages together.

52

53

```scala { .api }

54

/**

55

* ML Pipeline for chaining multiple pipeline stages

56

*/

57

class Pipeline extends Estimator[PipelineModel] {

58

def setStages(value: Array[_ <: PipelineStage]): this.type

59

def getStages: Array[PipelineStage]

60

}

61

62

class PipelineModel extends Model[PipelineModel] {

63

def stages: Array[Transformer]

64

override def transform(dataset: Dataset[_]): DataFrame

65

override def transformSchema(schema: StructType): StructType

66

}

67

```

68

69

**Usage Example:**

70

71

```scala

72

import org.apache.spark.ml.{Pipeline, PipelineModel}

73

import org.apache.spark.ml.classification.LogisticRegression

74

import org.apache.spark.ml.feature.{HashingTF, Tokenizer}

75

76

// Configure ML algorithms

77

val tokenizer = new Tokenizer()

78

.setInputCol("text")

79

.setOutputCol("words")

80

81

val hashingTF = new HashingTF()

82

.setNumFeatures(1000)

83

.setInputCol(tokenizer.getOutputCol)

84

.setOutputCol("features")

85

86

val lr = new LogisticRegression()

87

.setMaxIter(10)

88

.setRegParam(0.001)

89

90

// Create pipeline

91

val pipeline = new Pipeline()

92

.setStages(Array(tokenizer, hashingTF, lr))

93

94

// Fit the pipeline to training documents

95

val model = pipeline.fit(training)

96

97

// Make predictions on test documents

98

val predictions = model.transform(test)

99

```

100

101

### Parameter System

102

103

Comprehensive parameter management system with type safety and automatic validation.

104

105

```scala { .api }

106

/**

107

* Base trait for components with parameters

108

*/

109

trait Params extends Identifiable with Serializable {

110

def params: Array[Param[_]]

111

def explainParam(param: Param[_]): String

112

def explainParams(): String

113

def isSet(param: Param[_]): Boolean

114

def isDefined(param: Param[_]): Boolean

115

def hasDefault(param: Param[_]): Boolean

116

def getDefault[T](param: Param[T]): Option[T]

117

def extractParamMap(): ParamMap

118

def extractParamMap(extra: ParamMap): ParamMap

119

def copy(extra: ParamMap): Params

120

def set[T](param: Param[T], value: T): this.type

121

def clear(param: Param[_]): this.type

122

def get[T](param: Param[T]): Option[T]

123

def getOrDefault[T](param: Param[T]): T

124

def $[T](param: Param[T]): T

125

def setDefault[T](param: Param[T], value: T): this.type

126

def setDefault(paramPairs: ParamPair[_]*): this.type

127

def hasParam(paramName: String): Boolean

128

def getParam(paramName: String): Param[Any]

129

}

130

131

/**

132

* Typed parameter definition

133

*/

134

class Param[T] extends Serializable {

135

def name: String

136

def doc: String

137

def parent: String

138

def toString: String

139

def jsonEncode(value: T): String

140

def jsonDecode(json: String): T

141

def w(value: T): ParamPair[T]

142

def ->(value: T): ParamPair[T]

143

}

144

145

/**

146

* Parameter-value pairs

147

*/

148

case class ParamPair[T](param: Param[T], value: T)

149

150

/**

151

* Set of parameter values

152

*/

153

class ParamMap extends Serializable {

154

def put[T](param: Param[T], value: T): this.type

155

def put(paramPairs: ParamPair[_]*): this.type

156

def get[T](param: Param[T]): Option[T]

157

def apply[T](param: Param[T]): T

158

def contains(param: Param[_]): Boolean

159

def remove[T](param: Param[T]): this.type

160

def filter(f: ParamPair[_] => Boolean): ParamMap

161

def copy: ParamMap

162

def ++(other: ParamMap): ParamMap

163

def toSeq: Seq[ParamPair[_]]

164

def size: Int

165

}

166

```

167

168

### Model Persistence

169

170

MLlib's model saving and loading framework for production deployment.

171

172

```scala { .api }

173

/**

174

* Trait for ML components that can be written to storage

175

*/

176

trait MLWritable {

177

def write: MLWriter

178

def save(path: String): Unit

179

}

180

181

/**

182

* Abstract writer for ML components

183

*/

184

abstract class MLWriter extends Logging {

185

def context: SparkContext

186

def option(key: String, value: String): this.type

187

def options(options: Map[String, String]): this.type

188

def overwrite(): this.type

189

def save(path: String): Unit

190

protected def saveImpl(path: String): Unit

191

}

192

193

/**

194

* Trait for ML components that can be read from storage

195

*/

196

trait MLReadable[T] {

197

def read: MLReader[T]

198

def load(path: String): T

199

}

200

201

/**

202

* Abstract reader for ML components

203

*/

204

abstract class MLReader[T] extends Logging {

205

def context: SparkContext

206

def option(key: String, value: String): this.type

207

def options(options: Map[String, String]): this.type

208

def load(path: String): T

209

}

210

211

/**

212

* Default implementations for parameter persistence

213

*/

214

trait DefaultParamsWritable extends MLWritable {

215

self: Params =>

216

override def write: MLWriter = new DefaultParamsWriter(this)

217

}

218

219

trait DefaultParamsReadable[T] extends MLReadable[T] {

220

override def read: MLReader[T] = new DefaultParamsReader[T]

221

override def load(path: String): T = super.load(path)

222

}

223

```

224

225

**Usage Example:**

226

227

```scala

228

// Save a model

229

val model = pipeline.fit(trainingData)

230

model.write.overwrite().save("/path/to/model")

231

232

// Load a model

233

val loadedModel = PipelineModel.load("/path/to/model")

234

val predictions = loadedModel.transform(testData)

235

```

236

237

### Component Identification

238

239

Unique identification system for ML pipeline components.

240

241

```scala { .api }

242

/**

243

* Trait for objects with unique identifiers

244

*/

245

trait Identifiable {

246

val uid: String

247

def toString: String

248

}

249

250

/**

251

* Utilities for generating unique identifiers

252

*/

253

object Identifiable {

254

def randomUID(prefix: String): String

255

}

256

```

257

258

### Schema Utilities

259

260

Tools for working with DataFrame schemas in ML pipelines.

261

262

```scala { .api }

263

/**

264

* Schema validation and manipulation utilities

265

*/

266

object SchemaUtils {

267

def checkColumnType(

268

schema: StructType,

269

colName: String,

270

dataType: DataType,

271

msg: String = ""

272

): Unit

273

274

def checkColumnTypes(

275

schema: StructType,

276

colName: String,

277

dataTypes: Seq[DataType],

278

msg: String = ""

279

): Unit

280

281

def checkNumericType(

282

schema: StructType,

283

colName: String,

284

msg: String = ""

285

): Unit

286

287

def appendColumn(

288

schema: StructType,

289

colName: String,

290

dataType: DataType,

291

nullable: Boolean = false

292

): StructType

293

294

def appendColumns(

295

schema: StructType,

296

cols: Seq[StructField]

297

): StructType

298

}

299

```

300

301

### Metadata Utilities

302

303

Tools for handling DataFrame column metadata in ML contexts.

304

305

```scala { .api }

306

/**

307

* Metadata manipulation utilities for ML

308

*/

309

object MetadataUtils {

310

/**

311

* Gets the number of features from vector column metadata

312

*/

313

def getNumFeatures(dataset: Dataset[_], vectorCol: String): Int

314

315

/**

316

* Gets feature names from vector column metadata

317

*/

318

def getFeatureNames(dataset: Dataset[_], vectorCol: String): Option[Array[String]]

319

320

/**

321

* Gets categorical features metadata

322

*/

323

def getCategoricalFeatures(dataset: Dataset[_], featuresCol: String): Set[Int]

324

325

/**

326

* Creates metadata for vector columns

327

*/

328

def createVectorMetadata(

329

numFeatures: Int,

330

featureNames: Option[Array[String]] = None,

331

categoricalFeatures: Set[Int] = Set.empty

332

): Metadata

333

}

334

```

335

336

### Advanced Pipeline Features

337

338

Extended pipeline capabilities for complex ML workflows.

339

340

```scala { .api }

341

/**

342

* Pipeline utilities for advanced use cases

343

*/

344

object PipelineUtils {

345

/**

346

* Creates a pipeline from a sequence of stages

347

*/

348

def createPipeline(stages: PipelineStage*): Pipeline

349

350

/**

351

* Validates pipeline stage compatibility

352

*/

353

def validatePipeline(stages: Array[PipelineStage], inputSchema: StructType): Unit

354

355

/**

356

* Extracts the final estimator from a pipeline

357

*/

358

def getFinalEstimator(pipeline: Pipeline): Option[Estimator[_]]

359

360

/**

361

* Gets all transformers before the final estimator

362

*/

363

def getFeatureTransformers(pipeline: Pipeline): Array[Transformer]

364

}

365

366

/**

367

* Pipeline model utilities

368

*/

369

object PipelineModelUtils {

370

/**

371

* Extracts the final model from a fitted pipeline

372

*/

373

def getFinalModel(pipelineModel: PipelineModel): Option[Model[_]]

374

375

/**

376

* Gets all transformer stages except the final model

377

*/

378

def getFeatureTransformers(pipelineModel: PipelineModel): Array[Transformer]

379

380

/**

381

* Creates a new pipeline model with replaced stages

382

*/

383

def replaceStages(

384

pipelineModel: PipelineModel,

385

newStages: Array[Transformer]

386

): PipelineModel

387

}

388

```

389

390

### Cross-Validation Pipeline Integration

391

392

Integration between pipelines and cross-validation for comprehensive model selection.

393

394

```scala { .api }

395

/**

396

* Pipeline-aware cross-validation

397

*/

398

class PipelineCrossValidator extends CrossValidator {

399

/**

400

* Gets the best pipeline from cross-validation results

401

*/

402

def getBestPipeline: Pipeline

403

404

/**

405

* Gets feature importance from the best model if available

406

*/

407

def getFeatureImportances: Option[Vector]

408

409

/**

410

* Extracts the best hyperparameters for each pipeline stage

411

*/

412

def getBestParamsByStage: Map[String, ParamMap]

413

}

414

```

415

416

## Types

417

418

```scala { .api }

419

// Core pipeline imports

420

import org.apache.spark.ml._

421

import org.apache.spark.ml.param._

422

import org.apache.spark.ml.util._

423

import org.apache.spark.sql.{DataFrame, Dataset}

424

import org.apache.spark.sql.types.{StructType, StructField, DataType, Metadata}

425

426

// Pipeline stage types

427

import org.apache.spark.ml.{

428

Estimator,

429

Transformer,

430

Model,

431

Pipeline,

432

PipelineModel,

433

PipelineStage

434

}

435

436

// Parameter system types

437

import org.apache.spark.ml.param.{

438

Param,

439

ParamMap,

440

ParamPair,

441

Params

442

}

443

444

// Persistence types

445

import org.apache.spark.ml.util.{

446

MLWritable,

447

MLWriter,

448

MLReadable,

449

MLReader,

450

DefaultParamsWritable,

451

DefaultParamsReadable

452

}

453

454

// Utility types

455

import org.apache.spark.ml.util.{

456

Identifiable,

457

SchemaUtils,

458

MetadataUtils

459

}

460

```