CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-apache-spark

Lightning-fast unified analytics engine for large-scale data processing with high-level APIs in Scala, Java, Python, and R

Pending
Overview
Eval results
Files

data-sources.mddocs/

Data Sources

Spark provides extensive support for reading and writing data from various sources including local filesystems, HDFS, object stores, and databases. This document covers all the data source APIs available in Spark 1.0.0.

Text Files

Reading Text Files

textFile: Read text files line by line

def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]
// Local filesystem
val localFile = sc.textFile("file:///path/to/local/file.txt")

// HDFS
val hdfsFile = sc.textFile("hdfs://namenode:port/path/to/file.txt")

// Multiple files with wildcards
val multipleFiles = sc.textFile("hdfs://path/to/files/*.txt")

// Specify minimum partitions
val partitionedFile = sc.textFile("hdfs://path/to/large/file.txt", 8)

// Compressed files (automatically detected)
val gzipFile = sc.textFile("hdfs://path/to/file.txt.gz")

wholeTextFiles: Read entire files as key-value pairs

def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]
// Read directory of files - returns (filename, file_content) pairs
val filesRDD = sc.wholeTextFiles("hdfs://path/to/directory/")

filesRDD.foreach { case (filename, content) =>
  println(s"File: $filename")
  println(s"Size: ${content.length} characters")
  println(s"Lines: ${content.count(_ == '\n') + 1}")
}

// Process each file separately
val processedFiles = filesRDD.mapValues { content =>
  content.split("\n").filter(_.nonEmpty).length
}

Saving Text Files

saveAsTextFile: Save RDD as text files

def saveAsTextFile(path: String): Unit
def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit
val data = sc.parallelize(Array("line1", "line2", "line3"))

// Save as text files
data.saveAsTextFile("hdfs://path/to/output")

// Save with compression
import org.apache.hadoop.io.compress.GzipCodec
data.saveAsTextFile("hdfs://path/to/compressed-output", classOf[GzipCodec])

// Other compression codecs
import org.apache.hadoop.io.compress.{BZip2Codec, DefaultCodec, SnappyCodec}
data.saveAsTextFile("output-bzip2", classOf[BZip2Codec])
data.saveAsTextFile("output-snappy", classOf[SnappyCodec])

Object Files

Spark's native binary format using Java serialization.

Reading Object Files

objectFile: Load RDD of objects

def objectFile[T: ClassTag](path: String, minPartitions: Int = defaultMinPartitions): RDD[T]
// Save and load custom objects
case class Person(name: String, age: Int)

val people = sc.parallelize(Array(
  Person("Alice", 25),
  Person("Bob", 30),
  Person("Charlie", 35)
))

// Save as object file
people.saveAsObjectFile("hdfs://path/to/people")

// Load back
val loadedPeople: RDD[Person] = sc.objectFile[Person]("hdfs://path/to/people")

Saving Object Files

saveAsObjectFile: Save RDD as serialized objects

def saveAsObjectFile(path: String): Unit
val complexData = sc.parallelize(Array(
  Map("name" -> "Alice", "scores" -> List(85, 92, 78)),
  Map("name" -> "Bob", "scores" -> List(91, 87, 94))
))

complexData.saveAsObjectFile("hdfs://path/to/complex-data")

Hadoop Files

Spark integrates with Hadoop's input and output formats for reading various file types.

Sequence Files

Reading SequenceFiles:

def sequenceFile[K, V](
  path: String,
  keyClass: Class[K],
  valueClass: Class[V],
  minPartitions: Int = defaultMinPartitions
): RDD[(K, V)]
import org.apache.hadoop.io.{IntWritable, Text}

val seqFile = sc.sequenceFile[IntWritable, Text](
  "hdfs://path/to/sequencefile",
  classOf[IntWritable],
  classOf[Text]
)

// Convert Writable types to Scala types
val converted = seqFile.map { case (key, value) =>
  (key.get(), value.toString)
}

Saving SequenceFiles:

def saveAsSequenceFile(path: String, codec: Option[Class[_ <: CompressionCodec]] = None): Unit
// For RDDs of types that can be converted to Writable
val pairs = sc.parallelize(Array((1, "apple"), (2, "banana"), (3, "orange")))

// Convert to Writable types
val writablePairs = pairs.map { case (k, v) =>
  (new IntWritable(k), new Text(v))
}

writablePairs.saveAsSequenceFile("hdfs://path/to/output")

// With compression
writablePairs.saveAsSequenceFile("hdfs://path/to/compressed", Some(classOf[GzipCodec]))

Generic Hadoop Files

hadoopFile: Read files with custom InputFormat

def hadoopFile[K, V](
  path: String,
  inputFormatClass: Class[_ <: InputFormat[K, V]],
  keyClass: Class[K],
  valueClass: Class[V],
  minPartitions: Int = defaultMinPartitions
): RDD[(K, V)]
import org.apache.hadoop.mapred.{TextInputFormat, FileInputFormat}
import org.apache.hadoop.io.{LongWritable, Text}

// Read with TextInputFormat (returns line number, line content)
val textWithLineNumbers = sc.hadoopFile[LongWritable, Text](
  "hdfs://path/to/file",
  classOf[TextInputFormat],
  classOf[LongWritable],
  classOf[Text]
)

// Custom InputFormat example
class CustomInputFormat extends InputFormat[Text, Text] {
  // Implementation here
}

val customFile = sc.hadoopFile[Text, Text](
  "hdfs://path/to/custom/format",
  classOf[CustomInputFormat],
  classOf[Text],
  classOf[Text]
)

newAPIHadoopFile: Use new Hadoop API

def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
  path: String,
  fClass: Class[F],
  kClass: Class[K],
  vClass: Class[V],
  conf: Configuration = hadoopConfiguration
): RDD[(K, V)]
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.io.{LongWritable, Text}

val newAPIFile = sc.newAPIHadoopFile[LongWritable, Text, TextInputFormat](
  "hdfs://path/to/file",
  classOf[TextInputFormat],
  classOf[LongWritable],
  classOf[Text]
)

Hadoop Configuration

Access and modify Hadoop configuration:

def hadoopConfiguration: Configuration
val hadoopConf = sc.hadoopConfiguration

// Configure S3 access
hadoopConf.set("fs.s3a.access.key", "your-access-key")
hadoopConf.set("fs.s3a.secret.key", "your-secret-key")
hadoopConf.set("fs.s3a.endpoint", "s3.amazonaws.com")

// Configure compression
hadoopConf.set("mapreduce.map.output.compress", "true")
hadoopConf.set("mapreduce.map.output.compress.codec", "org.apache.hadoop.io.compress.SnappyCodec")

// Now read from S3
val s3Data = sc.textFile("s3a://bucket-name/path/to/file")

Saving with Hadoop Formats

Save as Hadoop Files

saveAsHadoopFile: Save with old Hadoop API

def saveAsHadoopFile[F <: OutputFormat[K, V]](
  path: String,
  keyClass: Class[_],
  valueClass: Class[_],
  outputFormatClass: Class[F],
  codec: Class[_ <: CompressionCodec]
): Unit

// Simplified versions
def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String): Unit
def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String, codec: Class[_ <: CompressionCodec]): Unit
import org.apache.hadoop.mapred.{TextOutputFormat, SequenceFileOutputFormat}
import org.apache.hadoop.io.{IntWritable, Text}

val pairs = sc.parallelize(Array((1, "apple"), (2, "banana")))
val writablePairs = pairs.map { case (k, v) => (new IntWritable(k), new Text(v)) }

// Save as text with custom format
writablePairs.saveAsHadoopFile[TextOutputFormat[IntWritable, Text]](
  "hdfs://path/to/output",
  classOf[IntWritable],
  classOf[Text],
  classOf[TextOutputFormat[IntWritable, Text]]
)

saveAsNewAPIHadoopFile: Save with new Hadoop API

def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](
  path: String,
  keyClass: Class[_],
  valueClass: Class[_],
  outputFormatClass: Class[F],
  conf: Configuration = context.hadoopConfiguration
): Unit

saveAsHadoopDataset: Save using JobConf

def saveAsHadoopDataset(conf: JobConf): Unit
import org.apache.hadoop.mapred.{JobConf, TextOutputFormat}

val jobConf = new JobConf()
jobConf.setOutputFormat(classOf[TextOutputFormat[IntWritable, Text]])
jobConf.setOutputKeyClass(classOf[IntWritable])
jobConf.setOutputValueClass(classOf[Text])
jobConf.setOutputPath(new Path("hdfs://path/to/output"))

writablePairs.saveAsHadoopDataset(jobConf)

Database Connectivity

JDBC Data Sources

While Spark 1.0.0 doesn't have built-in JDBC DataFrames, you can read from databases using custom input formats:

import java.sql.{Connection, DriverManager, ResultSet}

// Custom function to read from database
def readFromDatabase(url: String, query: String): RDD[String] = {
  sc.parallelize(Seq(query)).mapPartitions { queries =>
    val connection = DriverManager.getConnection(url)
    val statement = connection.createStatement()
    
    queries.flatMap { query =>
      val resultSet = statement.executeQuery(query)
      val results = scala.collection.mutable.ListBuffer[String]()
      
      while (resultSet.next()) {
        // Extract data from ResultSet
        results += resultSet.getString(1) // Assuming single column
      }
      
      resultSet.close()
      statement.close()
      connection.close()
      results
    }
  }
}

val dbData = readFromDatabase("jdbc:mysql://localhost:3306/mydb", "SELECT * FROM users")

Custom Data Sources

Create custom data sources by implementing InputFormat:

import org.apache.hadoop.mapred.{InputFormat, InputSplit, JobConf, RecordReader, Reporter}

class CustomInputFormat extends InputFormat[LongWritable, Text] {
  def getSplits(job: JobConf, numSplits: Int): Array[InputSplit] = {
    // Create input splits
    Array[InputSplit]()
  }
  
  def getRecordReader(split: InputSplit, job: JobConf, reporter: Reporter): RecordReader[LongWritable, Text] = {
    // Return record reader
    null
  }
}

// Use custom format
val customData = sc.hadoopFile[LongWritable, Text](
  "path",
  classOf[CustomInputFormat],
  classOf[LongWritable],
  classOf[Text]
)

Cloud Storage

Amazon S3

// Configure S3 access
val hadoopConf = sc.hadoopConfiguration
hadoopConf.set("fs.s3a.access.key", "ACCESS_KEY")
hadoopConf.set("fs.s3a.secret.key", "SECRET_KEY")

// Read from S3
val s3Data = sc.textFile("s3a://my-bucket/path/to/data.txt")

// Write to S3
data.saveAsTextFile("s3a://my-bucket/output/")

Azure Blob Storage

// Configure Azure access
hadoopConf.set("fs.azure.account.key.mystorageaccount.blob.core.windows.net", "ACCOUNT_KEY")

// Read from Azure
val azureData = sc.textFile("wasb://container@mystorageaccount.blob.core.windows.net/path/to/file")

File Formats and Compression

Supported Compression Codecs

import org.apache.hadoop.io.compress.{
  GzipCodec,        // .gz files
  BZip2Codec,       // .bz2 files  
  SnappyCodec,      // .snappy files
  LzopCodec,        // .lzo files
  DefaultCodec      // Default compression
}

Reading Compressed Files

Spark automatically detects compression based on file extension:

// Automatically decompressed
val gzipData = sc.textFile("hdfs://path/to/file.txt.gz")
val bzip2Data = sc.textFile("hdfs://path/to/file.txt.bz2")
val snappyData = sc.textFile("hdfs://path/to/file.txt.snappy")

// Mixed compression in directory
val mixedData = sc.textFile("hdfs://path/to/directory/*")  // Handles multiple formats

Writing Compressed Files

val data = sc.parallelize(Array("line1", "line2", "line3"))

// Save with different compression
data.saveAsTextFile("output-gzip", classOf[GzipCodec])
data.saveAsTextFile("output-bzip2", classOf[BZip2Codec])
data.saveAsTextFile("output-snappy", classOf[SnappyCodec])

Performance Considerations

Partitioning

// Control number of partitions when reading
val data = sc.textFile("large-file.txt", minPartitions = 100)

// Repartition after reading if needed
val repartitioned = data.repartition(50)

File Size Optimization

// For small files, use wholeTextFiles and then repartition
val smallFiles = sc.wholeTextFiles("hdfs://path/to/small-files/")
  .values  // Extract just the content
  .repartition(10)  // Reduce number of partitions

Caching Frequently Accessed Data

val frequentlyUsed = sc.textFile("hdfs://path/to/data")
  .filter(_.contains("important"))
  .cache()  // Cache in memory

// Multiple actions on cached data
val count1 = frequentlyUsed.count()
val count2 = frequentlyUsed.filter(_.length > 100).count()

Error Handling and Validation

// Validate file existence before reading
import org.apache.hadoop.fs.{FileSystem, Path}

val fs = FileSystem.get(sc.hadoopConfiguration)
val path = new Path("hdfs://path/to/file")

if (fs.exists(path)) {
  val data = sc.textFile(path.toString)
  // Process data
} else {
  println(s"File not found: $path")
}

// Handle malformed data
val safeData = sc.textFile("data.txt").mapPartitions { lines =>
  lines.flatMap { line =>
    try {
      Some(processLine(line))
    } catch {
      case e: Exception => 
        println(s"Error processing line: $line, Error: ${e.getMessage}")
        None
    }
  }
}

This comprehensive coverage of data sources provides the foundation for reading and writing data in various formats and storage systems with Apache Spark.

Install with Tessl CLI

npx tessl i tessl/maven-apache-spark

docs

caching-persistence.md

core-rdd.md

data-sources.md

graphx.md

index.md

java-api.md

key-value-operations.md

mllib.md

python-api.md

spark-context.md

sql.md

streaming.md

tile.json