or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

aggregations.mdcatalog.mddata-io.mddataframe-dataset.mdfunctions-expressions.mdindex.mdsession-management.mdstreaming.md

dataframe-dataset.mddocs/

0

# DataFrame and Dataset Operations

1

2

DataFrames and Datasets are the core distributed data structures in Spark SQL. DataFrame is an alias for Dataset[Row], providing an untyped interface, while Dataset[T] offers compile-time type safety. Both share the same underlying APIs and optimizations through the Catalyst query engine.

3

4

## Core Types

5

6

```scala { .api }

7

type DataFrame = Dataset[Row]

8

9

class Dataset[T] extends Serializable {

10

def schema: StructType

11

def dtypes: Array[(String, String)]

12

def columns: Array[String]

13

def count(): Long

14

def isEmpty: Boolean

15

def isLocal: Boolean

16

def isStreaming: Boolean

17

}

18

19

class Row extends Serializable {

20

def length: Int

21

def size: Int

22

def get(i: Int): Any

23

def getAs[T](i: Int): T

24

def getAs[T](fieldName: String): T

25

def getString(i: Int): String

26

def getInt(i: Int): Int

27

def getLong(i: Int): Long

28

def getDouble(i: Int): Double

29

def getFloat(i: Int): Float

30

def getBoolean(i: Int): Boolean

31

def getDate(i: Int): java.sql.Date

32

def getTimestamp(i: Int): java.sql.Timestamp

33

}

34

```

35

36

## Schema Operations

37

38

```scala { .api }

39

class Dataset[T] {

40

def schema: StructType

41

def dtypes: Array[(String, String)]

42

def columns: Array[String]

43

def printSchema(): Unit

44

def printSchema(level: Int): Unit

45

}

46

```

47

48

**Usage Examples:**

49

50

```scala

51

import org.apache.spark.sql.types._

52

53

// Examine schema

54

val df = spark.read.json("people.json")

55

df.printSchema()

56

df.schema.foreach(field => println(s"${field.name}: ${field.dataType}"))

57

58

// Get column information

59

val columnNames = df.columns

60

val columnTypes = df.dtypes

61

62

// Check schema programmatically

63

val hasNameColumn = df.schema.exists(_.name == "name")

64

val nameField = df.schema.find(_.name == "name")

65

```

66

67

## Column Selection and Projection

68

69

```scala { .api }

70

class Dataset[T] {

71

def select(cols: Column*): DataFrame

72

def select(col: String, cols: String*): DataFrame

73

def selectExpr(exprs: String*): DataFrame

74

def drop(colNames: String*): Dataset[T]

75

def drop(col: Column): DataFrame

76

def withColumn(colName: String, col: Column): DataFrame

77

def withColumnRenamed(existingName: String, newName: String): DataFrame

78

}

79

```

80

81

**Usage Examples:**

82

83

```scala

84

import org.apache.spark.sql.functions._

85

86

// Select columns

87

val selected = df.select("name", "age")

88

val selectedWithCol = df.select(col("name"), col("age") + 1)

89

90

// Select with expressions

91

val computed = df.selectExpr("name", "age + 1 as next_age", "upper(name) as upper_name")

92

93

// Add columns

94

val withFullName = df.withColumn("full_name", concat(col("first"), lit(" "), col("last")))

95

96

// Rename columns

97

val renamed = df.withColumnRenamed("old_name", "new_name")

98

99

// Drop columns

100

val dropped = df.drop("unwanted_column", "another_column")

101

val droppedByCol = df.drop(col("age"))

102

```

103

104

## Filtering and Conditions

105

106

```scala { .api }

107

class Dataset[T] {

108

def filter(condition: Column): Dataset[T]

109

def filter(conditionExpr: String): Dataset[T]

110

def where(condition: Column): Dataset[T]

111

def where(conditionExpr: String): Dataset[T]

112

}

113

```

114

115

**Usage Examples:**

116

117

```scala

118

// Filter with Column expressions

119

val adults = df.filter(col("age") >= 18)

120

val activeUsers = df.filter(col("active") === true && col("last_login").isNotNull)

121

122

// Filter with SQL expressions

123

val filtered = df.filter("age >= 18 AND active = true")

124

val complex = df.where("age BETWEEN 25 AND 65 AND city IN ('New York', 'San Francisco')")

125

126

// Multiple conditions

127

val result = df

128

.filter(col("age") > 21)

129

.filter(col("country") === "US")

130

.filter(col("score").isNotNull)

131

```

132

133

## Sorting and Ordering

134

135

```scala { .api }

136

class Dataset[T] {

137

def sort(sortCol: String, sortCols: String*): Dataset[T]

138

def sort(sortExprs: Column*): Dataset[T]

139

def orderBy(sortCol: String, sortCols: String*): Dataset[T]

140

def orderBy(sortExprs: Column*): Dataset[T]

141

}

142

```

143

144

**Usage Examples:**

145

146

```scala

147

// Sort by column names

148

val sorted = df.sort("age", "name")

149

150

// Sort with Column expressions and directions

151

val ordered = df.orderBy(col("age").desc, col("name").asc)

152

153

// Complex sorting

154

val complexSort = df.orderBy(

155

col("department"),

156

col("salary").desc,

157

col("hire_date").asc

158

)

159

```

160

161

## Joins

162

163

```scala { .api }

164

class Dataset[T] {

165

def join(right: Dataset[_]): DataFrame

166

def join(right: Dataset[_], usingColumn: String): DataFrame

167

def join(right: Dataset[_], usingColumns: Seq[String]): DataFrame

168

def join(right: Dataset[_], joinExprs: Column): DataFrame

169

def join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame

170

171

def crossJoin(right: Dataset[_]): DataFrame

172

}

173

```

174

175

**Join Types**: `"inner"`, `"cross"`, `"outer"`, `"full"`, `"full_outer"`, `"left"`, `"left_outer"`, `"right"`, `"right_outer"`, `"left_semi"`, `"left_anti"`

176

177

**Usage Examples:**

178

179

```scala

180

val users = spark.table("users")

181

val orders = spark.table("orders")

182

183

// Inner join (default)

184

val userOrders = users.join(orders, users("id") === orders("user_id"))

185

186

// Left outer join

187

val allUsers = users.join(orders, users("id") === orders("user_id"), "left_outer")

188

189

// Join on multiple columns

190

val joined = users.join(orders,

191

users("id") === orders("user_id") && users("region") === orders("region"))

192

193

// Join using column names (when columns have same names)

194

val simple = users.join(orders, "user_id")

195

val multiple = users.join(orders, Seq("user_id", "region"))

196

197

// Cross join

198

val cartesian = users.crossJoin(orders)

199

```

200

201

## Set Operations

202

203

```scala { .api }

204

class Dataset[T] {

205

def union(other: Dataset[T]): Dataset[T]

206

def unionAll(other: Dataset[T]): Dataset[T]

207

def unionByName(other: Dataset[T]): Dataset[T]

208

def intersect(other: Dataset[T]): Dataset[T]

209

def intersectAll(other: Dataset[T]): Dataset[T]

210

def except(other: Dataset[T]): Dataset[T]

211

def exceptAll(other: Dataset[T]): Dataset[T]

212

}

213

```

214

215

**Usage Examples:**

216

217

```scala

218

val df1 = spark.range(1, 5).toDF("id")

219

val df2 = spark.range(3, 8).toDF("id")

220

221

// Union (removes duplicates in Spark 2.0+)

222

val combined = df1.union(df2)

223

224

// Union by column names (handles different column orders)

225

val byName = df1.unionByName(df2)

226

227

// Set operations

228

val intersection = df1.intersect(df2) // Values in both

229

val difference = df1.except(df2) // Values in df1 but not df2

230

```

231

232

## Deduplication

233

234

```scala { .api }

235

class Dataset[T] {

236

def distinct(): Dataset[T]

237

def dropDuplicates(): Dataset[T]

238

def dropDuplicates(colNames: Array[String]): Dataset[T]

239

def dropDuplicates(colNames: Seq[String]): Dataset[T]

240

def dropDuplicates(col1: String, cols: String*): Dataset[T]

241

}

242

```

243

244

**Usage Examples:**

245

246

```scala

247

// Remove all duplicate rows

248

val unique = df.distinct()

249

250

// Remove duplicates based on specific columns

251

val uniqueUsers = df.dropDuplicates("user_id")

252

val uniqueByMultiple = df.dropDuplicates("user_id", "email")

253

val uniqueBySeq = df.dropDuplicates(Seq("user_id", "email"))

254

```

255

256

## Sampling and Limiting

257

258

```scala { .api }

259

class Dataset[T] {

260

def limit(n: Int): Dataset[T]

261

def sample(fraction: Double): Dataset[T]

262

def sample(fraction: Double, seed: Long): Dataset[T]

263

def sample(withReplacement: Boolean, fraction: Double): Dataset[T]

264

def sample(withReplacement: Boolean, fraction: Double, seed: Long): Dataset[T]

265

def sampleBy[K](col: String, fractions: Map[K, Double], seed: Long): Dataset[T]

266

}

267

```

268

269

**Usage Examples:**

270

271

```scala

272

// Limit results

273

val top100 = df.limit(100)

274

275

// Random sampling

276

val sample10Percent = df.sample(0.1)

277

val sampleWithSeed = df.sample(false, 0.2, seed = 42)

278

279

// Stratified sampling by column values

280

val stratified = df.sampleBy("category", Map("A" -> 0.1, "B" -> 0.2), seed = 123)

281

```

282

283

## Action Operations

284

285

```scala { .api }

286

class Dataset[T] {

287

def count(): Long

288

def collect(): Array[T]

289

def collectAsList(): java.util.List[T]

290

def first(): T

291

def head(): T

292

def head(n: Int): Array[T]

293

def take(n: Int): Array[T]

294

def takeAsList(n: Int): java.util.List[T]

295

def tail(n: Int): Array[T]

296

def show(): Unit

297

def show(numRows: Int): Unit

298

def show(numRows: Int, truncate: Boolean): Unit

299

def show(numRows: Int, truncate: Int): Unit

300

def show(numRows: Int, truncate: Int, vertical: Boolean): Unit

301

}

302

```

303

304

**Usage Examples:**

305

306

```scala

307

// Count rows

308

val totalRows = df.count()

309

310

// Collect data (use with caution on large datasets)

311

val allData = df.collect()

312

val firstRow = df.first()

313

val top10 = df.take(10)

314

315

// Display data

316

df.show() // Show 20 rows, truncate at 20 chars

317

df.show(50) // Show 50 rows

318

df.show(20, false) // Don't truncate strings

319

df.show(10, 100, true) // Vertical format

320

```

321

322

## Persistence and Caching

323

324

```scala { .api }

325

class Dataset[T] {

326

def cache(): Dataset[T]

327

def persist(): Dataset[T]

328

def persist(newLevel: StorageLevel): Dataset[T]

329

def unpersist(): Dataset[T]

330

def unpersist(blocking: Boolean): Dataset[T]

331

def storageLevel: StorageLevel

332

def isStreaming: Boolean

333

}

334

```

335

336

**Usage Examples:**

337

338

```scala

339

import org.apache.spark.storage.StorageLevel

340

341

// Cache in memory (default: MEMORY_AND_DISK)

342

val cached = df.cache()

343

344

// Persist with specific storage level

345

val persisted = df.persist(StorageLevel.MEMORY_ONLY)

346

val diskOnly = df.persist(StorageLevel.DISK_ONLY)

347

val memoryAndDiskSer = df.persist(StorageLevel.MEMORY_AND_DISK_SER)

348

349

// Check storage level

350

val level = df.storageLevel

351

352

// Remove from cache

353

df.unpersist()

354

df.unpersist(blocking = true) // Wait for removal to complete

355

```

356

357

## Iteration and Functional Operations

358

359

```scala { .api }

360

class Dataset[T] {

361

def foreach(f: T => Unit): Unit

362

def foreachPartition(f: Iterator[T] => Unit): Unit

363

def map[U : Encoder](func: T => U): Dataset[U]

364

def mapPartitions[U : Encoder](func: Iterator[T] => Iterator[U]): Dataset[U]

365

def flatMap[U : Encoder](func: T => TraversableOnce[U]): Dataset[U]

366

def filter(func: T => Boolean): Dataset[T]

367

def reduce(func: (T, T) => T): T

368

}

369

```

370

371

**Usage Examples:**

372

373

```scala

374

// Type-safe operations (require Encoder for result type)

375

case class Person(name: String, age: Int)

376

val people: Dataset[Person] = spark.createDataFrame(peopleSeq).as[Person]

377

378

// Map transformation

379

val names = people.map(_.name)

380

val ages = people.map(_.age)

381

382

// Filter with function

383

val adults = people.filter(_.age >= 18)

384

385

// FlatMap

386

val words = people.flatMap(_.name.split(" "))

387

388

// Reduce

389

val totalAge = people.map(_.age).reduce(_ + _)

390

391

// Side effects

392

people.foreach(person => println(s"Person: ${person.name}"))

393

people.foreachPartition { iter =>

394

// Process partition

395

iter.foreach(println)

396

}

397

```

398

399

## Type Conversion

400

401

```scala { .api }

402

class Dataset[T] {

403

def as[U : Encoder]: Dataset[U]

404

def toDF(): DataFrame

405

def toDF(colNames: String*): DataFrame

406

def rdd: RDD[T]

407

def javaRDD: JavaRDD[T]

408

def toJavaRDD: JavaRDD[T]

409

}

410

```

411

412

**Usage Examples:**

413

414

```scala

415

// Convert to typed Dataset

416

case class User(id: Long, name: String, age: Int)

417

val typedUsers = df.as[User]

418

419

// Convert to DataFrame

420

val dataFrame = dataset.toDF()

421

val renamedDF = dataset.toDF("col1", "col2", "col3")

422

423

// Convert to RDD

424

val rdd = df.rdd

425

val userRDD = typedUsers.rdd

426

427

// Java interop

428

val javaRDD = df.toJavaRDD

429

```

430

431

## Partitioning Operations

432

433

```scala { .api }

434

class Dataset[T] {

435

def repartition(numPartitions: Int): Dataset[T]

436

def repartition(partitionExprs: Column*): Dataset[T]

437

def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T]

438

def coalesce(numPartitions: Int): Dataset[T]

439

def repartitionByRange(partitionExprs: Column*): Dataset[T]

440

def repartitionByRange(numPartitions: Int, partitionExprs: Column*): Dataset[T]

441

}

442

```

443

444

**Usage Examples:**

445

446

```scala

447

// Change number of partitions

448

val repartitioned = df.repartition(200)

449

val coalesced = df.coalesce(10) // Reduce partitions without shuffle

450

451

// Partition by column values

452

val byDept = df.repartition(col("department"))

453

val byMultiple = df.repartition(col("year"), col("month"))

454

455

// Range partitioning (for ordered data)

456

val rangePartitioned = df.repartitionByRange(col("timestamp"))

457

val rangeWithCount = df.repartitionByRange(100, col("id"))

458

```