or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

aggregations.mdcatalog.mddata-io.mddataframe-dataset.mdfunctions-expressions.mdindex.mdsession-management.mdstreaming.md
tile.json

tessl/maven-org-apache-spark--spark-sql_2-11

Apache Spark SQL is a distributed SQL query engine that provides DataFrame and Dataset APIs for working with structured data.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-sql_2.11@2.4.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-spark--spark-sql_2-11@2.4.0

index.mddocs/

Apache Spark SQL

Apache Spark SQL is a distributed SQL query engine that provides DataFrame and Dataset APIs for working with structured data. It enables users to run SQL queries against structured data sources like JSON, Parquet, and JDBC databases, while providing seamless integration with Spark's core RDD API. The module includes the Catalyst optimizer framework for logical query planning and optimization, support for both SQL and DataFrame/Dataset APIs, integration with Hive for metadata and SerDes, and the ability to cache data in memory for faster query performance.

Package Information

  • Package Name: spark-sql_2.11
  • Package Type: maven
  • Language: Scala (with Java interoperability)
  • Installation:
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>2.4.8</version>
    </dependency>

Core Imports

import org.apache.spark.sql.{SparkSession, DataFrame, Dataset, Column, Row}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

For Java:

import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import static org.apache.spark.sql.functions.*;

Basic Usage

import org.apache.spark.sql.{SparkSession, DataFrame}
import org.apache.spark.sql.functions._

// Create SparkSession
val spark = SparkSession.builder()
  .appName("Spark SQL Example")
  .master("local[*]")
  .getOrCreate()

// Load data
val df = spark.read
  .format("json")
  .option("multiline", "true")
  .load("path/to/data.json")

// Transform data
val result = df
  .select(col("name"), col("age").cast("int"))
  .filter(col("age") > 18)
  .groupBy(col("name"))
  .agg(avg(col("age")).alias("avg_age"))

// Execute and show results
result.show()

// SQL queries
df.createOrReplaceTempView("people")
val sqlResult = spark.sql("SELECT name, AVG(age) as avg_age FROM people WHERE age > 18 GROUP BY name")
sqlResult.show()

spark.stop()

Architecture

Apache Spark SQL is built around several key components:

  • SparkSession: The unified entry point for all Spark SQL functionality, replacing SQLContext and HiveContext
  • DataFrame/Dataset API: Distributed collections with schema information and type safety (Dataset[T])
  • Catalyst Optimizer: Query optimizer that applies rule-based and cost-based optimizations
  • Tungsten Execution Engine: Code generation and memory management for improved performance
  • Data Source API: Pluggable interface for reading from and writing to various data formats
  • Catalog API: Interface for managing databases, tables, functions, and cached data

Capabilities

Session Management

SparkSession serves as the unified entry point for all Spark SQL operations, providing access to DataFrames, SQL execution, and configuration management.

object SparkSession {
  def builder(): SparkSession.Builder
}

class SparkSession {
  def sql(sqlText: String): DataFrame
  def read: DataFrameReader
  def readStream: DataStreamReader  
  def catalog: Catalog
  def conf: RuntimeConfig
  def table(tableName: String): DataFrame
  def stop(): Unit
}

Session Management

DataFrame and Dataset Operations

Core distributed data structures with schema information and type safety, supporting both typed (Dataset[T]) and untyped (DataFrame) operations.

class Dataset[T] {
  def select(cols: Column*): DataFrame
  def filter(condition: Column): Dataset[T]
  def groupBy(cols: Column*): RelationalGroupedDataset
  def join(right: Dataset[_]): DataFrame
  def union(other: Dataset[T]): Dataset[T]
  def count(): Long
  def collect(): Array[T]
  def show(): Unit
}

type DataFrame = Dataset[Row]

DataFrame and Dataset Operations

Data Input and Output

Comprehensive I/O capabilities for reading from and writing to various data sources including files, databases, and streaming sources.

class DataFrameReader {
  def format(source: String): DataFrameReader
  def schema(schema: StructType): DataFrameReader
  def option(key: String, value: String): DataFrameReader
  def load(): DataFrame
  def json(path: String): DataFrame
  def parquet(path: String): DataFrame
  def jdbc(url: String, table: String, properties: Properties): DataFrame
}

class DataFrameWriter[T] {
  def mode(saveMode: SaveMode): DataFrameWriter[T]
  def format(source: String): DataFrameWriter[T]
  def option(key: String, value: String): DataFrameWriter[T]
  def save(): Unit
  def saveAsTable(tableName: String): Unit
}

Data Input and Output

SQL Functions and Expressions

Extensive library of built-in SQL functions for data manipulation, aggregation, string processing, date/time operations, and mathematical calculations.

// Column operations
class Column {
  def +(other: Any): Column
  def ===(other: Any): Column
  def isNull: Column
  def cast(to: DataType): Column
  def alias(alias: String): Column
}

// Built-in functions (from functions object)
def col(colName: String): Column
def lit(literal: Any): Column
def when(condition: Column, value: Any): Column
def sum(e: Column): Column
def avg(e: Column): Column
def count(e: Column): Column
def max(e: Column): Column
def min(e: Column): Column

SQL Functions and Expressions

Aggregations and Grouping

Powerful aggregation capabilities with both untyped DataFrame aggregations and type-safe Dataset aggregations for complex analytical operations.

class RelationalGroupedDataset {
  def agg(expr: Column, exprs: Column*): DataFrame
  def count(): DataFrame
  def sum(colNames: String*): DataFrame
  def avg(colNames: String*): DataFrame
  def pivot(pivotColumn: String): RelationalGroupedDataset
}

class KeyValueGroupedDataset[K, V] {
  def agg[U1](column: TypedColumn[V, U1]): Dataset[(K, U1)]
  def count(): Dataset[(K, Long)]
  def mapGroups[U](f: (K, Iterator[V]) => U): Dataset[U]
  def reduceGroups(f: (V, V) => V): Dataset[(K, V)]
}

Aggregations and Grouping

Streaming Queries

Structured streaming capabilities for processing continuous data streams with the same DataFrame/Dataset APIs used for batch processing.

class DataStreamWriter[T] {
  def outputMode(outputMode: OutputMode): DataStreamWriter[T]
  def trigger(trigger: Trigger): DataStreamWriter[T]
  def start(): StreamingQuery
  def foreach(writer: ForeachWriter[T]): DataStreamWriter[T]
}

class StreamingQuery {
  def isActive: Boolean
  def awaitTermination(): Unit
  def stop(): Unit
  def status: StreamingQueryStatus
}

Streaming Queries

Catalog and Metadata Management

Comprehensive metadata management for databases, tables, functions, and cached data with full programmatic access to the Spark catalog.

class Catalog {
  def currentDatabase: String
  def listDatabases(): Dataset[Database]
  def listTables(): Dataset[Table]
  def listColumns(tableName: String): Dataset[Column]
  def listFunctions(): Dataset[Function]
  def cacheTable(tableName: String): Unit
  def isCached(tableName: String): Boolean
}

Catalog and Metadata Management

Types

// Core data types
abstract class DataType
case object StringType extends DataType
case object IntegerType extends DataType
case object LongType extends DataType
case object DoubleType extends DataType
case object BooleanType extends DataType
case object DateType extends DataType
case object TimestampType extends DataType

// Complex types
case class StructType(fields: Array[StructField]) extends DataType
case class StructField(name: String, dataType: DataType, nullable: Boolean)
case class ArrayType(elementType: DataType, containsNull: Boolean) extends DataType
case class MapType(keyType: DataType, valueType: DataType, valueContainsNull: Boolean) extends DataType

// Row representation
class Row {
  def get(i: Int): Any
  def getString(i: Int): String
  def getInt(i: Int): Int
  def getLong(i: Int): Long
  def getDouble(i: Int): Double
  def getBoolean(i: Int): Boolean
}

// Save modes
object SaveMode extends Enumeration {
  val Overwrite, Append, ErrorIfExists, Ignore = Value
}