Spec Registry
Help your agents use open-source better. Learn more.
Find usage specs for your project’s dependencies
- Author
- tessl
- Last updated
- Spec files
maven-apache-spark
Describes: maven/org.apache.spark/spark-parent
- Description
- Lightning-fast unified analytics engine for large-scale data processing with high-level APIs in Scala, Java, Python, and R
- Author
- tessl
- Last updated
data-sources.md docs/
1# Data Sources23Spark provides extensive support for reading and writing data from various sources including local filesystems, HDFS, object stores, and databases. This document covers all the data source APIs available in Spark 1.0.0.45## Text Files67### Reading Text Files89**textFile**: Read text files line by line10```scala { .api }11def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]12```1314```scala15// Local filesystem16val localFile = sc.textFile("file:///path/to/local/file.txt")1718// HDFS19val hdfsFile = sc.textFile("hdfs://namenode:port/path/to/file.txt")2021// Multiple files with wildcards22val multipleFiles = sc.textFile("hdfs://path/to/files/*.txt")2324// Specify minimum partitions25val partitionedFile = sc.textFile("hdfs://path/to/large/file.txt", 8)2627// Compressed files (automatically detected)28val gzipFile = sc.textFile("hdfs://path/to/file.txt.gz")29```3031**wholeTextFiles**: Read entire files as key-value pairs32```scala { .api }33def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]34```3536```scala37// Read directory of files - returns (filename, file_content) pairs38val filesRDD = sc.wholeTextFiles("hdfs://path/to/directory/")3940filesRDD.foreach { case (filename, content) =>41println(s"File: $filename")42println(s"Size: ${content.length} characters")43println(s"Lines: ${content.count(_ == '\n') + 1}")44}4546// Process each file separately47val processedFiles = filesRDD.mapValues { content =>48content.split("\n").filter(_.nonEmpty).length49}50```5152### Saving Text Files5354**saveAsTextFile**: Save RDD as text files55```scala { .api }56def saveAsTextFile(path: String): Unit57def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit58```5960```scala61val data = sc.parallelize(Array("line1", "line2", "line3"))6263// Save as text files64data.saveAsTextFile("hdfs://path/to/output")6566// Save with compression67import org.apache.hadoop.io.compress.GzipCodec68data.saveAsTextFile("hdfs://path/to/compressed-output", classOf[GzipCodec])6970// Other compression codecs71import org.apache.hadoop.io.compress.{BZip2Codec, DefaultCodec, SnappyCodec}72data.saveAsTextFile("output-bzip2", classOf[BZip2Codec])73data.saveAsTextFile("output-snappy", classOf[SnappyCodec])74```7576## Object Files7778Spark's native binary format using Java serialization.7980### Reading Object Files8182**objectFile**: Load RDD of objects83```scala { .api }84def objectFile[T: ClassTag](path: String, minPartitions: Int = defaultMinPartitions): RDD[T]85```8687```scala88// Save and load custom objects89case class Person(name: String, age: Int)9091val people = sc.parallelize(Array(92Person("Alice", 25),93Person("Bob", 30),94Person("Charlie", 35)95))9697// Save as object file98people.saveAsObjectFile("hdfs://path/to/people")99100// Load back101val loadedPeople: RDD[Person] = sc.objectFile[Person]("hdfs://path/to/people")102```103104### Saving Object Files105106**saveAsObjectFile**: Save RDD as serialized objects107```scala { .api }108def saveAsObjectFile(path: String): Unit109```110111```scala112val complexData = sc.parallelize(Array(113Map("name" -> "Alice", "scores" -> List(85, 92, 78)),114Map("name" -> "Bob", "scores" -> List(91, 87, 94))115))116117complexData.saveAsObjectFile("hdfs://path/to/complex-data")118```119120## Hadoop Files121122Spark integrates with Hadoop's input and output formats for reading various file types.123124### Sequence Files125126**Reading SequenceFiles**:127```scala { .api }128def sequenceFile[K, V](129path: String,130keyClass: Class[K],131valueClass: Class[V],132minPartitions: Int = defaultMinPartitions133): RDD[(K, V)]134```135136```scala137import org.apache.hadoop.io.{IntWritable, Text}138139val seqFile = sc.sequenceFile[IntWritable, Text](140"hdfs://path/to/sequencefile",141classOf[IntWritable],142classOf[Text]143)144145// Convert Writable types to Scala types146val converted = seqFile.map { case (key, value) =>147(key.get(), value.toString)148}149```150151**Saving SequenceFiles**:152```scala { .api }153def saveAsSequenceFile(path: String, codec: Option[Class[_ <: CompressionCodec]] = None): Unit154```155156```scala157// For RDDs of types that can be converted to Writable158val pairs = sc.parallelize(Array((1, "apple"), (2, "banana"), (3, "orange")))159160// Convert to Writable types161val writablePairs = pairs.map { case (k, v) =>162(new IntWritable(k), new Text(v))163}164165writablePairs.saveAsSequenceFile("hdfs://path/to/output")166167// With compression168writablePairs.saveAsSequenceFile("hdfs://path/to/compressed", Some(classOf[GzipCodec]))169```170171### Generic Hadoop Files172173**hadoopFile**: Read files with custom InputFormat174```scala { .api }175def hadoopFile[K, V](176path: String,177inputFormatClass: Class[_ <: InputFormat[K, V]],178keyClass: Class[K],179valueClass: Class[V],180minPartitions: Int = defaultMinPartitions181): RDD[(K, V)]182```183184```scala185import org.apache.hadoop.mapred.{TextInputFormat, FileInputFormat}186import org.apache.hadoop.io.{LongWritable, Text}187188// Read with TextInputFormat (returns line number, line content)189val textWithLineNumbers = sc.hadoopFile[LongWritable, Text](190"hdfs://path/to/file",191classOf[TextInputFormat],192classOf[LongWritable],193classOf[Text]194)195196// Custom InputFormat example197class CustomInputFormat extends InputFormat[Text, Text] {198// Implementation here199}200201val customFile = sc.hadoopFile[Text, Text](202"hdfs://path/to/custom/format",203classOf[CustomInputFormat],204classOf[Text],205classOf[Text]206)207```208209**newAPIHadoopFile**: Use new Hadoop API210```scala { .api }211def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](212path: String,213fClass: Class[F],214kClass: Class[K],215vClass: Class[V],216conf: Configuration = hadoopConfiguration217): RDD[(K, V)]218```219220```scala221import org.apache.hadoop.mapreduce.lib.input.TextInputFormat222import org.apache.hadoop.io.{LongWritable, Text}223224val newAPIFile = sc.newAPIHadoopFile[LongWritable, Text, TextInputFormat](225"hdfs://path/to/file",226classOf[TextInputFormat],227classOf[LongWritable],228classOf[Text]229)230```231232### Hadoop Configuration233234Access and modify Hadoop configuration:235236```scala { .api }237def hadoopConfiguration: Configuration238```239240```scala241val hadoopConf = sc.hadoopConfiguration242243// Configure S3 access244hadoopConf.set("fs.s3a.access.key", "your-access-key")245hadoopConf.set("fs.s3a.secret.key", "your-secret-key")246hadoopConf.set("fs.s3a.endpoint", "s3.amazonaws.com")247248// Configure compression249hadoopConf.set("mapreduce.map.output.compress", "true")250hadoopConf.set("mapreduce.map.output.compress.codec", "org.apache.hadoop.io.compress.SnappyCodec")251252// Now read from S3253val s3Data = sc.textFile("s3a://bucket-name/path/to/file")254```255256## Saving with Hadoop Formats257258### Save as Hadoop Files259260**saveAsHadoopFile**: Save with old Hadoop API261```scala { .api }262def saveAsHadoopFile[F <: OutputFormat[K, V]](263path: String,264keyClass: Class[_],265valueClass: Class[_],266outputFormatClass: Class[F],267codec: Class[_ <: CompressionCodec]268): Unit269270// Simplified versions271def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String): Unit272def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String, codec: Class[_ <: CompressionCodec]): Unit273```274275```scala276import org.apache.hadoop.mapred.{TextOutputFormat, SequenceFileOutputFormat}277import org.apache.hadoop.io.{IntWritable, Text}278279val pairs = sc.parallelize(Array((1, "apple"), (2, "banana")))280val writablePairs = pairs.map { case (k, v) => (new IntWritable(k), new Text(v)) }281282// Save as text with custom format283writablePairs.saveAsHadoopFile[TextOutputFormat[IntWritable, Text]](284"hdfs://path/to/output",285classOf[IntWritable],286classOf[Text],287classOf[TextOutputFormat[IntWritable, Text]]288)289```290291**saveAsNewAPIHadoopFile**: Save with new Hadoop API292```scala { .api }293def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](294path: String,295keyClass: Class[_],296valueClass: Class[_],297outputFormatClass: Class[F],298conf: Configuration = context.hadoopConfiguration299): Unit300```301302**saveAsHadoopDataset**: Save using JobConf303```scala { .api }304def saveAsHadoopDataset(conf: JobConf): Unit305```306307```scala308import org.apache.hadoop.mapred.{JobConf, TextOutputFormat}309310val jobConf = new JobConf()311jobConf.setOutputFormat(classOf[TextOutputFormat[IntWritable, Text]])312jobConf.setOutputKeyClass(classOf[IntWritable])313jobConf.setOutputValueClass(classOf[Text])314jobConf.setOutputPath(new Path("hdfs://path/to/output"))315316writablePairs.saveAsHadoopDataset(jobConf)317```318319## Database Connectivity320321### JDBC Data Sources322323While Spark 1.0.0 doesn't have built-in JDBC DataFrames, you can read from databases using custom input formats:324325```scala326import java.sql.{Connection, DriverManager, ResultSet}327328// Custom function to read from database329def readFromDatabase(url: String, query: String): RDD[String] = {330sc.parallelize(Seq(query)).mapPartitions { queries =>331val connection = DriverManager.getConnection(url)332val statement = connection.createStatement()333334queries.flatMap { query =>335val resultSet = statement.executeQuery(query)336val results = scala.collection.mutable.ListBuffer[String]()337338while (resultSet.next()) {339// Extract data from ResultSet340results += resultSet.getString(1) // Assuming single column341}342343resultSet.close()344statement.close()345connection.close()346results347}348}349}350351val dbData = readFromDatabase("jdbc:mysql://localhost:3306/mydb", "SELECT * FROM users")352```353354### Custom Data Sources355356Create custom data sources by implementing InputFormat:357358```scala359import org.apache.hadoop.mapred.{InputFormat, InputSplit, JobConf, RecordReader, Reporter}360361class CustomInputFormat extends InputFormat[LongWritable, Text] {362def getSplits(job: JobConf, numSplits: Int): Array[InputSplit] = {363// Create input splits364Array[InputSplit]()365}366367def getRecordReader(split: InputSplit, job: JobConf, reporter: Reporter): RecordReader[LongWritable, Text] = {368// Return record reader369null370}371}372373// Use custom format374val customData = sc.hadoopFile[LongWritable, Text](375"path",376classOf[CustomInputFormat],377classOf[LongWritable],378classOf[Text]379)380```381382## Cloud Storage383384### Amazon S3385386```scala387// Configure S3 access388val hadoopConf = sc.hadoopConfiguration389hadoopConf.set("fs.s3a.access.key", "ACCESS_KEY")390hadoopConf.set("fs.s3a.secret.key", "SECRET_KEY")391392// Read from S3393val s3Data = sc.textFile("s3a://my-bucket/path/to/data.txt")394395// Write to S3396data.saveAsTextFile("s3a://my-bucket/output/")397```398399### Azure Blob Storage400401```scala402// Configure Azure access403hadoopConf.set("fs.azure.account.key.mystorageaccount.blob.core.windows.net", "ACCOUNT_KEY")404405// Read from Azure406val azureData = sc.textFile("wasb://container@mystorageaccount.blob.core.windows.net/path/to/file")407```408409## File Formats and Compression410411### Supported Compression Codecs412413```scala { .api }414import org.apache.hadoop.io.compress.{415GzipCodec, // .gz files416BZip2Codec, // .bz2 files417SnappyCodec, // .snappy files418LzopCodec, // .lzo files419DefaultCodec // Default compression420}421```422423### Reading Compressed Files424425Spark automatically detects compression based on file extension:426427```scala428// Automatically decompressed429val gzipData = sc.textFile("hdfs://path/to/file.txt.gz")430val bzip2Data = sc.textFile("hdfs://path/to/file.txt.bz2")431val snappyData = sc.textFile("hdfs://path/to/file.txt.snappy")432433// Mixed compression in directory434val mixedData = sc.textFile("hdfs://path/to/directory/*") // Handles multiple formats435```436437### Writing Compressed Files438439```scala440val data = sc.parallelize(Array("line1", "line2", "line3"))441442// Save with different compression443data.saveAsTextFile("output-gzip", classOf[GzipCodec])444data.saveAsTextFile("output-bzip2", classOf[BZip2Codec])445data.saveAsTextFile("output-snappy", classOf[SnappyCodec])446```447448## Performance Considerations449450### Partitioning451452```scala453// Control number of partitions when reading454val data = sc.textFile("large-file.txt", minPartitions = 100)455456// Repartition after reading if needed457val repartitioned = data.repartition(50)458```459460### File Size Optimization461462```scala463// For small files, use wholeTextFiles and then repartition464val smallFiles = sc.wholeTextFiles("hdfs://path/to/small-files/")465.values // Extract just the content466.repartition(10) // Reduce number of partitions467```468469### Caching Frequently Accessed Data470471```scala472val frequentlyUsed = sc.textFile("hdfs://path/to/data")473.filter(_.contains("important"))474.cache() // Cache in memory475476// Multiple actions on cached data477val count1 = frequentlyUsed.count()478val count2 = frequentlyUsed.filter(_.length > 100).count()479```480481## Error Handling and Validation482483```scala484// Validate file existence before reading485import org.apache.hadoop.fs.{FileSystem, Path}486487val fs = FileSystem.get(sc.hadoopConfiguration)488val path = new Path("hdfs://path/to/file")489490if (fs.exists(path)) {491val data = sc.textFile(path.toString)492// Process data493} else {494println(s"File not found: $path")495}496497// Handle malformed data498val safeData = sc.textFile("data.txt").mapPartitions { lines =>499lines.flatMap { line =>500try {501Some(processLine(line))502} catch {503case e: Exception =>504println(s"Error processing line: $line, Error: ${e.getMessage}")505None506}507}508}509```510511This comprehensive coverage of data sources provides the foundation for reading and writing data in various formats and storage systems with Apache Spark.