or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.mdio-operations.mdkey-value-operations.mdpartitioning-shuffling.mdrdd-operations.mdshared-variables.mdspark-context.mdstorage-persistence.md
tile.json

io-operations.mddocs/

Input/Output Operations

File I/O operations for reading from and writing to various data sources including text files, sequence files, and Hadoop-compatible formats.

Capabilities

Input Operations (SparkContext)

Methods for creating RDDs from various data sources.

// Text file operations
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]
def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]

// Binary file operations  
def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, PortableDataStream)]
def binaryRecords(path: String, recordLength: Int, conf: Configuration = hadoopConfiguration): RDD[Array[Byte]]

// Object file operations
def objectFile[T: ClassTag](path: String, minPartitions: Int = defaultMinPartitions): RDD[T]

// Hadoop InputFormat operations
def hadoopFile[K, V](
  path: String,
  inputFormatClass: Class[_ <: InputFormat[K, V]],
  keyClass: Class[K],
  valueClass: Class[V],
  minPartitions: Int = defaultMinPartitions): RDD[(K, V)]

def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
  path: String,
  fClass: Class[F],
  kClass: Class[K],
  vClass: Class[V],
  conf: Configuration = hadoopConfiguration): RDD[(K, V)]

// Sequence file operations
def sequenceFile[K, V](
  path: String,
  keyClass: Class[K],
  valueClass: Class[V],
  minPartitions: Int = defaultMinPartitions): RDD[(K, V)]

// Hadoop RDD operations
def hadoopRDD[K, V](
  conf: JobConf,
  inputFormatClass: Class[_ <: InputFormat[K, V]],
  keyClass: Class[K],
  valueClass: Class[V],
  minPartitions: Int = defaultMinPartitions): RDD[(K, V)]

def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
  conf: Configuration = hadoopConfiguration,
  fClass: Class[F],
  kClass: Class[K],
  vClass: Class[V]): RDD[(K, V)]

Output Operations (RDD)

Methods for saving RDDs to various output formats.

// Basic output operations
def saveAsTextFile(path: String): Unit
def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit
def saveAsObjectFile(path: String): Unit

// Hadoop output operations (available on pair RDDs)
def saveAsHadoopFile[F <: OutputFormat[K, V]](
  path: String,
  keyClass: Class[K],
  valueClass: Class[V],
  outputFormatClass: Class[F],
  codec: Option[Class[_ <: CompressionCodec]] = None): Unit

def saveAsHadoopFile[F <: OutputFormat[K, V]](
  path: String,
  keyClass: Class[K],
  valueClass: Class[V],
  outputFormatClass: Class[F],
  conf: JobConf,
  codec: Option[Class[_ <: CompressionCodec]] = None): Unit

def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](
  path: String,
  keyClass: Class[K],
  valueClass: Class[V],
  outputFormatClass: Class[F],
  conf: Configuration = self.context.hadoopConfiguration): Unit

// Sequence file output
def saveAsSequenceFile(path: String, codec: Option[Class[_ <: CompressionCodec]] = None): Unit

Usage Examples:

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.hadoop.io.compress.GzipCodec

val sc = new SparkContext(new SparkConf().setAppName("IO Examples").setMaster("local[*]"))

// Text file input
val textLines = sc.textFile("hdfs://input/data.txt")
val multipleFiles = sc.textFile("hdfs://input/*.txt") // Wildcard support
val localFile = sc.textFile("file:///local/path/data.txt") // Local filesystem

// Whole text files (useful for small files)
val wholeFiles = sc.wholeTextFiles("hdfs://input/small-files/")
// Returns RDD[(filename, content)]

// Binary files
val binaryData = sc.binaryFiles("hdfs://input/images/")
// Returns RDD[(filename, PortableDataStream)]

// Process binary data
val imageSizes = binaryData.map { case (filename, stream) =>
  val bytes = stream.toArray()
  (filename, bytes.length)
}

// Object files (for Spark-serialized objects)
val numbers = sc.parallelize(1 to 1000)
numbers.saveAsObjectFile("hdfs://output/numbers")
val loadedNumbers = sc.objectFile[Int]("hdfs://output/numbers")

// Sequence files
val keyValueData = sc.parallelize(Array(("key1", "value1"), ("key2", "value2")))
keyValueData.saveAsSequenceFile("hdfs://output/sequence")
val loadedSequence = sc.sequenceFile[String, String]("hdfs://output/sequence")

// Hadoop InputFormat
val hadoopData = sc.hadoopFile[LongWritable, Text, TextInputFormat](
  "hdfs://input/hadoop-format",
  classOf[LongWritable],
  classOf[Text],
  classOf[TextInputFormat]
).map { case (key, value) => (key.get(), value.toString) }

// Text output with compression
textLines
  .filter(_.nonEmpty)
  .saveAsTextFile("hdfs://output/filtered", classOf[GzipCodec])

Specialized Input Formats

Built-in input formats for specific data types.

/**
 * Input format for reading whole text files
 */
class WholeTextFileInputFormat extends FileInputFormat[String, String]

/**
 * Input format for reading binary files as PortableDataStream
 */
class StreamInputFormat extends FileInputFormat[String, PortableDataStream]

/**
 * Input format for fixed-length binary records
 */
class FixedLengthBinaryInputFormat extends FileInputFormat[LongWritable, BytesWritable] {
  def setRecordLength(conf: Configuration, recordLength: Int): Unit
}

// PortableDataStream for binary data handling
class PortableDataStream(
  val isDirectory: Boolean,
  val path: String,
  val length: Long,
  val modificationTime: Long) extends Serializable {
  
  def open(): DataInputStream
  def toArray(): Array[Byte]
}

Advanced I/O Examples:

import org.apache.spark.input.{WholeTextFileInputFormat, FixedLengthBinaryInputFormat}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.{BytesWritable, LongWritable}

// Fixed-length binary records
val conf = new Configuration()
FixedLengthBinaryInputFormat.setRecordLength(conf, 1024) // 1KB records

val binaryRecords = sc.newAPIHadoopFile[LongWritable, BytesWritable, FixedLengthBinaryInputFormat](
  "hdfs://input/binary-data",
  classOf[FixedLengthBinaryInputFormat],
  classOf[LongWritable],
  classOf[BytesWritable],
  conf
).map { case (offset, bytes) =>
  (offset.get(), bytes.getBytes)
}

// Process small text files efficiently
val smallTextFiles = sc.newAPIHadoopFile[String, String, WholeTextFileInputFormat](
  "hdfs://input/logs/",
  classOf[WholeTextFileInputFormat],
  classOf[String],
  classOf[String]
).map { case (filename, content) =>
  val lines = content.split("\n")
  val errors = lines.count(_.contains("ERROR"))
  (filename, errors)
}

Database I/O

JDBC support for reading from relational databases.

/**
 * RDD for reading data from JDBC sources
 */
class JdbcRDD[T: ClassTag](
  sc: SparkContext,
  getConnection: () => Connection,
  sql: String,
  lowerBound: Long,
  upperBound: Long,
  numPartitions: Int,
  mapRow: ResultSet => T) extends RDD[T]

// Constructor
def JdbcRDD[T: ClassTag](
  sc: SparkContext,
  getConnection: () => Connection,
  sql: String,
  lowerBound: Long,
  upperBound: Long,
  numPartitions: Int)(mapRow: ResultSet => T): JdbcRDD[T]

Database Usage Examples:

import org.apache.spark.rdd.JdbcRDD
import java.sql.{Connection, DriverManager, ResultSet}

// Database connection function
def createConnection(): Connection = {
  Class.forName("com.mysql.jdbc.Driver")
  DriverManager.getConnection(
    "jdbc:mysql://localhost:3306/mydb",
    "username",
    "password"
  )
}

// Create JDBC RDD
val jdbcRDD = new JdbcRDD(
  sc,
  createConnection,
  "SELECT id, name, age FROM users WHERE id >= ? AND id <= ?",
  lowerBound = 1,
  upperBound = 1000000,
  numPartitions = 4,
  mapRow = { resultSet =>
    val id = resultSet.getLong("id")
    val name = resultSet.getString("name") 
    val age = resultSet.getInt("age")
    (id, name, age)
  }
)

// Process database data
val adultUsers = jdbcRDD.filter(_._3 >= 18)
val userCount = adultUsers.count()

Advanced I/O Patterns

Multi-Source Data Loading

// Load and combine data from multiple sources
def loadMultiSourceData(sc: SparkContext): RDD[(String, Map[String, Any])] = {
  // Text logs
  val logs = sc.textFile("hdfs://logs/*.log")
    .map(parseLegLine)
    .map(record => (record.id, Map("type" -> "log", "data" -> record)))
  
  // CSV files  
  val csvData = sc.textFile("hdfs://csv/*.csv")
    .map(_.split(","))
    .filter(_.length >= 3)
    .map(fields => (fields(0), Map("type" -> "csv", "data" -> fields)))
  
  // Binary data
  val binaryData = sc.binaryFiles("hdfs://binary/*")
    .map { case (filename, stream) =>
      val id = extractIdFromFilename(filename)
      val data = processBinaryStream(stream)
      (id, Map("type" -> "binary", "data" -> data))
    }
  
  // Combine all sources
  logs.union(csvData).union(binaryData)
}

// Usage
val combinedData = loadMultiSourceData(sc)
val groupedByType = combinedData.groupBy(_._2("type").toString)

Incremental Data Processing

import java.text.SimpleDateFormat
import java.util.Date

// Process data incrementally based on modification time
def processIncrementalData(sc: SparkContext, lastProcessedTime: Long): RDD[ProcessedRecord] = {
  val files = sc.wholeTextFiles("hdfs://incremental-data/*")
  
  // Filter files modified after last processed time
  val newFiles = files.filter { case (filename, content) =>
    val modTime = getFileModificationTime(filename)
    modTime > lastProcessedTime
  }
  
  // Process new files
  val processedData = newFiles.flatMap { case (filename, content) =>
    content.split("\n")
      .filter(_.nonEmpty)
      .map(parseRecord)
      .filter(_.isValid)
  }
  
  processedData
}

// Checkpoint processed time
def updateProcessedTime(): Long = {
  val currentTime = System.currentTimeMillis()
  // Save to persistent storage (HDFS, database, etc.)
  saveProcessedTime(currentTime)
  currentTime
}

Streaming-Style File Processing

// Process files as they arrive (simulation of streaming)
def processFilesAsStream(sc: SparkContext, inputDir: String, outputDir: String): Unit = {
  var processedFiles = Set.empty[String]
  
  while (true) {
    // List current files
    val currentFiles = listHDFSFiles(inputDir)
    val newFiles = currentFiles -- processedFiles
    
    if (newFiles.nonEmpty) {
      println(s"Processing ${newFiles.size} new files")
      
      // Process new files
      val newData = sc.textFile(newFiles.mkString(","))
      val processed = newData
        .filter(_.nonEmpty)
        .map(processLine)
        .filter(_.isSuccess)
      
      // Save results with timestamp
      val timestamp = new SimpleDateFormat("yyyyMMdd_HHmmss").format(new Date())
      processed.saveAsTextFile(s"$outputDir/batch_$timestamp")
      
      // Update processed files set
      processedFiles ++= newFiles
    }
    
    // Wait before checking again
    Thread.sleep(30000) // 30 seconds
  }
}

Data Format Conversion Pipeline

// Convert between different data formats
class DataFormatConverter(sc: SparkContext) {
  
  def csvToParquet(inputPath: String, outputPath: String): Unit = {
    val csvData = sc.textFile(inputPath)
      .map(_.split(","))
      .filter(_.length >= 3)
      .map(fields => (fields(0), fields(1), fields(2).toDouble))
    
    // Would typically use Spark SQL for Parquet, but showing concept
    csvData.saveAsSequenceFile(outputPath + "_sequence")
  }
  
  def textToAvro(inputPath: String, outputPath: String): Unit = {
    val textData = sc.textFile(inputPath)
      .map(parseTextRecord)
      .filter(_.isValid)
    
    // Convert to Avro format (simplified)
    val avroData = textData.map(recordToAvroBytes)
    avroData.saveAsObjectFile(outputPath)
  }
  
  def jsonToSequenceFile(inputPath: String, outputPath: String): Unit = {
    val jsonData = sc.textFile(inputPath)
      .map(parseJSON)
      .filter(_.isDefined)
      .map(_.get)
      .map(json => (json.getString("id"), json.toString))
    
    jsonData.saveAsSequenceFile(outputPath)
  }
}

// Usage
val converter = new DataFormatConverter(sc)
converter.csvToParquet("hdfs://input/data.csv", "hdfs://output/data.parquet")

Custom OutputFormat Example

import org.apache.hadoop.mapred.{OutputFormat, RecordWriter, JobConf}
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.util.Progressable

// Custom output format for special requirements
class CustomOutputFormat extends OutputFormat[String, String] {
  
  override def getRecordWriter(
    fs: FileSystem,
    job: JobConf,
    name: String,
    progress: Progressable): RecordWriter[String, String] = {
    
    new CustomRecordWriter(fs, job, name)
  }
  
  override def checkOutputSpecs(fs: FileSystem, job: JobConf): Unit = {
    // Validation logic
  }
}

class CustomRecordWriter(fs: FileSystem, job: JobConf, name: String) 
  extends RecordWriter[String, String] {
  
  private val outputStream = fs.create(new Path(name))
  
  override def write(key: String, value: String): Unit = {
    val record = s"$key|$value\n"
    outputStream.writeBytes(record)
  }
  
  override def close(reporter: org.apache.hadoop.mapred.Reporter): Unit = {
    outputStream.close()
  }
}

// Usage
val customData = sc.parallelize(Array(("key1", "value1"), ("key2", "value2")))
customData.saveAsHadoopFile[CustomOutputFormat](
  "hdfs://output/custom",
  classOf[String],
  classOf[String],
  classOf[CustomOutputFormat]
)

Performance Optimization

Efficient File Reading

// Optimize partition count for file reading
def optimizeFileReading(sc: SparkContext, path: String): RDD[String] = {
  // Get file size information
  val fs = FileSystem.get(sc.hadoopConfiguration)
  val fileStatus = fs.listStatus(new Path(path))
  val totalSize = fileStatus.map(_.getLen).sum
  
  // Calculate optimal partitions (64MB per partition)
  val blockSize = 64 * 1024 * 1024 // 64MB
  val optimalPartitions = math.max(1, totalSize / blockSize).toInt
  
  sc.textFile(path, minPartitions = optimalPartitions)
}

// Coalesce small files
def coalescedOutput[T](rdd: RDD[T], outputPath: String, targetPartitions: Int): Unit = {
  val currentPartitions = rdd.getNumPartitions
  
  if (currentPartitions > targetPartitions) {
    rdd.coalesce(targetPartitions).saveAsTextFile(outputPath)
  } else {
    rdd.saveAsTextFile(outputPath)
  }
}

Compression Strategy

import org.apache.hadoop.io.compress.{GzipCodec, SnappyCodec, LzopCodec}

// Choose compression based on data characteristics
def saveWithOptimalCompression[T](rdd: RDD[T], outputPath: String, dataType: String): Unit = {
  val codec = dataType match {
    case "logs" => classOf[GzipCodec] // High compression for archival
    case "intermediate" => classOf[SnappyCodec] // Fast compression for temp data
    case "streaming" => classOf[LzopCodec] // Splittable compression
    case _ => classOf[GzipCodec] // Default
  }
  
  rdd.saveAsTextFile(outputPath, codec)
}

// Monitor compression ratios
def analyzeCompressionRatio(originalPath: String, compressedPath: String): Double = {
  val fs = FileSystem.get(sc.hadoopConfiguration)
  
  val originalSize = fs.listStatus(new Path(originalPath))
    .map(_.getLen).sum
  val compressedSize = fs.listStatus(new Path(compressedPath))
    .map(_.getLen).sum
  
  val ratio = compressedSize.toDouble / originalSize
  println(s"Compression ratio: ${(1 - ratio) * 100}%")
  ratio
}

Best Practices

File Organization

// Organize output by date partitions
def saveWithDatePartitioning[T](rdd: RDD[T], basePath: String): Unit = {
  val today = new SimpleDateFormat("yyyy/MM/dd").format(new Date())
  val outputPath = s"$basePath/$today"
  
  rdd.saveAsTextFile(outputPath)
}

// Partition output by key ranges
def savePartitionedByKey(rdd: RDD[(String, String)], outputPath: String): Unit = {
  rdd.partitionBy(new HashPartitioner(10))
    .mapPartitionsWithIndex { (index, partition) =>
      partition.map(record => s"partition_$index: $record")
    }
    .saveAsTextFile(outputPath)
}

Error Handling in I/O

// Robust file processing with error handling
def robustFileProcessing(sc: SparkContext, inputPath: String): RDD[ProcessedRecord] = {
  sc.textFile(inputPath)
    .mapPartitionsWithIndex { (partitionId, lines) =>
      lines.zipWithIndex.flatMap { case (line, lineNumber) =>
        try {
          Some(parseRecord(line))
        } catch {
          case e: Exception =>
            logError(s"Partition $partitionId, Line $lineNumber: ${e.getMessage}")
            None
        }
      }
    }
    .filter(_.isDefined)
    .map(_.get)
}

// Validate output before final save
def validateAndSave[T](rdd: RDD[T], outputPath: String, validator: T => Boolean): Unit = {
  val validatedRDD = rdd.filter(validator)
  val originalCount = rdd.count()
  val validCount = validatedRDD.count()
  
  if (validCount < originalCount * 0.9) { // Less than 90% valid
    throw new RuntimeException(s"Too many invalid records: $validCount/$originalCount valid")
  }
  
  validatedRDD.saveAsTextFile(outputPath)
  println(s"Saved $validCount valid records out of $originalCount total")
}