Core application programming interface for the Cask Data Application Platform enabling development of scalable data processing applications on Hadoop ecosystems.
—
Spark programs in CDAP provide distributed data processing capabilities using Apache Spark, supporting both batch and streaming workloads with integrated access to CDAP datasets and services.
@Beta
public interface Spark {
void configure(SparkConfigurer configurer);
}Base interface for Spark programs. Spark programs are executed using Apache Spark's distributed computing framework.
public abstract class AbstractSpark implements Spark {
public abstract void configure(SparkConfigurer configurer);
}Base implementation class for Spark programs providing configuration framework.
public interface SparkConfigurer extends ProgramConfigurer, DatasetConfigurer, PluginConfigurer {
void setMainClass(Class<?> mainClass);
void setMainClassName(String mainClassName);
void setDriverResources(Resources resources);
void setExecutorResources(Resources resources);
void setClientResources(Resources resources);
}Interface for configuring Spark programs including main class specification and resource allocation.
public class SparkSpecification implements ProgramSpecification {
public String getName();
public String getDescription();
public String getClassName();
public String getMainClassName();
public Map<String, String> getProperties();
public Resources getDriverResources();
public Resources getExecutorResources();
public Resources getClientResources();
public Set<String> getDatasets();
}Complete specification of a Spark program including main class and resource requirements.
public interface SparkClientContext extends RuntimeContext, DatasetContext {
Map<String, String> getRuntimeArguments();
Map<String, String> getSparkConf();
PluginContext getPluginContext();
ServiceDiscoverer getServiceDiscoverer();
Metrics getMetrics();
Admin getAdmin();
void localize(String fileName, URI uri);
void localize(String fileName, URI uri, boolean archive);
}Client-side context available to Spark programs providing access to configuration, datasets, and CDAP services.
public class WordCountSpark extends AbstractSpark {
@Override
public void configure(SparkConfigurer configurer) {
configurer.setName("WordCountSpark");
configurer.setDescription("Counts words using Spark");
configurer.setMainClass(WordCountSparkMain.class);
// Configure resources
configurer.setDriverResources(new Resources(1024)); // 1GB for driver
configurer.setExecutorResources(new Resources(2048)); // 2GB for executors
// Use datasets
configurer.useDataset("textFiles");
configurer.useDataset("wordCounts");
}
public static class WordCountSparkMain {
public static void main(String[] args) throws Exception {
SparkClientContext context = new SparkClientContext();
JavaSparkContext jsc = new JavaSparkContext();
// Read input data from CDAP dataset
FileSet textFiles = context.getDataset("textFiles");
KeyValueTable wordCounts = context.getDataset("wordCounts");
// Spark processing
JavaRDD<String> lines = jsc.textFile(textFiles.getBaseLocation().toURI().getPath());
JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
JavaPairRDD<String, Integer> pairs = words.mapToPair(word -> new Tuple2<>(word, 1));
JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);
// Write results back to CDAP dataset
counts.foreach(wordCount -> {
wordCounts.write(wordCount._1(), String.valueOf(wordCount._2()));
});
context.getMetrics().count("words.processed", counts.count());
jsc.close();
}
}
}public class CustomerAnalyticsSpark extends AbstractSpark {
@Override
public void configure(SparkConfigurer configurer) {
configurer.setName("CustomerAnalytics");
configurer.setMainClass(CustomerAnalyticsMain.class);
configurer.useDataset("customers");
configurer.useDataset("analytics");
}
public static class CustomerAnalyticsMain {
public static void main(String[] args) throws Exception {
SparkClientContext context = new SparkClientContext();
JavaSparkContext jsc = new JavaSparkContext();
SQLContext sqlContext = new SQLContext(jsc);
// Access CDAP datasets
ObjectStore<Customer> customers = context.getDataset("customers");
ObjectStore<Analytics> analytics = context.getDataset("analytics");
// Load customer data into Spark
List<Customer> customerList = loadCustomers(customers);
JavaRDD<Customer> customerRDD = jsc.parallelize(customerList);
// Convert to DataFrame for SQL operations
Dataset<Row> customerDF = sqlContext.createDataFrame(customerRDD, Customer.class);
customerDF.createOrReplaceTempView("customers");
// Perform analytics using Spark SQL
Dataset<Row> regionAnalytics = sqlContext.sql(
"SELECT region, COUNT(*) as customerCount, AVG(purchaseAmount) as avgPurchase " +
"FROM customers GROUP BY region"
);
// Save results back to CDAP
regionAnalytics.toJavaRDD().foreach(row -> {
Analytics result = new Analytics(
row.getString(0), // region
row.getLong(1), // customerCount
row.getDouble(2) // avgPurchase
);
analytics.write(result.getRegion(), result);
});
context.getMetrics().gauge("customers.analyzed", customerList.size());
jsc.close();
}
private static List<Customer> loadCustomers(ObjectStore<Customer> store) {
// Load customers from dataset
List<Customer> customers = new ArrayList<>();
// Implementation to read from ObjectStore
return customers;
}
}
}public class RealTimeProcessingSpark extends AbstractSpark {
@Override
public void configure(SparkConfigurer configurer) {
configurer.setName("RealTimeProcessor");
configurer.setMainClass(StreamingMain.class);
configurer.useDataset("streamData");
configurer.useDataset("processedEvents");
// Set properties for streaming
configurer.setProperties(ImmutableMap.of(
"spark.streaming.batchDuration", "10",
"spark.streaming.checkpoint.directory", "/tmp/spark-checkpoint"
));
}
public static class StreamingMain {
public static void main(String[] args) throws Exception {
SparkClientContext context = new SparkClientContext();
JavaStreamingContext jssc = new JavaStreamingContext(
new SparkConf(), Durations.seconds(10));
ObjectStore<Event> processedEvents = context.getDataset("processedEvents");
// Create input stream from CDAP dataset
JavaDStream<String> lines = createInputStream(jssc, context);
// Process streaming data
JavaDStream<Event> events = lines.map(line -> parseEvent(line));
JavaDStream<Event> filteredEvents = events.filter(event -> event.isValid());
// Save processed events to CDAP dataset
filteredEvents.foreach(rdd -> {
rdd.foreach(event -> {
processedEvents.write(event.getId(), event);
});
context.getMetrics().count("events.processed", rdd.count());
});
jssc.start();
jssc.awaitTermination();
}
private static JavaDStream<String> createInputStream(JavaStreamingContext jssc,
SparkClientContext context) {
// Implementation to create stream from CDAP dataset
return null;
}
private static Event parseEvent(String line) {
// Parse event from string
return new Event();
}
}
}public class PluginSparkProgram extends AbstractSpark {
@Override
public void configure(SparkConfigurer configurer) {
configurer.setName("PluginProcessor");
configurer.setMainClass(PluginSparkMain.class);
// Use transformation plugin
configurer.usePlugin("transform", "dataTransform", "transformer",
PluginProperties.builder()
.add("operation", "normalize")
.add("fields", "name,email,phone")
.build());
}
public static class PluginSparkMain {
public static void main(String[] args) throws Exception {
SparkClientContext context = new SparkClientContext();
JavaSparkContext jsc = new JavaSparkContext();
// Get plugin instance
DataTransformer transformer = context.getPluginContext()
.newPluginInstance("transformer");
// Process data using plugin
JavaRDD<Record> inputData = loadInputData(jsc, context);
JavaRDD<Record> transformedData = inputData.map(record ->
transformer.transform(record));
// Save results
saveResults(transformedData, context);
context.getMetrics().count("records.transformed", transformedData.count());
jsc.close();
}
private static JavaRDD<Record> loadInputData(JavaSparkContext jsc,
SparkClientContext context) {
// Load data from CDAP datasets
return null;
}
private static void saveResults(JavaRDD<Record> data, SparkClientContext context) {
// Save to CDAP datasets
}
}
}public class MachineLearningAnalytics extends AbstractSpark {
@Override
public void configure(SparkConfigurer configurer) {
configurer.setName("MLAnalytics");
configurer.setMainClass(MLMain.class);
configurer.useDataset("trainingData");
configurer.useDataset("models");
configurer.useDataset("predictions");
// Allocate more resources for ML workload
configurer.setDriverResources(new Resources(4096, 2)); // 4GB, 2 cores
configurer.setExecutorResources(new Resources(8192, 4)); // 8GB, 4 cores
}
public static class MLMain {
public static void main(String[] args) throws Exception {
SparkClientContext context = new SparkClientContext();
JavaSparkContext jsc = new JavaSparkContext();
SQLContext sqlContext = new SQLContext(jsc);
// Load training data
ObjectStore<TrainingRecord> trainingData = context.getDataset("trainingData");
List<TrainingRecord> records = loadTrainingData(trainingData);
Dataset<Row> training = sqlContext.createDataFrame(records, TrainingRecord.class);
// Prepare features
VectorAssembler assembler = new VectorAssembler()
.setInputCols(new String[]{"feature1", "feature2", "feature3"})
.setOutputCol("features");
Dataset<Row> featuresDF = assembler.transform(training);
// Train model
LogisticRegression lr = new LogisticRegression()
.setFeaturesCol("features")
.setLabelCol("label")
.setMaxIter(100)
.setRegParam(0.01);
LogisticRegressionModel model = lr.fit(featuresDF);
// Save model
ObjectStore<MLModel> models = context.getDataset("models");
models.write("customer_classifier", new MLModel(model.coefficients()));
// Make predictions on test data
Dataset<Row> predictions = model.transform(featuresDF);
// Save predictions
ObjectStore<Prediction> predictionStore = context.getDataset("predictions");
predictions.toJavaRDD().foreach(row -> {
Prediction pred = new Prediction(
row.getString("id"),
row.getDouble("prediction"),
row.getDouble("probability")
);
predictionStore.write(pred.getId(), pred);
});
context.getMetrics().gauge("model.accuracy", calculateAccuracy(predictions));
jsc.close();
}
private static List<TrainingRecord> loadTrainingData(ObjectStore<TrainingRecord> store) {
// Load training data from dataset
return new ArrayList<>();
}
private static double calculateAccuracy(Dataset<Row> predictions) {
// Calculate model accuracy
return 0.95;
}
}
}Spark programs in CDAP provide powerful distributed processing capabilities for large-scale data analytics, machine learning, stream processing, and complex data transformations while maintaining integration with CDAP's dataset and service ecosystem.
Install with Tessl CLI
npx tessl i tessl/maven-co-cask-cdap--cdap-api