Spec RegistrySpec Registry

Help your agents use open-source better. Learn more.

Find usage specs for your project’s dependencies

>

maven-apache-spark

Description
Lightning-fast unified analytics engine for large-scale data processing with high-level APIs in Scala, Java, Python, and R
Author
tessl
Last updated

How to use

npx @tessl/cli registry install tessl/maven-apache-spark@1.0.0

data-sources.md docs/

1
# Data Sources
2
3
Spark provides extensive support for reading and writing data from various sources including local filesystems, HDFS, object stores, and databases. This document covers all the data source APIs available in Spark 1.0.0.
4
5
## Text Files
6
7
### Reading Text Files
8
9
**textFile**: Read text files line by line
10
```scala { .api }
11
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]
12
```
13
14
```scala
15
// Local filesystem
16
val localFile = sc.textFile("file:///path/to/local/file.txt")
17
18
// HDFS
19
val hdfsFile = sc.textFile("hdfs://namenode:port/path/to/file.txt")
20
21
// Multiple files with wildcards
22
val multipleFiles = sc.textFile("hdfs://path/to/files/*.txt")
23
24
// Specify minimum partitions
25
val partitionedFile = sc.textFile("hdfs://path/to/large/file.txt", 8)
26
27
// Compressed files (automatically detected)
28
val gzipFile = sc.textFile("hdfs://path/to/file.txt.gz")
29
```
30
31
**wholeTextFiles**: Read entire files as key-value pairs
32
```scala { .api }
33
def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]
34
```
35
36
```scala
37
// Read directory of files - returns (filename, file_content) pairs
38
val filesRDD = sc.wholeTextFiles("hdfs://path/to/directory/")
39
40
filesRDD.foreach { case (filename, content) =>
41
println(s"File: $filename")
42
println(s"Size: ${content.length} characters")
43
println(s"Lines: ${content.count(_ == '\n') + 1}")
44
}
45
46
// Process each file separately
47
val processedFiles = filesRDD.mapValues { content =>
48
content.split("\n").filter(_.nonEmpty).length
49
}
50
```
51
52
### Saving Text Files
53
54
**saveAsTextFile**: Save RDD as text files
55
```scala { .api }
56
def saveAsTextFile(path: String): Unit
57
def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit
58
```
59
60
```scala
61
val data = sc.parallelize(Array("line1", "line2", "line3"))
62
63
// Save as text files
64
data.saveAsTextFile("hdfs://path/to/output")
65
66
// Save with compression
67
import org.apache.hadoop.io.compress.GzipCodec
68
data.saveAsTextFile("hdfs://path/to/compressed-output", classOf[GzipCodec])
69
70
// Other compression codecs
71
import org.apache.hadoop.io.compress.{BZip2Codec, DefaultCodec, SnappyCodec}
72
data.saveAsTextFile("output-bzip2", classOf[BZip2Codec])
73
data.saveAsTextFile("output-snappy", classOf[SnappyCodec])
74
```
75
76
## Object Files
77
78
Spark's native binary format using Java serialization.
79
80
### Reading Object Files
81
82
**objectFile**: Load RDD of objects
83
```scala { .api }
84
def objectFile[T: ClassTag](path: String, minPartitions: Int = defaultMinPartitions): RDD[T]
85
```
86
87
```scala
88
// Save and load custom objects
89
case class Person(name: String, age: Int)
90
91
val people = sc.parallelize(Array(
92
Person("Alice", 25),
93
Person("Bob", 30),
94
Person("Charlie", 35)
95
))
96
97
// Save as object file
98
people.saveAsObjectFile("hdfs://path/to/people")
99
100
// Load back
101
val loadedPeople: RDD[Person] = sc.objectFile[Person]("hdfs://path/to/people")
102
```
103
104
### Saving Object Files
105
106
**saveAsObjectFile**: Save RDD as serialized objects
107
```scala { .api }
108
def saveAsObjectFile(path: String): Unit
109
```
110
111
```scala
112
val complexData = sc.parallelize(Array(
113
Map("name" -> "Alice", "scores" -> List(85, 92, 78)),
114
Map("name" -> "Bob", "scores" -> List(91, 87, 94))
115
))
116
117
complexData.saveAsObjectFile("hdfs://path/to/complex-data")
118
```
119
120
## Hadoop Files
121
122
Spark integrates with Hadoop's input and output formats for reading various file types.
123
124
### Sequence Files
125
126
**Reading SequenceFiles**:
127
```scala { .api }
128
def sequenceFile[K, V](
129
path: String,
130
keyClass: Class[K],
131
valueClass: Class[V],
132
minPartitions: Int = defaultMinPartitions
133
): RDD[(K, V)]
134
```
135
136
```scala
137
import org.apache.hadoop.io.{IntWritable, Text}
138
139
val seqFile = sc.sequenceFile[IntWritable, Text](
140
"hdfs://path/to/sequencefile",
141
classOf[IntWritable],
142
classOf[Text]
143
)
144
145
// Convert Writable types to Scala types
146
val converted = seqFile.map { case (key, value) =>
147
(key.get(), value.toString)
148
}
149
```
150
151
**Saving SequenceFiles**:
152
```scala { .api }
153
def saveAsSequenceFile(path: String, codec: Option[Class[_ <: CompressionCodec]] = None): Unit
154
```
155
156
```scala
157
// For RDDs of types that can be converted to Writable
158
val pairs = sc.parallelize(Array((1, "apple"), (2, "banana"), (3, "orange")))
159
160
// Convert to Writable types
161
val writablePairs = pairs.map { case (k, v) =>
162
(new IntWritable(k), new Text(v))
163
}
164
165
writablePairs.saveAsSequenceFile("hdfs://path/to/output")
166
167
// With compression
168
writablePairs.saveAsSequenceFile("hdfs://path/to/compressed", Some(classOf[GzipCodec]))
169
```
170
171
### Generic Hadoop Files
172
173
**hadoopFile**: Read files with custom InputFormat
174
```scala { .api }
175
def hadoopFile[K, V](
176
path: String,
177
inputFormatClass: Class[_ <: InputFormat[K, V]],
178
keyClass: Class[K],
179
valueClass: Class[V],
180
minPartitions: Int = defaultMinPartitions
181
): RDD[(K, V)]
182
```
183
184
```scala
185
import org.apache.hadoop.mapred.{TextInputFormat, FileInputFormat}
186
import org.apache.hadoop.io.{LongWritable, Text}
187
188
// Read with TextInputFormat (returns line number, line content)
189
val textWithLineNumbers = sc.hadoopFile[LongWritable, Text](
190
"hdfs://path/to/file",
191
classOf[TextInputFormat],
192
classOf[LongWritable],
193
classOf[Text]
194
)
195
196
// Custom InputFormat example
197
class CustomInputFormat extends InputFormat[Text, Text] {
198
// Implementation here
199
}
200
201
val customFile = sc.hadoopFile[Text, Text](
202
"hdfs://path/to/custom/format",
203
classOf[CustomInputFormat],
204
classOf[Text],
205
classOf[Text]
206
)
207
```
208
209
**newAPIHadoopFile**: Use new Hadoop API
210
```scala { .api }
211
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
212
path: String,
213
fClass: Class[F],
214
kClass: Class[K],
215
vClass: Class[V],
216
conf: Configuration = hadoopConfiguration
217
): RDD[(K, V)]
218
```
219
220
```scala
221
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
222
import org.apache.hadoop.io.{LongWritable, Text}
223
224
val newAPIFile = sc.newAPIHadoopFile[LongWritable, Text, TextInputFormat](
225
"hdfs://path/to/file",
226
classOf[TextInputFormat],
227
classOf[LongWritable],
228
classOf[Text]
229
)
230
```
231
232
### Hadoop Configuration
233
234
Access and modify Hadoop configuration:
235
236
```scala { .api }
237
def hadoopConfiguration: Configuration
238
```
239
240
```scala
241
val hadoopConf = sc.hadoopConfiguration
242
243
// Configure S3 access
244
hadoopConf.set("fs.s3a.access.key", "your-access-key")
245
hadoopConf.set("fs.s3a.secret.key", "your-secret-key")
246
hadoopConf.set("fs.s3a.endpoint", "s3.amazonaws.com")
247
248
// Configure compression
249
hadoopConf.set("mapreduce.map.output.compress", "true")
250
hadoopConf.set("mapreduce.map.output.compress.codec", "org.apache.hadoop.io.compress.SnappyCodec")
251
252
// Now read from S3
253
val s3Data = sc.textFile("s3a://bucket-name/path/to/file")
254
```
255
256
## Saving with Hadoop Formats
257
258
### Save as Hadoop Files
259
260
**saveAsHadoopFile**: Save with old Hadoop API
261
```scala { .api }
262
def saveAsHadoopFile[F <: OutputFormat[K, V]](
263
path: String,
264
keyClass: Class[_],
265
valueClass: Class[_],
266
outputFormatClass: Class[F],
267
codec: Class[_ <: CompressionCodec]
268
): Unit
269
270
// Simplified versions
271
def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String): Unit
272
def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String, codec: Class[_ <: CompressionCodec]): Unit
273
```
274
275
```scala
276
import org.apache.hadoop.mapred.{TextOutputFormat, SequenceFileOutputFormat}
277
import org.apache.hadoop.io.{IntWritable, Text}
278
279
val pairs = sc.parallelize(Array((1, "apple"), (2, "banana")))
280
val writablePairs = pairs.map { case (k, v) => (new IntWritable(k), new Text(v)) }
281
282
// Save as text with custom format
283
writablePairs.saveAsHadoopFile[TextOutputFormat[IntWritable, Text]](
284
"hdfs://path/to/output",
285
classOf[IntWritable],
286
classOf[Text],
287
classOf[TextOutputFormat[IntWritable, Text]]
288
)
289
```
290
291
**saveAsNewAPIHadoopFile**: Save with new Hadoop API
292
```scala { .api }
293
def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](
294
path: String,
295
keyClass: Class[_],
296
valueClass: Class[_],
297
outputFormatClass: Class[F],
298
conf: Configuration = context.hadoopConfiguration
299
): Unit
300
```
301
302
**saveAsHadoopDataset**: Save using JobConf
303
```scala { .api }
304
def saveAsHadoopDataset(conf: JobConf): Unit
305
```
306
307
```scala
308
import org.apache.hadoop.mapred.{JobConf, TextOutputFormat}
309
310
val jobConf = new JobConf()
311
jobConf.setOutputFormat(classOf[TextOutputFormat[IntWritable, Text]])
312
jobConf.setOutputKeyClass(classOf[IntWritable])
313
jobConf.setOutputValueClass(classOf[Text])
314
jobConf.setOutputPath(new Path("hdfs://path/to/output"))
315
316
writablePairs.saveAsHadoopDataset(jobConf)
317
```
318
319
## Database Connectivity
320
321
### JDBC Data Sources
322
323
While Spark 1.0.0 doesn't have built-in JDBC DataFrames, you can read from databases using custom input formats:
324
325
```scala
326
import java.sql.{Connection, DriverManager, ResultSet}
327
328
// Custom function to read from database
329
def readFromDatabase(url: String, query: String): RDD[String] = {
330
sc.parallelize(Seq(query)).mapPartitions { queries =>
331
val connection = DriverManager.getConnection(url)
332
val statement = connection.createStatement()
333
334
queries.flatMap { query =>
335
val resultSet = statement.executeQuery(query)
336
val results = scala.collection.mutable.ListBuffer[String]()
337
338
while (resultSet.next()) {
339
// Extract data from ResultSet
340
results += resultSet.getString(1) // Assuming single column
341
}
342
343
resultSet.close()
344
statement.close()
345
connection.close()
346
results
347
}
348
}
349
}
350
351
val dbData = readFromDatabase("jdbc:mysql://localhost:3306/mydb", "SELECT * FROM users")
352
```
353
354
### Custom Data Sources
355
356
Create custom data sources by implementing InputFormat:
357
358
```scala
359
import org.apache.hadoop.mapred.{InputFormat, InputSplit, JobConf, RecordReader, Reporter}
360
361
class CustomInputFormat extends InputFormat[LongWritable, Text] {
362
def getSplits(job: JobConf, numSplits: Int): Array[InputSplit] = {
363
// Create input splits
364
Array[InputSplit]()
365
}
366
367
def getRecordReader(split: InputSplit, job: JobConf, reporter: Reporter): RecordReader[LongWritable, Text] = {
368
// Return record reader
369
null
370
}
371
}
372
373
// Use custom format
374
val customData = sc.hadoopFile[LongWritable, Text](
375
"path",
376
classOf[CustomInputFormat],
377
classOf[LongWritable],
378
classOf[Text]
379
)
380
```
381
382
## Cloud Storage
383
384
### Amazon S3
385
386
```scala
387
// Configure S3 access
388
val hadoopConf = sc.hadoopConfiguration
389
hadoopConf.set("fs.s3a.access.key", "ACCESS_KEY")
390
hadoopConf.set("fs.s3a.secret.key", "SECRET_KEY")
391
392
// Read from S3
393
val s3Data = sc.textFile("s3a://my-bucket/path/to/data.txt")
394
395
// Write to S3
396
data.saveAsTextFile("s3a://my-bucket/output/")
397
```
398
399
### Azure Blob Storage
400
401
```scala
402
// Configure Azure access
403
hadoopConf.set("fs.azure.account.key.mystorageaccount.blob.core.windows.net", "ACCOUNT_KEY")
404
405
// Read from Azure
406
val azureData = sc.textFile("wasb://container@mystorageaccount.blob.core.windows.net/path/to/file")
407
```
408
409
## File Formats and Compression
410
411
### Supported Compression Codecs
412
413
```scala { .api }
414
import org.apache.hadoop.io.compress.{
415
GzipCodec, // .gz files
416
BZip2Codec, // .bz2 files
417
SnappyCodec, // .snappy files
418
LzopCodec, // .lzo files
419
DefaultCodec // Default compression
420
}
421
```
422
423
### Reading Compressed Files
424
425
Spark automatically detects compression based on file extension:
426
427
```scala
428
// Automatically decompressed
429
val gzipData = sc.textFile("hdfs://path/to/file.txt.gz")
430
val bzip2Data = sc.textFile("hdfs://path/to/file.txt.bz2")
431
val snappyData = sc.textFile("hdfs://path/to/file.txt.snappy")
432
433
// Mixed compression in directory
434
val mixedData = sc.textFile("hdfs://path/to/directory/*") // Handles multiple formats
435
```
436
437
### Writing Compressed Files
438
439
```scala
440
val data = sc.parallelize(Array("line1", "line2", "line3"))
441
442
// Save with different compression
443
data.saveAsTextFile("output-gzip", classOf[GzipCodec])
444
data.saveAsTextFile("output-bzip2", classOf[BZip2Codec])
445
data.saveAsTextFile("output-snappy", classOf[SnappyCodec])
446
```
447
448
## Performance Considerations
449
450
### Partitioning
451
452
```scala
453
// Control number of partitions when reading
454
val data = sc.textFile("large-file.txt", minPartitions = 100)
455
456
// Repartition after reading if needed
457
val repartitioned = data.repartition(50)
458
```
459
460
### File Size Optimization
461
462
```scala
463
// For small files, use wholeTextFiles and then repartition
464
val smallFiles = sc.wholeTextFiles("hdfs://path/to/small-files/")
465
.values // Extract just the content
466
.repartition(10) // Reduce number of partitions
467
```
468
469
### Caching Frequently Accessed Data
470
471
```scala
472
val frequentlyUsed = sc.textFile("hdfs://path/to/data")
473
.filter(_.contains("important"))
474
.cache() // Cache in memory
475
476
// Multiple actions on cached data
477
val count1 = frequentlyUsed.count()
478
val count2 = frequentlyUsed.filter(_.length > 100).count()
479
```
480
481
## Error Handling and Validation
482
483
```scala
484
// Validate file existence before reading
485
import org.apache.hadoop.fs.{FileSystem, Path}
486
487
val fs = FileSystem.get(sc.hadoopConfiguration)
488
val path = new Path("hdfs://path/to/file")
489
490
if (fs.exists(path)) {
491
val data = sc.textFile(path.toString)
492
// Process data
493
} else {
494
println(s"File not found: $path")
495
}
496
497
// Handle malformed data
498
val safeData = sc.textFile("data.txt").mapPartitions { lines =>
499
lines.flatMap { line =>
500
try {
501
Some(processLine(line))
502
} catch {
503
case e: Exception =>
504
println(s"Error processing line: $line, Error: ${e.getMessage}")
505
None
506
}
507
}
508
}
509
```
510
511
This comprehensive coverage of data sources provides the foundation for reading and writing data in various formats and storage systems with Apache Spark.