or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

classification.mdclustering.mdevaluation-tuning.mdfeature-engineering.mdindex.mdlinear-algebra.mdpipeline-components.mdrecommendation.mdregression.md
tile.json

pipeline-components.mddocs/

Pipeline Components

Core abstractions and utilities for building composable machine learning workflows with automated parameter management, model persistence, and metadata handling.

Capabilities

Core Pipeline Abstractions

Fundamental building blocks for creating machine learning pipelines with type-safe composition and parameter management.

/**
 * Base pipeline stage that can be part of an ML pipeline
 */
abstract class PipelineStage extends Params with Logging {
  def uid: String
  def transformSchema(schema: StructType): StructType
  def copy(extra: ParamMap): PipelineStage
}

/**
 * Abstract estimator that fits models to data
 */
abstract class Estimator[M <: Model[M]] extends PipelineStage {
  def fit(dataset: Dataset[_]): M
  def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): Seq[M]
  def fit(dataset: Dataset[_], paramMap: ParamMap): M
  def fit(dataset: Dataset[_], firstParamPair: ParamPair[_], otherParamPairs: ParamPair[_]*): M
}

/**
 * Abstract transformer that transforms datasets
 */
abstract class Transformer extends PipelineStage {
  def transform(dataset: Dataset[_]): DataFrame
  def transform(dataset: Dataset[_], paramMap: ParamMap): DataFrame
  def transform(dataset: Dataset[_], firstParamPair: ParamPair[_], otherParamPairs: ParamPair[_]*): DataFrame
}

/**
 * Abstract model that extends transformer (fitted estimators)
 */
abstract class Model[M <: Model[M]] extends Transformer {
  def parent: Estimator[M]
  def hasParent: Boolean
  def setParent(parent: Estimator[M]): M
}

Pipeline Construction

Composable pipeline classes for chaining multiple ML stages together.

/**
 * ML Pipeline for chaining multiple pipeline stages
 */
class Pipeline extends Estimator[PipelineModel] {
  def setStages(value: Array[_ <: PipelineStage]): this.type
  def getStages: Array[PipelineStage]
}

class PipelineModel extends Model[PipelineModel] {
  def stages: Array[Transformer]
  override def transform(dataset: Dataset[_]): DataFrame
  override def transformSchema(schema: StructType): StructType
}

Usage Example:

import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}

// Configure ML algorithms
val tokenizer = new Tokenizer()
  .setInputCol("text")
  .setOutputCol("words")

val hashingTF = new HashingTF()
  .setNumFeatures(1000)
  .setInputCol(tokenizer.getOutputCol)
  .setOutputCol("features")

val lr = new LogisticRegression()
  .setMaxIter(10)
  .setRegParam(0.001)

// Create pipeline
val pipeline = new Pipeline()
  .setStages(Array(tokenizer, hashingTF, lr))

// Fit the pipeline to training documents
val model = pipeline.fit(training)

// Make predictions on test documents
val predictions = model.transform(test)

Parameter System

Comprehensive parameter management system with type safety and automatic validation.

/**
 * Base trait for components with parameters
 */
trait Params extends Identifiable with Serializable {
  def params: Array[Param[_]]
  def explainParam(param: Param[_]): String
  def explainParams(): String
  def isSet(param: Param[_]): Boolean
  def isDefined(param: Param[_]): Boolean
  def hasDefault(param: Param[_]): Boolean
  def getDefault[T](param: Param[T]): Option[T]
  def extractParamMap(): ParamMap
  def extractParamMap(extra: ParamMap): ParamMap
  def copy(extra: ParamMap): Params
  def set[T](param: Param[T], value: T): this.type
  def clear(param: Param[_]): this.type
  def get[T](param: Param[T]): Option[T]
  def getOrDefault[T](param: Param[T]): T
  def $[T](param: Param[T]): T
  def setDefault[T](param: Param[T], value: T): this.type
  def setDefault(paramPairs: ParamPair[_]*): this.type
  def hasParam(paramName: String): Boolean
  def getParam(paramName: String): Param[Any]
}

/**
 * Typed parameter definition
 */
class Param[T] extends Serializable {
  def name: String
  def doc: String
  def parent: String
  def toString: String
  def jsonEncode(value: T): String
  def jsonDecode(json: String): T
  def w(value: T): ParamPair[T]
  def ->(value: T): ParamPair[T]
}

/**
 * Parameter-value pairs
 */
case class ParamPair[T](param: Param[T], value: T)

/**
 * Set of parameter values
 */
class ParamMap extends Serializable {
  def put[T](param: Param[T], value: T): this.type
  def put(paramPairs: ParamPair[_]*): this.type
  def get[T](param: Param[T]): Option[T]
  def apply[T](param: Param[T]): T
  def contains(param: Param[_]): Boolean
  def remove[T](param: Param[T]): this.type
  def filter(f: ParamPair[_] => Boolean): ParamMap
  def copy: ParamMap
  def ++(other: ParamMap): ParamMap
  def toSeq: Seq[ParamPair[_]]
  def size: Int
}

Model Persistence

MLlib's model saving and loading framework for production deployment.

/**
 * Trait for ML components that can be written to storage
 */
trait MLWritable {
  def write: MLWriter
  def save(path: String): Unit
}

/**
 * Abstract writer for ML components
 */
abstract class MLWriter extends Logging {
  def context: SparkContext
  def option(key: String, value: String): this.type
  def options(options: Map[String, String]): this.type
  def overwrite(): this.type
  def save(path: String): Unit
  protected def saveImpl(path: String): Unit
}

/**
 * Trait for ML components that can be read from storage
 */
trait MLReadable[T] {
  def read: MLReader[T]
  def load(path: String): T
}

/**
 * Abstract reader for ML components
 */
abstract class MLReader[T] extends Logging {
  def context: SparkContext
  def option(key: String, value: String): this.type
  def options(options: Map[String, String]): this.type
  def load(path: String): T
}

/**
 * Default implementations for parameter persistence
 */
trait DefaultParamsWritable extends MLWritable {
  self: Params =>
  override def write: MLWriter = new DefaultParamsWriter(this)
}

trait DefaultParamsReadable[T] extends MLReadable[T] {
  override def read: MLReader[T] = new DefaultParamsReader[T]
  override def load(path: String): T = super.load(path)
}

Usage Example:

// Save a model
val model = pipeline.fit(trainingData)
model.write.overwrite().save("/path/to/model")

// Load a model
val loadedModel = PipelineModel.load("/path/to/model")
val predictions = loadedModel.transform(testData)

Component Identification

Unique identification system for ML pipeline components.

/**
 * Trait for objects with unique identifiers
 */
trait Identifiable {
  val uid: String
  def toString: String
}

/**
 * Utilities for generating unique identifiers
 */
object Identifiable {
  def randomUID(prefix: String): String
}

Schema Utilities

Tools for working with DataFrame schemas in ML pipelines.

/**
 * Schema validation and manipulation utilities
 */
object SchemaUtils {
  def checkColumnType(
    schema: StructType,
    colName: String,
    dataType: DataType,
    msg: String = ""
  ): Unit
  
  def checkColumnTypes(
    schema: StructType,
    colName: String,
    dataTypes: Seq[DataType],
    msg: String = ""
  ): Unit
  
  def checkNumericType(
    schema: StructType,
    colName: String,
    msg: String = ""
  ): Unit
  
  def appendColumn(
    schema: StructType,
    colName: String,
    dataType: DataType,
    nullable: Boolean = false
  ): StructType
  
  def appendColumns(
    schema: StructType,
    cols: Seq[StructField]
  ): StructType
}

Metadata Utilities

Tools for handling DataFrame column metadata in ML contexts.

/**
 * Metadata manipulation utilities for ML
 */
object MetadataUtils {
  /**
   * Gets the number of features from vector column metadata
   */
  def getNumFeatures(dataset: Dataset[_], vectorCol: String): Int
  
  /**
   * Gets feature names from vector column metadata
   */
  def getFeatureNames(dataset: Dataset[_], vectorCol: String): Option[Array[String]]
  
  /**
   * Gets categorical features metadata
   */
  def getCategoricalFeatures(dataset: Dataset[_], featuresCol: String): Set[Int]
  
  /**
   * Creates metadata for vector columns
   */
  def createVectorMetadata(
    numFeatures: Int,
    featureNames: Option[Array[String]] = None,
    categoricalFeatures: Set[Int] = Set.empty
  ): Metadata
}

Advanced Pipeline Features

Extended pipeline capabilities for complex ML workflows.

/**
 * Pipeline utilities for advanced use cases
 */
object PipelineUtils {
  /**
   * Creates a pipeline from a sequence of stages
   */
  def createPipeline(stages: PipelineStage*): Pipeline
  
  /**
   * Validates pipeline stage compatibility
   */
  def validatePipeline(stages: Array[PipelineStage], inputSchema: StructType): Unit
  
  /**
   * Extracts the final estimator from a pipeline
   */
  def getFinalEstimator(pipeline: Pipeline): Option[Estimator[_]]
  
  /**
   * Gets all transformers before the final estimator
   */
  def getFeatureTransformers(pipeline: Pipeline): Array[Transformer]
}

/**
 * Pipeline model utilities
 */
object PipelineModelUtils {
  /**
   * Extracts the final model from a fitted pipeline
   */
  def getFinalModel(pipelineModel: PipelineModel): Option[Model[_]]
  
  /**
   * Gets all transformer stages except the final model
   */
  def getFeatureTransformers(pipelineModel: PipelineModel): Array[Transformer]
  
  /**
   * Creates a new pipeline model with replaced stages
   */
  def replaceStages(
    pipelineModel: PipelineModel,
    newStages: Array[Transformer]
  ): PipelineModel
}

Cross-Validation Pipeline Integration

Integration between pipelines and cross-validation for comprehensive model selection.

/**
 * Pipeline-aware cross-validation
 */
class PipelineCrossValidator extends CrossValidator {
  /**
   * Gets the best pipeline from cross-validation results
   */
  def getBestPipeline: Pipeline
  
  /**
   * Gets feature importance from the best model if available
   */
  def getFeatureImportances: Option[Vector]
  
  /**
   * Extracts the best hyperparameters for each pipeline stage
   */
  def getBestParamsByStage: Map[String, ParamMap]
}

Types

// Core pipeline imports
import org.apache.spark.ml._
import org.apache.spark.ml.param._
import org.apache.spark.ml.util._
import org.apache.spark.sql.{DataFrame, Dataset}
import org.apache.spark.sql.types.{StructType, StructField, DataType, Metadata}

// Pipeline stage types
import org.apache.spark.ml.{
  Estimator,
  Transformer,
  Model,
  Pipeline,
  PipelineModel,
  PipelineStage
}

// Parameter system types
import org.apache.spark.ml.param.{
  Param,
  ParamMap,
  ParamPair,
  Params
}

// Persistence types
import org.apache.spark.ml.util.{
  MLWritable,
  MLWriter,
  MLReadable,
  MLReader,
  DefaultParamsWritable,
  DefaultParamsReadable
}

// Utility types
import org.apache.spark.ml.util.{
  Identifiable,
  SchemaUtils,
  MetadataUtils
}