CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-parent-2-12

Apache Spark - Unified analytics engine for large-scale data processing

Pending
Overview
Eval results
Files

sql.mddocs/

SQL and DataFrames

High-level APIs for working with structured data using DataFrames and Datasets. Built on Spark SQL with Catalyst optimizer for query optimization and code generation. Provides seamless integration with various data sources and formats.

Capabilities

SparkSession

Entry point for DataFrame and Dataset APIs. The modern way to work with Spark SQL functionality.

/**
 * Entry point for DataFrame and Dataset APIs
 */
class SparkSession {
  /** Get DataFrameReader for loading data */
  def read: DataFrameReader
  /** Get DataStreamReader for streaming data */
  def readStream: DataStreamReader
  /** Execute SQL query */
  def sql(sqlText: String): DataFrame
  /** Access table as DataFrame */
  def table(tableName: String): DataFrame
  /** Create Dataset of numbers */
  def range(end: Long): Dataset[Long]
  def range(start: Long, end: Long): Dataset[Long]
  /** Create DataFrame from sequence */
  def createDataFrame[A <: Product : TypeTag](data: Seq[A]): DataFrame
  /** Create Dataset from sequence */
  def createDataset[T : Encoder](data: Seq[T]): Dataset[T]
  /** Create empty DataFrame */
  def emptyDataFrame: DataFrame
  /** Create empty Dataset */
  def emptyDataset[T: Encoder]: Dataset[T]
  /** Access catalog functions */
  def catalog: Catalog
  /** Access runtime configuration */
  def conf: RuntimeConfig
  /** Register user-defined functions */
  def udf: UDFRegistration
  /** Manage streaming queries */
  def streams: StreamingQueryManager
  /** Stop SparkSession */
  def stop(): Unit
}

/**
 * Builder for SparkSession
 */
object SparkSession {
  def builder(): Builder
  
  class Builder {
    def master(master: String): Builder
    def appName(name: String): Builder
    def config(key: String, value: String): Builder
    def config(conf: SparkConf): Builder
    def enableHiveSupport(): Builder
    def getOrCreate(): SparkSession
  }
}

Usage Examples:

import org.apache.spark.sql.SparkSession

// Create SparkSession
val spark = SparkSession.builder()
  .appName("MyApp")
  .master("local[*]")
  .config("spark.sql.adaptive.enabled", "true")
  .getOrCreate()

// Load data
val df = spark.read
  .option("header", "true")
  .csv("people.csv")

// SQL queries
spark.sql("SELECT name, age FROM people WHERE age > 21").show()

// Create from data
val data = Seq(("Alice", 25), ("Bob", 30))
val df2 = spark.createDataFrame(data).toDF("name", "age")

spark.stop()

Python SparkSession API:

class SparkSession:
    """
    Entry point for DataFrame and SQL APIs in Python
    """
    @property
    def read(self) -> DataFrameReader
    @property 
    def readStream(self) -> DataStreamReader
    def sql(self, sqlQuery: str) -> DataFrame
    def table(self, tableName: str) -> DataFrame
    def range(self, start: int, end: int = None, step: int = 1, numPartitions: int = None) -> DataFrame
    def createDataFrame(self, data: List, schema: Optional[Union[List[str], StructType]] = None) -> DataFrame
    @property
    def catalog(self) -> Catalog
    @property
    def conf(self) -> RuntimeConfig
    @property
    def udf(self) -> UDFRegistration
    @property
    def streams(self) -> StreamingQueryManager
    def stop(self) -> None

class SparkSession:
    @staticmethod
    def builder() -> Builder
    
    class Builder:
        def master(self, master: str) -> Builder
        def appName(self, name: str) -> Builder
        def config(self, key: str, value: str) -> Builder
        def enableHiveSupport(self) -> Builder
        def getOrCreate(self) -> SparkSession

Python Usage Examples:

from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder \
    .appName("MyApp") \
    .master("local[*]") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

# Load data
df = spark.read \
    .option("header", "true") \
    .csv("people.csv")

# SQL queries
spark.sql("SELECT name, age FROM people WHERE age > 21").show()

# Create from data
data = [("Alice", 25), ("Bob", 30)]
df2 = spark.createDataFrame(data, ["name", "age"])

spark.stop()

Dataset[T] and DataFrame

Dataset is a distributed collection of data with compile-time type safety. DataFrame is a type alias for Dataset[Row].

/**
 * Distributed collection of data with schema
 */
class Dataset[T] {
  /** Display data in tabular format */
  def show(numRows: Int = 20, truncate: Boolean = true): Unit
  /** Print schema to console */
  def printSchema(): Unit
  /** Show query execution plan */
  def explain(extended: Boolean = false): Unit
  
  /** Select columns */
  def select(cols: Column*): DataFrame
  def select(col: String, cols: String*): DataFrame
  /** Filter rows */
  def filter(condition: Column): Dataset[T]
  def where(condition: Column): Dataset[T]
  /** Group by columns */
  def groupBy(cols: Column*): RelationalGroupedDataset
  def groupBy(col1: String, cols: String*): RelationalGroupedDataset
  /** Aggregate expressions */
  def agg(expr: Column, exprs: Column*): DataFrame
  
  /** Join with another Dataset */
  def join(right: Dataset[_]): DataFrame
  def join(right: Dataset[_], joinExprs: Column): DataFrame
  def join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame
  /** Union with another Dataset */
  def union(other: Dataset[T]): Dataset[T]
  def unionAll(other: Dataset[T]): Dataset[T]
  /** Intersection */
  def intersect(other: Dataset[T]): Dataset[T]
  /** Difference */
  def except(other: Dataset[T]): Dataset[T]
  
  /** Sort by columns */
  def orderBy(sortExprs: Column*): Dataset[T]
  def sort(sortExprs: Column*): Dataset[T]
  /** Limit number of rows */
  def limit(n: Int): Dataset[T]
  /** Remove duplicates */
  def distinct(): Dataset[T]
  def dropDuplicates(): Dataset[T]
  def dropDuplicates(colNames: Array[String]): Dataset[T]
  
  /** Add or replace column */
  def withColumn(colName: String, col: Column): DataFrame
  /** Rename column */
  def withColumnRenamed(existingName: String, newName: String): DataFrame
  /** Drop column */
  def drop(colName: String): DataFrame
  def drop(col: Column): DataFrame
  
  /** Collect all rows to driver */
  def collect(): Array[T]
  /** Collect as Java List */
  def collectAsList(): java.util.List[T]
  /** Count rows */
  def count(): Long
  /** Get first row */
  def first(): T
  def head(): T
  /** Get first n rows */
  def head(n: Int): Array[T] 
  def take(n: Int): Array[T]
  
  /** Get writer for saving data */
  def write: DataFrameWriter[T]
  /** Get writer for streaming */
  def writeStream: DataStreamWriter[T]
  
  /** Cache Dataset */
  def cache(): Dataset[T]
  /** Persist with storage level */
  def persist(newLevel: StorageLevel): Dataset[T]
  /** Remove from cache */
  def unpersist(blocking: Boolean = false): Dataset[T]
  
  /** Convert to different type */
  def as[U](implicit encoder: Encoder[U]): Dataset[U]
  /** Map function with encoder */
  def map[U](func: T => U)(implicit encoder: Encoder[U]): Dataset[U]
  /** FlatMap with encoder */
  def flatMap[U](func: T => TraversableOnce[U])(implicit encoder: Encoder[U]): Dataset[U]
  /** Apply function to each element */
  def foreach(f: T => Unit): Unit
  /** Apply function to each partition */
  def foreachPartition(f: Iterator[T] => Unit): Unit
}

/** DataFrame is Dataset[Row] */
type DataFrame = Dataset[Row]

Usage Examples:

import org.apache.spark.sql.functions._

// Load and explore data
val df = spark.read.json("people.json")
df.show()
df.printSchema()
df.explain()

// Transformations
val adults = df
  .select("name", "age")
  .filter($"age" > 18)
  .orderBy($"age".desc)

// Aggregations
val avgAge = df
  .groupBy("department")
  .agg(avg("age").as("avg_age"), count("*").as("count"))

// Joins
val departments = spark.read.json("departments.json")
val joined = df.join(departments, "dept_id")

// Window functions
import org.apache.spark.sql.expressions.Window
val windowSpec = Window.partitionBy("department").orderBy("salary")
val ranked = df.withColumn("rank", row_number().over(windowSpec))

// Type-safe operations with case classes
case class Person(name: String, age: Int)
val people = df.as[Person]
val names = people.map(_.name.toUpperCase)

Column Expressions

Column expressions for building queries and transformations.

/**
 * Column expression for DataFrame operations
 */
class Column {
  /** Create alias */
  def alias(alias: String): Column
  def as(alias: String): Column
  /** Cast to different type */
  def cast(to: DataType): Column
  def cast(to: String): Column
  
  /** Null checks */
  def isNull: Column
  def isNotNull: Column
  def isNaN: Column
  
  /** Logical operators */
  def &&(other: Any): Column
  def ||(other: Any): Column
  def unary_! : Column
  
  /** Comparison operators */
  def ===(other: Any): Column
  def =!=(other: Any): Column
  def >(other: Any): Column
  def <(other: Any): Column
  def >=(other: Any): Column
  def <=(other: Any): Column
  
  /** Arithmetic operators */
  def +(other: Any): Column
  def -(other: Any): Column
  def *(other: Any): Column
  def /(other: Any): Column
  def %(other: Any): Column
  
  /** String operations */
  def contains(other: Any): Column
  def startsWith(other: Column): Column
  def endsWith(other: Column): Column
  def like(literal: String): Column
  def rlike(literal: String): Column
  
  /** Sorting */
  def asc: Column
  def desc: Column
  def asc_nulls_first: Column
  def desc_nulls_last: Column
  
  /** Array operations */
  def getItem(key: Any): Column
  def getField(fieldName: String): Column
}

Built-in Functions

Comprehensive set of built-in functions for data manipulation.

/**
 * Built-in functions for DataFrame operations
 */
object functions {
  /** Column references */
  def col(colName: String): Column
  def column(colName: String): Column
  def lit(literal: Any): Column
  
  /** Conditional expressions */
  def when(condition: Column, value: Any): Column
  def coalesce(cols: Column*): Column
  def isnull(col: Column): Column
  def isnan(col: Column): Column
  
  /** String functions */
  def upper(e: Column): Column
  def lower(e: Column): Column
  def trim(e: Column): Column
  def ltrim(e: Column): Column
  def rtrim(e: Column): Column
  def length(e: Column): Column
  def substring(str: Column, pos: Int, len: Int): Column
  def concat(exprs: Column*): Column
  def concat_ws(sep: String, exprs: Column*): Column
  def split(str: Column, pattern: String): Column
  def regexp_replace(e: Column, pattern: String, replacement: String): Column
  def regexp_extract(e: Column, exp: String, groupIdx: Int): Column
  
  /** Math functions */
  def abs(e: Column): Column
  def sqrt(e: Column): Column
  def pow(l: Column, r: Column): Column
  def round(e: Column, scale: Int): Column
  def ceil(e: Column): Column
  def floor(e: Column): Column
  def sin(e: Column): Column
  def cos(e: Column): Column
  def tan(e: Column): Column
  def log(e: Column): Column
  def exp(e: Column): Column
  def greatest(exprs: Column*): Column
  def least(exprs: Column*): Column
  
  /** Aggregate functions */
  def sum(e: Column): Column
  def avg(e: Column): Column
  def mean(e: Column): Column
  def count(e: Column): Column
  def countDistinct(expr: Column, exprs: Column*): Column
  def min(e: Column): Column
  def max(e: Column): Column
  def first(e: Column): Column
  def last(e: Column): Column
  def stddev(e: Column): Column
  def variance(e: Column): Column
  def collect_list(e: Column): Column
  def collect_set(e: Column): Column
  
  /** Date/Time functions */
  def current_date(): Column
  def current_timestamp(): Column
  def date_add(start: Column, days: Int): Column
  def date_sub(start: Column, days: Int): Column
  def date_format(dateExpr: Column, format: String): Column
  def year(e: Column): Column
  def month(e: Column): Column
  def dayofmonth(e: Column): Column
  def hour(e: Column): Column
  def minute(e: Column): Column
  def second(e: Column): Column
  def unix_timestamp(): Column
  def from_unixtime(ut: Column): Column
  
  /** Array functions */
  def array(cols: Column*): Column
  def array_contains(column: Column, value: Any): Column
  def explode(e: Column): Column
  def posexplode(e: Column): Column
  def size(e: Column): Column
  def sort_array(e: Column): Column
  def reverse(e: Column): Column
  def array_distinct(e: Column): Column
  
  /** Map functions */
  def map(cols: Column*): Column
  def map_keys(e: Column): Column
  def map_values(e: Column): Column
  
  /** Window functions */
  def row_number(): Column
  def rank(): Column
  def dense_rank(): Column
  def percent_rank(): Column
  def ntile(n: Int): Column
  def lag(e: Column, offset: Int): Column
  def lead(e: Column, offset: Int): Column
  def first_value(e: Column): Column
  def last_value(e: Column): Column
}

Data I/O

Reading and writing data from various sources.

/**
 * Interface for loading DataFrames from external storage
 */
class DataFrameReader {
  /** Specify data source format */
  def format(source: String): DataFrameReader
  /** Add input option */
  def option(key: String, value: String): DataFrameReader
  def option(key: String, value: Boolean): DataFrameReader
  def option(key: String, value: Long): DataFrameReader
  def option(key: String, value: Double): DataFrameReader
  /** Add multiple options */
  def options(options: Map[String, String]): DataFrameReader
  /** Set expected schema */
  def schema(schema: StructType): DataFrameReader
  def schema(schemaString: String): DataFrameReader
  
  /** Load data */
  def load(): DataFrame
  def load(path: String): DataFrame
  def load(paths: String*): DataFrame
  
  /** Format-specific methods */
  def json(path: String): DataFrame
  def json(jsonRDD: RDD[String]): DataFrame
  def json(jsonDataset: Dataset[String]): DataFrame
  def parquet(paths: String*): DataFrame
  def text(paths: String*): DataFrame
  def textFile(paths: String*): Dataset[String]
  def csv(paths: String*): DataFrame
  def orc(paths: String*): DataFrame
  def jdbc(url: String, table: String, properties: Properties): DataFrame
  def table(tableName: String): DataFrame
}

/**
 * Interface for saving DataFrames to external storage
 */
class DataFrameWriter[T] {
  /** Set save mode */
  def mode(saveMode: SaveMode): DataFrameWriter[T]
  def mode(saveMode: String): DataFrameWriter[T]
  /** Specify output format */
  def format(source: String): DataFrameWriter[T]
  /** Add output option */
  def option(key: String, value: String): DataFrameWriter[T]
  def option(key: String, value: Boolean): DataFrameWriter[T]
  def option(key: String, value: Long): DataFrameWriter[T]
  def option(key: String, value: Double): DataFrameWriter[T]
  /** Add multiple options */
  def options(options: Map[String, String]): DataFrameWriter[T]
  /** Partition output by columns */
  def partitionBy(colNames: String*): DataFrameWriter[T]
  /** Bucket output by columns */
  def bucketBy(numBuckets: Int, colName: String, colNames: String*): DataFrameWriter[T]
  /** Sort within buckets */
  def sortBy(colName: String, colNames: String*): DataFrameWriter[T]
  
  /** Save data */
  def save(): Unit
  def save(path: String): Unit
  /** Insert into existing table */
  def insertInto(tableName: String): Unit
  /** Save as table */
  def saveAsTable(tableName: String): Unit
  
  /** Format-specific methods */
  def json(path: String): Unit
  def parquet(path: String): Unit
  def text(path: String): Unit
  def csv(path: String): Unit
  def orc(path: String): Unit
  def jdbc(url: String, table: String, connectionProperties: Properties): Unit
}

/**
 * Save modes for DataFrameWriter
 */
object SaveMode extends Enumeration {
  val Append, Overwrite, ErrorIfExists, Ignore = Value
}

Usage Examples:

// Reading data
val df = spark.read
  .format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("data.csv")

// Reading with schema
import org.apache.spark.sql.types._
val schema = StructType(Array(
  StructField("name", StringType, true),
  StructField("age", IntegerType, true)
))
val df2 = spark.read.schema(schema).csv("data.csv")

// Writing data
df.write
  .mode(SaveMode.Overwrite)
  .option("header", "true")
  .csv("output")

// Partitioned output
df.write
  .partitionBy("year", "month")
  .parquet("partitioned_output")

// Database operations
df.write
  .format("jdbc")
  .option("url", "jdbc:postgresql://localhost/test")
  .option("dbtable", "people")
  .option("user", "username")
  .option("password", "password")
  .save()

Data Types and Schema

Schema definition and data type system.

/**
 * Base class for data types
 */
abstract class DataType

/** Primitive types */
object StringType extends DataType
object IntegerType extends DataType
object LongType extends DataType
object DoubleType extends DataType
object FloatType extends DataType
object BooleanType extends DataType
object DateType extends DataType
object TimestampType extends DataType
object BinaryType extends DataType

/** Complex types */
case class ArrayType(elementType: DataType, containsNull: Boolean) extends DataType
case class MapType(keyType: DataType, valueType: DataType, valueContainsNull: Boolean) extends DataType
case class StructType(fields: Array[StructField]) extends DataType

/**
 * Field in a struct
 */
case class StructField(name: String, dataType: DataType, nullable: Boolean, metadata: Metadata)

/**
 * Row in a DataFrame
 */
trait Row {
  def length: Int
  def size: Int
  def get(i: Int): Any
  def getString(i: Int): String
  def getBoolean(i: Int): Boolean
  def getInt(i: Int): Int
  def getLong(i: Int): Long
  def getFloat(i: Int): Float
  def getDouble(i: Int): Double
  def getAs[T](i: Int): T
  def getAs[T](fieldName: String): T
  def isNullAt(i: Int): Boolean
  def toSeq: Seq[Any]
}

Error Handling

Common SQL exceptions:

  • AnalysisException - SQL analysis errors (invalid columns, type mismatches)
  • ParseException - SQL parsing errors
  • StreamingQueryException - Streaming query failures
  • SparkSQLException - General SQL execution errors

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-parent-2-12

docs

core.md

deployment.md

graphx.md

index.md

ml.md

sql.md

streaming.md

tile.json