or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

aggregations.mdcatalog.mddata-io.mddataframe-dataset.mdfunctions-expressions.mdindex.mdsession-management.mdstreaming.md

data-io.mddocs/

0

# Data Input and Output

1

2

Spark SQL provides comprehensive I/O capabilities through DataFrameReader and DataFrameWriter interfaces. These support reading from and writing to various data sources including files (JSON, Parquet, CSV, ORC, text), databases (JDBC), and streaming sources.

3

4

## DataFrameReader

5

6

```scala { .api }

7

class DataFrameReader {

8

def format(source: String): DataFrameReader

9

def schema(schema: StructType): DataFrameReader

10

def schema(schemaString: String): DataFrameReader

11

def option(key: String, value: String): DataFrameReader

12

def option(key: String, value: Boolean): DataFrameReader

13

def option(key: String, value: Long): DataFrameReader

14

def option(key: String, value: Double): DataFrameReader

15

def options(options: scala.collection.Map[String, String]): DataFrameReader

16

def options(options: java.util.Map[String, String]): DataFrameReader

17

def load(): DataFrame

18

def load(path: String): DataFrame

19

def load(paths: String*): DataFrame

20

}

21

```

22

23

## File Format Readers

24

25

### JSON Files

26

27

```scala { .api }

28

class DataFrameReader {

29

def json(path: String): DataFrame

30

def json(paths: String*): DataFrame

31

def json(jsonRDD: RDD[String]): DataFrame

32

def json(jsonDataset: Dataset[String]): DataFrame

33

}

34

```

35

36

**Usage Examples:**

37

38

```scala

39

// Basic JSON reading

40

val df = spark.read.json("path/to/file.json")

41

val multipleFiles = spark.read.json("file1.json", "file2.json")

42

43

// With options

44

val jsonDF = spark.read

45

.option("multiline", "true")

46

.option("allowComments", "true")

47

.option("allowUnquotedFieldNames", "true")

48

.json("complex.json")

49

50

// With explicit schema for better performance

51

import org.apache.spark.sql.types._

52

val schema = StructType(Array(

53

StructField("name", StringType, true),

54

StructField("age", IntegerType, true),

55

StructField("city", StringType, true)

56

))

57

58

val typedJson = spark.read

59

.schema(schema)

60

.json("people.json")

61

62

// From RDD or Dataset of strings

63

val jsonStrings = spark.sparkContext.parallelize(Seq(

64

"""{"name": "Alice", "age": 25}""",

65

"""{"name": "Bob", "age": 30}"""

66

))

67

val fromRDD = spark.read.json(jsonStrings)

68

```

69

70

**Common JSON Options:**

71

- `multiline`: Parse multi-line JSON objects (default: false)

72

- `allowComments`: Allow JavaScript-style comments (default: false)

73

- `allowUnquotedFieldNames`: Allow unquoted field names (default: false)

74

- `allowSingleQuotes`: Allow single quotes (default: true)

75

- `primitivesAsString`: Infer all primitive values as strings (default: false)

76

77

### Parquet Files

78

79

```scala { .api }

80

class DataFrameReader {

81

def parquet(paths: String*): DataFrame

82

}

83

```

84

85

**Usage Examples:**

86

87

```scala

88

// Read Parquet files

89

val parquetDF = spark.read.parquet("data.parquet")

90

val multipleParquet = spark.read.parquet("part1.parquet", "part2.parquet")

91

92

// With options

93

val parquetWithOptions = spark.read

94

.option("mergeSchema", "true")

95

.parquet("partitioned_data/")

96

97

// Read partitioned Parquet data

98

val partitioned = spark.read.parquet("data/year=2023/month=*/day=*")

99

```

100

101

**Common Parquet Options:**

102

- `mergeSchema`: Merge schemas from multiple files (default: false)

103

- `recursiveFileLookup`: Recursively search subdirectories (default: false)

104

105

### CSV Files

106

107

```scala { .api }

108

class DataFrameReader {

109

def csv(paths: String*): DataFrame

110

}

111

```

112

113

**Usage Examples:**

114

115

```scala

116

// Basic CSV reading

117

val csvDF = spark.read.csv("data.csv")

118

119

// With options and header

120

val csvWithHeader = spark.read

121

.option("header", "true")

122

.option("inferSchema", "true")

123

.option("sep", ",")

124

.csv("people.csv")

125

126

// With explicit schema

127

val csvSchema = StructType(Array(

128

StructField("id", IntegerType, true),

129

StructField("name", StringType, true),

130

StructField("salary", DoubleType, true)

131

))

132

133

val typedCSV = spark.read

134

.schema(csvSchema)

135

.option("header", "true")

136

.csv("employees.csv")

137

```

138

139

**Common CSV Options:**

140

- `header`: Whether first line is header (default: false)

141

- `inferSchema`: Automatically infer column types (default: false)

142

- `sep`: Field separator character (default: ",")

143

- `quote`: Quote character (default: "\"")

144

- `escape`: Escape character (default: "\\")

145

- `nullValue`: String representing null values (default: "")

146

- `dateFormat`: Date format string (default: "yyyy-MM-dd")

147

- `timestampFormat`: Timestamp format string (default: "yyyy-MM-dd'T'HH:mm:ss.SSSXXX")

148

149

### ORC Files

150

151

```scala { .api }

152

class DataFrameReader {

153

def orc(paths: String*): DataFrame

154

}

155

```

156

157

**Usage Examples:**

158

159

```scala

160

// Read ORC files

161

val orcDF = spark.read.orc("data.orc")

162

163

// With options

164

val orcWithOptions = spark.read

165

.option("mergeSchema", "true")

166

.orc("orc_data/")

167

```

168

169

### Text Files

170

171

```scala { .api }

172

class DataFrameReader {

173

def text(paths: String*): DataFrame

174

def textFile(paths: String*): Dataset[String]

175

}

176

```

177

178

**Usage Examples:**

179

180

```scala

181

// Read as DataFrame with single "value" column

182

val textDF = spark.read.text("log.txt")

183

184

// Read as Dataset[String]

185

val textDS = spark.read.textFile("documents/*.txt")

186

187

// With encoding option

188

val encodedText = spark.read

189

.option("encoding", "UTF-8")

190

.text("data.txt")

191

```

192

193

## Database Connectivity (JDBC)

194

195

```scala { .api }

196

class DataFrameReader {

197

def jdbc(url: String, table: String, properties: Properties): DataFrame

198

def jdbc(url: String, table: String, predicates: Array[String], connectionProperties: Properties): DataFrame

199

def jdbc(url: String, table: String, columnName: String, lowerBound: Long, upperBound: Long, numPartitions: Int, connectionProperties: Properties): DataFrame

200

def jdbc(url: String, table: String, columnName: String, lowerBound: Date, upperBound: Date, numPartitions: Int, connectionProperties: Properties): DataFrame

201

def jdbc(url: String, table: String, columnName: String, lowerBound: Timestamp, upperBound: Timestamp, numPartitions: Int, connectionProperties: Properties): DataFrame

202

}

203

```

204

205

**Usage Examples:**

206

207

```scala

208

import java.util.Properties

209

210

// Basic JDBC reading

211

val connectionProperties = new Properties()

212

connectionProperties.put("user", "username")

213

connectionProperties.put("password", "password")

214

connectionProperties.put("driver", "com.mysql.jdbc.Driver")

215

216

val jdbcDF = spark.read

217

.jdbc("jdbc:mysql://localhost:3306/mydb", "users", connectionProperties)

218

219

// With partitioning for performance

220

val partitionedJDBC = spark.read

221

.jdbc(

222

url = "jdbc:postgresql://localhost:5432/mydb",

223

table = "large_table",

224

columnName = "id",

225

lowerBound = 1,

226

upperBound = 1000000,

227

numPartitions = 10,

228

connectionProperties = connectionProperties

229

)

230

231

// With custom query

232

val customQuery = """

233

(SELECT u.id, u.name, p.title

234

FROM users u JOIN profiles p ON u.id = p.user_id

235

WHERE u.active = true) as query

236

"""

237

238

val queryResult = spark.read

239

.jdbc("jdbc:mysql://localhost:3306/mydb", customQuery, connectionProperties)

240

241

// With predicates for parallel reading

242

val predicates = Array(

243

"age < 25",

244

"age >= 25 AND age < 50",

245

"age >= 50"

246

)

247

248

val parallelJDBC = spark.read

249

.jdbc("jdbc:mysql://localhost:3306/mydb", "users", predicates, connectionProperties)

250

```

251

252

## Generic Data Sources

253

254

```scala { .api }

255

class DataFrameReader {

256

def format(source: String): DataFrameReader

257

def load(): DataFrame

258

def load(path: String): DataFrame

259

}

260

```

261

262

**Usage Examples:**

263

264

```scala

265

// Delta Lake (third-party format)

266

val deltaDF = spark.read

267

.format("delta")

268

.load("path/to/delta/table")

269

270

// Avro files

271

val avroDF = spark.read

272

.format("avro")

273

.load("data.avro")

274

275

// Custom data source

276

val customDF = spark.read

277

.format("com.example.CustomDataSource")

278

.option("customOption", "value")

279

.load("path/to/data")

280

```

281

282

## DataFrameWriter

283

284

```scala { .api }

285

class DataFrameWriter[T] {

286

def mode(saveMode: SaveMode): DataFrameWriter[T]

287

def mode(saveMode: String): DataFrameWriter[T]

288

def format(source: String): DataFrameWriter[T]

289

def option(key: String, value: String): DataFrameWriter[T]

290

def option(key: String, value: Boolean): DataFrameWriter[T]

291

def option(key: String, value: Long): DataFrameWriter[T]

292

def option(key: String, value: Double): DataFrameWriter[T]

293

def options(options: scala.collection.Map[String, String]): DataFrameWriter[T]

294

def options(options: java.util.Map[String, String]): DataFrameWriter[T]

295

def partitionBy(colNames: String*): DataFrameWriter[T]

296

def bucketBy(numBuckets: Int, colName: String, colNames: String*): DataFrameWriter[T]

297

def sortBy(colName: String, colNames: String*): DataFrameWriter[T]

298

def save(): Unit

299

def save(path: String): Unit

300

}

301

302

object SaveMode extends Enumeration {

303

val Overwrite, Append, ErrorIfExists, Ignore = Value

304

}

305

```

306

307

## Writing to Files

308

309

### Parquet Files

310

311

```scala { .api }

312

class DataFrameWriter[T] {

313

def parquet(path: String): Unit

314

}

315

```

316

317

**Usage Examples:**

318

319

```scala

320

// Basic Parquet writing

321

df.write.parquet("output.parquet")

322

323

// With save mode

324

df.write

325

.mode(SaveMode.Overwrite)

326

.parquet("data/output.parquet")

327

328

// Partitioned writing

329

df.write

330

.partitionBy("year", "month")

331

.parquet("partitioned_data/")

332

333

// With options

334

df.write

335

.option("compression", "snappy")

336

.mode("overwrite")

337

.parquet("compressed_output.parquet")

338

```

339

340

### JSON Files

341

342

```scala { .api }

343

class DataFrameWriter[T] {

344

def json(path: String): Unit

345

}

346

```

347

348

**Usage Examples:**

349

350

```scala

351

// Write JSON

352

df.write.json("output.json")

353

354

// With formatting options

355

df.write

356

.option("timestampFormat", "yyyy-MM-dd HH:mm:ss")

357

.option("dateFormat", "yyyy-MM-dd")

358

.mode("overwrite")

359

.json("formatted_output.json")

360

```

361

362

### CSV Files

363

364

```scala { .api }

365

class DataFrameWriter[T] {

366

def csv(path: String): Unit

367

}

368

```

369

370

**Usage Examples:**

371

372

```scala

373

// Write CSV

374

df.write.csv("output.csv")

375

376

// With options

377

df.write

378

.option("header", "true")

379

.option("sep", "|")

380

.mode("overwrite")

381

.csv("data_with_header.csv")

382

```

383

384

### ORC Files

385

386

```scala { .api }

387

class DataFrameWriter[T] {

388

def orc(path: String): Unit

389

}

390

```

391

392

**Usage Examples:**

393

394

```scala

395

// Write ORC

396

df.write.orc("output.orc")

397

398

// With compression

399

df.write

400

.option("compression", "zlib")

401

.mode("overwrite")

402

.orc("compressed.orc")

403

```

404

405

### Text Files

406

407

```scala { .api }

408

class DataFrameWriter[T] {

409

def text(path: String): Unit

410

}

411

```

412

413

**Usage Examples:**

414

415

```scala

416

// Write text (requires single string column)

417

df.select("message").write.text("logs.txt")

418

419

// Combine columns first

420

df.select(concat_ws(",", col("name"), col("age")).alias("line"))

421

.write

422

.text("combined_output.txt")

423

```

424

425

## Table Operations

426

427

```scala { .api }

428

class DataFrameWriter[T] {

429

def saveAsTable(tableName: String): Unit

430

def insertInto(tableName: String): Unit

431

}

432

```

433

434

**Usage Examples:**

435

436

```scala

437

// Save as managed table

438

df.write

439

.mode("overwrite")

440

.saveAsTable("my_database.my_table")

441

442

// Insert into existing table

443

df.write

444

.mode("append")

445

.insertInto("existing_table")

446

447

// Partitioned table

448

df.write

449

.partitionBy("year", "month")

450

.mode("overwrite")

451

.saveAsTable("partitioned_table")

452

```

453

454

## JDBC Writing

455

456

```scala { .api }

457

class DataFrameWriter[T] {

458

def jdbc(url: String, table: String, connectionProperties: Properties): Unit

459

}

460

```

461

462

**Usage Examples:**

463

464

```scala

465

// Write to database

466

val connectionProperties = new Properties()

467

connectionProperties.put("user", "username")

468

connectionProperties.put("password", "password")

469

connectionProperties.put("driver", "com.mysql.jdbc.Driver")

470

471

df.write

472

.mode("overwrite")

473

.jdbc("jdbc:mysql://localhost:3306/mydb", "users", connectionProperties)

474

475

// With additional options

476

df.write

477

.option("batchsize", "10000")

478

.option("truncate", "true")

479

.mode("overwrite")

480

.jdbc("jdbc:postgresql://localhost:5432/mydb", "large_table", connectionProperties)

481

```

482

483

## Advanced I/O Patterns

484

485

### Bucketing

486

487

```scala

488

// Write bucketed data for optimized joins

489

df.write

490

.bucketBy(42, "user_id")

491

.sortBy("timestamp")

492

.mode("overwrite")

493

.saveAsTable("bucketed_events")

494

```

495

496

### Multi-format Writing

497

498

```scala

499

// Write same data to multiple formats

500

val writer = df.write.mode("overwrite")

501

502

writer.parquet("data.parquet")

503

writer.json("data.json")

504

writer.option("header", "true").csv("data.csv")

505

```

506

507

### Conditional Writing

508

509

```scala

510

// Write different partitions to different locations

511

df.filter(col("region") === "US")

512

.write

513

.mode("overwrite")

514

.parquet("us_data/")

515

516

df.filter(col("region") === "EU")

517

.write

518

.mode("overwrite")

519

.parquet("eu_data/")

520

```

521

522

### Schema Evolution

523

524

```scala

525

// Handle schema changes in Parquet

526

df.write

527

.option("mergeSchema", "true")

528

.mode("append")

529

.parquet("evolving_schema/")

530

```

531

532

## Data Source Options Reference

533

534

### Common Options (All Formats)

535

- `path`: File system path

536

- `recursiveFileLookup`: Recursively search subdirectories (default: false)

537

- `pathGlobFilter`: Glob pattern for file filtering

538

- `modifiedBefore`: Only files modified before timestamp

539

- `modifiedAfter`: Only files modified after timestamp

540

541

### Compression Options

542

- Parquet: `none`, `snappy`, `gzip`, `lzo`, `brotli`, `lz4`, `zstd`

543

- JSON: `none`, `bzip2`, `gzip`, `lz4`, `snappy`, `deflate`

544

- ORC: `none`, `zlib`, `snappy`, `lzo`, `lz4`, `zstd`

545

546

### Performance Tuning

547

- Use appropriate file sizes (128MB-1GB for Parquet)

548

- Partition by frequently filtered columns

549

- Use columnar formats (Parquet, ORC) for analytical workloads

550

- Enable predicate pushdown with appropriate schemas

551

- Consider bucketing for frequently joined tables