or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

catalog.mdcolumns-functions.mddata-io.mddataset-dataframe.mdindex.mdsession-management.mdstreaming.mdtypes-encoders.mdudfs.md

dataset-dataframe.mddocs/

0

# Dataset and DataFrame Operations

1

2

Core data structures and operations for structured data manipulation. Dataset provides type-safe operations while DataFrame offers untyped flexibility. Both support functional and SQL-style operations with lazy evaluation and Catalyst optimization.

3

4

## Capabilities

5

6

### Dataset[T]

7

8

Strongly-typed collection of domain objects that can be transformed using functional and relational operations.

9

10

```scala { .api }

11

/**

12

* Strongly-typed collection of domain objects

13

* @tparam T The type of the objects in the dataset

14

*/

15

class Dataset[T] {

16

/** Dataset schema */

17

def schema: StructType

18

19

/** Column names */

20

def columns: Array[String]

21

22

/** Print schema to console */

23

def printSchema(): Unit

24

25

/** Convert to DataFrame (untyped) */

26

def toDF(): DataFrame

27

def toDF(colNames: String*): DataFrame

28

29

/** Convert to different type */

30

def as[U: Encoder]: Dataset[U]

31

32

/** Get write interface */

33

def write: DataFrameWriter[T]

34

35

/** Get write interface for streaming */

36

def writeStream: DataStreamWriter[T]

37

}

38

39

// DataFrame is type alias for Dataset[Row]

40

type DataFrame = Dataset[Row]

41

```

42

43

### Dataset Transformations

44

45

Lazy transformations that return new Datasets without triggering computation.

46

47

```scala { .api }

48

class Dataset[T] {

49

/** Select columns by name or expression */

50

def select(cols: Column*): DataFrame

51

def select(col: String, cols: String*): DataFrame

52

def selectExpr(exprs: String*): DataFrame

53

54

/** Filter rows based on condition */

55

def filter(condition: Column): Dataset[T]

56

def filter(conditionExpr: String): Dataset[T]

57

def where(condition: Column): Dataset[T]

58

59

/** Add or replace column */

60

def withColumn(colName: String, col: Column): DataFrame

61

62

/** Rename column */

63

def withColumnRenamed(existingName: String, newName: String): DataFrame

64

65

/** Drop columns */

66

def drop(colName: String): DataFrame

67

def drop(colNames: String*): DataFrame

68

def drop(col: Column): DataFrame

69

70

/** Remove duplicate rows */

71

def distinct(): Dataset[T]

72

def dropDuplicates(): Dataset[T]

73

def dropDuplicates(colNames: String*): Dataset[T]

74

75

/** Limit number of rows */

76

def limit(n: Int): Dataset[T]

77

78

/** Sample fraction of rows */

79

def sample(fraction: Double): Dataset[T]

80

def sample(withReplacement: Boolean, fraction: Double): Dataset[T]

81

def sample(withReplacement: Boolean, fraction: Double, seed: Long): Dataset[T]

82

83

/** Sort rows */

84

def sort(sortCol: String, sortCols: String*): Dataset[T]

85

def sort(sortExprs: Column*): Dataset[T]

86

def orderBy(sortCol: String, sortCols: String*): Dataset[T]

87

def orderBy(sortExprs: Column*): Dataset[T]

88

89

/** Repartition */

90

def repartition(numPartitions: Int): Dataset[T]

91

def repartition(partitionExprs: Column*): Dataset[T]

92

def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T]

93

def coalesce(numPartitions: Int): Dataset[T]

94

}

95

```

96

97

### Dataset Actions

98

99

Eager operations that trigger computation and return results.

100

101

```scala { .api }

102

class Dataset[T] {

103

/** Display DataFrame contents */

104

def show(): Unit

105

def show(numRows: Int): Unit

106

def show(numRows: Int, truncate: Boolean): Unit

107

def show(numRows: Int, truncate: Int): Unit

108

def show(numRows: Int, truncate: Int, vertical: Boolean): Unit

109

110

/** Collect all rows to driver */

111

def collect(): Array[T]

112

def collectAsList(): java.util.List[T]

113

114

/** Take first N rows */

115

def take(n: Int): Array[T]

116

def takeAsList(n: Int): java.util.List[T]

117

118

/** Get first row */

119

def first(): T

120

def head(): T

121

def head(n: Int): Array[T]

122

123

/** Count rows */

124

def count(): Long

125

126

/** Check if Dataset is empty */

127

def isEmpty: Boolean

128

129

/** Apply function to each row */

130

def foreach(f: T => Unit): Unit

131

def foreachPartition(f: Iterator[T] => Unit): Unit

132

133

/** Reduce rows to single value */

134

def reduce(func: (T, T) => T): T

135

136

/** Cache Dataset in memory */

137

def cache(): Dataset[T]

138

def persist(): Dataset[T]

139

def persist(newLevel: StorageLevel): Dataset[T]

140

def unpersist(): Dataset[T]

141

def unpersist(blocking: Boolean): Dataset[T]

142

}

143

```

144

145

**Usage Examples:**

146

147

```scala

148

import org.apache.spark.sql.functions._

149

150

// Basic transformations

151

val people = spark.read.json("people.json")

152

153

val adults = people

154

.filter(col("age") > 18)

155

.select("name", "age")

156

.orderBy(col("age").desc)

157

158

adults.show()

159

160

// Column operations

161

val enriched = people

162

.withColumn("age_group", when(col("age") < 18, "minor").otherwise("adult"))

163

.withColumn("full_name", concat(col("first_name"), lit(" "), col("last_name")))

164

.drop("first_name", "last_name")

165

166

// Actions

167

val totalCount = people.count()

168

val firstPerson = people.first()

169

val allPeople = people.collect()

170

```

171

172

### Joins and Set Operations

173

174

Operations for combining multiple Datasets.

175

176

```scala { .api }

177

class Dataset[T] {

178

/** Join with another Dataset */

179

def join(right: Dataset[_]): DataFrame

180

def join(right: Dataset[_], usingColumn: String): DataFrame

181

def join(right: Dataset[_], usingColumns: Seq[String]): DataFrame

182

def join(right: Dataset[_], joinExprs: Column): DataFrame

183

def join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame

184

185

/** Cross join (Cartesian product) */

186

def crossJoin(right: Dataset[_]): DataFrame

187

188

/** Union operations */

189

def union(other: Dataset[T]): Dataset[T]

190

def unionAll(other: Dataset[T]): Dataset[T]

191

def unionByName(other: Dataset[T]): Dataset[T]

192

193

/** Set operations */

194

def intersect(other: Dataset[T]): Dataset[T]

195

def intersectAll(other: Dataset[T]): Dataset[T]

196

def except(other: Dataset[T]): Dataset[T]

197

def exceptAll(other: Dataset[T]): Dataset[T]

198

}

199

```

200

201

**Usage Examples:**

202

203

```scala

204

val employees = spark.table("employees")

205

val departments = spark.table("departments")

206

207

// Inner join

208

val employeesWithDept = employees.join(departments,

209

employees("dept_id") === departments("id"))

210

211

// Left outer join

212

val allEmployees = employees.join(departments,

213

employees("dept_id") === departments("id"), "left_outer")

214

215

// Join on multiple columns

216

val result = employees.join(departments,

217

Seq("dept_id", "location"), "inner")

218

219

// Union datasets

220

val currentEmployees = spark.table("current_employees")

221

val formerEmployees = spark.table("former_employees")

222

val allEmployees = currentEmployees.union(formerEmployees)

223

```

224

225

### Aggregations and Grouping

226

227

Group data and perform aggregate operations.

228

229

```scala { .api }

230

class Dataset[T] {

231

/** Group by columns */

232

def groupBy(cols: Column*): RelationalGroupedDataset

233

def groupBy(col1: String, cols: String*): RelationalGroupedDataset

234

235

/** Aggregate without grouping */

236

def agg(expr: Column, exprs: Column*): DataFrame

237

def agg(exprs: Map[String, String]): DataFrame

238

}

239

240

/**

241

* Dataset that has been logically grouped by user specified grouping key

242

*/

243

class RelationalGroupedDataset {

244

/** Count rows in each group */

245

def count(): DataFrame

246

247

/** Sum columns for each group */

248

def sum(colNames: String*): DataFrame

249

250

/** Average columns for each group */

251

def avg(colNames: String*): DataFrame

252

def mean(colNames: String*): DataFrame

253

254

/** Min/Max columns for each group */

255

def min(colNames: String*): DataFrame

256

def max(colNames: String*): DataFrame

257

258

/** Aggregate with expressions */

259

def agg(expr: Column, exprs: Column*): DataFrame

260

def agg(exprs: Map[String, String]): DataFrame

261

262

/** Pivot on column values */

263

def pivot(pivotColumn: String): RelationalGroupedDataset

264

def pivot(pivotColumn: String, values: Seq[Any]): RelationalGroupedDataset

265

}

266

```

267

268

**Usage Examples:**

269

270

```scala

271

import org.apache.spark.sql.functions._

272

273

val sales = spark.table("sales")

274

275

// Simple aggregation

276

val totalSales = sales.agg(sum("amount").alias("total"))

277

278

// Group by with aggregation

279

val salesByRegion = sales

280

.groupBy("region")

281

.agg(

282

sum("amount").alias("total_sales"),

283

avg("amount").alias("avg_sale"),

284

count("*").alias("num_transactions")

285

)

286

287

// Multiple group by columns

288

val salesByRegionAndMonth = sales

289

.groupBy("region", "month")

290

.sum("amount")

291

292

// Pivot table

293

val salesPivot = sales

294

.groupBy("region")

295

.pivot("month")

296

.sum("amount")

297

```

298

299

### Typed Dataset Operations

300

301

Type-safe functional operations on Datasets.

302

303

```scala { .api }

304

class Dataset[T] {

305

/** Transform each element */

306

def map[U: Encoder](func: T => U): Dataset[U]

307

308

/** Transform each element to zero or more elements */

309

def flatMap[U: Encoder](func: T => TraversableOnce[U]): Dataset[U]

310

311

/** Transform each partition */

312

def mapPartitions[U: Encoder](func: Iterator[T] => Iterator[U]): Dataset[U]

313

314

/** Group by key and apply function to groups */

315

def groupByKey[K: Encoder](func: T => K): KeyValueGroupedDataset[K, T]

316

}

317

318

/**

319

* Dataset that has been logically grouped by a user specified grouping key

320

*/

321

class KeyValueGroupedDataset[K, V] {

322

/** Apply function to each group */

323

def mapGroups[U: Encoder](f: (K, Iterator[V]) => U): Dataset[U]

324

325

/** Transform values in each group */

326

def mapValues[U: Encoder](func: V => U): KeyValueGroupedDataset[K, U]

327

328

/** Aggregate each group */

329

def agg[U: Encoder](column: TypedColumn[V, U]): Dataset[(K, U)]

330

331

/** Reduce values in each group */

332

def reduceGroups(f: (V, V) => V): Dataset[(K, V)]

333

334

/** Count elements in each group */

335

def count(): Dataset[(K, Long)]

336

}

337

```

338

339

**Usage Examples:**

340

341

```scala

342

case class Person(name: String, age: Int, city: String)

343

344

val people = spark.read.json("people.json").as[Person]

345

346

// Type-safe transformations

347

val adults = people.filter(_.age >= 18)

348

val names = people.map(_.name)

349

val cityAges = people.map(p => (p.city, p.age))

350

351

// Group by key

352

val peopleByCity = people.groupByKey(_.city)

353

val avgAgeByCity = peopleByCity.agg(avg(col("age")).as[Double])

354

355

// Reduce groups

356

val oldestByCity = peopleByCity.reduceGroups((p1, p2) =>

357

if (p1.age > p2.age) p1 else p2

358

)

359

```