CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-parent-2-13

Apache Spark is a unified analytics engine for large-scale data processing with high-level APIs in Scala, Java, Python, and R.

Pending
Overview
Eval results
Files

sql-dataframes.mddocs/

SQL and DataFrames

Spark SQL provides structured data processing capabilities through DataFrames and Datasets, along with a SQL query engine. It offers strong integration with various data sources and formats, and includes both batch and streaming processing capabilities.

Package Information

SQL and DataFrame functionality is available through:

import org.apache.spark.sql.{SparkSession, DataFrame, Dataset, Row, Column}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

Basic Usage

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

// Create Spark session
val spark = SparkSession.builder()
  .appName("SQL Example")
  .master("local[*]")
  .getOrCreate()

// Read data
val df = spark.read
  .option("header", "true")
  .option("inferSchema", "true")
  .csv("path/to/data.csv")

// DataFrame operations
val result = df
  .select("name", "age", "salary")
  .filter(col("age") > 25)
  .groupBy("department")
  .agg(avg("salary").alias("avg_salary"))
  .orderBy(desc("avg_salary"))

result.show()

// SQL queries
df.createOrReplaceTempView("employees")
val sqlResult = spark.sql("""
  SELECT department, AVG(salary) as avg_salary
  FROM employees 
  WHERE age > 25
  GROUP BY department
  ORDER BY avg_salary DESC
""")

spark.stop()

Capabilities

Spark Session

The unified entry point for Spark SQL functionality, replacing SQLContext and HiveContext.

class SparkSession private(sparkContext: SparkContext, existingSharedState: Option[SharedState]) {
  // Data reading
  def read: DataFrameReader
  def readStream: DataStreamReader
  
  // SQL execution
  def sql(sqlText: String): DataFrame
  def table(tableName: String): DataFrame
  
  // DataFrame creation
  def createDataFrame[A <: Product : TypeTag](data: Seq[A]): DataFrame
  def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame
  def createDataFrame(rows: java.util.List[Row], schema: StructType): DataFrame
  def emptyDataFrame: DataFrame
  def range(end: Long): Dataset[java.lang.Long]
  def range(start: Long, end: Long): Dataset[java.lang.Long]
  def range(start: Long, end: Long, step: Long, numPartitions: Int): Dataset[java.lang.Long]
  
  // Catalog and metadata
  def catalog: Catalog
  def conf: RuntimeConfig
  def sessionState: SessionState
  def sharedState: SharedState
  
  // Streaming
  def streams: StreamingQueryManager
  
  // Resources and control
  def sparkContext: SparkContext
  def version: String
  def stop(): Unit
  def close(): Unit
  def newSession(): SparkSession
}

object SparkSession {
  def builder(): Builder
  def active: SparkSession
  def getActiveSession: Option[SparkSession]
  def getDefaultSession: Option[SparkSession]
  def setActiveSession(session: SparkSession): Unit
  def clearActiveSession(): Unit
  def setDefaultSession(session: SparkSession): Unit
  def clearDefaultSession(): Unit
}

class Builder {
  def appName(name: String): Builder
  def master(master: String): Builder
  def config(key: String, value: String): Builder
  def config(key: String, value: Long): Builder
  def config(key: String, value: Double): Builder
  def config(key: String, value: Boolean): Builder
  def config(conf: SparkConf): Builder
  def enableHiveSupport(): Builder
  def getOrCreate(): SparkSession
}

Usage example:

val spark = SparkSession.builder()
  .appName("My Application")
  .master("local[4]")
  .config("spark.sql.adaptive.enabled", "true")
  .config("spark.sql.adaptive.coalescePartitions.enabled", "true")
  .enableHiveSupport()
  .getOrCreate()

DataFrames and Datasets

DataFrames are Datasets of Row objects, providing a programming abstraction and DSL for structured data manipulation.

abstract class Dataset[T] extends Serializable {
  // Column selection and projection
  def select(cols: Column*): DataFrame
  def select(col: String, cols: String*): DataFrame
  def selectExpr(exprs: String*): DataFrame
  def drop(colName: String): DataFrame
  def drop(colNames: String*): DataFrame
  def drop(col: Column): DataFrame
  def withColumn(colName: String, col: Column): DataFrame
  def withColumnRenamed(existingName: String, newName: String): DataFrame
  
  // Filtering and conditions
  def filter(condition: Column): Dataset[T]
  def filter(conditionExpr: String): Dataset[T]
  def where(condition: Column): Dataset[T]
  
  // Grouping and aggregation
  def groupBy(cols: Column*): RelationalGroupedDataset
  def groupBy(col1: String, cols: String*): RelationalGroupedDataset
  def rollup(cols: Column*): RelationalGroupedDataset
  def cube(cols: Column*): RelationalGroupedDataset
  def agg(expr: Column, exprs: Column*): DataFrame
  def agg(exprs: Map[String, String]): DataFrame
  
  // Joins
  def join(right: Dataset[_]): DataFrame
  def join(right: Dataset[_], usingColumn: String): DataFrame
  def join(right: Dataset[_], usingColumns: Seq[String]): DataFrame
  def join(right: Dataset[_], joinExprs: Column): DataFrame
  def join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame
  def crossJoin(right: Dataset[_]): DataFrame
  
  // Set operations
  def union(other: Dataset[T]): Dataset[T]
  def unionAll(other: Dataset[T]): Dataset[T]
  def unionByName(other: Dataset[T]): Dataset[T]
  def intersect(other: Dataset[T]): Dataset[T]
  def intersectAll(other: Dataset[T]): Dataset[T]
  def except(other: Dataset[T]): Dataset[T]
  def exceptAll(other: Dataset[T]): Dataset[T]
  
  // Sorting
  def sort(sortCol: String, sortCols: String*): Dataset[T]
  def sort(sortExprs: Column*): Dataset[T]
  def orderBy(sortCol: String, sortCols: String*): Dataset[T]
  def orderBy(sortExprs: Column*): Dataset[T]
  
  // Sampling and limiting
  def sample(fraction: Double): Dataset[T]
  def sample(withReplacement: Boolean, fraction: Double): Dataset[T]
  def sample(withReplacement: Boolean, fraction: Double, seed: Long): Dataset[T]
  def limit(n: Int): Dataset[T]
  
  // Actions
  def collect(): Array[T]
  def collectAsList(): java.util.List[T]
  def count(): Long
  def describe(cols: String*): DataFrame
  def first(): T
  def head(): T
  def head(n: Int): Array[T]
  def take(n: Int): Array[T]
  def takeAsList(n: Int): java.util.List[T]
  def tail(n: Int): Array[T]
  def foreach(f: T => Unit): Unit
  def foreachPartition(f: Iterator[T] => Unit): Unit
  
  // Display
  def show(): Unit
  def show(numRows: Int): Unit
  def show(numRows: Int, truncate: Boolean): Unit
  def show(numRows: Int, truncate: Int): Unit
  def show(numRows: Int, truncate: Int, vertical: Boolean): Unit
  
  // Schema and metadata
  def schema: StructType
  def printSchema(): Unit
  def dtypes: Array[(String, String)]
  def columns: Array[String]
  
  // Persistence
  def cache(): Dataset[T]
  def persist(): Dataset[T]
  def persist(newLevel: StorageLevel): Dataset[T]
  def unpersist(): Dataset[T]
  def unpersist(blocking: Boolean): Dataset[T]
  
  // Type conversion
  def as[U : Encoder]: Dataset[U]
  def alias(alias: String): Dataset[T]
  
  // I/O
  def write: DataFrameWriter[T]
  def writeStream: DataStreamWriter[T]
  
  // Advanced transformations
  def unpivot(ids: Array[Column], values: Array[Column], variableColumnName: String, valueColumnName: String): DataFrame
  def melt(ids: Array[Column], values: Array[Column], variableColumnName: String, valueColumnName: String): DataFrame
  def transpose(): DataFrame
  def observe(name: String, expr: Column, exprs: Column*): Dataset[T] 
  def lateralJoin(tableFunctionCall: Column): DataFrame
  
  // Utility operations
  def exists(condition: Column): Boolean
  def scalar(): Any
  
  // SQL operations
  def createOrReplaceTempView(viewName: String): Unit
  def createGlobalTempView(viewName: String): Unit
  def createTempView(viewName: String): Unit
}

type DataFrame = Dataset[Row]

Column Operations

Represents a column in a DataFrame and provides methods for column expressions.

class Column(expr: Expression) {
  // Comparison operators
  def ===(other: Any): Column
  def !==(other: Any): Column
  def >(other: Any): Column
  def >=(other: Any): Column
  def <(other: Any): Column
  def <=(other: Any): Column
  def <=> (other: Any): Column
  
  // Logical operators
  def &&(other: Any): Column
  def ||(other: Any): Column
  def unary_!: Column
  
  // Arithmetic operators
  def +(other: Any): Column
  def -(other: Any): Column
  def *(other: Any): Column
  def /(other: Any): Column
  def %(other: Any): Column
  
  // Null handling
  def isNull: Column
  def isNotNull: Column
  def isNaN: Column
  
  // String operations
  def contains(other: Any): Column
  def startsWith(other: Column): Column
  def startsWith(literal: String): Column
  def endsWith(other: Column): Column
  def endsWith(literal: String): Column
  def substr(startPos: Column, len: Column): Column
  def substr(startPos: Int, len: Int): Column
  def like(literal: String): Column
  def rlike(literal: String): Column
  
  // Array operations
  def getItem(key: Any): Column
  def getField(fieldName: String): Column
  
  // Type conversion
  def cast(to: DataType): Column
  def cast(to: String): Column
  
  // Naming
  def alias(alias: String): Column
  def as(alias: String): Column
  def as(alias: String, metadata: Metadata): Column
  def name(alias: String): Column
  
  // Sorting
  def asc: Column
  def asc_nulls_first: Column
  def asc_nulls_last: Column
  def desc: Column
  def desc_nulls_first: Column
  def desc_nulls_last: Column
  
  // Window functions
  def over(): Column
  def over(window: WindowSpec): Column
}

Built-in Functions

The functions object provides a rich set of built-in functions for DataFrame operations.

object functions {
  // Column creation
  def col(colName: String): Column
  def column(colName: String): Column
  def lit(literal: Any): Column
  def typedLit[T : TypeTag](literal: T): Column
  
  // Conditional expressions
  def when(condition: Column, value: Any): Column
  def coalesce(cols: Column*): Column
  def isnull(col: Column): Column
  def nanvl(col1: Column, col2: Column): Column
  
  // Aggregate functions
  def count(e: Column): Column
  def count(columnName: String): TypedColumn[Any, Long]
  def countDistinct(expr: Column, exprs: Column*): Column
  def approx_count_distinct(e: Column): Column
  def sum(e: Column): Column
  def sum(columnName: String): TypedColumn[Any, java.lang.Double]
  def avg(e: Column): Column
  def avg(columnName: String): TypedColumn[Any, java.lang.Double]
  def mean(e: Column): Column
  def min(e: Column): Column
  def max(e: Column): Column
  def first(e: Column): Column
  def first(e: Column, ignoreNulls: Boolean): Column
  def last(e: Column): Column
  def last(e: Column, ignoreNulls: Boolean): Column
  def collect_list(e: Column): Column
  def collect_set(e: Column): Column
  
  // String functions
  def ascii(e: Column): Column
  def base64(e: Column): Column
  def concat(exprs: Column*): Column
  def concat_ws(sep: String, exprs: Column*): Column
  def length(e: Column): Column
  def lower(e: Column): Column
  def upper(e: Column): Column
  def ltrim(e: Column): Column
  def rtrim(e: Column): Column
  def trim(e: Column): Column
  def regexp_extract(e: Column, pattern: String, idx: Int): Column
  def regexp_replace(e: Column, pattern: String, replacement: String): Column
  def split(str: Column, pattern: String): Column
  def substring(str: Column, pos: Int, len: Int): Column
  
  // Math functions
  def abs(e: Column): Column
  def acos(e: Column): Column
  def asin(e: Column): Column
  def atan(e: Column): Column
  def atan2(y: Column, x: Column): Column
  def ceil(e: Column): Column
  def cos(e: Column): Column
  def exp(e: Column): Column
  def floor(e: Column): Column
  def log(e: Column): Column
  def log10(e: Column): Column
  def pow(l: Column, r: Column): Column
  def round(e: Column): Column
  def round(e: Column, scale: Int): Column
  def sin(e: Column): Column
  def sqrt(e: Column): Column
  def tan(e: Column): Column
  
  // Date and time functions
  def current_date(): Column
  def current_timestamp(): Column
  def date_add(start: Column, days: Int): Column
  def date_sub(start: Column, days: Int): Column
  def datediff(end: Column, start: Column): Column
  def date_format(dateExpr: Column, format: String): Column
  def dayofmonth(e: Column): Column
  def dayofweek(e: Column): Column
  def dayofyear(e: Column): Column
  def hour(e: Column): Column
  def minute(e: Column): Column
  def month(e: Column): Column
  def quarter(e: Column): Column
  def second(e: Column): Column
  def to_date(e: Column): Column
  def to_date(e: Column, fmt: String): Column
  def to_timestamp(s: Column): Column
  def to_timestamp(s: Column, fmt: String): Column
  def unix_timestamp(): Column
  def unix_timestamp(s: Column): Column
  def unix_timestamp(s: Column, p: String): Column
  def year(e: Column): Column
  
  // Array functions
  def array(cols: Column*): Column
  def array_contains(column: Column, value: Any): Column
  def array_distinct(e: Column): Column
  def array_max(e: Column): Column
  def array_min(e: Column): Column
  def array_position(column: Column, value: Any): Column
  def array_remove(column: Column, element: Any): Column
  def array_sort(e: Column): Column
  def arrays_overlap(a1: Column, a2: Column): Column
  def explode(e: Column): Column
  def explode_outer(e: Column): Column
  def posexplode(e: Column): Column
  def posexplode_outer(e: Column): Column
  def size(e: Column): Column
  def slice(x: Column, start: Int, length: Int): Column
  def sort_array(e: Column): Column
  def sort_array(e: Column, asc: Boolean): Column
  
  // Map functions
  def map(cols: Column*): Column
  def map_keys(e: Column): Column
  def map_values(e: Column): Column
  
  // Struct functions
  def struct(cols: Column*): Column
  
  // Window functions
  def row_number(): Column
  def rank(): Column
  def dense_rank(): Column
  def percent_rank(): Column
  def ntile(n: Int): Column
  def cume_dist(): Column
  def lag(e: Column, offset: Int): Column
  def lag(e: Column, offset: Int, defaultValue: Any): Column
  def lead(e: Column, offset: Int): Column
  def lead(e: Column, offset: Int, defaultValue: Any): Column
  
  // UDF creation
  def udf[RT: TypeTag, A1: TypeTag](f: Function1[A1, RT]): UserDefinedFunction
  def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag](f: Function2[A1, A2, RT]): UserDefinedFunction
}

Data Types

Spark SQL data type system for schema definition.

abstract class DataType extends AbstractDataType {
  def json: String
  def prettyJson: String
  def simpleString: String
  def catalogString: String
  def sql: String
}

object DataTypes {
  def createArrayType(elementType: DataType): ArrayType
  def createMapType(keyType: DataType, valueType: DataType): MapType
  def createStructField(name: String, dataType: DataType, nullable: Boolean): StructField
  def createStructType(fields: Array[StructField]): StructType
  
  val StringType: StringType
  val BinaryType: BinaryType
  val BooleanType: BooleanType
  val DateType: DateType
  val TimestampType: TimestampType
  val CalendarIntervalType: CalendarIntervalType
  val DoubleType: DoubleType
  val FloatType: FloatType
  val ByteType: ByteType
  val IntegerType: IntegerType
  val LongType: LongType
  val ShortType: ShortType
  val NullType: NullType
}

case class StructType(fields: Array[StructField]) extends DataType {
  def add(field: StructField): StructType
  def add(name: String, dataType: DataType): StructType
  def add(name: String, dataType: DataType, nullable: Boolean): StructType
  def add(name: String, dataType: DataType, nullable: Boolean, metadata: Metadata): StructType
  def apply(name: String): StructField
  def apply(index: Int): StructField
  def fieldNames: Array[String]
  def length: Int
  def size: Int
}

case class StructField(
  name: String,
  dataType: DataType,
  nullable: Boolean = true,
  metadata: Metadata = Metadata.empty
) {
  def getComment(): Option[String]
}

case class ArrayType(elementType: DataType, containsNull: Boolean = true) extends DataType

case class MapType(
  keyType: DataType,
  valueType: DataType,
  valueContainsNull: Boolean = true
) extends DataType

Row

Represents a row in a DataFrame.

trait Row extends Serializable {
  def size: Int
  def length: Int
  def schema: StructType
  def apply(i: Int): Any
  def get(i: Int): Any
  def getAs[T](i: Int): T
  def getAs[T](fieldName: String): T
  def fieldIndex(name: String): Int
  
  // Type-specific getters
  def isNullAt(i: Int): Boolean
  def getBoolean(i: Int): Boolean
  def getByte(i: Int): Byte
  def getShort(i: Int): Short
  def getInt(i: Int): Int
  def getLong(i: Int): Long
  def getFloat(i: Int): Float
  def getDouble(i: Int): Double
  def getString(i: Int): String
  def getDecimal(i: Int): java.math.BigDecimal
  def getDate(i: Int): java.sql.Date
  def getTimestamp(i: Int): java.sql.Timestamp
  def getSeq[T](i: Int): Seq[T]
  def getList[T](i: Int): java.util.List[T]
  def getMap[K, V](i: Int): scala.collection.Map[K, V]
  def getJavaMap[K, V](i: Int): java.util.Map[K, V]
  def getStruct(i: Int): Row
  
  // Array conversion
  def toSeq: Seq[Any]
  def copy(): Row
}

object Row {
  def empty: Row
  def apply(values: Any*): Row
  def fromSeq(values: Seq[Any]): Row
  def fromTuple(tuple: Product): Row
  def merge(rows: Row*): Row
  def unapplySeq(row: Row): Some[Seq[Any]]
}

Data I/O

DataFrameReader

Interface for loading DataFrames from external storage systems.

class DataFrameReader {
  // Configuration
  def format(source: String): DataFrameReader
  def schema(schema: StructType): DataFrameReader
  def schema(schemaString: String): DataFrameReader
  def option(key: String, value: String): DataFrameReader
  def option(key: String, value: Boolean): DataFrameReader
  def option(key: String, value: Long): DataFrameReader
  def option(key: String, value: Double): DataFrameReader
  def options(options: scala.collection.Map[String, String]): DataFrameReader
  def options(options: java.util.Map[String, String]): DataFrameReader
  
  // Data sources
  def csv(path: String): DataFrame
  def csv(paths: String*): DataFrame
  def json(path: String): DataFrame
  def json(paths: String*): DataFrame
  def parquet(path: String): DataFrame
  def parquet(paths: String*): DataFrame
  def orc(path: String): DataFrame
  def orc(paths: String*): DataFrame
  def text(path: String): DataFrame
  def text(paths: String*): DataFrame
  def textFile(path: String): Dataset[String]
  def textFile(paths: String*): Dataset[String]
  def table(tableName: String): DataFrame
  def jdbc(url: String, table: String, properties: java.util.Properties): DataFrame
  
  // Generic load
  def load(): DataFrame
  def load(path: String): DataFrame
  def load(paths: String*): DataFrame
}

DataFrameWriter

Interface for saving DataFrames to external storage systems.

class DataFrameWriter[T] {
  // Configuration
  def mode(saveMode: SaveMode): DataFrameWriter[T]
  def mode(saveMode: String): DataFrameWriter[T]
  def format(source: String): DataFrameWriter[T]
  def option(key: String, value: String): DataFrameWriter[T]
  def option(key: String, value: Boolean): DataFrameWriter[T]
  def option(key: String, value: Long): DataFrameWriter[T]
  def option(key: String, value: Double): DataFrameWriter[T]
  def options(options: scala.collection.Map[String, String]): DataFrameWriter[T]
  def options(options: java.util.Map[String, String]): DataFrameWriter[T]
  def partitionBy(colNames: String*): DataFrameWriter[T]
  def bucketBy(numBuckets: Int, colName: String, colNames: String*): DataFrameWriter[T]
  def sortBy(colName: String, colNames: String*): DataFrameWriter[T]
  
  // Data sources
  def csv(path: String): Unit
  def json(path: String): Unit
  def parquet(path: String): Unit
  def orc(path: String): Unit
  def text(path: String): Unit
  def jdbc(url: String, table: String, connectionProperties: java.util.Properties): Unit
  def insertInto(tableName: String): Unit
  def saveAsTable(tableName: String): Unit
  
  // Generic save
  def save(): Unit
  def save(path: String): Unit
}

object SaveMode extends Enumeration {
  val Append, Overwrite, ErrorIfExists, Ignore = Value
}

Usage example:

// Reading data
val df = spark.read
  .format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("path/to/file.csv")

// Writing data
df.write
  .format("parquet")
  .mode("overwrite")
  .option("compression", "snappy")
  .partitionBy("year", "month")
  .save("path/to/output")

Grouped Data Operations

class RelationalGroupedDataset protected(
  df: DataFrame,
  groupingExprs: Seq[Expression],
  groupType: RelationalGroupedDataset.GroupType
) {
  // Aggregation functions
  def agg(expr: Column, exprs: Column*): DataFrame
  def agg(exprs: Map[String, String]): DataFrame
  def agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame
  def count(): DataFrame
  def mean(colNames: String*): DataFrame
  def max(colNames: String*): DataFrame
  def min(colNames: String*): DataFrame
  def sum(colNames: String*): DataFrame
  def avg(colNames: String*): DataFrame
  
  // Pivot operations
  def pivot(pivotColumn: String): RelationalGroupedDataset
  def pivot(pivotColumn: String, values: Seq[Any]): RelationalGroupedDataset
}

Usage example:

// Group by operations
val grouped = df.groupBy("department", "level")
  .agg(
    count("*").alias("employee_count"),
    avg("salary").alias("avg_salary"),
    max("salary").alias("max_salary")
  )

// Pivot operations  
val pivoted = df.groupBy("department")
  .pivot("level")
  .agg(avg("salary"))

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-parent-2-13

docs

core-engine.md

graph-processing.md

index.md

machine-learning.md

sql-dataframes.md

stream-processing.md

tile.json