CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-parent-2-12

Apache Spark - Unified analytics engine for large-scale data processing

Pending
Overview
Eval results
Files

ml.mddocs/

Machine Learning

Comprehensive machine learning library with algorithms for classification, regression, clustering, and collaborative filtering. Provides both high-level Pipeline API for building ML workflows and lower-level RDD-based APIs for advanced use cases.

Capabilities

Pipeline API

High-level API for building machine learning workflows with transformers and estimators.

/**
 * Machine learning pipeline for chaining transformers and estimators
 */
class Pipeline extends Estimator[PipelineModel] {
  /** Set pipeline stages */
  def setStages(value: Array[PipelineStage]): Pipeline
  /** Get pipeline stages */
  def getStages: Array[PipelineStage]
  /** Fit pipeline to data */
  def fit(dataset: Dataset[_]): PipelineModel
}

/**
 * Fitted pipeline ready for predictions
 */
class PipelineModel extends Transformer {
  /** Transform dataset using fitted pipeline */
  def transform(dataset: Dataset[_]): DataFrame
  /** Get fitted stages */
  def stages: Array[Transformer]
}

/**
 * Base class for pipeline components
 */
abstract class PipelineStage {
  def uid: String
  def copy(extra: ParamMap): PipelineStage
}

/**
 * Algorithm that can be fit on a DataFrame
 */
abstract class Estimator[M <: Model[M]] extends PipelineStage {
  /** Fit model to dataset */
  def fit(dataset: Dataset[_]): M
}

/**
 * Algorithm that transforms one DataFrame into another
 */
abstract class Transformer extends PipelineStage {
  /** Transform dataset */
  def transform(dataset: Dataset[_]): DataFrame
}

/**
 * Result of fitting an Estimator
 */
abstract class Model[M <: Model[M]] extends Transformer

Usage Examples:

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler}
import org.apache.spark.ml.classification.LogisticRegression

// Create pipeline stages
val indexer = new StringIndexer()
  .setInputCol("category")
  .setOutputCol("categoryIndex")

val assembler = new VectorAssembler()
  .setInputCols(Array("age", "income", "categoryIndex"))
  .setOutputCol("features")

val lr = new LogisticRegression()
  .setFeaturesCol("features")
  .setLabelCol("label")

// Create and fit pipeline
val pipeline = new Pipeline()
  .setStages(Array(indexer, assembler, lr))

val model = pipeline.fit(trainingData)
val predictions = model.transform(testData)

Classification

Algorithms for supervised learning with discrete target variables.

/**
 * Logistic regression classifier
 */
class LogisticRegression extends Classifier[Vector, LogisticRegression, LogisticRegressionModel] {
  def setFeaturesCol(value: String): LogisticRegression
  def setLabelCol(value: String): LogisticRegression
  def setPredictionCol(value: String): LogisticRegression
  def setProbabilityCol(value: String): LogisticRegression
  def setMaxIter(value: Int): LogisticRegression
  def setRegParam(value: Double): LogisticRegression
  def setElasticNetParam(value: Double): LogisticRegression
  def setFamily(value: String): LogisticRegression
  def fit(dataset: Dataset[_]): LogisticRegressionModel
}

/**
 * Decision tree classifier
 */
class DecisionTreeClassifier extends Classifier[Vector, DecisionTreeClassifier, DecisionTreeClassificationModel] {
  def setMaxDepth(value: Int): DecisionTreeClassifier
  def setMinInstancesPerNode(value: Int): DecisionTreeClassifier
  def setImpurity(value: String): DecisionTreeClassifier
  def setMaxBins(value: Int): DecisionTreeClassifier
  def setSeed(value: Long): DecisionTreeClassifier
  def fit(dataset: Dataset[_]): DecisionTreeClassificationModel
}

/**
 * Random forest classifier
 */
class RandomForestClassifier extends Classifier[Vector, RandomForestClassifier, RandomForestClassificationModel] {
  def setNumTrees(value: Int): RandomForestClassifier
  def setMaxDepth(value: Int): RandomForestClassifier
  def setSubsamplingRate(value: Double): RandomForestClassifier
  def setFeatureSubsetStrategy(value: String): RandomForestClassifier
  def fit(dataset: Dataset[_]): RandomForestClassificationModel
}

/**
 * Gradient-boosted tree classifier
 */
class GBTClassifier extends Classifier[Vector, GBTClassifier, GBTClassificationModel] {
  def setMaxIter(value: Int): GBTClassifier
  def setStepSize(value: Double): GBTClassifier
  def setMaxDepth(value: Int): GBTClassifier
  def fit(dataset: Dataset[_]): GBTClassificationModel
}

/**
 * Naive Bayes classifier
 */
class NaiveBayes extends Classifier[Vector, NaiveBayes, NaiveBayesModel] {
  def setModelType(value: String): NaiveBayes
  def setSmoothing(value: Double): NaiveBayes
  def fit(dataset: Dataset[_]): NaiveBayesModel
}

/**
 * Linear Support Vector Machine
 */
class LinearSVC extends Classifier[Vector, LinearSVC, LinearSVCModel] {
  def setMaxIter(value: Int): LinearSVC
  def setRegParam(value: Double): LinearSVC
  def setTol(value: Double): LinearSVC
  def fit(dataset: Dataset[_]): LinearSVCModel
}

Usage Examples:

import org.apache.spark.ml.classification._
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator

// Logistic Regression
val lr = new LogisticRegression()
  .setMaxIter(20)
  .setRegParam(0.3)
  .setElasticNetParam(0.8)

val lrModel = lr.fit(trainingData)
val lrPredictions = lrModel.transform(testData)

// Random Forest
val rf = new RandomForestClassifier()
  .setNumTrees(100)
  .setFeatureSubsetStrategy("auto")
  .setImpurity("gini")
  .setMaxDepth(4)
  .setMaxBins(32)

val rfModel = rf.fit(trainingData)
val rfPredictions = rfModel.transform(testData)

// Evaluation
val evaluator = new MulticlassClassificationEvaluator()
  .setLabelCol("indexedLabel")
  .setPredictionCol("prediction")
  .setMetricName("accuracy")

val accuracy = evaluator.evaluate(predictions)

Regression

Algorithms for supervised learning with continuous target variables.

/**
 * Linear regression
 */
class LinearRegression extends Regressor[Vector, LinearRegression, LinearRegressionModel] {
  def setFeaturesCol(value: String): LinearRegression
  def setLabelCol(value: String): LinearRegression
  def setPredictionCol(value: String): LinearRegression
  def setMaxIter(value: Int): LinearRegression
  def setRegParam(value: Double): LinearRegression
  def setElasticNetParam(value: Double): LinearRegression
  def setTol(value: Double): LinearRegression
  def setFitIntercept(value: Boolean): LinearRegression
  def fit(dataset: Dataset[_]): LinearRegressionModel
}

/**
 * Decision tree regressor
 */
class DecisionTreeRegressor extends Regressor[Vector, DecisionTreeRegressor, DecisionTreeRegressionModel] {
  def setMaxDepth(value: Int): DecisionTreeRegressor
  def setMinInstancesPerNode(value: Int): DecisionTreeRegressor
  def setImpurity(value: String): DecisionTreeRegressor
  def fit(dataset: Dataset[_]): DecisionTreeRegressionModel
}

/**
 * Random forest regressor
 */
class RandomForestRegressor extends Regressor[Vector, RandomForestRegressor, RandomForestRegressionModel] {
  def setNumTrees(value: Int): RandomForestRegressor
  def setMaxDepth(value: Int): RandomForestRegressor
  def setSubsamplingRate(value: Double): RandomForestRegressor
  def fit(dataset: Dataset[_]): RandomForestRegressionModel
}

/**
 * Gradient-boosted tree regressor
 */
class GBTRegressor extends Regressor[Vector, GBTRegressor, GBTRegressionModel] {
  def setMaxIter(value: Int): GBTRegressor
  def setStepSize(value: Double): GBTRegressor
  def setMaxDepth(value: Int): GBTRegressor
  def fit(dataset: Dataset[_]): GBTRegressionModel
}

/**
 * Generalized linear regression
 */
class GeneralizedLinearRegression extends Regressor[Vector, GeneralizedLinearRegression, GeneralizedLinearRegressionModel] {
  def setFamily(value: String): GeneralizedLinearRegression
  def setLink(value: String): GeneralizedLinearRegression
  def setMaxIter(value: Int): GeneralizedLinearRegression
  def setRegParam(value: Double): GeneralizedLinearRegression
  def fit(dataset: Dataset[_]): GeneralizedLinearRegressionModel
}

Clustering

Unsupervised learning algorithms for grouping data points.

/**
 * K-means clustering
 */
class KMeans extends Estimator[KMeansModel] {
  def setFeaturesCol(value: String): KMeans
  def setPredictionCol(value: String): KMeans
  def setK(value: Int): KMeans
  def setMaxIter(value: Int): KMeans
  def setTol(value: Double): KMeans
  def setInitMode(value: String): KMeans
  def setInitSteps(value: Int): KMeans
  def setSeed(value: Long): KMeans
  def fit(dataset: Dataset[_]): KMeansModel
}

/**
 * Bisecting K-means clustering
 */
class BisectingKMeans extends Estimator[BisectingKMeansModel] {
  def setK(value: Int): BisectingKMeans
  def setMaxIter(value: Int): BisectingKMeans
  def setSeed(value: Long): BisectingKMeans
  def fit(dataset: Dataset[_]): BisectingKMeansModel
}

/**
 * Gaussian Mixture Model
 */
class GaussianMixture extends Estimator[GaussianMixtureModel] {
  def setK(value: Int): GaussianMixture
  def setMaxIter(value: Int): GaussianMixture
  def setTol(value: Double): GaussianMixture
  def setSeed(value: Long): GaussianMixture
  def fit(dataset: Dataset[_]): GaussianMixtureModel
}

/**
 * Latent Dirichlet Allocation for topic modeling
 */
class LDA extends Estimator[LDAModel] {
  def setK(value: Int): LDA
  def setMaxIter(value: Int): LDA
  def setTopicConcentration(value: Double): LDA
  def setDocConcentration(value: Double): LDA
  def setSeed(value: Long): LDA
  def fit(dataset: Dataset[_]): LDAModel
}

Usage Examples:

import org.apache.spark.ml.clustering._

// K-Means
val kmeans = new KMeans()
  .setK(3)
  .setSeed(1L)
  .setMaxIter(20)

val kmeansModel = kmeans.fit(dataset)
val predictions = kmeansModel.transform(dataset)

// Evaluate clustering by computing Within Set Sum of Squared Errors
val wssse = kmeansModel.computeCost(dataset)
println(s"Within Set Sum of Squared Errors = $wssse")

// Gaussian Mixture Model
val gmm = new GaussianMixture()
  .setK(3)
  .setSeed(538009335L)

val gmmModel = gmm.fit(dataset)
val gmmPredictions = gmmModel.transform(dataset)

Feature Processing

Data preprocessing and feature engineering transformers.

/**
 * Combine multiple columns into vector column
 */
class VectorAssembler extends Transformer {
  def setInputCols(value: Array[String]): VectorAssembler
  def setOutputCol(value: String): VectorAssembler
  def setHandleInvalid(value: String): VectorAssembler
  def transform(dataset: Dataset[_]): DataFrame
}

/**
 * Standardize features by removing mean and scaling to unit variance
 */
class StandardScaler extends Estimator[StandardScalerModel] {
  def setInputCol(value: String): StandardScaler
  def setOutputCol(value: String): StandardScaler
  def setWithMean(value: Boolean): StandardScaler
  def setWithStd(value: Boolean): StandardScaler
  def fit(dataset: Dataset[_]): StandardScalerModel
}

/**
 * Scale features to given range
 */
class MinMaxScaler extends Estimator[MinMaxScalerModel] {
  def setInputCol(value: String): MinMaxScaler
  def setOutputCol(value: String): MinMaxScaler
  def setMin(value: Double): MinMaxScaler
  def setMax(value: Double): MinMaxScaler
  def fit(dataset: Dataset[_]): MinMaxScalerModel
}

/**
 * Map strings to indices
 */
class StringIndexer extends Estimator[StringIndexerModel] {
  def setInputCol(value: String): StringIndexer
  def setOutputCol(value: String): StringIndexer
  def setHandleInvalid(value: String): StringIndexer
  def setStringOrderType(value: String): StringIndexer
  def fit(dataset: Dataset[_]): StringIndexerModel
}

/**
 * One-hot encode categorical features
 */
class OneHotEncoder extends Transformer {
  def setInputCols(value: Array[String]): OneHotEncoder
  def setOutputCols(value: Array[String]): OneHotEncoder
  def setDropLast(value: Boolean): OneHotEncoder
  def setHandleInvalid(value: String): OneHotEncoder
  def transform(dataset: Dataset[_]): DataFrame
}

/**
 * Tokenize text into words
 */
class Tokenizer extends Transformer {
  def setInputCol(value: String): Tokenizer
  def setOutputCol(value: String): Tokenizer
  def transform(dataset: Dataset[_]): DataFrame
}

/**
 * Regular expression tokenizer
 */
class RegexTokenizer extends Transformer {
  def setInputCol(value: String): RegexTokenizer
  def setOutputCol(value: String): RegexTokenizer
  def setPattern(value: String): RegexTokenizer
  def setGaps(value: Boolean): RegexTokenizer
  def setMinTokenLength(value: Int): RegexTokenizer
  def transform(dataset: Dataset[_]): DataFrame
}

/**
 * Hash text features to fixed-length vectors
 */
class HashingTF extends Transformer {
  def setInputCol(value: String): HashingTF
  def setOutputCol(value: String): HashingTF
  def setNumFeatures(value: Int): HashingTF
  def setBinary(value: Boolean): HashingTF
  def transform(dataset: Dataset[_]): DataFrame
}

/**
 * Compute Inverse Document Frequency
 */
class IDF extends Estimator[IDFModel] {
  def setInputCol(value: String): IDF
  def setOutputCol(value: String): IDF
  def setMinDocFreq(value: Int): IDF
  def fit(dataset: Dataset[_]): IDFModel
}

/**
 * Principal Component Analysis
 */
class PCA extends Estimator[PCAModel] {
  def setInputCol(value: String): PCA
  def setOutputCol(value: String): PCA
  def setK(value: Int): PCA
  def fit(dataset: Dataset[_]): PCAModel
}

/**
 * Chi-squared feature selection
 */
class ChiSqSelector extends Estimator[ChiSqSelectorModel] {
  def setFeaturesCol(value: String): ChiSqSelector
  def setOutputCol(value: String): ChiSqSelector
  def setLabelCol(value: String): ChiSqSelector
  def setNumTopFeatures(value: Int): ChiSqSelector
  def setSelectorType(value: String): ChiSqSelector
  def fit(dataset: Dataset[_]): ChiSqSelectorModel
}

Usage Examples:

import org.apache.spark.ml.feature._

// Feature assembly
val assembler = new VectorAssembler()
  .setInputCols(Array("age", "income", "education"))
  .setOutputCol("features")

val assembled = assembler.transform(df)

// String indexing and one-hot encoding
val indexer = new StringIndexer()
  .setInputCol("category")
  .setOutputCol("categoryIndex")
  .fit(df)

val indexed = indexer.transform(df)

val encoder = new OneHotEncoder()
  .setInputCols(Array("categoryIndex"))
  .setOutputCols(Array("categoryVec"))

val encoded = encoder.transform(indexed)

// Text processing pipeline
val tokenizer = new Tokenizer()
  .setInputCol("text")
  .setOutputCol("words")

val hashingTF = new HashingTF()
  .setInputCol("words")
  .setOutputCol("rawFeatures")
  .setNumFeatures(10000)

val idf = new IDF()
  .setInputCol("rawFeatures")
  .setOutputCol("features")

val idfModel = idf.fit(hashingTF.transform(tokenizer.transform(textDF)))

Model Evaluation

Metrics and evaluators for assessing model performance.

/**
 * Evaluator for multiclass classification
 */
class MulticlassClassificationEvaluator extends Evaluator {
  def setLabelCol(value: String): MulticlassClassificationEvaluator
  def setPredictionCol(value: String): MulticlassClassificationEvaluator
  def setMetricName(value: String): MulticlassClassificationEvaluator
  def evaluate(dataset: Dataset[_]): Double
}

/**
 * Evaluator for binary classification
 */
class BinaryClassificationEvaluator extends Evaluator {
  def setLabelCol(value: String): BinaryClassificationEvaluator
  def setRawPredictionCol(value: String): BinaryClassificationEvaluator
  def setMetricName(value: String): BinaryClassificationEvaluator
  def evaluate(dataset: Dataset[_]): Double
}

/**
 * Evaluator for regression
 */
class RegressionEvaluator extends Evaluator {
  def setLabelCol(value: String): RegressionEvaluator
  def setPredictionCol(value: String): RegressionEvaluator
  def setMetricName(value: String): RegressionEvaluator
  def evaluate(dataset: Dataset[_]): Double
}

/**
 * Evaluator for clustering
 */
class ClusteringEvaluator extends Evaluator {
  def setFeaturesCol(value: String): ClusteringEvaluator
  def setPredictionCol(value: String): ClusteringEvaluator
  def setMetricName(value: String): ClusteringEvaluator
  def evaluate(dataset: Dataset[_]): Double
}

Model Selection

Cross-validation and hyperparameter tuning utilities.

/**
 * Cross-validator for model selection
 */
class CrossValidator extends Estimator[CrossValidatorModel] {
  def setEstimator(value: Estimator[_]): CrossValidator
  def setEstimatorParamMaps(value: Array[ParamMap]): CrossValidator
  def setEvaluator(value: Evaluator): CrossValidator
  def setNumFolds(value: Int): CrossValidator
  def setSeed(value: Long): CrossValidator
  def fit(dataset: Dataset[_]): CrossValidatorModel
}

/**
 * Train-validation split for model selection
 */
class TrainValidationSplit extends Estimator[TrainValidationSplitModel] {
  def setEstimator(value: Estimator[_]): TrainValidationSplit
  def setEstimatorParamMaps(value: Array[ParamMap]): TrainValidationSplit
  def setEvaluator(value: Evaluator): TrainValidationSplit
  def setTrainRatio(value: Double): TrainValidationSplit
  def setSeed(value: Long): TrainValidationSplit
  def fit(dataset: Dataset[_]): TrainValidationSplitModel
}

/**
 * Parameter grid builder
 */
class ParamGridBuilder {
  def addGrid[T](param: Param[T], values: Array[T]): ParamGridBuilder
  def build(): Array[ParamMap]
}

Usage Examples:

import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator

// Parameter grid
val paramGrid = new ParamGridBuilder()
  .addGrid(lr.regParam, Array(0.1, 0.01))
  .addGrid(lr.maxIter, Array(10, 100))
  .build()

// Cross validation
val cv = new CrossValidator()
  .setEstimator(lr)
  .setEvaluator(new BinaryClassificationEvaluator())
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(3)
  .setSeed(1234L)

val cvModel = cv.fit(trainingData)
val bestModel = cvModel.bestModel

Error Handling

Common ML exceptions:

  • IllegalArgumentException - Invalid parameters or configurations
  • SparkException - General Spark execution errors during ML operations
  • UnsupportedOperationException - Unsupported operations on specific model types

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-parent-2-12

docs

core.md

deployment.md

graphx.md

index.md

ml.md

sql.md

streaming.md

tile.json