0
# Apache Spark SQL
1
2
Apache Spark SQL is a distributed SQL query engine built on top of Apache Spark's core engine that enables users to execute SQL queries, work with DataFrames and Datasets, and perform complex data transformations on large-scale distributed data. It provides a comprehensive SQL interface with support for ANSI SQL compliance, advanced query optimization through the Catalyst optimizer, and seamless integration with various data sources including JSON, Parquet, Delta Lake, and JDBC databases.
3
4
## Package Information
5
6
- **Package Name**: spark-sql_2.13
7
- **Package Type**: maven
8
- **Language**: Scala
9
- **Installation**: Maven dependency `org.apache.spark:spark-sql_2.13:4.0.1`
10
11
## Core Imports
12
13
```scala
14
import org.apache.spark.sql.{SparkSession, Dataset, DataFrame, Column, Row}
15
import org.apache.spark.sql.types._
16
import org.apache.spark.sql.functions._
17
```
18
19
For Java:
20
21
```java
22
import org.apache.spark.sql.SparkSession;
23
import org.apache.spark.sql.Dataset;
24
import org.apache.spark.sql.Row;
25
import static org.apache.spark.sql.functions.*;
26
```
27
28
## Basic Usage
29
30
```scala
31
import org.apache.spark.sql.{SparkSession, functions => F}
32
33
// Create SparkSession - entry point to Spark SQL
34
val spark = SparkSession.builder()
35
.appName("Spark SQL Example")
36
.master("local[*]")
37
.getOrCreate()
38
39
// Create a DataFrame from JSON data
40
val df = spark.read.json("path/to/data.json")
41
42
// Perform transformations
43
val result = df
44
.select(F.col("name"), F.col("age"))
45
.filter(F.col("age") > 18)
46
.groupBy(F.col("department"))
47
.agg(F.count("*").as("employee_count"))
48
49
// Execute SQL queries directly
50
val sqlResult = spark.sql("SELECT department, COUNT(*) FROM employees WHERE age > 18 GROUP BY department")
51
52
// Show results
53
result.show()
54
55
// Clean up
56
spark.stop()
57
```
58
59
## Architecture
60
61
Apache Spark SQL is built around several key components:
62
63
- **SparkSession**: The unified entry point providing DataFrame/Dataset APIs and SQL execution
64
- **Catalyst Optimizer**: Advanced query optimization engine with rule-based and cost-based optimization
65
- **Tungsten Execution**: Code generation and memory management for high-performance query execution
66
- **Data Source API**: Pluggable interface for reading from and writing to various data formats
67
- **Streaming Engine**: Unified batch and streaming processing with micro-batch and continuous processing modes
68
- **Type System**: Rich data type hierarchy supporting complex nested structures and user-defined types
69
70
## Capabilities
71
72
### Session Management
73
74
Primary entry point for all Spark SQL operations, managing the Spark context and providing unified APIs for both batch and streaming workloads.
75
76
```scala { .api }
77
abstract class SparkSession extends Serializable with Closeable {
78
// Configuration and lifecycle
79
def version: String
80
def conf: RuntimeConfig
81
def sparkContext: SparkContext
82
def sessionState: SessionState
83
def sharedState: SharedState
84
def newSession(): SparkSession
85
def stop(): Unit
86
def close(): Unit
87
88
// SQL execution
89
def sql(sqlText: String): DataFrame
90
def sqlContext: SQLContext
91
92
// Data access
93
def read: DataFrameReader
94
def readStream: DataStreamReader
95
def catalog: Catalog
96
def table(tableName: String): DataFrame
97
def range(end: Long): Dataset[Long]
98
def range(start: Long, end: Long): Dataset[Long]
99
def range(start: Long, end: Long, step: Long): Dataset[Long]
100
def range(start: Long, end: Long, step: Long, numPartitions: Int): Dataset[Long]
101
102
// DataFrame creation
103
def createDataFrame[A <: Product : TypeTag](data: Seq[A]): DataFrame
104
def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame
105
def createDataFrame(rows: java.util.List[Row], schema: StructType): DataFrame
106
def createDataset[T : Encoder](data: Seq[T]): Dataset[T]
107
def createDataset[T : Encoder](data: RDD[T]): Dataset[T]
108
def createDataset[T : Encoder](data: java.util.List[T]): Dataset[T]
109
110
// UDF registration
111
def udf: UDFRegistration
112
113
// Streaming
114
def streams: StreamingQueryManager
115
116
// Experimental and advanced features
117
def experimental: ExperimentalMethods
118
def listenerManager: ExecutionListenerManager
119
120
// Time travel and versioning
121
def time: TimeTravel
122
}
123
124
object SparkSession {
125
def builder(): Builder
126
def getActiveSession: Option[SparkSession]
127
def getDefaultSession: Option[SparkSession]
128
def setActiveSession(session: SparkSession): Unit
129
def setDefaultSession(session: SparkSession): Unit
130
def clearActiveSession(): Unit
131
def clearDefaultSession(): Unit
132
133
class Builder {
134
def appName(name: String): Builder
135
def master(master: String): Builder
136
def config(key: String, value: String): Builder
137
def config(key: String, value: Boolean): Builder
138
def config(key: String, value: Long): Builder
139
def config(key: String, value: Double): Builder
140
def config(conf: SparkConf): Builder
141
def enableHiveSupport(): Builder
142
def getOrCreate(): SparkSession
143
}
144
}
145
```
146
147
[Session Management](./session-management.md)
148
149
### DataFrame and Dataset Operations
150
151
Strongly-typed and untyped distributed collections with functional programming APIs and SQL-like operations for data transformation and analysis.
152
153
```scala { .api }
154
abstract class Dataset[T] extends Serializable {
155
// Basic transformations
156
def select(cols: Column*): DataFrame
157
def select(col: String, cols: String*): DataFrame
158
def selectExpr(exprs: String*): DataFrame
159
def filter(condition: Column): Dataset[T]
160
def filter(conditionExpr: String): Dataset[T]
161
def where(condition: Column): Dataset[T]
162
def where(conditionExpr: String): Dataset[T]
163
164
// Grouping and aggregation
165
def groupBy(cols: Column*): RelationalGroupedDataset
166
def groupBy(col1: String, cols: String*): RelationalGroupedDataset
167
def groupByKey[K: Encoder](func: T => K): KeyValueGroupedDataset[K, T]
168
def agg(expr: Column, exprs: Column*): DataFrame
169
def agg(exprs: Map[String, String]): DataFrame
170
171
// Joins
172
def join(right: Dataset[_]): DataFrame
173
def join(right: Dataset[_], joinExprs: Column): DataFrame
174
def join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame
175
def join(right: Dataset[_], usingColumn: String): DataFrame
176
def crossJoin(right: Dataset[_]): DataFrame
177
def joinWith[U](other: Dataset[U], condition: Column): Dataset[(T, U)]
178
179
// Sorting
180
def sort(sortExprs: Column*): Dataset[T]
181
def sort(sortCol: String, sortCols: String*): Dataset[T]
182
def orderBy(sortExprs: Column*): Dataset[T]
183
def orderBy(sortCol: String, sortCols: String*): Dataset[T]
184
185
// Set operations
186
def union(other: Dataset[T]): Dataset[T]
187
def unionByName(other: Dataset[T]): Dataset[T]
188
def intersect(other: Dataset[T]): Dataset[T]
189
def except(other: Dataset[T]): Dataset[T]
190
def distinct(): Dataset[T]
191
def dropDuplicates(): Dataset[T]
192
def dropDuplicates(colNames: Array[String]): Dataset[T]
193
194
// Column operations
195
def withColumn(colName: String, col: Column): DataFrame
196
def withColumnRenamed(existingName: String, newName: String): DataFrame
197
def drop(colName: String): DataFrame
198
def drop(col: Column): DataFrame
199
200
// Typed transformations
201
def map[U: Encoder](func: T => U): Dataset[U]
202
def flatMap[U: Encoder](func: T => IterableOnce[U]): Dataset[U]
203
def mapPartitions[U: Encoder](func: Iterator[T] => Iterator[U]): Dataset[U]
204
205
// Sampling and partitioning
206
def sample(fraction: Double): Dataset[T]
207
def sample(withReplacement: Boolean, fraction: Double, seed: Long): Dataset[T]
208
def randomSplit(weights: Array[Double]): Array[Dataset[T]]
209
def repartition(numPartitions: Int): Dataset[T]
210
def repartition(partitionExprs: Column*): Dataset[T]
211
def coalesce(numPartitions: Int): Dataset[T]
212
def limit(n: Int): Dataset[T]
213
214
// Actions
215
def count(): Long
216
def collect(): Array[T]
217
def collectAsList(): java.util.List[T]
218
def take(n: Int): Array[T]
219
def first(): T
220
def head(): T
221
def head(n: Int): Array[T]
222
def show(): Unit
223
def show(numRows: Int): Unit
224
def show(numRows: Int, truncate: Boolean): Unit
225
def reduce(func: (T, T) => T): T
226
def foreach(f: T => Unit): Unit
227
def foreachPartition(f: Iterator[T] => Unit): Unit
228
229
// Schema and metadata
230
def schema: StructType
231
def printSchema(): Unit
232
def dtypes: Array[(String, String)]
233
def columns: Array[String]
234
def isLocal: Boolean
235
def isEmpty: Boolean
236
def isStreaming: Boolean
237
238
// Type conversions and casting
239
def as[U: Encoder]: Dataset[U]
240
def toDF(): DataFrame
241
def toDF(colNames: String*): DataFrame
242
243
// Persistence
244
def cache(): Dataset[T]
245
def persist(): Dataset[T]
246
def persist(newLevel: StorageLevel): Dataset[T]
247
def unpersist(): Dataset[T]
248
def unpersist(blocking: Boolean): Dataset[T]
249
250
// I/O
251
def write: DataFrameWriter[T]
252
def writeStream: DataStreamWriter[T]
253
}
254
255
type DataFrame = Dataset[Row]
256
```
257
258
[DataFrame and Dataset Operations](./dataframe-dataset.md)
259
260
### SQL Functions
261
262
Comprehensive library of 749+ built-in functions for data manipulation, including aggregate, string, date/time, mathematical, array/map, and window functions.
263
264
```scala { .api }
265
object functions {
266
def col(colName: String): Column
267
def lit(literal: Any): Column
268
def when(condition: Column, value: Any): Column
269
def sum(e: Column): Column
270
def count(e: Column): Column
271
def concat(exprs: Column*): Column
272
def date_format(dateExpr: Column, format: String): Column
273
}
274
```
275
276
[SQL Functions](./sql-functions.md)
277
278
### Data Types and Schema
279
280
Rich type system supporting primitive types, complex nested structures, and user-defined types with comprehensive schema management capabilities.
281
282
```scala { .api }
283
abstract class DataType {
284
def typeName: String
285
def json: String
286
}
287
288
case class StructType(fields: Array[StructField]) extends DataType
289
case class StructField(name: String, dataType: DataType, nullable: Boolean)
290
```
291
292
[Data Types and Schema](./data-types.md)
293
294
### Streaming Processing
295
296
Real-time data processing with support for multiple execution modes, stateful operations, watermarking, and exactly-once semantics.
297
298
```scala { .api }
299
class DataStreamReader {
300
def format(source: String): DataStreamReader
301
def schema(schema: StructType): DataStreamReader
302
def schema(schemaString: String): DataStreamReader
303
def option(key: String, value: String): DataStreamReader
304
def option(key: String, value: Boolean): DataStreamReader
305
def option(key: String, value: Long): DataStreamReader
306
def option(key: String, value: Double): DataStreamReader
307
def options(options: Map[String, String]): DataStreamReader
308
def options(options: java.util.Map[String, String]): DataStreamReader
309
def load(): DataFrame
310
def load(path: String): DataFrame
311
def json(path: String): DataFrame
312
def parquet(path: String): DataFrame
313
def text(path: String): DataFrame
314
def csv(path: String): DataFrame
315
def orc(path: String): DataFrame
316
def table(tableName: String): DataFrame
317
}
318
319
class DataStreamWriter[T] {
320
def outputMode(outputMode: OutputMode): DataStreamWriter[T]
321
def outputMode(outputMode: String): DataStreamWriter[T]
322
def trigger(trigger: Trigger): DataStreamWriter[T]
323
def format(source: String): DataStreamWriter[T]
324
def option(key: String, value: String): DataStreamWriter[T]
325
def option(key: String, value: Boolean): DataStreamWriter[T]
326
def option(key: String, value: Long): DataStreamWriter[T]
327
def option(key: String, value: Double): DataStreamWriter[T]
328
def options(options: Map[String, String]): DataStreamWriter[T]
329
def options(options: java.util.Map[String, String]): DataStreamWriter[T]
330
def partitionBy(colNames: String*): DataStreamWriter[T]
331
def queryName(queryName: String): DataStreamWriter[T]
332
def start(): StreamingQuery
333
def start(path: String): StreamingQuery
334
def toTable(tableName: String): StreamingQuery
335
def foreach(writer: ForeachWriter[T]): DataStreamWriter[T]
336
def foreachBatch(function: (Dataset[T], Long) => Unit): DataStreamWriter[T]
337
}
338
```
339
340
[Streaming Processing](./streaming.md)
341
342
### Data Sources and I/O
343
344
Comprehensive data source support for reading from and writing to various formats including Parquet, JSON, CSV, JDBC databases, and cloud storage systems.
345
346
```scala { .api }
347
class DataFrameReader {
348
def format(source: String): DataFrameReader
349
def schema(schema: StructType): DataFrameReader
350
def option(key: String, value: String): DataFrameReader
351
def load(): DataFrame
352
def json(path: String): DataFrame
353
def parquet(path: String): DataFrame
354
def jdbc(url: String, table: String, properties: Properties): DataFrame
355
}
356
```
357
358
[Data Sources and I/O](./data-sources.md)
359
360
### Catalog Operations
361
362
Metadata management for databases, tables, views, and functions with comprehensive catalog inspection and manipulation capabilities.
363
364
```scala { .api }
365
abstract class Catalog {
366
def currentDatabase: String
367
def listDatabases(): Dataset[Database]
368
def listTables(): Dataset[Table]
369
def listFunctions(): Dataset[Function]
370
def cacheTable(tableName: String): Unit
371
}
372
```
373
374
[Catalog Operations](./catalog.md)
375
376
### User-Defined Functions
377
378
Support for registering custom functions in Scala, Java, Python, and R with type-safe interfaces and SQL integration.
379
380
```scala { .api }
381
abstract class UDFRegistration {
382
def register[RT: TypeTag](name: String, func: Function0[RT]): UserDefinedFunction
383
def register[RT: TypeTag, A1: TypeTag](name: String, func: Function1[A1, RT]): UserDefinedFunction
384
}
385
386
case class UserDefinedFunction(f: AnyRef, dataType: DataType, inputTypes: Option[Seq[DataType]])
387
```
388
389
[User-Defined Functions](./udfs.md)
390
391
## Core Types
392
393
```scala { .api }
394
trait Row extends Serializable {
395
def get(i: Int): Any
396
def getString(i: Int): String
397
def getInt(i: Int): Int
398
def getLong(i: Int): Long
399
def getDouble(i: Int): Double
400
def getBoolean(i: Int): Boolean
401
def schema: StructType
402
}
403
404
case class Column(expr: Expression) {
405
// Type conversion and encoding
406
def as[U: Encoder]: TypedColumn[Any, U]
407
def as(alias: String): Column
408
def as(aliases: Seq[String]): Column
409
def as(aliases: Array[String]): Column
410
def as(alias: Symbol): Column
411
def as(alias: String, metadata: Metadata): Column
412
def cast(to: DataType): Column
413
def cast(to: String): Column
414
def try_cast(to: DataType): Column
415
def try_cast(to: String): Column
416
417
// Naming and aliasing
418
def name(alias: String): Column
419
def alias(alias: String): Column
420
421
// Arithmetic operations
422
def unary_- : Column
423
def +(other: Any): Column
424
def plus(other: Any): Column
425
def -(other: Any): Column
426
def minus(other: Any): Column
427
def *(other: Any): Column
428
def multiply(other: Any): Column
429
def /(other: Any): Column
430
def divide(other: Any): Column
431
def %(other: Any): Column
432
def mod(other: Any): Column
433
434
// Comparison operations
435
def ===(other: Any): Column
436
def equalTo(other: Any): Column
437
def =!=(other: Any): Column
438
def notEqual(other: Any): Column
439
def >(other: Any): Column
440
def gt(other: Any): Column
441
def <(other: Any): Column
442
def lt(other: Any): Column
443
def <=(other: Any): Column
444
def leq(other: Any): Column
445
def >=(other: Any): Column
446
def geq(other: Any): Column
447
def <=>(other: Any): Column
448
def eqNullSafe(other: Any): Column
449
def between(lowerBound: Any, upperBound: Any): Column
450
451
// Logical operations
452
def unary_! : Column
453
def ||(other: Any): Column
454
def or(other: Column): Column
455
def &&(other: Any): Column
456
def and(other: Column): Column
457
458
// Bitwise operations
459
def bitwiseOR(other: Any): Column
460
def bitwiseAND(other: Any): Column
461
def bitwiseXOR(other: Any): Column
462
463
// Null/NaN testing
464
def isNaN: Column
465
def isNull: Column
466
def isNotNull: Column
467
468
// String operations
469
def like(literal: String): Column
470
def rlike(literal: String): Column
471
def ilike(literal: String): Column
472
def contains(other: Any): Column
473
def startsWith(other: Column): Column
474
def startsWith(literal: String): Column
475
def endsWith(other: Column): Column
476
def endsWith(literal: String): Column
477
def substr(startPos: Column, len: Column): Column
478
def substr(startPos: Int, len: Int): Column
479
480
// Collection and structure operations
481
def apply(extraction: Any): Column
482
def getItem(key: Any): Column
483
def getField(fieldName: String): Column
484
def withField(fieldName: String, col: Column): Column
485
def dropFields(fieldNames: String*): Column
486
def isin(list: Any*): Column
487
def isInCollection(values: scala.collection.Iterable[_]): Column
488
def isInCollection(values: java.lang.Iterable[_]): Column
489
490
// Conditional logic
491
def when(condition: Column, value: Any): Column
492
def otherwise(value: Any): Column
493
494
// Sorting operations
495
def desc: Column
496
def desc_nulls_first: Column
497
def desc_nulls_last: Column
498
def asc: Column
499
def asc_nulls_first: Column
500
def asc_nulls_last: Column
501
502
// Window operations
503
def over(window: expressions.WindowSpec): Column
504
def over(): Column
505
}
506
507
class RelationalGroupedDataset(
508
df: DataFrame,
509
groupingExprs: Seq[Expression],
510
groupType: RelationalGroupedDataset.GroupType
511
) {
512
def agg(expr: Column, exprs: Column*): DataFrame
513
def count(): DataFrame
514
def mean(colNames: String*): DataFrame
515
def max(colNames: String*): DataFrame
516
def min(colNames: String*): DataFrame
517
def sum(colNames: String*): DataFrame
518
def pivot(pivotColumn: String): RelationalGroupedDataset
519
def pivot(pivotColumn: String, values: Seq[Any]): RelationalGroupedDataset
520
}
521
522
class KeyValueGroupedDataset[K, V](
523
kEncoder: Encoder[K],
524
vEncoder: Encoder[V]
525
) {
526
def agg[U1](col1: TypedColumn[V, U1]): Dataset[(K, U1)]
527
def count(): Dataset[(K, Long)]
528
def cogroup[U](other: KeyValueGroupedDataset[K, U]): Dataset[(K, (Iterable[V], Iterable[U]))]
529
def flatMapGroups[U: Encoder](f: (K, Iterator[V]) => Iterator[U]): Dataset[U]
530
def mapGroups[U: Encoder](f: (K, Iterator[V]) => U): Dataset[(K, U)]
531
def reduceGroups(f: (V, V) => V): Dataset[(K, V)]
532
}
533
534
abstract class Expression extends Serializable {
535
def dataType: DataType
536
def nullable: Boolean
537
def eval(input: InternalRow): Any
538
def sql: String
539
}
540
541
abstract class OutputMode {
542
def name: String
543
}
544
545
object OutputMode {
546
val Append: OutputMode
547
val Complete: OutputMode
548
val Update: OutputMode
549
}
550
551
abstract class Trigger {
552
def name: String
553
}
554
555
object Trigger {
556
def ProcessingTime(interval: String): Trigger
557
def ProcessingTime(interval: Duration): Trigger
558
def Once(): Trigger
559
def Continuous(interval: String): Trigger
560
def Continuous(interval: Duration): Trigger
561
def AvailableNow(): Trigger
562
}
563
564
abstract class StreamingQuery {
565
def id: UUID
566
def name: String
567
def isActive: Boolean
568
def awaitTermination(): Unit
569
def awaitTermination(timeoutMs: Long): Boolean
570
def stop(): Unit
571
def processAllAvailable(): Unit
572
def lastProgress: StreamingQueryProgress
573
def recentProgress: Array[StreamingQueryProgress]
574
def status: StreamingQueryStatus
575
def exception: Option[StreamingQueryException]
576
}
577
578
case class Database(
579
name: String,
580
catalog: Option[String],
581
description: Option[String],
582
locationUri: String
583
)
584
585
case class Table(
586
name: String,
587
catalog: Option[String],
588
namespace: Array[String],
589
description: Option[String],
590
tableType: String,
591
isTemporary: Boolean
592
)
593
594
case class Function(
595
name: String,
596
catalog: Option[String],
597
namespace: Array[String],
598
description: Option[String],
599
className: String,
600
isTemporary: Boolean
601
)
602
603
abstract class Encoder[T] extends Serializable {
604
def schema: StructType
605
def clsTag: ClassTag[T]
606
}
607
608
class TypedColumn[-T, U](
609
node: ColumnNode,
610
encoder: Encoder[U]
611
) extends Column {
612
def name(alias: String): TypedColumn[T, U]
613
}
614
615
abstract class StorageLevel extends Serializable {
616
def useDisk: Boolean
617
def useMemory: Boolean
618
def useOffHeap: Boolean
619
def deserialized: Boolean
620
def replication: Int
621
}
622
623
object StorageLevel {
624
val NONE: StorageLevel
625
val DISK_ONLY: StorageLevel
626
val DISK_ONLY_2: StorageLevel
627
val MEMORY_ONLY: StorageLevel
628
val MEMORY_ONLY_2: StorageLevel
629
val MEMORY_ONLY_SER: StorageLevel
630
val MEMORY_ONLY_SER_2: StorageLevel
631
val MEMORY_AND_DISK: StorageLevel
632
val MEMORY_AND_DISK_2: StorageLevel
633
val MEMORY_AND_DISK_SER: StorageLevel
634
val MEMORY_AND_DISK_SER_2: StorageLevel
635
val OFF_HEAP: StorageLevel
636
}
637
638
class DataFrameWriter[T] {
639
def mode(saveMode: SaveMode): DataFrameWriter[T]
640
def mode(saveMode: String): DataFrameWriter[T]
641
def format(source: String): DataFrameWriter[T]
642
def option(key: String, value: String): DataFrameWriter[T]
643
def options(options: Map[String, String]): DataFrameWriter[T]
644
def partitionBy(colNames: String*): DataFrameWriter[T]
645
def bucketBy(numBuckets: Int, colName: String, colNames: String*): DataFrameWriter[T]
646
def sortBy(colName: String, colNames: String*): DataFrameWriter[T]
647
def save(): Unit
648
def save(path: String): Unit
649
def insertInto(tableName: String): Unit
650
def saveAsTable(name: String): Unit
651
def json(path: String): Unit
652
def parquet(path: String): Unit
653
def orc(path: String): Unit
654
def text(path: String): Unit
655
def csv(path: String): Unit
656
def jdbc(url: String, table: String, connectionProperties: Properties): Unit
657
}
658
659
abstract class SaveMode {
660
def name(): String
661
}
662
663
object SaveMode {
664
val Append: SaveMode
665
val Overwrite: SaveMode
666
val ErrorIfExists: SaveMode
667
val Ignore: SaveMode
668
}
669
670
abstract class InternalRow extends Serializable {
671
def get(ordinal: Int, dataType: DataType): Any
672
def isNullAt(ordinal: Int): Boolean
673
def getBoolean(ordinal: Int): Boolean
674
def getByte(ordinal: Int): Byte
675
def getShort(ordinal: Int): Short
676
def getInt(ordinal: Int): Int
677
def getLong(ordinal: Int): Long
678
def getFloat(ordinal: Int): Float
679
def getDouble(ordinal: Int): Double
680
}
681
682
case class StreamingQueryProgress(
683
id: UUID,
684
runId: UUID,
685
name: String,
686
timestamp: String,
687
batchId: Long,
688
batchDuration: Long,
689
durationMs: Map[String, Long],
690
eventTime: Map[String, String],
691
stateOperators: Seq[StateOperatorProgress],
692
sources: Seq[SourceProgress],
693
sink: SinkProgress,
694
observedMetrics: Map[String, Row]
695
)
696
697
case class StreamingQueryStatus(
698
message: String,
699
isDataAvailable: Boolean,
700
isTriggerActive: Boolean
701
)
702
703
case class StreamingQueryException(
704
message: String,
705
cause: Option[String],
706
startOffset: Option[OffsetSeq],
707
endOffset: Option[OffsetSeq]
708
) extends Exception(message)
709
710
case class StateOperatorProgress(
711
operatorName: String,
712
numRowsTotal: Long,
713
numRowsUpdated: Long,
714
allUpdatesTimeMs: Long,
715
numRowsRemoved: Long,
716
allRemovalsTimeMs: Long,
717
commitTimeMs: Long,
718
memoryUsedBytes: Long,
719
numRowsDroppedByWatermark: Long,
720
numShufflePartitions: Long,
721
numStateStoreInstances: Long,
722
customMetrics: Map[String, Long]
723
)
724
725
case class SourceProgress(
726
description: String,
727
startOffset: Option[String],
728
endOffset: Option[String],
729
latestOffset: Option[String],
730
numInputRows: Long,
731
inputRowsPerSecond: Double,
732
processedRowsPerSecond: Double,
733
metrics: Map[String, String]
734
)
735
736
case class SinkProgress(
737
description: String,
738
numOutputRows: Option[Long],
739
metrics: Map[String, String]
740
)
741
742
abstract class OffsetSeq extends Serializable {
743
def toStreamProgress(sources: Seq[Source]): StreamingQueryProgress
744
}
745
746
abstract class WindowSpec {
747
def partitionBy(cols: Column*): WindowSpec
748
def partitionBy(colNames: String*): WindowSpec
749
def orderBy(cols: Column*): WindowSpec
750
def orderBy(colNames: String*): WindowSpec
751
def rowsBetween(start: Long, end: Long): WindowSpec
752
def rangeBetween(start: Long, end: Long): WindowSpec
753
}
754
755
object Window {
756
def partitionBy(cols: Column*): WindowSpec
757
def partitionBy(colNames: String*): WindowSpec
758
def orderBy(cols: Column*): WindowSpec
759
def orderBy(colNames: String*): WindowSpec
760
val unboundedPreceding: Long
761
val unboundedFollowing: Long
762
val currentRow: Long
763
}
764
765
abstract class Metadata extends Serializable {
766
def json: String
767
def contains(key: String): Boolean
768
def getLong(key: String): Long
769
def getDouble(key: String): Double
770
def getBoolean(key: String): Boolean
771
def getString(key: String): String
772
def getMetadata(key: String): Metadata
773
}
774
775
object Metadata {
776
val empty: Metadata
777
def fromJson(json: String): Metadata
778
}
779
780
abstract class ColumnNode extends Serializable {
781
def sql: String
782
def normalized: ColumnNode
783
}
784
785
abstract class Source extends Serializable {
786
def schema: StructType
787
}
788
789
abstract class Properties extends java.util.Properties
790
791
abstract class ForeachWriter[T] extends Serializable {
792
def open(partitionId: Long, epochId: Long): Boolean
793
def process(value: T): Unit
794
def close(errorOrNull: Throwable): Unit
795
}
796
797
abstract class ClassTag[T] extends Serializable
798
799
abstract class RuntimeConfig extends Serializable {
800
def get(key: String): String
801
def get(key: String, defaultValue: String): String
802
def getOption(key: String): Option[String]
803
def set(key: String, value: String): RuntimeConfig
804
def set(key: String, value: Boolean): RuntimeConfig
805
def set(key: String, value: Long): RuntimeConfig
806
def set(key: String, value: Double): RuntimeConfig
807
def unset(key: String): RuntimeConfig
808
def isModifiable(key: String): Boolean
809
}
810
811
abstract class SparkContext extends Serializable {
812
def version: String
813
def applicationId: String
814
def applicationAttemptId: Option[String]
815
def master: String
816
def appName: String
817
def jars: Seq[String]
818
def startTime: Long
819
def defaultParallelism: Int
820
def defaultMinPartitions: Int
821
def hadoopConfiguration: Configuration
822
def stop(): Unit
823
}
824
825
abstract class SparkConf extends Serializable {
826
def set(key: String, value: String): SparkConf
827
def setAppName(name: String): SparkConf
828
def setMaster(master: String): SparkConf
829
def get(key: String): String
830
def get(key: String, defaultValue: String): String
831
def getOption(key: String): Option[String]
832
def getAll: Array[(String, String)]
833
def contains(key: String): Boolean
834
def remove(key: String): SparkConf
835
}
836
837
abstract class SessionState extends Serializable {
838
def catalog: SessionCatalog
839
def conf: SQLConf
840
def experimentalMethods: ExperimentalMethods
841
def functionRegistry: FunctionRegistry
842
def udf: UDFRegistration
843
def analyzer: Analyzer
844
def optimizer: Optimizer
845
def planner: SparkPlanner
846
def streamingQueryManager: StreamingQueryManager
847
}
848
849
abstract class SharedState extends Serializable {
850
def sparkContext: SparkContext
851
def cacheManager: CacheManager
852
def listener: SQLListener
853
def externalCatalog: ExternalCatalog
854
def globalTempViewManager: GlobalTempViewManager
855
def streamingQueryManager: StreamingQueryManager
856
}
857
858
abstract class SQLContext extends Serializable {
859
def sparkSession: SparkSession
860
def sparkContext: SparkContext
861
def sql(sqlText: String): DataFrame
862
def table(tableName: String): DataFrame
863
def tableNames(): Array[String]
864
def tableNames(databaseName: String): Array[String]
865
def tables(): DataFrame
866
def tables(databaseName: String): DataFrame
867
}
868
869
abstract class StreamingQueryManager extends Serializable {
870
def active: Array[StreamingQuery]
871
def get(id: String): StreamingQuery
872
def get(id: UUID): StreamingQuery
873
def resetTerminated(): Unit
874
def awaitAnyTermination(): Unit
875
def awaitAnyTermination(timeoutMs: Long): Boolean
876
}
877
878
abstract class ExperimentalMethods extends Serializable
879
880
abstract class ExecutionListenerManager extends Serializable {
881
def register(listener: QueryExecutionListener): Unit
882
def unregister(listener: QueryExecutionListener): Unit
883
def clear(): Unit
884
}
885
886
abstract class TimeTravel extends Serializable
887
888
abstract class Configuration extends Serializable
889
890
abstract class SessionCatalog extends Serializable
891
abstract class SQLConf extends Serializable
892
abstract class FunctionRegistry extends Serializable
893
abstract class Analyzer extends Serializable
894
abstract class Optimizer extends Serializable
895
abstract class SparkPlanner extends Serializable
896
abstract class CacheManager extends Serializable
897
abstract class SQLListener extends Serializable
898
abstract class ExternalCatalog extends Serializable
899
abstract class GlobalTempViewManager extends Serializable
900
abstract class QueryExecutionListener extends Serializable
901
902
abstract class RDD[T] extends Serializable {
903
def collect(): Array[T]
904
def count(): Long
905
def first(): T
906
def take(num: Int): Array[T]
907
def foreach(f: T => Unit): Unit
908
def map[U: ClassTag](f: T => U): RDD[U]
909
def filter(f: T => Boolean): RDD[T]
910
def cache(): RDD[T]
911
def persist(): RDD[T]
912
def unpersist(): RDD[T]
913
}
914
```