Core abstractions and utilities for building composable machine learning workflows with automated parameter management, model persistence, and metadata handling.
Fundamental building blocks for creating machine learning pipelines with type-safe composition and parameter management.
/**
* Base pipeline stage that can be part of an ML pipeline
*/
abstract class PipelineStage extends Params with Logging {
def uid: String
def transformSchema(schema: StructType): StructType
def copy(extra: ParamMap): PipelineStage
}
/**
* Abstract estimator that fits models to data
*/
abstract class Estimator[M <: Model[M]] extends PipelineStage {
def fit(dataset: Dataset[_]): M
def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): Seq[M]
def fit(dataset: Dataset[_], paramMap: ParamMap): M
def fit(dataset: Dataset[_], firstParamPair: ParamPair[_], otherParamPairs: ParamPair[_]*): M
}
/**
* Abstract transformer that transforms datasets
*/
abstract class Transformer extends PipelineStage {
def transform(dataset: Dataset[_]): DataFrame
def transform(dataset: Dataset[_], paramMap: ParamMap): DataFrame
def transform(dataset: Dataset[_], firstParamPair: ParamPair[_], otherParamPairs: ParamPair[_]*): DataFrame
}
/**
* Abstract model that extends transformer (fitted estimators)
*/
abstract class Model[M <: Model[M]] extends Transformer {
def parent: Estimator[M]
def hasParent: Boolean
def setParent(parent: Estimator[M]): M
}Composable pipeline classes for chaining multiple ML stages together.
/**
* ML Pipeline for chaining multiple pipeline stages
*/
class Pipeline extends Estimator[PipelineModel] {
def setStages(value: Array[_ <: PipelineStage]): this.type
def getStages: Array[PipelineStage]
}
class PipelineModel extends Model[PipelineModel] {
def stages: Array[Transformer]
override def transform(dataset: Dataset[_]): DataFrame
override def transformSchema(schema: StructType): StructType
}Usage Example:
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
// Configure ML algorithms
val tokenizer = new Tokenizer()
.setInputCol("text")
.setOutputCol("words")
val hashingTF = new HashingTF()
.setNumFeatures(1000)
.setInputCol(tokenizer.getOutputCol)
.setOutputCol("features")
val lr = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.001)
// Create pipeline
val pipeline = new Pipeline()
.setStages(Array(tokenizer, hashingTF, lr))
// Fit the pipeline to training documents
val model = pipeline.fit(training)
// Make predictions on test documents
val predictions = model.transform(test)Comprehensive parameter management system with type safety and automatic validation.
/**
* Base trait for components with parameters
*/
trait Params extends Identifiable with Serializable {
def params: Array[Param[_]]
def explainParam(param: Param[_]): String
def explainParams(): String
def isSet(param: Param[_]): Boolean
def isDefined(param: Param[_]): Boolean
def hasDefault(param: Param[_]): Boolean
def getDefault[T](param: Param[T]): Option[T]
def extractParamMap(): ParamMap
def extractParamMap(extra: ParamMap): ParamMap
def copy(extra: ParamMap): Params
def set[T](param: Param[T], value: T): this.type
def clear(param: Param[_]): this.type
def get[T](param: Param[T]): Option[T]
def getOrDefault[T](param: Param[T]): T
def $[T](param: Param[T]): T
def setDefault[T](param: Param[T], value: T): this.type
def setDefault(paramPairs: ParamPair[_]*): this.type
def hasParam(paramName: String): Boolean
def getParam(paramName: String): Param[Any]
}
/**
* Typed parameter definition
*/
class Param[T] extends Serializable {
def name: String
def doc: String
def parent: String
def toString: String
def jsonEncode(value: T): String
def jsonDecode(json: String): T
def w(value: T): ParamPair[T]
def ->(value: T): ParamPair[T]
}
/**
* Parameter-value pairs
*/
case class ParamPair[T](param: Param[T], value: T)
/**
* Set of parameter values
*/
class ParamMap extends Serializable {
def put[T](param: Param[T], value: T): this.type
def put(paramPairs: ParamPair[_]*): this.type
def get[T](param: Param[T]): Option[T]
def apply[T](param: Param[T]): T
def contains(param: Param[_]): Boolean
def remove[T](param: Param[T]): this.type
def filter(f: ParamPair[_] => Boolean): ParamMap
def copy: ParamMap
def ++(other: ParamMap): ParamMap
def toSeq: Seq[ParamPair[_]]
def size: Int
}MLlib's model saving and loading framework for production deployment.
/**
* Trait for ML components that can be written to storage
*/
trait MLWritable {
def write: MLWriter
def save(path: String): Unit
}
/**
* Abstract writer for ML components
*/
abstract class MLWriter extends Logging {
def context: SparkContext
def option(key: String, value: String): this.type
def options(options: Map[String, String]): this.type
def overwrite(): this.type
def save(path: String): Unit
protected def saveImpl(path: String): Unit
}
/**
* Trait for ML components that can be read from storage
*/
trait MLReadable[T] {
def read: MLReader[T]
def load(path: String): T
}
/**
* Abstract reader for ML components
*/
abstract class MLReader[T] extends Logging {
def context: SparkContext
def option(key: String, value: String): this.type
def options(options: Map[String, String]): this.type
def load(path: String): T
}
/**
* Default implementations for parameter persistence
*/
trait DefaultParamsWritable extends MLWritable {
self: Params =>
override def write: MLWriter = new DefaultParamsWriter(this)
}
trait DefaultParamsReadable[T] extends MLReadable[T] {
override def read: MLReader[T] = new DefaultParamsReader[T]
override def load(path: String): T = super.load(path)
}Usage Example:
// Save a model
val model = pipeline.fit(trainingData)
model.write.overwrite().save("/path/to/model")
// Load a model
val loadedModel = PipelineModel.load("/path/to/model")
val predictions = loadedModel.transform(testData)Unique identification system for ML pipeline components.
/**
* Trait for objects with unique identifiers
*/
trait Identifiable {
val uid: String
def toString: String
}
/**
* Utilities for generating unique identifiers
*/
object Identifiable {
def randomUID(prefix: String): String
}Tools for working with DataFrame schemas in ML pipelines.
/**
* Schema validation and manipulation utilities
*/
object SchemaUtils {
def checkColumnType(
schema: StructType,
colName: String,
dataType: DataType,
msg: String = ""
): Unit
def checkColumnTypes(
schema: StructType,
colName: String,
dataTypes: Seq[DataType],
msg: String = ""
): Unit
def checkNumericType(
schema: StructType,
colName: String,
msg: String = ""
): Unit
def appendColumn(
schema: StructType,
colName: String,
dataType: DataType,
nullable: Boolean = false
): StructType
def appendColumns(
schema: StructType,
cols: Seq[StructField]
): StructType
}Tools for handling DataFrame column metadata in ML contexts.
/**
* Metadata manipulation utilities for ML
*/
object MetadataUtils {
/**
* Gets the number of features from vector column metadata
*/
def getNumFeatures(dataset: Dataset[_], vectorCol: String): Int
/**
* Gets feature names from vector column metadata
*/
def getFeatureNames(dataset: Dataset[_], vectorCol: String): Option[Array[String]]
/**
* Gets categorical features metadata
*/
def getCategoricalFeatures(dataset: Dataset[_], featuresCol: String): Set[Int]
/**
* Creates metadata for vector columns
*/
def createVectorMetadata(
numFeatures: Int,
featureNames: Option[Array[String]] = None,
categoricalFeatures: Set[Int] = Set.empty
): Metadata
}Extended pipeline capabilities for complex ML workflows.
/**
* Pipeline utilities for advanced use cases
*/
object PipelineUtils {
/**
* Creates a pipeline from a sequence of stages
*/
def createPipeline(stages: PipelineStage*): Pipeline
/**
* Validates pipeline stage compatibility
*/
def validatePipeline(stages: Array[PipelineStage], inputSchema: StructType): Unit
/**
* Extracts the final estimator from a pipeline
*/
def getFinalEstimator(pipeline: Pipeline): Option[Estimator[_]]
/**
* Gets all transformers before the final estimator
*/
def getFeatureTransformers(pipeline: Pipeline): Array[Transformer]
}
/**
* Pipeline model utilities
*/
object PipelineModelUtils {
/**
* Extracts the final model from a fitted pipeline
*/
def getFinalModel(pipelineModel: PipelineModel): Option[Model[_]]
/**
* Gets all transformer stages except the final model
*/
def getFeatureTransformers(pipelineModel: PipelineModel): Array[Transformer]
/**
* Creates a new pipeline model with replaced stages
*/
def replaceStages(
pipelineModel: PipelineModel,
newStages: Array[Transformer]
): PipelineModel
}Integration between pipelines and cross-validation for comprehensive model selection.
/**
* Pipeline-aware cross-validation
*/
class PipelineCrossValidator extends CrossValidator {
/**
* Gets the best pipeline from cross-validation results
*/
def getBestPipeline: Pipeline
/**
* Gets feature importance from the best model if available
*/
def getFeatureImportances: Option[Vector]
/**
* Extracts the best hyperparameters for each pipeline stage
*/
def getBestParamsByStage: Map[String, ParamMap]
}// Core pipeline imports
import org.apache.spark.ml._
import org.apache.spark.ml.param._
import org.apache.spark.ml.util._
import org.apache.spark.sql.{DataFrame, Dataset}
import org.apache.spark.sql.types.{StructType, StructField, DataType, Metadata}
// Pipeline stage types
import org.apache.spark.ml.{
Estimator,
Transformer,
Model,
Pipeline,
PipelineModel,
PipelineStage
}
// Parameter system types
import org.apache.spark.ml.param.{
Param,
ParamMap,
ParamPair,
Params
}
// Persistence types
import org.apache.spark.ml.util.{
MLWritable,
MLWriter,
MLReadable,
MLReader,
DefaultParamsWritable,
DefaultParamsReadable
}
// Utility types
import org.apache.spark.ml.util.{
Identifiable,
SchemaUtils,
MetadataUtils
}