0
# Dataset and DataFrame Operations
1
2
Core data structures and operations for structured data manipulation. Dataset provides type-safe operations while DataFrame offers untyped flexibility. Both support functional and SQL-style operations with lazy evaluation and Catalyst optimization.
3
4
## Capabilities
5
6
### Dataset[T]
7
8
Strongly-typed collection of domain objects that can be transformed using functional and relational operations.
9
10
```scala { .api }
11
/**
12
* Strongly-typed collection of domain objects
13
* @tparam T The type of the objects in the dataset
14
*/
15
class Dataset[T] {
16
/** Dataset schema */
17
def schema: StructType
18
19
/** Column names */
20
def columns: Array[String]
21
22
/** Print schema to console */
23
def printSchema(): Unit
24
25
/** Convert to DataFrame (untyped) */
26
def toDF(): DataFrame
27
def toDF(colNames: String*): DataFrame
28
29
/** Convert to different type */
30
def as[U: Encoder]: Dataset[U]
31
32
/** Get write interface */
33
def write: DataFrameWriter[T]
34
35
/** Get write interface for streaming */
36
def writeStream: DataStreamWriter[T]
37
}
38
39
// DataFrame is type alias for Dataset[Row]
40
type DataFrame = Dataset[Row]
41
```
42
43
### Dataset Transformations
44
45
Lazy transformations that return new Datasets without triggering computation.
46
47
```scala { .api }
48
class Dataset[T] {
49
/** Select columns by name or expression */
50
def select(cols: Column*): DataFrame
51
def select(col: String, cols: String*): DataFrame
52
def selectExpr(exprs: String*): DataFrame
53
54
/** Filter rows based on condition */
55
def filter(condition: Column): Dataset[T]
56
def filter(conditionExpr: String): Dataset[T]
57
def where(condition: Column): Dataset[T]
58
59
/** Add or replace column */
60
def withColumn(colName: String, col: Column): DataFrame
61
62
/** Rename column */
63
def withColumnRenamed(existingName: String, newName: String): DataFrame
64
65
/** Drop columns */
66
def drop(colName: String): DataFrame
67
def drop(colNames: String*): DataFrame
68
def drop(col: Column): DataFrame
69
70
/** Remove duplicate rows */
71
def distinct(): Dataset[T]
72
def dropDuplicates(): Dataset[T]
73
def dropDuplicates(colNames: String*): Dataset[T]
74
75
/** Limit number of rows */
76
def limit(n: Int): Dataset[T]
77
78
/** Sample fraction of rows */
79
def sample(fraction: Double): Dataset[T]
80
def sample(withReplacement: Boolean, fraction: Double): Dataset[T]
81
def sample(withReplacement: Boolean, fraction: Double, seed: Long): Dataset[T]
82
83
/** Sort rows */
84
def sort(sortCol: String, sortCols: String*): Dataset[T]
85
def sort(sortExprs: Column*): Dataset[T]
86
def orderBy(sortCol: String, sortCols: String*): Dataset[T]
87
def orderBy(sortExprs: Column*): Dataset[T]
88
89
/** Repartition */
90
def repartition(numPartitions: Int): Dataset[T]
91
def repartition(partitionExprs: Column*): Dataset[T]
92
def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T]
93
def coalesce(numPartitions: Int): Dataset[T]
94
}
95
```
96
97
### Dataset Actions
98
99
Eager operations that trigger computation and return results.
100
101
```scala { .api }
102
class Dataset[T] {
103
/** Display DataFrame contents */
104
def show(): Unit
105
def show(numRows: Int): Unit
106
def show(numRows: Int, truncate: Boolean): Unit
107
def show(numRows: Int, truncate: Int): Unit
108
def show(numRows: Int, truncate: Int, vertical: Boolean): Unit
109
110
/** Collect all rows to driver */
111
def collect(): Array[T]
112
def collectAsList(): java.util.List[T]
113
114
/** Take first N rows */
115
def take(n: Int): Array[T]
116
def takeAsList(n: Int): java.util.List[T]
117
118
/** Get first row */
119
def first(): T
120
def head(): T
121
def head(n: Int): Array[T]
122
123
/** Count rows */
124
def count(): Long
125
126
/** Check if Dataset is empty */
127
def isEmpty: Boolean
128
129
/** Apply function to each row */
130
def foreach(f: T => Unit): Unit
131
def foreachPartition(f: Iterator[T] => Unit): Unit
132
133
/** Reduce rows to single value */
134
def reduce(func: (T, T) => T): T
135
136
/** Cache Dataset in memory */
137
def cache(): Dataset[T]
138
def persist(): Dataset[T]
139
def persist(newLevel: StorageLevel): Dataset[T]
140
def unpersist(): Dataset[T]
141
def unpersist(blocking: Boolean): Dataset[T]
142
}
143
```
144
145
**Usage Examples:**
146
147
```scala
148
import org.apache.spark.sql.functions._
149
150
// Basic transformations
151
val people = spark.read.json("people.json")
152
153
val adults = people
154
.filter(col("age") > 18)
155
.select("name", "age")
156
.orderBy(col("age").desc)
157
158
adults.show()
159
160
// Column operations
161
val enriched = people
162
.withColumn("age_group", when(col("age") < 18, "minor").otherwise("adult"))
163
.withColumn("full_name", concat(col("first_name"), lit(" "), col("last_name")))
164
.drop("first_name", "last_name")
165
166
// Actions
167
val totalCount = people.count()
168
val firstPerson = people.first()
169
val allPeople = people.collect()
170
```
171
172
### Joins and Set Operations
173
174
Operations for combining multiple Datasets.
175
176
```scala { .api }
177
class Dataset[T] {
178
/** Join with another Dataset */
179
def join(right: Dataset[_]): DataFrame
180
def join(right: Dataset[_], usingColumn: String): DataFrame
181
def join(right: Dataset[_], usingColumns: Seq[String]): DataFrame
182
def join(right: Dataset[_], joinExprs: Column): DataFrame
183
def join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame
184
185
/** Cross join (Cartesian product) */
186
def crossJoin(right: Dataset[_]): DataFrame
187
188
/** Union operations */
189
def union(other: Dataset[T]): Dataset[T]
190
def unionAll(other: Dataset[T]): Dataset[T]
191
def unionByName(other: Dataset[T]): Dataset[T]
192
193
/** Set operations */
194
def intersect(other: Dataset[T]): Dataset[T]
195
def intersectAll(other: Dataset[T]): Dataset[T]
196
def except(other: Dataset[T]): Dataset[T]
197
def exceptAll(other: Dataset[T]): Dataset[T]
198
}
199
```
200
201
**Usage Examples:**
202
203
```scala
204
val employees = spark.table("employees")
205
val departments = spark.table("departments")
206
207
// Inner join
208
val employeesWithDept = employees.join(departments,
209
employees("dept_id") === departments("id"))
210
211
// Left outer join
212
val allEmployees = employees.join(departments,
213
employees("dept_id") === departments("id"), "left_outer")
214
215
// Join on multiple columns
216
val result = employees.join(departments,
217
Seq("dept_id", "location"), "inner")
218
219
// Union datasets
220
val currentEmployees = spark.table("current_employees")
221
val formerEmployees = spark.table("former_employees")
222
val allEmployees = currentEmployees.union(formerEmployees)
223
```
224
225
### Aggregations and Grouping
226
227
Group data and perform aggregate operations.
228
229
```scala { .api }
230
class Dataset[T] {
231
/** Group by columns */
232
def groupBy(cols: Column*): RelationalGroupedDataset
233
def groupBy(col1: String, cols: String*): RelationalGroupedDataset
234
235
/** Aggregate without grouping */
236
def agg(expr: Column, exprs: Column*): DataFrame
237
def agg(exprs: Map[String, String]): DataFrame
238
}
239
240
/**
241
* Dataset that has been logically grouped by user specified grouping key
242
*/
243
class RelationalGroupedDataset {
244
/** Count rows in each group */
245
def count(): DataFrame
246
247
/** Sum columns for each group */
248
def sum(colNames: String*): DataFrame
249
250
/** Average columns for each group */
251
def avg(colNames: String*): DataFrame
252
def mean(colNames: String*): DataFrame
253
254
/** Min/Max columns for each group */
255
def min(colNames: String*): DataFrame
256
def max(colNames: String*): DataFrame
257
258
/** Aggregate with expressions */
259
def agg(expr: Column, exprs: Column*): DataFrame
260
def agg(exprs: Map[String, String]): DataFrame
261
262
/** Pivot on column values */
263
def pivot(pivotColumn: String): RelationalGroupedDataset
264
def pivot(pivotColumn: String, values: Seq[Any]): RelationalGroupedDataset
265
}
266
```
267
268
**Usage Examples:**
269
270
```scala
271
import org.apache.spark.sql.functions._
272
273
val sales = spark.table("sales")
274
275
// Simple aggregation
276
val totalSales = sales.agg(sum("amount").alias("total"))
277
278
// Group by with aggregation
279
val salesByRegion = sales
280
.groupBy("region")
281
.agg(
282
sum("amount").alias("total_sales"),
283
avg("amount").alias("avg_sale"),
284
count("*").alias("num_transactions")
285
)
286
287
// Multiple group by columns
288
val salesByRegionAndMonth = sales
289
.groupBy("region", "month")
290
.sum("amount")
291
292
// Pivot table
293
val salesPivot = sales
294
.groupBy("region")
295
.pivot("month")
296
.sum("amount")
297
```
298
299
### Typed Dataset Operations
300
301
Type-safe functional operations on Datasets.
302
303
```scala { .api }
304
class Dataset[T] {
305
/** Transform each element */
306
def map[U: Encoder](func: T => U): Dataset[U]
307
308
/** Transform each element to zero or more elements */
309
def flatMap[U: Encoder](func: T => TraversableOnce[U]): Dataset[U]
310
311
/** Transform each partition */
312
def mapPartitions[U: Encoder](func: Iterator[T] => Iterator[U]): Dataset[U]
313
314
/** Group by key and apply function to groups */
315
def groupByKey[K: Encoder](func: T => K): KeyValueGroupedDataset[K, T]
316
}
317
318
/**
319
* Dataset that has been logically grouped by a user specified grouping key
320
*/
321
class KeyValueGroupedDataset[K, V] {
322
/** Apply function to each group */
323
def mapGroups[U: Encoder](f: (K, Iterator[V]) => U): Dataset[U]
324
325
/** Transform values in each group */
326
def mapValues[U: Encoder](func: V => U): KeyValueGroupedDataset[K, U]
327
328
/** Aggregate each group */
329
def agg[U: Encoder](column: TypedColumn[V, U]): Dataset[(K, U)]
330
331
/** Reduce values in each group */
332
def reduceGroups(f: (V, V) => V): Dataset[(K, V)]
333
334
/** Count elements in each group */
335
def count(): Dataset[(K, Long)]
336
}
337
```
338
339
**Usage Examples:**
340
341
```scala
342
case class Person(name: String, age: Int, city: String)
343
344
val people = spark.read.json("people.json").as[Person]
345
346
// Type-safe transformations
347
val adults = people.filter(_.age >= 18)
348
val names = people.map(_.name)
349
val cityAges = people.map(p => (p.city, p.age))
350
351
// Group by key
352
val peopleByCity = people.groupByKey(_.city)
353
val avgAgeByCity = peopleByCity.agg(avg(col("age")).as[Double])
354
355
// Reduce groups
356
val oldestByCity = peopleByCity.reduceGroups((p1, p2) =>
357
if (p1.age > p2.age) p1 else p2
358
)
359
```