or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

context-config.mdindex.mdjava-api.mdrdd-operations.mdresource-management.mdserialization.mdshared-variables.mdstorage-caching.mdtask-context.md
tile.json

context-config.mddocs/

Context and Configuration

SparkContext

The main entry point for Spark functionality. SparkContext represents the connection to a Spark cluster and is used to create RDDs, accumulators, and broadcast variables.

class SparkContext(config: SparkConf) {
  // RDD Creation
  def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T]
  def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]
  def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]
  
  // Hadoop Integration
  def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
    path: String,
    fClass: Class[F],
    kClass: Class[K],
    vClass: Class[V],
    conf: Configuration = hadoopConfiguration
  ): RDD[(K, V)]
  
  def hadoopFile[K, V](
    path: String,
    inputFormatClass: Class[_ <: InputFormat[K, V]],
    keyClass: Class[K],
    valueClass: Class[V],
    minPartitions: Int = defaultMinPartitions
  ): RDD[(K, V)]
  
  // Shared Variables  
  def broadcast[T: ClassTag](value: T): Broadcast[T]
  def longAccumulator(): LongAccumulator
  def longAccumulator(name: String): LongAccumulator
  def doubleAccumulator(): DoubleAccumulator
  def doubleAccumulator(name: String): DoubleAccumulator
  def collectionAccumulator[T](): CollectionAccumulator[T]
  def collectionAccumulator[T](name: String): CollectionAccumulator[T]
  
  // Configuration and Properties
  def getConf: SparkConf
  def master: String
  def appName: String
  def applicationId: String
  def defaultParallelism: Int
  val startTime: Long
  def version: String
  
  // File and Jar Management
  def addFile(path: String): Unit
  def addFile(path: String, recursive: Boolean): Unit
  def addJar(path: String): Unit
  
  // Job Management
  def setJobGroup(groupId: String, description: String, interruptOnCancel: Boolean = false): Unit
  def clearJobGroup(): Unit
  def cancelJobGroup(groupId: String): Unit
  
  // Additional RDD Creation
  def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, PortableDataStream)]
  def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T]
  def union[T: ClassTag](first: RDD[T], rest: RDD[T]*): RDD[T]
  
  // Lifecycle Management
  def stop(): Unit
  def statusTracker: SparkStatusTracker
}

Usage Examples

Creating a SparkContext:

import org.apache.spark.{SparkContext, SparkConf}

val conf = new SparkConf()
  .setAppName("My Application")
  .setMaster("local[4]") // Use 4 local threads

val sc = new SparkContext(conf)

// Use SparkContext...

sc.stop() // Always stop when done

Reading data:

// From local collection
val numbers = sc.parallelize(1 to 1000, 8) // 8 partitions

// From text file  
val lines = sc.textFile("hdfs://path/to/file.txt")

// Multiple text files as (filename, content) pairs
val files = sc.wholeTextFiles("hdfs://path/to/directory/")

SparkConf

Configuration object for Spark applications. Supports method chaining for easy configuration setup.

class SparkConf(loadDefaults: Boolean = true) {
  // Core Configuration
  def set(key: String, value: String): SparkConf
  def setMaster(master: String): SparkConf
  def setAppName(name: String): SparkConf
  def setJars(jars: Seq[String]): SparkConf
  def setExecutorEnv(variables: Seq[(String, String)]): SparkConf
  def setExecutorEnv(variable: String, value: String): SparkConf
  
  // Retrieval
  def get(key: String): String
  def get(key: String, defaultValue: String): String
  def getAll: Array[(String, String)]
  def contains(key: String): Boolean
  def getOption(key: String): Option[String]
  
  // Spark-specific Settings
  def setSparkHome(home: String) 
  def setExecutorMemory(mem: String): SparkConf
  def setAll(settings: Traversable[(String, String)]): SparkConf
  
  // Management
  def clone(): SparkConf
  def remove(key: String): SparkConf
}

Configuration Examples

Basic configuration:

val conf = new SparkConf()
  .setAppName("Data Processing Job")
  .setMaster("spark://master:7077")
  .set("spark.executor.memory", "4g")
  .set("spark.executor.cores", "4")
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

Loading from properties file:

// Loads from spark-defaults.conf and system properties
val conf = new SparkConf() // loadDefaults = true by default
  .setAppName("My App")

Resource configuration:

val conf = new SparkConf()
  .setAppName("Resource Heavy Job")
  .set("spark.executor.memory", "8g")
  .set("spark.executor.cores", "8")
  .set("spark.executor.instances", "10")
  .set("spark.sql.adaptive.enabled", "true")
  .set("spark.sql.shuffle.partitions", "400")

Common Configuration Properties

Execution Properties

  • spark.master - Master URL (local[*], spark://host:port, yarn, etc.)
  • spark.app.name - Application name
  • spark.executor.memory - Executor memory (e.g., "4g", "2048m")
  • spark.executor.cores - CPU cores per executor
  • spark.executor.instances - Number of executors (for YARN/K8s)
  • spark.default.parallelism - Default number of partitions

Serialization Properties

  • spark.serializer - Serializer class (Java or Kryo)
  • spark.kryo.registrator - Kryo registrator class
  • spark.kryo.classesToRegister - Classes to register with Kryo

Storage Properties

  • spark.storage.level - Default storage level for RDDs
  • spark.storage.memoryFraction - Fraction of JVM heap for storage
  • spark.storage.safetyFraction - Safety fraction for storage memory

Shuffle Properties

  • spark.shuffle.service.enabled - Enable external shuffle service
  • spark.shuffle.compress - Compress shuffle output
  • spark.shuffle.spill.compress - Compress spilled data