0
# Apache Spark SQL
1
2
Apache Spark SQL is a distributed SQL query engine that provides DataFrame and Dataset APIs for working with structured data. It enables users to run SQL queries against structured data sources like JSON, Parquet, and JDBC databases, while providing seamless integration with Spark's core RDD API. The module includes the Catalyst optimizer framework for logical query planning and optimization, support for both SQL and DataFrame/Dataset APIs, integration with Hive for metadata and SerDes, and the ability to cache data in memory for faster query performance.
3
4
## Package Information
5
6
- **Package Name**: spark-sql_2.11
7
- **Package Type**: maven
8
- **Language**: Scala (with Java interoperability)
9
- **Installation**:
10
```xml
11
<dependency>
12
<groupId>org.apache.spark</groupId>
13
<artifactId>spark-sql_2.11</artifactId>
14
<version>2.4.8</version>
15
</dependency>
16
```
17
18
## Core Imports
19
20
```scala
21
import org.apache.spark.sql.{SparkSession, DataFrame, Dataset, Column, Row}
22
import org.apache.spark.sql.functions._
23
import org.apache.spark.sql.types._
24
```
25
26
For Java:
27
28
```java
29
import org.apache.spark.sql.SparkSession;
30
import org.apache.spark.sql.Dataset;
31
import org.apache.spark.sql.Row;
32
import static org.apache.spark.sql.functions.*;
33
```
34
35
## Basic Usage
36
37
```scala
38
import org.apache.spark.sql.{SparkSession, DataFrame}
39
import org.apache.spark.sql.functions._
40
41
// Create SparkSession
42
val spark = SparkSession.builder()
43
.appName("Spark SQL Example")
44
.master("local[*]")
45
.getOrCreate()
46
47
// Load data
48
val df = spark.read
49
.format("json")
50
.option("multiline", "true")
51
.load("path/to/data.json")
52
53
// Transform data
54
val result = df
55
.select(col("name"), col("age").cast("int"))
56
.filter(col("age") > 18)
57
.groupBy(col("name"))
58
.agg(avg(col("age")).alias("avg_age"))
59
60
// Execute and show results
61
result.show()
62
63
// SQL queries
64
df.createOrReplaceTempView("people")
65
val sqlResult = spark.sql("SELECT name, AVG(age) as avg_age FROM people WHERE age > 18 GROUP BY name")
66
sqlResult.show()
67
68
spark.stop()
69
```
70
71
## Architecture
72
73
Apache Spark SQL is built around several key components:
74
75
- **SparkSession**: The unified entry point for all Spark SQL functionality, replacing SQLContext and HiveContext
76
- **DataFrame/Dataset API**: Distributed collections with schema information and type safety (Dataset[T])
77
- **Catalyst Optimizer**: Query optimizer that applies rule-based and cost-based optimizations
78
- **Tungsten Execution Engine**: Code generation and memory management for improved performance
79
- **Data Source API**: Pluggable interface for reading from and writing to various data formats
80
- **Catalog API**: Interface for managing databases, tables, functions, and cached data
81
82
## Capabilities
83
84
### Session Management
85
86
SparkSession serves as the unified entry point for all Spark SQL operations, providing access to DataFrames, SQL execution, and configuration management.
87
88
```scala { .api }
89
object SparkSession {
90
def builder(): SparkSession.Builder
91
}
92
93
class SparkSession {
94
def sql(sqlText: String): DataFrame
95
def read: DataFrameReader
96
def readStream: DataStreamReader
97
def catalog: Catalog
98
def conf: RuntimeConfig
99
def table(tableName: String): DataFrame
100
def stop(): Unit
101
}
102
```
103
104
[Session Management](./session-management.md)
105
106
### DataFrame and Dataset Operations
107
108
Core distributed data structures with schema information and type safety, supporting both typed (Dataset[T]) and untyped (DataFrame) operations.
109
110
```scala { .api }
111
class Dataset[T] {
112
def select(cols: Column*): DataFrame
113
def filter(condition: Column): Dataset[T]
114
def groupBy(cols: Column*): RelationalGroupedDataset
115
def join(right: Dataset[_]): DataFrame
116
def union(other: Dataset[T]): Dataset[T]
117
def count(): Long
118
def collect(): Array[T]
119
def show(): Unit
120
}
121
122
type DataFrame = Dataset[Row]
123
```
124
125
[DataFrame and Dataset Operations](./dataframe-dataset.md)
126
127
### Data Input and Output
128
129
Comprehensive I/O capabilities for reading from and writing to various data sources including files, databases, and streaming sources.
130
131
```scala { .api }
132
class DataFrameReader {
133
def format(source: String): DataFrameReader
134
def schema(schema: StructType): DataFrameReader
135
def option(key: String, value: String): DataFrameReader
136
def load(): DataFrame
137
def json(path: String): DataFrame
138
def parquet(path: String): DataFrame
139
def jdbc(url: String, table: String, properties: Properties): DataFrame
140
}
141
142
class DataFrameWriter[T] {
143
def mode(saveMode: SaveMode): DataFrameWriter[T]
144
def format(source: String): DataFrameWriter[T]
145
def option(key: String, value: String): DataFrameWriter[T]
146
def save(): Unit
147
def saveAsTable(tableName: String): Unit
148
}
149
```
150
151
[Data Input and Output](./data-io.md)
152
153
### SQL Functions and Expressions
154
155
Extensive library of built-in SQL functions for data manipulation, aggregation, string processing, date/time operations, and mathematical calculations.
156
157
```scala { .api }
158
// Column operations
159
class Column {
160
def +(other: Any): Column
161
def ===(other: Any): Column
162
def isNull: Column
163
def cast(to: DataType): Column
164
def alias(alias: String): Column
165
}
166
167
// Built-in functions (from functions object)
168
def col(colName: String): Column
169
def lit(literal: Any): Column
170
def when(condition: Column, value: Any): Column
171
def sum(e: Column): Column
172
def avg(e: Column): Column
173
def count(e: Column): Column
174
def max(e: Column): Column
175
def min(e: Column): Column
176
```
177
178
[SQL Functions and Expressions](./functions-expressions.md)
179
180
### Aggregations and Grouping
181
182
Powerful aggregation capabilities with both untyped DataFrame aggregations and type-safe Dataset aggregations for complex analytical operations.
183
184
```scala { .api }
185
class RelationalGroupedDataset {
186
def agg(expr: Column, exprs: Column*): DataFrame
187
def count(): DataFrame
188
def sum(colNames: String*): DataFrame
189
def avg(colNames: String*): DataFrame
190
def pivot(pivotColumn: String): RelationalGroupedDataset
191
}
192
193
class KeyValueGroupedDataset[K, V] {
194
def agg[U1](column: TypedColumn[V, U1]): Dataset[(K, U1)]
195
def count(): Dataset[(K, Long)]
196
def mapGroups[U](f: (K, Iterator[V]) => U): Dataset[U]
197
def reduceGroups(f: (V, V) => V): Dataset[(K, V)]
198
}
199
```
200
201
[Aggregations and Grouping](./aggregations.md)
202
203
### Streaming Queries
204
205
Structured streaming capabilities for processing continuous data streams with the same DataFrame/Dataset APIs used for batch processing.
206
207
```scala { .api }
208
class DataStreamWriter[T] {
209
def outputMode(outputMode: OutputMode): DataStreamWriter[T]
210
def trigger(trigger: Trigger): DataStreamWriter[T]
211
def start(): StreamingQuery
212
def foreach(writer: ForeachWriter[T]): DataStreamWriter[T]
213
}
214
215
class StreamingQuery {
216
def isActive: Boolean
217
def awaitTermination(): Unit
218
def stop(): Unit
219
def status: StreamingQueryStatus
220
}
221
```
222
223
[Streaming Queries](./streaming.md)
224
225
### Catalog and Metadata Management
226
227
Comprehensive metadata management for databases, tables, functions, and cached data with full programmatic access to the Spark catalog.
228
229
```scala { .api }
230
class Catalog {
231
def currentDatabase: String
232
def listDatabases(): Dataset[Database]
233
def listTables(): Dataset[Table]
234
def listColumns(tableName: String): Dataset[Column]
235
def listFunctions(): Dataset[Function]
236
def cacheTable(tableName: String): Unit
237
def isCached(tableName: String): Boolean
238
}
239
```
240
241
[Catalog and Metadata Management](./catalog.md)
242
243
## Types
244
245
```scala { .api }
246
// Core data types
247
abstract class DataType
248
case object StringType extends DataType
249
case object IntegerType extends DataType
250
case object LongType extends DataType
251
case object DoubleType extends DataType
252
case object BooleanType extends DataType
253
case object DateType extends DataType
254
case object TimestampType extends DataType
255
256
// Complex types
257
case class StructType(fields: Array[StructField]) extends DataType
258
case class StructField(name: String, dataType: DataType, nullable: Boolean)
259
case class ArrayType(elementType: DataType, containsNull: Boolean) extends DataType
260
case class MapType(keyType: DataType, valueType: DataType, valueContainsNull: Boolean) extends DataType
261
262
// Row representation
263
class Row {
264
def get(i: Int): Any
265
def getString(i: Int): String
266
def getInt(i: Int): Int
267
def getLong(i: Int): Long
268
def getDouble(i: Int): Double
269
def getBoolean(i: Int): Boolean
270
}
271
272
// Save modes
273
object SaveMode extends Enumeration {
274
val Overwrite, Append, ErrorIfExists, Ignore = Value
275
}
276
```