0
# Data I/O and Persistence
1
2
Spark Core provides comprehensive capabilities for reading data from various sources, writing results to different formats, and managing RDD persistence across memory and disk storage systems.
3
4
## Capabilities
5
6
### Text File Operations
7
8
Read and write text-based data formats.
9
10
```scala { .api }
11
// Reading text files
12
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]
13
def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]
14
15
// Writing text files (available on all RDDs)
16
def saveAsTextFile(path: String): Unit
17
def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit
18
```
19
20
**Usage Examples:**
21
```scala
22
// Read single or multiple text files
23
val lines = sc.textFile("hdfs://data/input.txt")
24
val multipleFiles = sc.textFile("hdfs://data/logs/*.log")
25
26
// Read entire small files as key-value pairs (filename, content)
27
val smallFiles = sc.wholeTextFiles("hdfs://data/documents/")
28
// Result: RDD[(String, String)] where key is filename, value is full file content
29
30
// Write RDD to text files
31
val processed = lines.filter(_.contains("ERROR")).map(_.toUpperCase)
32
processed.saveAsTextFile("hdfs://data/output")
33
34
// Write with compression
35
processed.saveAsTextFile("hdfs://data/output-compressed",
36
classOf[org.apache.hadoop.io.compress.GzipCodec])
37
```
38
39
### Hadoop InputFormat Support
40
41
Read data using Hadoop InputFormat classes for integration with Hadoop ecosystem.
42
43
```scala { .api }
44
// Old Hadoop API (mapred package)
45
def hadoopRDD[K, V](
46
conf: JobConf,
47
inputFormat: Class[_ <: InputFormat[K, V]],
48
keyClass: Class[K],
49
valueClass: Class[V],
50
minPartitions: Int = defaultMinPartitions
51
): RDD[(K, V)]
52
53
def hadoopFile[K, V](
54
path: String,
55
inputFormat: Class[_ <: InputFormat[K, V]],
56
keyClass: Class[K],
57
valueClass: Class[V],
58
minPartitions: Int = defaultMinPartitions
59
): RDD[(K, V)]
60
61
// New Hadoop API (mapreduce package)
62
def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
63
conf: Configuration,
64
fClass: Class[F],
65
kClass: Class[K],
66
vClass: Class[V]
67
): RDD[(K, V)]
68
69
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
70
path: String,
71
fClass: Class[F],
72
kClass: Class[K],
73
vClass: Class[V],
74
conf: Configuration = hadoopConfiguration
75
): RDD[(K, V)]
76
```
77
78
**Usage Examples:**
79
```scala
80
import org.apache.hadoop.io.{LongWritable, Text}
81
import org.apache.hadoop.mapred.TextInputFormat
82
import org.apache.hadoop.mapreduce.lib.input.{TextInputFormat => NewTextInputFormat}
83
import org.apache.hadoop.conf.Configuration
84
85
// Using old Hadoop API
86
val hadoopRDD = sc.hadoopFile[LongWritable, Text, TextInputFormat](
87
"hdfs://data/input",
88
classOf[TextInputFormat],
89
classOf[LongWritable],
90
classOf[Text]
91
).map { case (offset, text) => text.toString }
92
93
// Using new Hadoop API with configuration
94
val conf = new Configuration()
95
conf.set("mapreduce.input.fileinputformat.split.maxsize", "134217728") // 128MB
96
val newApiRDD = sc.newAPIHadoopFile[LongWritable, Text, NewTextInputFormat](
97
"hdfs://data/input",
98
classOf[NewTextInputFormat],
99
classOf[LongWritable],
100
classOf[Text],
101
conf
102
)
103
```
104
105
### Sequence File Operations
106
107
Read and write Hadoop SequenceFiles for efficient binary data storage.
108
109
```scala { .api }
110
// Reading SequenceFiles
111
def sequenceFile[K, V](path: String,
112
keyClass: Class[K],
113
valueClass: Class[V],
114
minPartitions: Int = defaultMinPartitions): RDD[(K, V)]
115
116
// Writing SequenceFiles (available on RDD[(K, V)])
117
def saveAsSequenceFile(path: String): Unit // Available via implicit conversion
118
119
// Hadoop OutputFormat operations
120
def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String,
121
keyClass: Class[_],
122
valueClass: Class[_],
123
outputFormatClass: Class[F],
124
codec: Class[_ <: CompressionCodec] = null): Unit
125
126
def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](path: String,
127
keyClass: Class[_],
128
valueClass: Class[_],
129
outputFormatClass: Class[F],
130
conf: Configuration = new Configuration): Unit
131
```
132
133
**Usage Examples:**
134
```scala
135
import org.apache.hadoop.io.{IntWritable, Text}
136
137
// Read SequenceFile
138
val seqData = sc.sequenceFile[IntWritable, Text]("hdfs://data/sequence")
139
val converted = seqData.map { case (key, value) =>
140
(key.get(), value.toString)
141
}
142
143
// Write as SequenceFile (requires key-value RDD)
144
val keyValueData = sc.parallelize(Seq((1, "first"), (2, "second"), (3, "third")))
145
val writableData = keyValueData.map { case (k, v) =>
146
(new IntWritable(k), new Text(v))
147
}
148
writableData.saveAsSequenceFile("hdfs://data/output-sequence")
149
```
150
151
### Object File Operations
152
153
Serialize and deserialize Scala objects using Java serialization.
154
155
```scala { .api }
156
// Reading object files
157
def objectFile[T: ClassTag](path: String, minPartitions: Int = defaultMinPartitions): RDD[T]
158
159
// Writing object files (available on all RDDs)
160
def saveAsObjectFile(path: String): Unit
161
```
162
163
**Usage Examples:**
164
```scala
165
// Serialize complex objects
166
case class Person(name: String, age: Int, email: String)
167
val people = sc.parallelize(Seq(
168
Person("Alice", 25, "alice@example.com"),
169
Person("Bob", 30, "bob@example.com")
170
))
171
172
// Save as object file
173
people.saveAsObjectFile("hdfs://data/people-objects")
174
175
// Read back as objects
176
val loadedPeople = sc.objectFile[Person]("hdfs://data/people-objects")
177
```
178
179
### RDD Persistence
180
181
Control RDD caching and storage across memory and disk.
182
183
```scala { .api }
184
// Persistence methods
185
def persist(): RDD[T] // Default: MEMORY_ONLY
186
def persist(newLevel: StorageLevel): RDD[T]
187
def cache(): RDD[T] // Alias for persist(MEMORY_ONLY)
188
def unpersist(blocking: Boolean = false): RDD[T]
189
190
// Storage level inquiry
191
def getStorageLevel: StorageLevel
192
def isCheckpointed: Boolean
193
```
194
195
**Storage Levels:**
196
```scala { .api }
197
object StorageLevel {
198
val NONE: StorageLevel
199
val DISK_ONLY: StorageLevel
200
val DISK_ONLY_2: StorageLevel // Replicated
201
val MEMORY_ONLY: StorageLevel
202
val MEMORY_ONLY_2: StorageLevel // Replicated
203
val MEMORY_ONLY_SER: StorageLevel // Serialized
204
val MEMORY_ONLY_SER_2: StorageLevel // Serialized + Replicated
205
val MEMORY_AND_DISK: StorageLevel
206
val MEMORY_AND_DISK_2: StorageLevel // Replicated
207
val MEMORY_AND_DISK_SER: StorageLevel // Serialized
208
val MEMORY_AND_DISK_SER_2: StorageLevel // Serialized + Replicated
209
val OFF_HEAP: StorageLevel
210
}
211
```
212
213
**Usage Examples:**
214
```scala
215
val expensiveRDD = sc.textFile("hdfs://large-dataset")
216
.map(parseComplexRecord)
217
.filter(isValid)
218
219
// Cache in memory for reuse
220
expensiveRDD.cache()
221
222
// Use multiple times - only computed once
223
val count = expensiveRDD.count()
224
val sample = expensiveRDD.take(10)
225
val stats = expensiveRDD.map(_.value).stats()
226
227
// Different storage levels for different use cases
228
val largeRDD = sc.parallelize(1 to 1000000)
229
230
// Memory only with replication for fault tolerance
231
largeRDD.persist(StorageLevel.MEMORY_ONLY_2)
232
233
// Memory and disk for large datasets that don't fit in memory
234
largeRDD.persist(StorageLevel.MEMORY_AND_DISK)
235
236
// Serialized storage to save memory (CPU overhead for serialization)
237
largeRDD.persist(StorageLevel.MEMORY_ONLY_SER)
238
239
// Clean up when done
240
expensiveRDD.unpersist()
241
largeRDD.unpersist()
242
```
243
244
### Checkpointing
245
246
Persist RDD lineage to stable storage for fault tolerance optimization.
247
248
```scala { .api }
249
// Checkpointing operations
250
def checkpoint(): Unit
251
def isCheckpointed: Boolean
252
def getCheckpointFile: Option[String]
253
def localCheckpoint(): RDD[T]
254
255
// SparkContext checkpoint configuration
256
def setCheckpointDir(directory: String): Unit
257
```
258
259
**Usage Examples:**
260
```scala
261
// Set checkpoint directory (usually HDFS)
262
sc.setCheckpointDir("hdfs://checkpoints")
263
264
val iterativeRDD = sc.parallelize(1 to 100)
265
var current = iterativeRDD
266
267
// In iterative algorithms, checkpoint periodically to truncate lineage
268
for (i <- 1 to 10) {
269
current = current.map(iterativeTransformation)
270
271
if (i % 3 == 0) {
272
current.checkpoint() // Save to stable storage
273
current.count() // Trigger checkpoint
274
}
275
}
276
277
// Local checkpointing (faster but less fault-tolerant)
278
val tempRDD = someComplexComputation()
279
tempRDD.localCheckpoint() // Store in executor storage only
280
tempRDD.count() // Trigger checkpoint
281
```
282
283
### Database Connectivity
284
285
Read from JDBC data sources using JdbcRDD.
286
287
```scala { .api }
288
class JdbcRDD[T: ClassTag](
289
sc: SparkContext,
290
getConnection: () => Connection,
291
sql: String,
292
lowerBound: Long,
293
upperBound: Long,
294
numPartitions: Int,
295
mapRow: (ResultSet) => T
296
) extends RDD[T]
297
```
298
299
**Usage Example:**
300
```scala
301
import java.sql.{Connection, DriverManager, ResultSet}
302
303
def createConnection(): Connection = {
304
Class.forName("org.postgresql.Driver")
305
DriverManager.getConnection(
306
"jdbc:postgresql://localhost:5432/mydb",
307
"username",
308
"password"
309
)
310
}
311
312
val jdbcRDD = new JdbcRDD(
313
sc,
314
createConnection,
315
"SELECT id, name, age FROM users WHERE id >= ? AND id <= ?",
316
lowerBound = 1,
317
upperBound = 1000,
318
numPartitions = 4,
319
mapRow = { resultSet =>
320
(resultSet.getInt("id"), resultSet.getString("name"), resultSet.getInt("age"))
321
}
322
)
323
324
val users = jdbcRDD.collect()
325
```
326
327
### Custom InputFormat
328
329
Create RDDs from custom Hadoop InputFormat implementations.
330
331
```scala { .api }
332
// For custom formats implementing InputFormat interface
333
def hadoopRDD[K, V](
334
conf: JobConf,
335
inputFormat: Class[_ <: InputFormat[K, V]],
336
keyClass: Class[K],
337
valueClass: Class[V]
338
): RDD[(K, V)]
339
```
340
341
**Usage Example:**
342
```scala
343
// Custom InputFormat for reading binary files
344
class BinaryFileInputFormat extends FileInputFormat[Text, BytesWritable] {
345
override def createRecordReader(split: InputSplit, context: TaskAttemptContext) = {
346
new BinaryFileRecordReader()
347
}
348
}
349
350
// Use custom format
351
val binaryFiles = sc.newAPIHadoopFile[Text, BytesWritable, BinaryFileInputFormat](
352
"hdfs://binary-data/",
353
classOf[BinaryFileInputFormat],
354
classOf[Text],
355
classOf[BytesWritable]
356
).map { case (filename, bytes) =>
357
(filename.toString, bytes.getBytes)
358
}
359
```
360
361
## Performance Considerations
362
363
### Storage Level Selection
364
365
```scala
366
// Choose appropriate storage level based on usage pattern
367
368
// Single use: No persistence
369
val oneTimeUse = data.filter(condition)
370
// Don't persist - just compute when needed
371
372
// Multiple actions on same RDD: Memory
373
val reusedRDD = data.map(expensiveFunction).cache()
374
375
// Large dataset, multiple uses: Memory + Disk
376
val largeReused = bigData.persist(StorageLevel.MEMORY_AND_DISK)
377
378
// Memory constrained: Serialized
379
val memoryConstrained = data.persist(StorageLevel.MEMORY_ONLY_SER)
380
381
// Critical data: Replicated
382
val critical = data.persist(StorageLevel.MEMORY_AND_DISK_2)
383
```
384
385
### Partitioning for I/O
386
387
```scala
388
// Control parallelism for I/O operations
389
val data = sc.textFile("hdfs://data", minPartitions = 100) // More parallelism
390
391
// Coalesce before writing to reduce output files
392
data.coalesce(10).saveAsTextFile("hdfs://output") // 10 output files instead of 100
393
```
394
395
## Types
396
397
```scala { .api }
398
// Storage configuration
399
class StorageLevel private (
400
private var _useDisk: Boolean,
401
private var _useMemory: Boolean,
402
private var _useOffHeap: Boolean,
403
private var _deserialized: Boolean,
404
private var _replication: Int
405
) extends Externalizable
406
407
// JDBC connectivity
408
class JdbcRDD[T: ClassTag](
409
sc: SparkContext,
410
getConnection: () => Connection,
411
sql: String,
412
lowerBound: Long,
413
upperBound: Long,
414
numPartitions: Int,
415
mapRow: (ResultSet) => T
416
) extends RDD[T]
417
418
// Hadoop integration types
419
import org.apache.hadoop.mapred.{InputFormat, JobConf}
420
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
421
import org.apache.hadoop.conf.Configuration
422
import org.apache.hadoop.io.compress.CompressionCodec
423
```