Python API for Apache Spark, providing distributed computing, data analysis, and machine learning capabilities
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Legacy machine learning library providing RDD-based algorithms for classification, regression, clustering, and collaborative filtering. This library is maintained for backward compatibility but new applications should use the DataFrame-based ML package.
RDD-based classification algorithms.
class LogisticRegressionWithSGD:
"""Logistic regression using stochastic gradient descent."""
@classmethod
def train(cls, data, iterations=100, step=1.0, miniBatchFraction=1.0,
initialWeights=None, regParam=0.01, regType="l2", intercept=False,
validateData=True, convergenceTol=0.001):
"""
Train a logistic regression model.
Parameters:
- data: Training data RDD of LabeledPoint
- iterations (int): Number of iterations
- step (float): Step size for SGD
- miniBatchFraction (float): Fraction of data for mini-batch
- initialWeights (Vector): Initial weights
- regParam (float): Regularization parameter
- regType (str): Regularization type ("l1", "l2", "none")
- intercept (bool): Whether to add intercept
- validateData (bool): Whether to validate input data
- convergenceTol (float): Convergence tolerance
Returns:
LogisticRegressionModel
"""
class LogisticRegressionWithLBFGS:
"""Logistic regression using L-BFGS optimizer."""
@classmethod
def train(cls, data, iterations=100, initialWeights=None, regParam=0.0,
regType="l2", intercept=False, corrections=10, tolerance=1e-6,
validateData=True, numClasses=2):
"""Train a logistic regression model using L-BFGS."""
class SVMWithSGD:
"""Support Vector Machine using stochastic gradient descent."""
@classmethod
def train(cls, data, iterations=100, step=1.0, regParam=0.01,
miniBatchFraction=1.0, initialWeights=None, regType="l2",
intercept=False, validateData=True, convergenceTol=0.001):
"""Train an SVM model using SGD."""
class NaiveBayes:
"""Naive Bayes classifier."""
@classmethod
def train(cls, data, lambda_=1.0, modelType="multinomial"):
"""
Train a Naive Bayes model.
Parameters:
- data: Training data RDD of LabeledPoint
- lambda_ (float): Smoothing parameter
- modelType (str): Model type ("multinomial" or "bernoulli")
Returns:
NaiveBayesModel
"""RDD-based regression algorithms.
class LinearRegressionWithSGD:
"""Linear regression using stochastic gradient descent."""
@classmethod
def train(cls, data, iterations=100, step=1.0, miniBatchFraction=1.0,
initialWeights=None, regParam=0.0, regType=None, intercept=False,
validateData=True, convergenceTol=0.001):
"""Train a linear regression model using SGD."""
class LassoWithSGD:
"""Lasso regression using stochastic gradient descent."""
@classmethod
def train(cls, data, iterations=100, step=1.0, regParam=0.01,
miniBatchFraction=1.0, initialWeights=None, intercept=False,
validateData=True, convergenceTol=0.001):
"""Train a Lasso regression model using SGD."""
class RidgeRegressionWithSGD:
"""Ridge regression using stochastic gradient descent."""
@classmethod
def train(cls, data, iterations=100, step=1.0, regParam=0.01,
miniBatchFraction=1.0, initialWeights=None, intercept=False,
validateData=True, convergenceTol=0.001):
"""Train a Ridge regression model using SGD."""RDD-based clustering algorithms.
class KMeans:
"""K-means clustering algorithm."""
@classmethod
def train(cls, rdd, k, maxIterations=100, runs=1, initializationMode="k-means||",
seed=None, initializationSteps=2, epsilon=1e-4, initialModel=None):
"""
Train a k-means model.
Parameters:
- rdd: Training data RDD of vectors
- k (int): Number of clusters
- maxIterations (int): Maximum number of iterations
- runs (int): Number of runs to execute
- initializationMode (str): Initialization algorithm
- seed (int): Random seed
- initializationSteps (int): Number of steps for k-means|| initialization
- epsilon (float): Convergence tolerance
- initialModel: Initial cluster centers
Returns:
KMeansModel
"""
class GaussianMixture:
"""Gaussian Mixture Model clustering."""
@classmethod
def train(cls, rdd, k, convergenceTol=1e-3, maxIterations=100, seed=None,
initialModel=None):
"""
Train a Gaussian Mixture Model.
Parameters:
- rdd: Training data RDD of vectors
- k (int): Number of components
- convergenceTol (float): Convergence tolerance
- maxIterations (int): Maximum number of iterations
- seed (int): Random seed
- initialModel: Initial model
Returns:
GaussianMixtureModel
"""
class LDA:
"""Latent Dirichlet Allocation."""
@classmethod
def train(cls, rdd, k=10, maxIterations=20, docConcentration=-1.0,
topicConcentration=-1.0, seed=None, checkpointInterval=10,
optimizer="em"):
"""
Train an LDA model.
Parameters:
- rdd: Training data RDD of (document ID, word counts) pairs
- k (int): Number of topics
- maxIterations (int): Maximum number of iterations
- docConcentration (float): Document concentration parameter
- topicConcentration (float): Topic concentration parameter
- seed (int): Random seed
- checkpointInterval (int): Checkpoint interval
- optimizer (str): Optimizer ("em" or "online")
Returns:
LDAModel
"""Vector and matrix operations for MLlib.
class Vector:
"""Abstract base class for vectors."""
def toArray(self):
"""Convert to numpy array."""
class DenseVector(Vector):
"""Dense vector."""
def __init__(self, ar):
"""Create dense vector from array."""
class SparseVector(Vector):
"""Sparse vector."""
def __init__(self, size, *args):
"""Create sparse vector."""
class Vectors:
"""Factory methods for vectors."""
@staticmethod
def dense(*values):
"""Create dense vector."""
@staticmethod
def sparse(size, *args):
"""Create sparse vector."""
@staticmethod
def zeros(size):
"""Create zero vector."""
class Matrix:
"""Abstract base class for matrices."""
def numRows(self):
"""Number of rows."""
def numCols(self):
"""Number of columns."""
class DenseMatrix(Matrix):
"""Dense matrix."""
def __init__(self, numRows, numCols, values, isTransposed=False):
"""Create dense matrix."""
class Matrices:
"""Factory methods for matrices."""
@staticmethod
def dense(numRows, numCols, values):
"""Create dense matrix."""
class LabeledPoint:
"""Labeled data point for supervised learning."""
def __init__(self, label, features):
"""
Create labeled point.
Parameters:
- label (float): Point label
- features (Vector): Feature vector
"""Collaborative filtering for recommendation systems.
class ALS:
"""Alternating Least Squares matrix factorization."""
@classmethod
def train(cls, ratings, rank, iterations=5, lambda_=0.01, blocks=-1, nonnegative=False,
seed=None):
"""
Train an ALS model.
Parameters:
- ratings: RDD of Rating objects
- rank (int): Number of latent factors
- iterations (int): Number of iterations
- lambda_ (float): Regularization parameter
- blocks (int): Number of blocks for parallel computation
- nonnegative (bool): Whether to enforce non-negative constraints
- seed (int): Random seed
Returns:
MatrixFactorizationModel
"""
@classmethod
def trainImplicit(cls, ratings, rank, iterations=5, lambda_=0.01, blocks=-1,
alpha=0.01, nonnegative=False, seed=None):
"""Train ALS model for implicit feedback data."""
class Rating:
"""Rating for collaborative filtering."""
def __init__(self, user, product, rating):
"""
Create rating.
Parameters:
- user (int): User ID
- product (int): Product ID
- rating (float): Rating value
"""Feature extraction and transformation utilities.
class HashingTF:
"""Hashing Term Frequency."""
def __init__(self, numFeatures=1048576):
"""
Initialize HashingTF.
Parameters:
- numFeatures (int): Number of features/buckets
"""
def transform(self, document):
"""Transform document to TF vector."""
class IDF:
"""Inverse Document Frequency."""
def __init__(self, minDocFreq=0):
"""
Initialize IDF.
Parameters:
- minDocFreq (int): Minimum document frequency
"""
def fit(self, dataset):
"""
Compute IDF from dataset.
Parameters:
- dataset: RDD of TF vectors
Returns:
IDFModel
"""
class StandardScaler:
"""Feature standardization."""
def __init__(self, withMean=False, withStd=True):
"""
Initialize StandardScaler.
Parameters:
- withMean (bool): Whether to center data
- withStd (bool): Whether to scale to unit variance
"""
def fit(self, data):
"""
Compute statistics for scaling.
Parameters:
- data: RDD of vectors
Returns:
StandardScalerModel
"""
class Word2Vec:
"""Word2Vec model for word embeddings."""
def __init__(self, vectorSize=100, learningRate=0.025, numPartitions=1,
numIterations=1, seed=None, minCount=5, windowSize=5):
"""
Initialize Word2Vec.
Parameters:
- vectorSize (int): Size of word vectors
- learningRate (float): Learning rate
- numPartitions (int): Number of partitions
- numIterations (int): Number of iterations
- seed (int): Random seed
- minCount (int): Minimum word frequency
- windowSize (int): Context window size
"""
def fit(self, data):
"""
Train Word2Vec model.
Parameters:
- data: RDD of sentences (lists of words)
Returns:
Word2VecModel
"""class LabeledPoint:
"""Labeled data point for supervised learning."""
def __init__(self, label, features):
"""
Create labeled point.
Parameters:
- label (float): Point label
- features (Vector): Feature vector
"""
class Rating:
"""Rating for collaborative filtering."""
def __init__(self, user, product, rating):
"""
Create rating.
Parameters:
- user (int): User ID
- product (int): Product ID
- rating (float): Rating value
"""Install with Tessl CLI
npx tessl i tessl/pypi-pyspark