0
# Data Sources
1
2
Spark provides extensive support for reading and writing data from various sources including local filesystems, HDFS, object stores, and databases. This document covers all the data source APIs available in Spark 1.0.0.
3
4
## Text Files
5
6
### Reading Text Files
7
8
**textFile**: Read text files line by line
9
```scala { .api }
10
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]
11
```
12
13
```scala
14
// Local filesystem
15
val localFile = sc.textFile("file:///path/to/local/file.txt")
16
17
// HDFS
18
val hdfsFile = sc.textFile("hdfs://namenode:port/path/to/file.txt")
19
20
// Multiple files with wildcards
21
val multipleFiles = sc.textFile("hdfs://path/to/files/*.txt")
22
23
// Specify minimum partitions
24
val partitionedFile = sc.textFile("hdfs://path/to/large/file.txt", 8)
25
26
// Compressed files (automatically detected)
27
val gzipFile = sc.textFile("hdfs://path/to/file.txt.gz")
28
```
29
30
**wholeTextFiles**: Read entire files as key-value pairs
31
```scala { .api }
32
def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]
33
```
34
35
```scala
36
// Read directory of files - returns (filename, file_content) pairs
37
val filesRDD = sc.wholeTextFiles("hdfs://path/to/directory/")
38
39
filesRDD.foreach { case (filename, content) =>
40
println(s"File: $filename")
41
println(s"Size: ${content.length} characters")
42
println(s"Lines: ${content.count(_ == '\n') + 1}")
43
}
44
45
// Process each file separately
46
val processedFiles = filesRDD.mapValues { content =>
47
content.split("\n").filter(_.nonEmpty).length
48
}
49
```
50
51
### Saving Text Files
52
53
**saveAsTextFile**: Save RDD as text files
54
```scala { .api }
55
def saveAsTextFile(path: String): Unit
56
def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit
57
```
58
59
```scala
60
val data = sc.parallelize(Array("line1", "line2", "line3"))
61
62
// Save as text files
63
data.saveAsTextFile("hdfs://path/to/output")
64
65
// Save with compression
66
import org.apache.hadoop.io.compress.GzipCodec
67
data.saveAsTextFile("hdfs://path/to/compressed-output", classOf[GzipCodec])
68
69
// Other compression codecs
70
import org.apache.hadoop.io.compress.{BZip2Codec, DefaultCodec, SnappyCodec}
71
data.saveAsTextFile("output-bzip2", classOf[BZip2Codec])
72
data.saveAsTextFile("output-snappy", classOf[SnappyCodec])
73
```
74
75
## Object Files
76
77
Spark's native binary format using Java serialization.
78
79
### Reading Object Files
80
81
**objectFile**: Load RDD of objects
82
```scala { .api }
83
def objectFile[T: ClassTag](path: String, minPartitions: Int = defaultMinPartitions): RDD[T]
84
```
85
86
```scala
87
// Save and load custom objects
88
case class Person(name: String, age: Int)
89
90
val people = sc.parallelize(Array(
91
Person("Alice", 25),
92
Person("Bob", 30),
93
Person("Charlie", 35)
94
))
95
96
// Save as object file
97
people.saveAsObjectFile("hdfs://path/to/people")
98
99
// Load back
100
val loadedPeople: RDD[Person] = sc.objectFile[Person]("hdfs://path/to/people")
101
```
102
103
### Saving Object Files
104
105
**saveAsObjectFile**: Save RDD as serialized objects
106
```scala { .api }
107
def saveAsObjectFile(path: String): Unit
108
```
109
110
```scala
111
val complexData = sc.parallelize(Array(
112
Map("name" -> "Alice", "scores" -> List(85, 92, 78)),
113
Map("name" -> "Bob", "scores" -> List(91, 87, 94))
114
))
115
116
complexData.saveAsObjectFile("hdfs://path/to/complex-data")
117
```
118
119
## Hadoop Files
120
121
Spark integrates with Hadoop's input and output formats for reading various file types.
122
123
### Sequence Files
124
125
**Reading SequenceFiles**:
126
```scala { .api }
127
def sequenceFile[K, V](
128
path: String,
129
keyClass: Class[K],
130
valueClass: Class[V],
131
minPartitions: Int = defaultMinPartitions
132
): RDD[(K, V)]
133
```
134
135
```scala
136
import org.apache.hadoop.io.{IntWritable, Text}
137
138
val seqFile = sc.sequenceFile[IntWritable, Text](
139
"hdfs://path/to/sequencefile",
140
classOf[IntWritable],
141
classOf[Text]
142
)
143
144
// Convert Writable types to Scala types
145
val converted = seqFile.map { case (key, value) =>
146
(key.get(), value.toString)
147
}
148
```
149
150
**Saving SequenceFiles**:
151
```scala { .api }
152
def saveAsSequenceFile(path: String, codec: Option[Class[_ <: CompressionCodec]] = None): Unit
153
```
154
155
```scala
156
// For RDDs of types that can be converted to Writable
157
val pairs = sc.parallelize(Array((1, "apple"), (2, "banana"), (3, "orange")))
158
159
// Convert to Writable types
160
val writablePairs = pairs.map { case (k, v) =>
161
(new IntWritable(k), new Text(v))
162
}
163
164
writablePairs.saveAsSequenceFile("hdfs://path/to/output")
165
166
// With compression
167
writablePairs.saveAsSequenceFile("hdfs://path/to/compressed", Some(classOf[GzipCodec]))
168
```
169
170
### Generic Hadoop Files
171
172
**hadoopFile**: Read files with custom InputFormat
173
```scala { .api }
174
def hadoopFile[K, V](
175
path: String,
176
inputFormatClass: Class[_ <: InputFormat[K, V]],
177
keyClass: Class[K],
178
valueClass: Class[V],
179
minPartitions: Int = defaultMinPartitions
180
): RDD[(K, V)]
181
```
182
183
```scala
184
import org.apache.hadoop.mapred.{TextInputFormat, FileInputFormat}
185
import org.apache.hadoop.io.{LongWritable, Text}
186
187
// Read with TextInputFormat (returns line number, line content)
188
val textWithLineNumbers = sc.hadoopFile[LongWritable, Text](
189
"hdfs://path/to/file",
190
classOf[TextInputFormat],
191
classOf[LongWritable],
192
classOf[Text]
193
)
194
195
// Custom InputFormat example
196
class CustomInputFormat extends InputFormat[Text, Text] {
197
// Implementation here
198
}
199
200
val customFile = sc.hadoopFile[Text, Text](
201
"hdfs://path/to/custom/format",
202
classOf[CustomInputFormat],
203
classOf[Text],
204
classOf[Text]
205
)
206
```
207
208
**newAPIHadoopFile**: Use new Hadoop API
209
```scala { .api }
210
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
211
path: String,
212
fClass: Class[F],
213
kClass: Class[K],
214
vClass: Class[V],
215
conf: Configuration = hadoopConfiguration
216
): RDD[(K, V)]
217
```
218
219
```scala
220
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
221
import org.apache.hadoop.io.{LongWritable, Text}
222
223
val newAPIFile = sc.newAPIHadoopFile[LongWritable, Text, TextInputFormat](
224
"hdfs://path/to/file",
225
classOf[TextInputFormat],
226
classOf[LongWritable],
227
classOf[Text]
228
)
229
```
230
231
### Hadoop Configuration
232
233
Access and modify Hadoop configuration:
234
235
```scala { .api }
236
def hadoopConfiguration: Configuration
237
```
238
239
```scala
240
val hadoopConf = sc.hadoopConfiguration
241
242
// Configure S3 access
243
hadoopConf.set("fs.s3a.access.key", "your-access-key")
244
hadoopConf.set("fs.s3a.secret.key", "your-secret-key")
245
hadoopConf.set("fs.s3a.endpoint", "s3.amazonaws.com")
246
247
// Configure compression
248
hadoopConf.set("mapreduce.map.output.compress", "true")
249
hadoopConf.set("mapreduce.map.output.compress.codec", "org.apache.hadoop.io.compress.SnappyCodec")
250
251
// Now read from S3
252
val s3Data = sc.textFile("s3a://bucket-name/path/to/file")
253
```
254
255
## Saving with Hadoop Formats
256
257
### Save as Hadoop Files
258
259
**saveAsHadoopFile**: Save with old Hadoop API
260
```scala { .api }
261
def saveAsHadoopFile[F <: OutputFormat[K, V]](
262
path: String,
263
keyClass: Class[_],
264
valueClass: Class[_],
265
outputFormatClass: Class[F],
266
codec: Class[_ <: CompressionCodec]
267
): Unit
268
269
// Simplified versions
270
def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String): Unit
271
def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String, codec: Class[_ <: CompressionCodec]): Unit
272
```
273
274
```scala
275
import org.apache.hadoop.mapred.{TextOutputFormat, SequenceFileOutputFormat}
276
import org.apache.hadoop.io.{IntWritable, Text}
277
278
val pairs = sc.parallelize(Array((1, "apple"), (2, "banana")))
279
val writablePairs = pairs.map { case (k, v) => (new IntWritable(k), new Text(v)) }
280
281
// Save as text with custom format
282
writablePairs.saveAsHadoopFile[TextOutputFormat[IntWritable, Text]](
283
"hdfs://path/to/output",
284
classOf[IntWritable],
285
classOf[Text],
286
classOf[TextOutputFormat[IntWritable, Text]]
287
)
288
```
289
290
**saveAsNewAPIHadoopFile**: Save with new Hadoop API
291
```scala { .api }
292
def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](
293
path: String,
294
keyClass: Class[_],
295
valueClass: Class[_],
296
outputFormatClass: Class[F],
297
conf: Configuration = context.hadoopConfiguration
298
): Unit
299
```
300
301
**saveAsHadoopDataset**: Save using JobConf
302
```scala { .api }
303
def saveAsHadoopDataset(conf: JobConf): Unit
304
```
305
306
```scala
307
import org.apache.hadoop.mapred.{JobConf, TextOutputFormat}
308
309
val jobConf = new JobConf()
310
jobConf.setOutputFormat(classOf[TextOutputFormat[IntWritable, Text]])
311
jobConf.setOutputKeyClass(classOf[IntWritable])
312
jobConf.setOutputValueClass(classOf[Text])
313
jobConf.setOutputPath(new Path("hdfs://path/to/output"))
314
315
writablePairs.saveAsHadoopDataset(jobConf)
316
```
317
318
## Database Connectivity
319
320
### JDBC Data Sources
321
322
While Spark 1.0.0 doesn't have built-in JDBC DataFrames, you can read from databases using custom input formats:
323
324
```scala
325
import java.sql.{Connection, DriverManager, ResultSet}
326
327
// Custom function to read from database
328
def readFromDatabase(url: String, query: String): RDD[String] = {
329
sc.parallelize(Seq(query)).mapPartitions { queries =>
330
val connection = DriverManager.getConnection(url)
331
val statement = connection.createStatement()
332
333
queries.flatMap { query =>
334
val resultSet = statement.executeQuery(query)
335
val results = scala.collection.mutable.ListBuffer[String]()
336
337
while (resultSet.next()) {
338
// Extract data from ResultSet
339
results += resultSet.getString(1) // Assuming single column
340
}
341
342
resultSet.close()
343
statement.close()
344
connection.close()
345
results
346
}
347
}
348
}
349
350
val dbData = readFromDatabase("jdbc:mysql://localhost:3306/mydb", "SELECT * FROM users")
351
```
352
353
### Custom Data Sources
354
355
Create custom data sources by implementing InputFormat:
356
357
```scala
358
import org.apache.hadoop.mapred.{InputFormat, InputSplit, JobConf, RecordReader, Reporter}
359
360
class CustomInputFormat extends InputFormat[LongWritable, Text] {
361
def getSplits(job: JobConf, numSplits: Int): Array[InputSplit] = {
362
// Create input splits
363
Array[InputSplit]()
364
}
365
366
def getRecordReader(split: InputSplit, job: JobConf, reporter: Reporter): RecordReader[LongWritable, Text] = {
367
// Return record reader
368
null
369
}
370
}
371
372
// Use custom format
373
val customData = sc.hadoopFile[LongWritable, Text](
374
"path",
375
classOf[CustomInputFormat],
376
classOf[LongWritable],
377
classOf[Text]
378
)
379
```
380
381
## Cloud Storage
382
383
### Amazon S3
384
385
```scala
386
// Configure S3 access
387
val hadoopConf = sc.hadoopConfiguration
388
hadoopConf.set("fs.s3a.access.key", "ACCESS_KEY")
389
hadoopConf.set("fs.s3a.secret.key", "SECRET_KEY")
390
391
// Read from S3
392
val s3Data = sc.textFile("s3a://my-bucket/path/to/data.txt")
393
394
// Write to S3
395
data.saveAsTextFile("s3a://my-bucket/output/")
396
```
397
398
### Azure Blob Storage
399
400
```scala
401
// Configure Azure access
402
hadoopConf.set("fs.azure.account.key.mystorageaccount.blob.core.windows.net", "ACCOUNT_KEY")
403
404
// Read from Azure
405
val azureData = sc.textFile("wasb://container@mystorageaccount.blob.core.windows.net/path/to/file")
406
```
407
408
## File Formats and Compression
409
410
### Supported Compression Codecs
411
412
```scala { .api }
413
import org.apache.hadoop.io.compress.{
414
GzipCodec, // .gz files
415
BZip2Codec, // .bz2 files
416
SnappyCodec, // .snappy files
417
LzopCodec, // .lzo files
418
DefaultCodec // Default compression
419
}
420
```
421
422
### Reading Compressed Files
423
424
Spark automatically detects compression based on file extension:
425
426
```scala
427
// Automatically decompressed
428
val gzipData = sc.textFile("hdfs://path/to/file.txt.gz")
429
val bzip2Data = sc.textFile("hdfs://path/to/file.txt.bz2")
430
val snappyData = sc.textFile("hdfs://path/to/file.txt.snappy")
431
432
// Mixed compression in directory
433
val mixedData = sc.textFile("hdfs://path/to/directory/*") // Handles multiple formats
434
```
435
436
### Writing Compressed Files
437
438
```scala
439
val data = sc.parallelize(Array("line1", "line2", "line3"))
440
441
// Save with different compression
442
data.saveAsTextFile("output-gzip", classOf[GzipCodec])
443
data.saveAsTextFile("output-bzip2", classOf[BZip2Codec])
444
data.saveAsTextFile("output-snappy", classOf[SnappyCodec])
445
```
446
447
## Performance Considerations
448
449
### Partitioning
450
451
```scala
452
// Control number of partitions when reading
453
val data = sc.textFile("large-file.txt", minPartitions = 100)
454
455
// Repartition after reading if needed
456
val repartitioned = data.repartition(50)
457
```
458
459
### File Size Optimization
460
461
```scala
462
// For small files, use wholeTextFiles and then repartition
463
val smallFiles = sc.wholeTextFiles("hdfs://path/to/small-files/")
464
.values // Extract just the content
465
.repartition(10) // Reduce number of partitions
466
```
467
468
### Caching Frequently Accessed Data
469
470
```scala
471
val frequentlyUsed = sc.textFile("hdfs://path/to/data")
472
.filter(_.contains("important"))
473
.cache() // Cache in memory
474
475
// Multiple actions on cached data
476
val count1 = frequentlyUsed.count()
477
val count2 = frequentlyUsed.filter(_.length > 100).count()
478
```
479
480
## Error Handling and Validation
481
482
```scala
483
// Validate file existence before reading
484
import org.apache.hadoop.fs.{FileSystem, Path}
485
486
val fs = FileSystem.get(sc.hadoopConfiguration)
487
val path = new Path("hdfs://path/to/file")
488
489
if (fs.exists(path)) {
490
val data = sc.textFile(path.toString)
491
// Process data
492
} else {
493
println(s"File not found: $path")
494
}
495
496
// Handle malformed data
497
val safeData = sc.textFile("data.txt").mapPartitions { lines =>
498
lines.flatMap { line =>
499
try {
500
Some(processLine(line))
501
} catch {
502
case e: Exception =>
503
println(s"Error processing line: $line, Error: ${e.getMessage}")
504
None
505
}
506
}
507
}
508
```
509
510
This comprehensive coverage of data sources provides the foundation for reading and writing data in various formats and storage systems with Apache Spark.