Core Spark 1.x integration component for the Cask Data Application Platform providing runtime services and execution context for CDAP applications
—
Dynamic Scala compilation and interpretation capabilities that enable runtime code generation, interactive development, and flexible application behavior modification within Spark applications running on the CDAP platform.
Scala compiler implementation that provides dynamic compilation capabilities for runtime code generation and flexible application behavior.
/**
* Default compiler for dynamic Scala code compilation in Spark
* Enables runtime compilation of Scala code within CDAP Spark applications
*/
class DefaultSparkCompiler extends SparkCompiler {
/**
* Compiles Scala code and returns the resulting class
* @param code Scala source code to compile
* @return Option containing the compiled class, or None if compilation fails
* @throws CompilationException if compilation errors occur
*/
def compile(code: String): Option[Class[_]]
/**
* Compiles Scala code with a specific class name
* @param className Fully qualified name for the compiled class
* @param code Scala source code to compile
* @return Option containing the compiled class, or None if compilation fails
* @throws CompilationException if compilation errors occur
*/
def compileClass(className: String, code: String): Option[Class[_]]
/**
* Compiles multiple Scala source files
* @param sources Map of class names to source code
* @return Map of class names to compiled classes
* @throws CompilationException if compilation errors occur
*/
def compileClasses(sources: Map[String, String]): Map[String, Class[_]]
/**
* Sets the compiler class path
* @param classpath Array of classpath entries
*/
def setClassPath(classpath: Array[String]): Unit
/**
* Gets the current compiler settings
* @return CompilerSettings containing current configuration
*/
def getSettings: CompilerSettings
/**
* Validates Scala code syntax without compilation
* @param code Scala source code to validate
* @return ValidationResult containing validation status and errors
*/
def validate(code: String): ValidationResult
}Scala interpreter implementation that provides interactive code execution and REPL-like capabilities within Spark applications.
/**
* Default interpreter for dynamic Scala code execution in Spark
* Provides REPL-like capabilities for interactive development
*/
class DefaultSparkInterpreter extends SparkInterpreter {
/**
* Interprets and executes Scala code
* @param code Scala code to interpret and execute
* @throws InterpretationException if interpretation or execution fails
*/
def interpret(code: String): Unit
/**
* Interprets code and returns the result
* @param code Scala expression to evaluate
* @tparam T Expected return type
* @return Result of the expression evaluation
* @throws InterpretationException if interpretation fails
*/
def interpretAndReturn[T](code: String): T
/**
* Binds a variable to the interpreter context
* @param name Variable name
* @param value Variable value
* @tparam T Type of the value
*/
def bind[T](name: String, value: T): Unit
/**
* Binds a variable with explicit type
* @param name Variable name
* @param tpe Type information
* @param value Variable value
*/
def bind(name: String, tpe: String, value: Any): Unit
/**
* Resets the interpreter state
* Clears all bindings and compiled code
*/
def reset(): Unit
/**
* Gets a bound variable value
* @param name Variable name
* @tparam T Expected type
* @return Variable value
* @throws NoSuchElementException if variable not found
*/
def get[T](name: String): T
/**
* Checks if a variable is bound
* @param name Variable name
* @return true if variable exists in interpreter context
*/
def isBound(name: String): Boolean
/**
* Gets all bound variable names
* @return Set of bound variable names
*/
def getBoundNames: Set[String]
}Base interface defining the contract for Spark compiler implementations.
/**
* Interface for Spark compiler implementations
* Defines the contract for dynamic Scala compilation
*/
trait SparkCompiler {
/**
* Compiles Scala source code
* @param code Source code to compile
* @return Option containing compiled class or None if compilation fails
*/
def compile(code: String): Option[Class[_]]
/**
* Compiles code with specific class name
* @param className Target class name
* @param code Source code to compile
* @return Option containing compiled class or None if compilation fails
*/
def compileClass(className: String, code: String): Option[Class[_]]
/**
* Gets compiler configuration settings
* @return Current compiler settings
*/
def getSettings: CompilerSettings
}Base interface defining the contract for Spark interpreter implementations.
/**
* Interface for Spark interpreter implementations
* Defines the contract for interactive Scala code execution
*/
trait SparkInterpreter {
/**
* Interprets and executes Scala code
* @param code Code to interpret
*/
def interpret(code: String): Unit
/**
* Binds a variable to interpreter context
* @param name Variable name
* @param value Variable value
*/
def bind(name: String, value: Any): Unit
/**
* Resets interpreter state
*/
def reset(): Unit
}Dynamic Compilation:
import co.cask.cdap.app.runtime.spark.dynamic.DefaultSparkCompiler
// Create compiler instance
val compiler = new DefaultSparkCompiler()
// Set up classpath
compiler.setClassPath(Array(
"/path/to/spark-core.jar",
"/path/to/cdap-api.jar"
))
// Compile simple class
val code = """
class DynamicTransform {
def transform(input: String): String = {
input.toUpperCase.reverse
}
}
"""
compiler.compile(code) match {
case Some(clazz) =>
// Create instance and use
val instance = clazz.newInstance()
val method = clazz.getMethod("transform", classOf[String])
val result = method.invoke(instance, "hello world")
println(s"Result: $result") // Result: DLROW OLLEH
case None =>
println("Compilation failed")
}
// Compile with specific class name
val transformerCode = """
package com.example
class CustomTransformer(multiplier: Int) {
def process(values: Array[Int]): Array[Int] = {
values.map(_ * multiplier)
}
}
"""
compiler.compileClass("com.example.CustomTransformer", transformerCode) match {
case Some(clazz) =>
val constructor = clazz.getConstructor(classOf[Int])
val transformer = constructor.newInstance(Int.box(2))
// Use transformer...
case None =>
println("Compilation failed")
}Interactive Interpretation:
import co.cask.cdap.app.runtime.spark.dynamic.DefaultSparkInterpreter
// Create interpreter instance
val interpreter = new DefaultSparkInterpreter()
// Bind variables
interpreter.bind("sparkContext", sparkContext)
interpreter.bind("inputData", Array(1, 2, 3, 4, 5))
// Execute code interactively
interpreter.interpret("val doubled = inputData.map(_ * 2)")
interpreter.interpret("println(s\"Doubled: ${doubled.mkString(', ')}\")")
// Get results back
val result = interpreter.interpretAndReturn[Array[Int]]("doubled")
println(s"Result array: ${result.mkString(", ")}")
// Check bindings
if (interpreter.isBound("doubled")) {
val boundValue = interpreter.get[Array[Int]]("doubled")
println(s"Bound value: ${boundValue.mkString(", ")}")
}
// Reset interpreter
interpreter.reset()Spark Integration Example:
import org.apache.spark.SparkContext
import co.cask.cdap.app.runtime.spark.dynamic.{DefaultSparkCompiler, DefaultSparkInterpreter}
// Create Spark RDD processing with dynamic code
def processWithDynamicCode(sc: SparkContext,
data: Array[String],
transformCode: String): Array[String] = {
val compiler = new DefaultSparkCompiler()
// Compile transform function
val wrapperCode = s"""
class DynamicProcessor {
def process(input: String): String = {
$transformCode
}
}
"""
compiler.compile(wrapperCode) match {
case Some(clazz) =>
// Create RDD and apply dynamic transformation
val rdd = sc.parallelize(data)
val transformedRDD = rdd.map { item =>
val processor = clazz.newInstance()
val method = clazz.getMethod("process", classOf[String])
method.invoke(processor, item).asInstanceOf[String]
}
transformedRDD.collect()
case None =>
throw new RuntimeException("Failed to compile transformation code")
}
}
// Usage
val inputData = Array("apple", "banana", "cherry")
val transformCode = "input.toUpperCase + \"_PROCESSED\""
val results = processWithDynamicCode(sparkContext, inputData, transformCode)
// Results: Array("APPLE_PROCESSED", "BANANA_PROCESSED", "CHERRY_PROCESSED")REPL-like Development:
import co.cask.cdap.app.runtime.spark.dynamic.DefaultSparkInterpreter
// Create development environment
class SparkREPL(sparkContext: SparkContext) {
private val interpreter = new DefaultSparkInterpreter()
// Initialize with common imports and bindings
interpreter.interpret("import org.apache.spark.SparkContext")
interpreter.interpret("import org.apache.spark.rdd.RDD")
interpreter.bind("sc", sparkContext)
def execute(code: String): Unit = {
try {
interpreter.interpret(code)
} catch {
case e: Exception => println(s"Error: ${e.getMessage}")
}
}
def bindData(name: String, data: Any): Unit = {
interpreter.bind(name, data)
}
def getResult[T](expression: String): T = {
interpreter.interpretAndReturn[T](expression)
}
}
// Usage
val repl = new SparkREPL(sparkContext)
// Bind data
repl.bindData("numbers", Array(1, 2, 3, 4, 5))
// Execute transformations
repl.execute("val rdd = sc.parallelize(numbers)")
repl.execute("val squares = rdd.map(x => x * x)")
repl.execute("val sum = squares.reduce(_ + _)")
// Get results
val finalSum = repl.getResult[Int]("sum")
println(s"Sum of squares: $finalSum") // Sum of squares: 55/**
* Compiler configuration settings
*/
case class CompilerSettings(
/**
* Classpath entries for compilation
*/
classpath: Array[String],
/**
* Output directory for compiled classes
*/
outputDirectory: String,
/**
* Scala compiler options
*/
options: Map[String, String],
/**
* Maximum compilation time in milliseconds
*/
maxCompilationTime: Long
)
/**
* Result of code validation
*/
case class ValidationResult(
/**
* Whether the code is syntactically valid
*/
isValid: Boolean,
/**
* Compilation errors if any
*/
errors: List[CompilationError],
/**
* Compilation warnings if any
*/
warnings: List[CompilationWarning]
)
/**
* Compilation error information
*/
case class CompilationError(
/**
* Error message
*/
message: String,
/**
* Line number where error occurred
*/
line: Int,
/**
* Column number where error occurred
*/
column: Int,
/**
* Severity level of the error
*/
severity: ErrorSeverity
)
/**
* Compilation warning information
*/
case class CompilationWarning(
/**
* Warning message
*/
message: String,
/**
* Line number where warning occurred
*/
line: Int,
/**
* Column number where warning occurred
*/
column: Int
)
/**
* Error severity levels
*/
sealed trait ErrorSeverity
object ErrorSeverity {
case object Info extends ErrorSeverity
case object Warning extends ErrorSeverity
case object Error extends ErrorSeverity
case object Fatal extends ErrorSeverity
}
/**
* Exception thrown during compilation
*/
class CompilationException(message: String, cause: Throwable = null) extends Exception(message, cause)
/**
* Exception thrown during interpretation
*/
class InterpretationException(message: String, cause: Throwable = null) extends Exception(message, cause)Install with Tessl CLI
npx tessl i tessl/maven-co-cask-cdap--cdap-spark-core