0
# Data I/O Operations
1
2
Reading and writing data from various sources and formats. Supports batch and streaming data with extensive configuration options, built-in format support, and custom data source integration.
3
4
## Capabilities
5
6
### DataFrameReader
7
8
Interface for reading data from external storage systems into DataFrames.
9
10
```scala { .api }
11
/**
12
* Interface for reading data from external storage systems
13
*/
14
class DataFrameReader {
15
/** Specify data source format */
16
def format(source: String): DataFrameReader
17
18
/** Set schema for the data */
19
def schema(schema: StructType): DataFrameReader
20
def schema(schemaString: String): DataFrameReader
21
22
/** Set options for the data source */
23
def option(key: String, value: String): DataFrameReader
24
def option(key: String, value: Boolean): DataFrameReader
25
def option(key: String, value: Long): DataFrameReader
26
def option(key: String, value: Double): DataFrameReader
27
def options(options: scala.collection.Map[String, String]): DataFrameReader
28
def options(options: java.util.Map[String, String]): DataFrameReader
29
30
/** Load data using generic interface */
31
def load(): DataFrame
32
def load(path: String): DataFrame
33
def load(paths: String*): DataFrame
34
35
/** Built-in format readers */
36
def json(path: String): DataFrame
37
def json(paths: String*): DataFrame
38
def json(jsonRDD: RDD[String]): DataFrame
39
def json(jsonDataset: Dataset[String]): DataFrame
40
41
def parquet(paths: String*): DataFrame
42
43
def orc(paths: String*): DataFrame
44
45
def text(paths: String*): DataFrame
46
def textFile(paths: String*): Dataset[String]
47
48
def csv(paths: String*): DataFrame
49
50
def table(tableName: String): DataFrame
51
52
/** JDBC data source */
53
def jdbc(url: String, table: String, properties: java.util.Properties): DataFrame
54
def jdbc(url: String, table: String, predicates: Array[String],
55
connectionProperties: java.util.Properties): DataFrame
56
def jdbc(url: String, table: String, columnName: String,
57
lowerBound: Long, upperBound: Long, numPartitions: Int,
58
connectionProperties: java.util.Properties): DataFrame
59
}
60
```
61
62
**Usage Examples:**
63
64
```scala
65
// JSON with schema inference
66
val df1 = spark.read
67
.option("multiline", "true")
68
.json("path/to/file.json")
69
70
// CSV with custom options
71
val df2 = spark.read
72
.option("header", "true")
73
.option("inferSchema", "true")
74
.option("delimiter", ",")
75
.csv("path/to/file.csv")
76
77
// Parquet (schema preserved)
78
val df3 = spark.read.parquet("path/to/*.parquet")
79
80
// With explicit schema
81
import org.apache.spark.sql.types._
82
83
val schema = StructType(Seq(
84
StructField("name", StringType, nullable = true),
85
StructField("age", IntegerType, nullable = true),
86
StructField("salary", DoubleType, nullable = true)
87
))
88
89
val df4 = spark.read
90
.schema(schema)
91
.option("header", "true")
92
.csv("employees.csv")
93
94
// JDBC connection
95
val df5 = spark.read
96
.format("jdbc")
97
.option("url", "jdbc:postgresql://localhost:5432/mydb")
98
.option("dbtable", "employees")
99
.option("user", "username")
100
.option("password", "password")
101
.load()
102
103
// Custom data source
104
val df6 = spark.read
105
.format("org.apache.spark.sql.cassandra")
106
.option("keyspace", "mykeyspace")
107
.option("table", "mytable")
108
.load()
109
```
110
111
### DataFrameWriter
112
113
Interface for writing Dataset to external storage systems.
114
115
```scala { .api }
116
/**
117
* Interface for writing Dataset to external storage systems
118
* @tparam T Type of the Dataset
119
*/
120
class DataFrameWriter[T] {
121
/** Specify output format */
122
def format(source: String): DataFrameWriter[T]
123
124
/** Set save mode */
125
def mode(saveMode: SaveMode): DataFrameWriter[T]
126
def mode(saveMode: String): DataFrameWriter[T]
127
128
/** Set options for the data source */
129
def option(key: String, value: String): DataFrameWriter[T]
130
def option(key: String, value: Boolean): DataFrameWriter[T]
131
def option(key: String, value: Long): DataFrameWriter[T]
132
def option(key: String, value: Double): DataFrameWriter[T]
133
def options(options: scala.collection.Map[String, String]): DataFrameWriter[T]
134
def options(options: java.util.Map[String, String]): DataFrameWriter[T]
135
136
/** Partition output by columns */
137
def partitionBy(colNames: String*): DataFrameWriter[T]
138
139
/** Bucket output by columns */
140
def bucketBy(numBuckets: Int, colName: String, colNames: String*): DataFrameWriter[T]
141
def sortBy(colName: String, colNames: String*): DataFrameWriter[T]
142
143
/** Save using generic interface */
144
def save(): Unit
145
def save(path: String): Unit
146
147
/** Built-in format writers */
148
def json(path: String): Unit
149
def parquet(path: String): Unit
150
def orc(path: String): Unit
151
def text(path: String): Unit
152
def csv(path: String): Unit
153
154
/** Save as table */
155
def saveAsTable(tableName: String): Unit
156
def insertInto(tableName: String): Unit
157
158
/** JDBC output */
159
def jdbc(url: String, table: String, connectionProperties: java.util.Properties): Unit
160
}
161
162
/**
163
* Save modes for writing data
164
*/
165
object SaveMode extends Enumeration {
166
type SaveMode = Value
167
val Append, Overwrite, ErrorIfExists, Ignore = Value
168
}
169
```
170
171
**Usage Examples:**
172
173
```scala
174
val df = spark.table("employees")
175
176
// Basic save operations
177
df.write
178
.mode(SaveMode.Overwrite)
179
.parquet("output/employees.parquet")
180
181
df.write
182
.mode("append")
183
.option("header", "true")
184
.csv("output/employees.csv")
185
186
// Partitioned output
187
df.write
188
.mode(SaveMode.Overwrite)
189
.partitionBy("department", "year")
190
.parquet("output/employees_partitioned")
191
192
// Bucketed output
193
df.write
194
.mode(SaveMode.Overwrite)
195
.bucketBy(10, "employee_id")
196
.sortBy("salary")
197
.saveAsTable("bucketed_employees")
198
199
// JDBC output
200
df.write
201
.mode(SaveMode.Overwrite)
202
.format("jdbc")
203
.option("url", "jdbc:postgresql://localhost:5432/mydb")
204
.option("dbtable", "employees")
205
.option("user", "username")
206
.option("password", "password")
207
.save()
208
209
// Custom format with options
210
df.write
211
.mode(SaveMode.Append)
212
.format("delta")
213
.option("mergeSchema", "true")
214
.save("path/to/delta-table")
215
```
216
217
### Common Data Source Options
218
219
Configuration options for built-in data sources.
220
221
```scala { .api }
222
// CSV Options
223
object CsvOptions {
224
val DELIMITER = "delimiter" // Field delimiter (default: ",")
225
val QUOTE = "quote" // Quote character (default: "\"")
226
val ESCAPE = "escape" // Escape character (default: "\")
227
val HEADER = "header" // First line is header (default: "false")
228
val INFER_SCHEMA = "inferSchema" // Infer schema from data (default: "false")
229
val NULL_VALUE = "nullValue" // String representation of null (default: "")
230
val DATE_FORMAT = "dateFormat" // Date format (default: "yyyy-MM-dd")
231
val TIMESTAMP_FORMAT = "timestampFormat" // Timestamp format
232
val MAX_COLUMNS = "maxColumns" // Maximum number of columns
233
val MAX_CHARS_PER_COLUMN = "maxCharsPerColumn" // Max chars per column
234
val ENCODING = "encoding" // Character encoding (default: "UTF-8")
235
val COMMENT = "comment" // Comment character
236
val MODE = "mode" // Parse mode: PERMISSIVE, DROPMALFORMED, FAILFAST
237
}
238
239
// JSON Options
240
object JsonOptions {
241
val ALLOW_COMMENTS = "allowComments" // Allow comments (default: "false")
242
val ALLOW_UNQUOTED_FIELD_NAMES = "allowUnquotedFieldNames" // Allow unquoted field names
243
val ALLOW_SINGLE_QUOTES = "allowSingleQuotes" // Allow single quotes
244
val ALLOW_NUMERIC_LEADING_ZEROS = "allowNumericLeadingZeros" // Allow leading zeros
245
val ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER = "allowBackslashEscapingAnyCharacter"
246
val MULTILINE = "multiline" // Parse multiline JSON (default: "false")
247
val DATE_FORMAT = "dateFormat" // Date format
248
val TIMESTAMP_FORMAT = "timestampFormat" // Timestamp format
249
val PRIMITIVE_AS_STRING = "primitivesAsString" // Parse primitives as strings
250
}
251
252
// Parquet Options
253
object ParquetOptions {
254
val MERGE_SCHEMA = "mergeSchema" // Merge schemas from multiple files
255
val COMPRESSION = "compression" // Compression codec: none, snappy, gzip, lzo
256
val DICTIONARY_ENCODING = "dictionaryEncoding" // Use dictionary encoding
257
}
258
259
// JDBC Options
260
object JdbcOptions {
261
val DRIVER = "driver" // JDBC driver class name
262
val USER = "user" // Username
263
val PASSWORD = "password" // Password
264
val FETCH_SIZE = "fetchsize" // JDBC fetch size
265
val BATCH_SIZE = "batchsize" // JDBC batch size for inserts
266
val ISOLATION_LEVEL = "isolationLevel" // Transaction isolation level
267
val NUM_PARTITIONS = "numPartitions" // Number of partitions for parallel reads
268
val PARTITION_COLUMN = "partitionColumn" // Column for partitioning
269
val LOWER_BOUND = "lowerBound" // Lower bound for partitioning
270
val UPPER_BOUND = "upperBound" // Upper bound for partitioning
271
val QUERY_TIMEOUT = "queryTimeout" // Query timeout in seconds
272
val CREATE_TABLE_OPTIONS = "createTableOptions" // Options for CREATE TABLE
273
val CREATE_TABLE_COLUMN_TYPES = "createTableColumnTypes" // Column types for CREATE TABLE
274
val CUSTOM_SCHEMA = "customSchema" // Custom schema for reading
275
}
276
```
277
278
### Built-in Data Sources
279
280
Support for various data formats and storage systems.
281
282
```scala { .api }
283
// File-based sources
284
val CSV_SOURCE = "csv"
285
val JSON_SOURCE = "json"
286
val PARQUET_SOURCE = "parquet"
287
val ORC_SOURCE = "orc"
288
val TEXT_SOURCE = "text"
289
val AVRO_SOURCE = "avro" // Requires spark-avro package
290
291
// Database sources
292
val JDBC_SOURCE = "jdbc"
293
294
// Big data sources
295
val HIVE_SOURCE = "hive"
296
val DELTA_SOURCE = "delta" // Requires Delta Lake
297
val ICEBERG_SOURCE = "iceberg" // Requires Apache Iceberg
298
299
// Cloud sources (require appropriate dependencies)
300
val S3_SOURCE = "s3"
301
val AZURE_SOURCE = "azure"
302
val GCS_SOURCE = "gcs"
303
304
// Streaming sources
305
val KAFKA_SOURCE = "kafka"
306
val SOCKET_SOURCE = "socket"
307
val RATE_SOURCE = "rate" // For testing
308
```
309
310
### Advanced I/O Patterns
311
312
Common patterns for data ingestion and output.
313
314
**Multi-format reading:**
315
316
```scala
317
// Read from multiple formats
318
val jsonDf = spark.read.json("data/*.json")
319
val csvDf = spark.read.option("header", "true").csv("data/*.csv")
320
val combined = jsonDf.union(csvDf)
321
322
// Schema evolution with Parquet
323
val df1 = spark.read.parquet("data/year=2021")
324
val df2 = spark.read.parquet("data/year=2022")
325
val merged = df1.union(df2)
326
```
327
328
**Optimized writes:**
329
330
```scala
331
// Optimize partition size
332
df.repartition(200).write
333
.mode(SaveMode.Overwrite)
334
.parquet("output/data")
335
336
// Coalesce for fewer files
337
df.coalesce(10).write
338
.mode(SaveMode.Overwrite)
339
.json("output/data")
340
341
// Dynamic partitioning
342
df.write
343
.mode(SaveMode.Overwrite)
344
.partitionBy("year", "month")
345
.option("maxRecordsPerFile", "100000")
346
.parquet("output/partitioned_data")
347
```
348
349
**Error handling:**
350
351
```scala
352
// Handle malformed records
353
val df = spark.read
354
.option("mode", "DROPMALFORMED") // or "PERMISSIVE", "FAILFAST"
355
.option("columnNameOfCorruptRecord", "_corrupt_record")
356
.json("data/potentially_bad.json")
357
358
// Validate data after read
359
val validDf = df.filter(col("_corrupt_record").isNull)
360
val corruptDf = df.filter(col("_corrupt_record").isNotNull)
361
```
362
363
**Custom data sources:**
364
365
```scala
366
// Register custom format
367
spark.sql("CREATE TABLE custom_table USING org.example.CustomDataSource OPTIONS (path 'data/custom')")
368
369
// Use programmatically
370
val df = spark.read
371
.format("org.example.CustomDataSource")
372
.option("customOption", "value")
373
.load("data/custom")
374
```