0
# Data Input and Output
1
2
Spark SQL provides comprehensive I/O capabilities through DataFrameReader and DataFrameWriter interfaces. These support reading from and writing to various data sources including files (JSON, Parquet, CSV, ORC, text), databases (JDBC), and streaming sources.
3
4
## DataFrameReader
5
6
```scala { .api }
7
class DataFrameReader {
8
def format(source: String): DataFrameReader
9
def schema(schema: StructType): DataFrameReader
10
def schema(schemaString: String): DataFrameReader
11
def option(key: String, value: String): DataFrameReader
12
def option(key: String, value: Boolean): DataFrameReader
13
def option(key: String, value: Long): DataFrameReader
14
def option(key: String, value: Double): DataFrameReader
15
def options(options: scala.collection.Map[String, String]): DataFrameReader
16
def options(options: java.util.Map[String, String]): DataFrameReader
17
def load(): DataFrame
18
def load(path: String): DataFrame
19
def load(paths: String*): DataFrame
20
}
21
```
22
23
## File Format Readers
24
25
### JSON Files
26
27
```scala { .api }
28
class DataFrameReader {
29
def json(path: String): DataFrame
30
def json(paths: String*): DataFrame
31
def json(jsonRDD: RDD[String]): DataFrame
32
def json(jsonDataset: Dataset[String]): DataFrame
33
}
34
```
35
36
**Usage Examples:**
37
38
```scala
39
// Basic JSON reading
40
val df = spark.read.json("path/to/file.json")
41
val multipleFiles = spark.read.json("file1.json", "file2.json")
42
43
// With options
44
val jsonDF = spark.read
45
.option("multiline", "true")
46
.option("allowComments", "true")
47
.option("allowUnquotedFieldNames", "true")
48
.json("complex.json")
49
50
// With explicit schema for better performance
51
import org.apache.spark.sql.types._
52
val schema = StructType(Array(
53
StructField("name", StringType, true),
54
StructField("age", IntegerType, true),
55
StructField("city", StringType, true)
56
))
57
58
val typedJson = spark.read
59
.schema(schema)
60
.json("people.json")
61
62
// From RDD or Dataset of strings
63
val jsonStrings = spark.sparkContext.parallelize(Seq(
64
"""{"name": "Alice", "age": 25}""",
65
"""{"name": "Bob", "age": 30}"""
66
))
67
val fromRDD = spark.read.json(jsonStrings)
68
```
69
70
**Common JSON Options:**
71
- `multiline`: Parse multi-line JSON objects (default: false)
72
- `allowComments`: Allow JavaScript-style comments (default: false)
73
- `allowUnquotedFieldNames`: Allow unquoted field names (default: false)
74
- `allowSingleQuotes`: Allow single quotes (default: true)
75
- `primitivesAsString`: Infer all primitive values as strings (default: false)
76
77
### Parquet Files
78
79
```scala { .api }
80
class DataFrameReader {
81
def parquet(paths: String*): DataFrame
82
}
83
```
84
85
**Usage Examples:**
86
87
```scala
88
// Read Parquet files
89
val parquetDF = spark.read.parquet("data.parquet")
90
val multipleParquet = spark.read.parquet("part1.parquet", "part2.parquet")
91
92
// With options
93
val parquetWithOptions = spark.read
94
.option("mergeSchema", "true")
95
.parquet("partitioned_data/")
96
97
// Read partitioned Parquet data
98
val partitioned = spark.read.parquet("data/year=2023/month=*/day=*")
99
```
100
101
**Common Parquet Options:**
102
- `mergeSchema`: Merge schemas from multiple files (default: false)
103
- `recursiveFileLookup`: Recursively search subdirectories (default: false)
104
105
### CSV Files
106
107
```scala { .api }
108
class DataFrameReader {
109
def csv(paths: String*): DataFrame
110
}
111
```
112
113
**Usage Examples:**
114
115
```scala
116
// Basic CSV reading
117
val csvDF = spark.read.csv("data.csv")
118
119
// With options and header
120
val csvWithHeader = spark.read
121
.option("header", "true")
122
.option("inferSchema", "true")
123
.option("sep", ",")
124
.csv("people.csv")
125
126
// With explicit schema
127
val csvSchema = StructType(Array(
128
StructField("id", IntegerType, true),
129
StructField("name", StringType, true),
130
StructField("salary", DoubleType, true)
131
))
132
133
val typedCSV = spark.read
134
.schema(csvSchema)
135
.option("header", "true")
136
.csv("employees.csv")
137
```
138
139
**Common CSV Options:**
140
- `header`: Whether first line is header (default: false)
141
- `inferSchema`: Automatically infer column types (default: false)
142
- `sep`: Field separator character (default: ",")
143
- `quote`: Quote character (default: "\"")
144
- `escape`: Escape character (default: "\\")
145
- `nullValue`: String representing null values (default: "")
146
- `dateFormat`: Date format string (default: "yyyy-MM-dd")
147
- `timestampFormat`: Timestamp format string (default: "yyyy-MM-dd'T'HH:mm:ss.SSSXXX")
148
149
### ORC Files
150
151
```scala { .api }
152
class DataFrameReader {
153
def orc(paths: String*): DataFrame
154
}
155
```
156
157
**Usage Examples:**
158
159
```scala
160
// Read ORC files
161
val orcDF = spark.read.orc("data.orc")
162
163
// With options
164
val orcWithOptions = spark.read
165
.option("mergeSchema", "true")
166
.orc("orc_data/")
167
```
168
169
### Text Files
170
171
```scala { .api }
172
class DataFrameReader {
173
def text(paths: String*): DataFrame
174
def textFile(paths: String*): Dataset[String]
175
}
176
```
177
178
**Usage Examples:**
179
180
```scala
181
// Read as DataFrame with single "value" column
182
val textDF = spark.read.text("log.txt")
183
184
// Read as Dataset[String]
185
val textDS = spark.read.textFile("documents/*.txt")
186
187
// With encoding option
188
val encodedText = spark.read
189
.option("encoding", "UTF-8")
190
.text("data.txt")
191
```
192
193
## Database Connectivity (JDBC)
194
195
```scala { .api }
196
class DataFrameReader {
197
def jdbc(url: String, table: String, properties: Properties): DataFrame
198
def jdbc(url: String, table: String, predicates: Array[String], connectionProperties: Properties): DataFrame
199
def jdbc(url: String, table: String, columnName: String, lowerBound: Long, upperBound: Long, numPartitions: Int, connectionProperties: Properties): DataFrame
200
def jdbc(url: String, table: String, columnName: String, lowerBound: Date, upperBound: Date, numPartitions: Int, connectionProperties: Properties): DataFrame
201
def jdbc(url: String, table: String, columnName: String, lowerBound: Timestamp, upperBound: Timestamp, numPartitions: Int, connectionProperties: Properties): DataFrame
202
}
203
```
204
205
**Usage Examples:**
206
207
```scala
208
import java.util.Properties
209
210
// Basic JDBC reading
211
val connectionProperties = new Properties()
212
connectionProperties.put("user", "username")
213
connectionProperties.put("password", "password")
214
connectionProperties.put("driver", "com.mysql.jdbc.Driver")
215
216
val jdbcDF = spark.read
217
.jdbc("jdbc:mysql://localhost:3306/mydb", "users", connectionProperties)
218
219
// With partitioning for performance
220
val partitionedJDBC = spark.read
221
.jdbc(
222
url = "jdbc:postgresql://localhost:5432/mydb",
223
table = "large_table",
224
columnName = "id",
225
lowerBound = 1,
226
upperBound = 1000000,
227
numPartitions = 10,
228
connectionProperties = connectionProperties
229
)
230
231
// With custom query
232
val customQuery = """
233
(SELECT u.id, u.name, p.title
234
FROM users u JOIN profiles p ON u.id = p.user_id
235
WHERE u.active = true) as query
236
"""
237
238
val queryResult = spark.read
239
.jdbc("jdbc:mysql://localhost:3306/mydb", customQuery, connectionProperties)
240
241
// With predicates for parallel reading
242
val predicates = Array(
243
"age < 25",
244
"age >= 25 AND age < 50",
245
"age >= 50"
246
)
247
248
val parallelJDBC = spark.read
249
.jdbc("jdbc:mysql://localhost:3306/mydb", "users", predicates, connectionProperties)
250
```
251
252
## Generic Data Sources
253
254
```scala { .api }
255
class DataFrameReader {
256
def format(source: String): DataFrameReader
257
def load(): DataFrame
258
def load(path: String): DataFrame
259
}
260
```
261
262
**Usage Examples:**
263
264
```scala
265
// Delta Lake (third-party format)
266
val deltaDF = spark.read
267
.format("delta")
268
.load("path/to/delta/table")
269
270
// Avro files
271
val avroDF = spark.read
272
.format("avro")
273
.load("data.avro")
274
275
// Custom data source
276
val customDF = spark.read
277
.format("com.example.CustomDataSource")
278
.option("customOption", "value")
279
.load("path/to/data")
280
```
281
282
## DataFrameWriter
283
284
```scala { .api }
285
class DataFrameWriter[T] {
286
def mode(saveMode: SaveMode): DataFrameWriter[T]
287
def mode(saveMode: String): DataFrameWriter[T]
288
def format(source: String): DataFrameWriter[T]
289
def option(key: String, value: String): DataFrameWriter[T]
290
def option(key: String, value: Boolean): DataFrameWriter[T]
291
def option(key: String, value: Long): DataFrameWriter[T]
292
def option(key: String, value: Double): DataFrameWriter[T]
293
def options(options: scala.collection.Map[String, String]): DataFrameWriter[T]
294
def options(options: java.util.Map[String, String]): DataFrameWriter[T]
295
def partitionBy(colNames: String*): DataFrameWriter[T]
296
def bucketBy(numBuckets: Int, colName: String, colNames: String*): DataFrameWriter[T]
297
def sortBy(colName: String, colNames: String*): DataFrameWriter[T]
298
def save(): Unit
299
def save(path: String): Unit
300
}
301
302
object SaveMode extends Enumeration {
303
val Overwrite, Append, ErrorIfExists, Ignore = Value
304
}
305
```
306
307
## Writing to Files
308
309
### Parquet Files
310
311
```scala { .api }
312
class DataFrameWriter[T] {
313
def parquet(path: String): Unit
314
}
315
```
316
317
**Usage Examples:**
318
319
```scala
320
// Basic Parquet writing
321
df.write.parquet("output.parquet")
322
323
// With save mode
324
df.write
325
.mode(SaveMode.Overwrite)
326
.parquet("data/output.parquet")
327
328
// Partitioned writing
329
df.write
330
.partitionBy("year", "month")
331
.parquet("partitioned_data/")
332
333
// With options
334
df.write
335
.option("compression", "snappy")
336
.mode("overwrite")
337
.parquet("compressed_output.parquet")
338
```
339
340
### JSON Files
341
342
```scala { .api }
343
class DataFrameWriter[T] {
344
def json(path: String): Unit
345
}
346
```
347
348
**Usage Examples:**
349
350
```scala
351
// Write JSON
352
df.write.json("output.json")
353
354
// With formatting options
355
df.write
356
.option("timestampFormat", "yyyy-MM-dd HH:mm:ss")
357
.option("dateFormat", "yyyy-MM-dd")
358
.mode("overwrite")
359
.json("formatted_output.json")
360
```
361
362
### CSV Files
363
364
```scala { .api }
365
class DataFrameWriter[T] {
366
def csv(path: String): Unit
367
}
368
```
369
370
**Usage Examples:**
371
372
```scala
373
// Write CSV
374
df.write.csv("output.csv")
375
376
// With options
377
df.write
378
.option("header", "true")
379
.option("sep", "|")
380
.mode("overwrite")
381
.csv("data_with_header.csv")
382
```
383
384
### ORC Files
385
386
```scala { .api }
387
class DataFrameWriter[T] {
388
def orc(path: String): Unit
389
}
390
```
391
392
**Usage Examples:**
393
394
```scala
395
// Write ORC
396
df.write.orc("output.orc")
397
398
// With compression
399
df.write
400
.option("compression", "zlib")
401
.mode("overwrite")
402
.orc("compressed.orc")
403
```
404
405
### Text Files
406
407
```scala { .api }
408
class DataFrameWriter[T] {
409
def text(path: String): Unit
410
}
411
```
412
413
**Usage Examples:**
414
415
```scala
416
// Write text (requires single string column)
417
df.select("message").write.text("logs.txt")
418
419
// Combine columns first
420
df.select(concat_ws(",", col("name"), col("age")).alias("line"))
421
.write
422
.text("combined_output.txt")
423
```
424
425
## Table Operations
426
427
```scala { .api }
428
class DataFrameWriter[T] {
429
def saveAsTable(tableName: String): Unit
430
def insertInto(tableName: String): Unit
431
}
432
```
433
434
**Usage Examples:**
435
436
```scala
437
// Save as managed table
438
df.write
439
.mode("overwrite")
440
.saveAsTable("my_database.my_table")
441
442
// Insert into existing table
443
df.write
444
.mode("append")
445
.insertInto("existing_table")
446
447
// Partitioned table
448
df.write
449
.partitionBy("year", "month")
450
.mode("overwrite")
451
.saveAsTable("partitioned_table")
452
```
453
454
## JDBC Writing
455
456
```scala { .api }
457
class DataFrameWriter[T] {
458
def jdbc(url: String, table: String, connectionProperties: Properties): Unit
459
}
460
```
461
462
**Usage Examples:**
463
464
```scala
465
// Write to database
466
val connectionProperties = new Properties()
467
connectionProperties.put("user", "username")
468
connectionProperties.put("password", "password")
469
connectionProperties.put("driver", "com.mysql.jdbc.Driver")
470
471
df.write
472
.mode("overwrite")
473
.jdbc("jdbc:mysql://localhost:3306/mydb", "users", connectionProperties)
474
475
// With additional options
476
df.write
477
.option("batchsize", "10000")
478
.option("truncate", "true")
479
.mode("overwrite")
480
.jdbc("jdbc:postgresql://localhost:5432/mydb", "large_table", connectionProperties)
481
```
482
483
## Advanced I/O Patterns
484
485
### Bucketing
486
487
```scala
488
// Write bucketed data for optimized joins
489
df.write
490
.bucketBy(42, "user_id")
491
.sortBy("timestamp")
492
.mode("overwrite")
493
.saveAsTable("bucketed_events")
494
```
495
496
### Multi-format Writing
497
498
```scala
499
// Write same data to multiple formats
500
val writer = df.write.mode("overwrite")
501
502
writer.parquet("data.parquet")
503
writer.json("data.json")
504
writer.option("header", "true").csv("data.csv")
505
```
506
507
### Conditional Writing
508
509
```scala
510
// Write different partitions to different locations
511
df.filter(col("region") === "US")
512
.write
513
.mode("overwrite")
514
.parquet("us_data/")
515
516
df.filter(col("region") === "EU")
517
.write
518
.mode("overwrite")
519
.parquet("eu_data/")
520
```
521
522
### Schema Evolution
523
524
```scala
525
// Handle schema changes in Parquet
526
df.write
527
.option("mergeSchema", "true")
528
.mode("append")
529
.parquet("evolving_schema/")
530
```
531
532
## Data Source Options Reference
533
534
### Common Options (All Formats)
535
- `path`: File system path
536
- `recursiveFileLookup`: Recursively search subdirectories (default: false)
537
- `pathGlobFilter`: Glob pattern for file filtering
538
- `modifiedBefore`: Only files modified before timestamp
539
- `modifiedAfter`: Only files modified after timestamp
540
541
### Compression Options
542
- Parquet: `none`, `snappy`, `gzip`, `lzo`, `brotli`, `lz4`, `zstd`
543
- JSON: `none`, `bzip2`, `gzip`, `lz4`, `snappy`, `deflate`
544
- ORC: `none`, `zlib`, `snappy`, `lzo`, `lz4`, `zstd`
545
546
### Performance Tuning
547
- Use appropriate file sizes (128MB-1GB for Parquet)
548
- Partition by frequently filtered columns
549
- Use columnar formats (Parquet, ORC) for analytical workloads
550
- Enable predicate pushdown with appropriate schemas
551
- Consider bucketing for frequently joined tables