CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-core-2-11

Apache Spark Core - The foundational component of Apache Spark providing distributed computing capabilities including RDDs, transformations, actions, and cluster management.

Pending
Overview
Eval results
Files

context-configuration.mddocs/

Context and Configuration

This module provides the primary entry points for creating and configuring Spark applications. The SparkContext serves as the main coordination point for distributed computation, while SparkConf handles all configuration parameters.

SparkConf

SparkConf manages all configuration parameters for a Spark application.

class SparkConf(loadDefaults: Boolean = true) {
  def set(key: String, value: String): SparkConf
  def setMaster(master: String): SparkConf
  def setAppName(name: String): SparkConf
  def setJars(jars: Seq[String]): SparkConf
  def setExecutorEnv(variable: String, value: String): SparkConf
  def setExecutorEnv(variables: Seq[(String, String)]): SparkConf
  def setExecutorEnv(variables: Array[(String, String)]): SparkConf
  def get(key: String): String
  def get(key: String, defaultValue: String): String
  def getOption(key: String): Option[String]
  def getAll: Array[(String, String)]
  def remove(key: String): SparkConf
  def contains(key: String): Boolean
  def clone(): SparkConf
  def setSparkHome(home: String): SparkConf
  def setIfMissing(key: String, value: String): SparkConf
}

Usage Examples

import org.apache.spark.SparkConf

// Basic configuration
val conf = new SparkConf()
  .setAppName("My Spark Application")
  .setMaster("local[4]")
  .set("spark.sql.adaptive.enabled", "true")
  .set("spark.sql.adaptive.coalescePartitions.enabled", "true")

// Configure executor settings
conf.set("spark.executor.memory", "4g")
    .set("spark.executor.cores", "2")
    .set("spark.executor.instances", "10")

// Set environment variables for executors
conf.setExecutorEnv("JAVA_HOME", "/usr/lib/jvm/java-8-openjdk")
    .setExecutorEnv(Seq(
      ("SPARK_LOCAL_DIRS", "/tmp/spark"),
      ("SPARK_WORKER_DIR", "/tmp/spark-worker")
    ))

// Conditional configuration
if (!conf.contains("spark.master")) {
  conf.setMaster("local[*]")
}

// Clone configuration for different contexts
val testConf = conf.clone()
  .setAppName("Test Application")
  .set("spark.sql.execution.arrow.pyspark.enabled", "false")

SparkContext

SparkContext is the main entry point for Spark functionality. Only one SparkContext should be active per JVM.

class SparkContext(config: SparkConf) {
  // RDD Creation
  def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T]
  def makeRDD[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T]
  def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T]
  def emptyRDD[T: ClassTag]: RDD[T]
  def range(start: Long, end: Long, step: Long = 1, numSlices: Int = defaultParallelism): RDD[Long]
  
  // File Input
  def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]
  def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]
  def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, PortableDataStream)]
  def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V], minPartitions: Int = defaultMinPartitions): RDD[(K, V)]
  def hadoopRDD[K, V](conf: JobConf, inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], minPartitions: Int = defaultMinPartitions): RDD[(K, V)]
  def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](conf: Configuration = hadoopConfiguration, fClass: Class[F], kClass: Class[K], vClass: Class[V]): RDD[(K, V)]
  
  // Broadcast and Accumulators
  def broadcast[T: ClassTag](value: T): Broadcast[T]
  def longAccumulator(): LongAccumulator
  def longAccumulator(name: String): LongAccumulator
  def doubleAccumulator(): DoubleAccumulator
  def doubleAccumulator(name: String): DoubleAccumulator
  def collectionAccumulator[T](): CollectionAccumulator[T]
  def collectionAccumulator[T](name: String): CollectionAccumulator[T]
  
  // Application Control
  def stop(): Unit
  def addFile(path: String): Unit
  def addFile(path: String, recursive: Boolean): Unit
  def addJar(path: String): Unit
  def clearFiles(): Unit
  def clearJars(): Unit
  
  // Configuration and Environment
  def setLogLevel(logLevel: String): Unit
  def setLocalProperty(key: String, value: String): Unit
  def getLocalProperty(key: String): String
  def setJobGroup(groupId: String, description: String, interruptOnCancel: Boolean = false): Unit
  def clearJobGroup(): Unit
  def setJobDescription(value: String): Unit
  
  // Dynamic Resource Allocation
  def requestTotalExecutors(requestedTotal: Int, localityAwareTasks: Int = 0, hostToLocalTaskCount: Map[String, Int] = Map.empty): Boolean
  def requestExecutors(numAdditionalExecutors: Int): Boolean
  def killExecutors(executorIds: Seq[String]): Boolean
  def killExecutor(executorId: String): Boolean
  
  // Properties
  def master: String
  def appName: String
  def jars: Seq[String]
  def files: Seq[String]
  def startTime: Long
  def version: String
  def defaultParallelism: Int
  def defaultMinPartitions: Int
  def conf: SparkConf
  def statusTracker: SparkStatusTracker
}

Usage Examples

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.storage.StorageLevel

// Create configuration and context
val conf = new SparkConf()
  .setAppName("Data Processing Pipeline")
  .setMaster("spark://master:7077")

val sc = new SparkContext(conf)

// Create RDDs from various sources
val numbers = sc.parallelize(1 to 1000000, numSlices = 100)
val lines = sc.textFile("hdfs://data/input.txt", minPartitions = 50)
val keyValueData = sc.sequenceFile[String, Int]("hdfs://data/sequence")

// Set job properties for monitoring
sc.setJobGroup("data-processing", "Main data processing pipeline")
sc.setJobDescription("Processing customer transaction data")

// Add application files and JARs
sc.addFile("hdfs://shared/config.properties")
sc.addJar("s3://libs/custom-transformations.jar")

// Create broadcast variables for lookup tables
val lookupTable = Map("A" -> 1, "B" -> 2, "C" -> 3)
val broadcastLookup = sc.broadcast(lookupTable)

// Create accumulators for metrics
val errorCount = sc.longAccumulator("Processing Errors")
val processingTime = sc.doubleAccumulator("Total Processing Time")

// Dynamic resource management
if (sc.defaultParallelism < 100) {
  sc.requestExecutors(20)
}

// Process data using broadcast and accumulators
val results = lines.map { line =>
  try {
    val startTime = System.currentTimeMillis()
    val processed = processLine(line, broadcastLookup.value)
    processingTime.add(System.currentTimeMillis() - startTime)
    processed
  } catch {
    case e: Exception =>
      errorCount.add(1)
      throw e
  }
}

// Persist intermediate results
results.persist(StorageLevel.MEMORY_AND_DISK_SER)

// Trigger computation and collect metrics
val finalCount = results.count()
println(s"Processed $finalCount records")
println(s"Error count: ${errorCount.value}")
println(s"Average processing time: ${processingTime.value / finalCount}ms")

// Clean up
sc.stop()

Package Constants

Access to Spark version and build information.

// Available in org.apache.spark package object
val SPARK_VERSION: String
val SPARK_VERSION_SHORT: String
val SPARK_BRANCH: String
val SPARK_REVISION: String
val SPARK_BUILD_USER: String
val SPARK_REPO_URL: String
val SPARK_BUILD_DATE: String

Usage Example

import org.apache.spark._

println(s"Running Spark version: $SPARK_VERSION")
println(s"Built by: $SPARK_BUILD_USER on $SPARK_BUILD_DATE")
println(s"Git revision: $SPARK_REVISION")

Best Practices

Configuration Management

  • Use configuration files or environment variables for deployment-specific settings
  • Set appropriate memory and core allocations based on cluster resources
  • Enable adaptive query execution for SQL workloads
  • Configure serialization (prefer Kryo over Java serialization)

Resource Management

  • Monitor executor resource utilization and adjust allocations
  • Use dynamic resource allocation in multi-tenant environments
  • Set appropriate timeouts for network operations
  • Configure storage levels based on data access patterns

Error Handling

  • Implement proper exception handling in transformations and actions
  • Use accumulator variables to collect error metrics
  • Set up comprehensive logging and monitoring
  • Handle node failures gracefully with appropriate retry policies

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-core-2-11

docs

broadcast-accumulators.md

context-configuration.md

index.md

java-api.md

key-value-operations.md

rdd-operations.md

status-monitoring.md

storage-persistence.md

task-context.md

tile.json