0
# DataFrame and Dataset Operations
1
2
DataFrames and Datasets are the core distributed data structures in Spark SQL. DataFrame is an alias for Dataset[Row], providing an untyped interface, while Dataset[T] offers compile-time type safety. Both share the same underlying APIs and optimizations through the Catalyst query engine.
3
4
## Core Types
5
6
```scala { .api }
7
type DataFrame = Dataset[Row]
8
9
class Dataset[T] extends Serializable {
10
def schema: StructType
11
def dtypes: Array[(String, String)]
12
def columns: Array[String]
13
def count(): Long
14
def isEmpty: Boolean
15
def isLocal: Boolean
16
def isStreaming: Boolean
17
}
18
19
class Row extends Serializable {
20
def length: Int
21
def size: Int
22
def get(i: Int): Any
23
def getAs[T](i: Int): T
24
def getAs[T](fieldName: String): T
25
def getString(i: Int): String
26
def getInt(i: Int): Int
27
def getLong(i: Int): Long
28
def getDouble(i: Int): Double
29
def getFloat(i: Int): Float
30
def getBoolean(i: Int): Boolean
31
def getDate(i: Int): java.sql.Date
32
def getTimestamp(i: Int): java.sql.Timestamp
33
}
34
```
35
36
## Schema Operations
37
38
```scala { .api }
39
class Dataset[T] {
40
def schema: StructType
41
def dtypes: Array[(String, String)]
42
def columns: Array[String]
43
def printSchema(): Unit
44
def printSchema(level: Int): Unit
45
}
46
```
47
48
**Usage Examples:**
49
50
```scala
51
import org.apache.spark.sql.types._
52
53
// Examine schema
54
val df = spark.read.json("people.json")
55
df.printSchema()
56
df.schema.foreach(field => println(s"${field.name}: ${field.dataType}"))
57
58
// Get column information
59
val columnNames = df.columns
60
val columnTypes = df.dtypes
61
62
// Check schema programmatically
63
val hasNameColumn = df.schema.exists(_.name == "name")
64
val nameField = df.schema.find(_.name == "name")
65
```
66
67
## Column Selection and Projection
68
69
```scala { .api }
70
class Dataset[T] {
71
def select(cols: Column*): DataFrame
72
def select(col: String, cols: String*): DataFrame
73
def selectExpr(exprs: String*): DataFrame
74
def drop(colNames: String*): Dataset[T]
75
def drop(col: Column): DataFrame
76
def withColumn(colName: String, col: Column): DataFrame
77
def withColumnRenamed(existingName: String, newName: String): DataFrame
78
}
79
```
80
81
**Usage Examples:**
82
83
```scala
84
import org.apache.spark.sql.functions._
85
86
// Select columns
87
val selected = df.select("name", "age")
88
val selectedWithCol = df.select(col("name"), col("age") + 1)
89
90
// Select with expressions
91
val computed = df.selectExpr("name", "age + 1 as next_age", "upper(name) as upper_name")
92
93
// Add columns
94
val withFullName = df.withColumn("full_name", concat(col("first"), lit(" "), col("last")))
95
96
// Rename columns
97
val renamed = df.withColumnRenamed("old_name", "new_name")
98
99
// Drop columns
100
val dropped = df.drop("unwanted_column", "another_column")
101
val droppedByCol = df.drop(col("age"))
102
```
103
104
## Filtering and Conditions
105
106
```scala { .api }
107
class Dataset[T] {
108
def filter(condition: Column): Dataset[T]
109
def filter(conditionExpr: String): Dataset[T]
110
def where(condition: Column): Dataset[T]
111
def where(conditionExpr: String): Dataset[T]
112
}
113
```
114
115
**Usage Examples:**
116
117
```scala
118
// Filter with Column expressions
119
val adults = df.filter(col("age") >= 18)
120
val activeUsers = df.filter(col("active") === true && col("last_login").isNotNull)
121
122
// Filter with SQL expressions
123
val filtered = df.filter("age >= 18 AND active = true")
124
val complex = df.where("age BETWEEN 25 AND 65 AND city IN ('New York', 'San Francisco')")
125
126
// Multiple conditions
127
val result = df
128
.filter(col("age") > 21)
129
.filter(col("country") === "US")
130
.filter(col("score").isNotNull)
131
```
132
133
## Sorting and Ordering
134
135
```scala { .api }
136
class Dataset[T] {
137
def sort(sortCol: String, sortCols: String*): Dataset[T]
138
def sort(sortExprs: Column*): Dataset[T]
139
def orderBy(sortCol: String, sortCols: String*): Dataset[T]
140
def orderBy(sortExprs: Column*): Dataset[T]
141
}
142
```
143
144
**Usage Examples:**
145
146
```scala
147
// Sort by column names
148
val sorted = df.sort("age", "name")
149
150
// Sort with Column expressions and directions
151
val ordered = df.orderBy(col("age").desc, col("name").asc)
152
153
// Complex sorting
154
val complexSort = df.orderBy(
155
col("department"),
156
col("salary").desc,
157
col("hire_date").asc
158
)
159
```
160
161
## Joins
162
163
```scala { .api }
164
class Dataset[T] {
165
def join(right: Dataset[_]): DataFrame
166
def join(right: Dataset[_], usingColumn: String): DataFrame
167
def join(right: Dataset[_], usingColumns: Seq[String]): DataFrame
168
def join(right: Dataset[_], joinExprs: Column): DataFrame
169
def join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame
170
171
def crossJoin(right: Dataset[_]): DataFrame
172
}
173
```
174
175
**Join Types**: `"inner"`, `"cross"`, `"outer"`, `"full"`, `"full_outer"`, `"left"`, `"left_outer"`, `"right"`, `"right_outer"`, `"left_semi"`, `"left_anti"`
176
177
**Usage Examples:**
178
179
```scala
180
val users = spark.table("users")
181
val orders = spark.table("orders")
182
183
// Inner join (default)
184
val userOrders = users.join(orders, users("id") === orders("user_id"))
185
186
// Left outer join
187
val allUsers = users.join(orders, users("id") === orders("user_id"), "left_outer")
188
189
// Join on multiple columns
190
val joined = users.join(orders,
191
users("id") === orders("user_id") && users("region") === orders("region"))
192
193
// Join using column names (when columns have same names)
194
val simple = users.join(orders, "user_id")
195
val multiple = users.join(orders, Seq("user_id", "region"))
196
197
// Cross join
198
val cartesian = users.crossJoin(orders)
199
```
200
201
## Set Operations
202
203
```scala { .api }
204
class Dataset[T] {
205
def union(other: Dataset[T]): Dataset[T]
206
def unionAll(other: Dataset[T]): Dataset[T]
207
def unionByName(other: Dataset[T]): Dataset[T]
208
def intersect(other: Dataset[T]): Dataset[T]
209
def intersectAll(other: Dataset[T]): Dataset[T]
210
def except(other: Dataset[T]): Dataset[T]
211
def exceptAll(other: Dataset[T]): Dataset[T]
212
}
213
```
214
215
**Usage Examples:**
216
217
```scala
218
val df1 = spark.range(1, 5).toDF("id")
219
val df2 = spark.range(3, 8).toDF("id")
220
221
// Union (removes duplicates in Spark 2.0+)
222
val combined = df1.union(df2)
223
224
// Union by column names (handles different column orders)
225
val byName = df1.unionByName(df2)
226
227
// Set operations
228
val intersection = df1.intersect(df2) // Values in both
229
val difference = df1.except(df2) // Values in df1 but not df2
230
```
231
232
## Deduplication
233
234
```scala { .api }
235
class Dataset[T] {
236
def distinct(): Dataset[T]
237
def dropDuplicates(): Dataset[T]
238
def dropDuplicates(colNames: Array[String]): Dataset[T]
239
def dropDuplicates(colNames: Seq[String]): Dataset[T]
240
def dropDuplicates(col1: String, cols: String*): Dataset[T]
241
}
242
```
243
244
**Usage Examples:**
245
246
```scala
247
// Remove all duplicate rows
248
val unique = df.distinct()
249
250
// Remove duplicates based on specific columns
251
val uniqueUsers = df.dropDuplicates("user_id")
252
val uniqueByMultiple = df.dropDuplicates("user_id", "email")
253
val uniqueBySeq = df.dropDuplicates(Seq("user_id", "email"))
254
```
255
256
## Sampling and Limiting
257
258
```scala { .api }
259
class Dataset[T] {
260
def limit(n: Int): Dataset[T]
261
def sample(fraction: Double): Dataset[T]
262
def sample(fraction: Double, seed: Long): Dataset[T]
263
def sample(withReplacement: Boolean, fraction: Double): Dataset[T]
264
def sample(withReplacement: Boolean, fraction: Double, seed: Long): Dataset[T]
265
def sampleBy[K](col: String, fractions: Map[K, Double], seed: Long): Dataset[T]
266
}
267
```
268
269
**Usage Examples:**
270
271
```scala
272
// Limit results
273
val top100 = df.limit(100)
274
275
// Random sampling
276
val sample10Percent = df.sample(0.1)
277
val sampleWithSeed = df.sample(false, 0.2, seed = 42)
278
279
// Stratified sampling by column values
280
val stratified = df.sampleBy("category", Map("A" -> 0.1, "B" -> 0.2), seed = 123)
281
```
282
283
## Action Operations
284
285
```scala { .api }
286
class Dataset[T] {
287
def count(): Long
288
def collect(): Array[T]
289
def collectAsList(): java.util.List[T]
290
def first(): T
291
def head(): T
292
def head(n: Int): Array[T]
293
def take(n: Int): Array[T]
294
def takeAsList(n: Int): java.util.List[T]
295
def tail(n: Int): Array[T]
296
def show(): Unit
297
def show(numRows: Int): Unit
298
def show(numRows: Int, truncate: Boolean): Unit
299
def show(numRows: Int, truncate: Int): Unit
300
def show(numRows: Int, truncate: Int, vertical: Boolean): Unit
301
}
302
```
303
304
**Usage Examples:**
305
306
```scala
307
// Count rows
308
val totalRows = df.count()
309
310
// Collect data (use with caution on large datasets)
311
val allData = df.collect()
312
val firstRow = df.first()
313
val top10 = df.take(10)
314
315
// Display data
316
df.show() // Show 20 rows, truncate at 20 chars
317
df.show(50) // Show 50 rows
318
df.show(20, false) // Don't truncate strings
319
df.show(10, 100, true) // Vertical format
320
```
321
322
## Persistence and Caching
323
324
```scala { .api }
325
class Dataset[T] {
326
def cache(): Dataset[T]
327
def persist(): Dataset[T]
328
def persist(newLevel: StorageLevel): Dataset[T]
329
def unpersist(): Dataset[T]
330
def unpersist(blocking: Boolean): Dataset[T]
331
def storageLevel: StorageLevel
332
def isStreaming: Boolean
333
}
334
```
335
336
**Usage Examples:**
337
338
```scala
339
import org.apache.spark.storage.StorageLevel
340
341
// Cache in memory (default: MEMORY_AND_DISK)
342
val cached = df.cache()
343
344
// Persist with specific storage level
345
val persisted = df.persist(StorageLevel.MEMORY_ONLY)
346
val diskOnly = df.persist(StorageLevel.DISK_ONLY)
347
val memoryAndDiskSer = df.persist(StorageLevel.MEMORY_AND_DISK_SER)
348
349
// Check storage level
350
val level = df.storageLevel
351
352
// Remove from cache
353
df.unpersist()
354
df.unpersist(blocking = true) // Wait for removal to complete
355
```
356
357
## Iteration and Functional Operations
358
359
```scala { .api }
360
class Dataset[T] {
361
def foreach(f: T => Unit): Unit
362
def foreachPartition(f: Iterator[T] => Unit): Unit
363
def map[U : Encoder](func: T => U): Dataset[U]
364
def mapPartitions[U : Encoder](func: Iterator[T] => Iterator[U]): Dataset[U]
365
def flatMap[U : Encoder](func: T => TraversableOnce[U]): Dataset[U]
366
def filter(func: T => Boolean): Dataset[T]
367
def reduce(func: (T, T) => T): T
368
}
369
```
370
371
**Usage Examples:**
372
373
```scala
374
// Type-safe operations (require Encoder for result type)
375
case class Person(name: String, age: Int)
376
val people: Dataset[Person] = spark.createDataFrame(peopleSeq).as[Person]
377
378
// Map transformation
379
val names = people.map(_.name)
380
val ages = people.map(_.age)
381
382
// Filter with function
383
val adults = people.filter(_.age >= 18)
384
385
// FlatMap
386
val words = people.flatMap(_.name.split(" "))
387
388
// Reduce
389
val totalAge = people.map(_.age).reduce(_ + _)
390
391
// Side effects
392
people.foreach(person => println(s"Person: ${person.name}"))
393
people.foreachPartition { iter =>
394
// Process partition
395
iter.foreach(println)
396
}
397
```
398
399
## Type Conversion
400
401
```scala { .api }
402
class Dataset[T] {
403
def as[U : Encoder]: Dataset[U]
404
def toDF(): DataFrame
405
def toDF(colNames: String*): DataFrame
406
def rdd: RDD[T]
407
def javaRDD: JavaRDD[T]
408
def toJavaRDD: JavaRDD[T]
409
}
410
```
411
412
**Usage Examples:**
413
414
```scala
415
// Convert to typed Dataset
416
case class User(id: Long, name: String, age: Int)
417
val typedUsers = df.as[User]
418
419
// Convert to DataFrame
420
val dataFrame = dataset.toDF()
421
val renamedDF = dataset.toDF("col1", "col2", "col3")
422
423
// Convert to RDD
424
val rdd = df.rdd
425
val userRDD = typedUsers.rdd
426
427
// Java interop
428
val javaRDD = df.toJavaRDD
429
```
430
431
## Partitioning Operations
432
433
```scala { .api }
434
class Dataset[T] {
435
def repartition(numPartitions: Int): Dataset[T]
436
def repartition(partitionExprs: Column*): Dataset[T]
437
def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T]
438
def coalesce(numPartitions: Int): Dataset[T]
439
def repartitionByRange(partitionExprs: Column*): Dataset[T]
440
def repartitionByRange(numPartitions: Int, partitionExprs: Column*): Dataset[T]
441
}
442
```
443
444
**Usage Examples:**
445
446
```scala
447
// Change number of partitions
448
val repartitioned = df.repartition(200)
449
val coalesced = df.coalesce(10) // Reduce partitions without shuffle
450
451
// Partition by column values
452
val byDept = df.repartition(col("department"))
453
val byMultiple = df.repartition(col("year"), col("month"))
454
455
// Range partitioning (for ordered data)
456
val rangePartitioned = df.repartitionByRange(col("timestamp"))
457
val rangeWithCount = df.repartitionByRange(100, col("id"))
458
```