CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-co-cask-cdap--cdap-api

Core application programming interface for the Cask Data Application Platform enabling development of scalable data processing applications on Hadoop ecosystems.

Pending
Overview
Eval results
Files

spark-programs.mddocs/

Spark Programs

Spark programs in CDAP provide distributed data processing capabilities using Apache Spark, supporting both batch and streaming workloads with integrated access to CDAP datasets and services.

Core Spark Interfaces

Spark

@Beta
public interface Spark {
    void configure(SparkConfigurer configurer);
}

Base interface for Spark programs. Spark programs are executed using Apache Spark's distributed computing framework.

AbstractSpark

public abstract class AbstractSpark implements Spark {
    public abstract void configure(SparkConfigurer configurer);
}

Base implementation class for Spark programs providing configuration framework.

Spark Configuration

SparkConfigurer

public interface SparkConfigurer extends ProgramConfigurer, DatasetConfigurer, PluginConfigurer {
    void setMainClass(Class<?> mainClass);
    void setMainClassName(String mainClassName);
    void setDriverResources(Resources resources);
    void setExecutorResources(Resources resources);
    void setClientResources(Resources resources);
}

Interface for configuring Spark programs including main class specification and resource allocation.

SparkSpecification

public class SparkSpecification implements ProgramSpecification {
    public String getName();
    public String getDescription();
    public String getClassName();
    public String getMainClassName();
    public Map<String, String> getProperties();
    public Resources getDriverResources();
    public Resources getExecutorResources();
    public Resources getClientResources();
    public Set<String> getDatasets();
}

Complete specification of a Spark program including main class and resource requirements.

Spark Context

SparkClientContext

public interface SparkClientContext extends RuntimeContext, DatasetContext {
    Map<String, String> getRuntimeArguments();
    Map<String, String> getSparkConf();
    
    PluginContext getPluginContext();
    ServiceDiscoverer getServiceDiscoverer();
    Metrics getMetrics();
    Admin getAdmin();
    
    void localize(String fileName, URI uri);
    void localize(String fileName, URI uri, boolean archive);
}

Client-side context available to Spark programs providing access to configuration, datasets, and CDAP services.

Usage Examples

Basic Spark Program

public class WordCountSpark extends AbstractSpark {
    
    @Override
    public void configure(SparkConfigurer configurer) {
        configurer.setName("WordCountSpark");
        configurer.setDescription("Counts words using Spark");
        configurer.setMainClass(WordCountSparkMain.class);
        
        // Configure resources
        configurer.setDriverResources(new Resources(1024));     // 1GB for driver
        configurer.setExecutorResources(new Resources(2048));   // 2GB for executors
        
        // Use datasets
        configurer.useDataset("textFiles");
        configurer.useDataset("wordCounts");
    }
    
    public static class WordCountSparkMain {
        public static void main(String[] args) throws Exception {
            SparkClientContext context = new SparkClientContext();
            JavaSparkContext jsc = new JavaSparkContext();
            
            // Read input data from CDAP dataset
            FileSet textFiles = context.getDataset("textFiles");
            KeyValueTable wordCounts = context.getDataset("wordCounts");
            
            // Spark processing
            JavaRDD<String> lines = jsc.textFile(textFiles.getBaseLocation().toURI().getPath());
            JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
            JavaPairRDD<String, Integer> pairs = words.mapToPair(word -> new Tuple2<>(word, 1));
            JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);
            
            // Write results back to CDAP dataset
            counts.foreach(wordCount -> {
                wordCounts.write(wordCount._1(), String.valueOf(wordCount._2()));
            });
            
            context.getMetrics().count("words.processed", counts.count());
            jsc.close();
        }
    }
}

Spark with Dataset Integration

public class CustomerAnalyticsSpark extends AbstractSpark {
    
    @Override
    public void configure(SparkConfigurer configurer) {
        configurer.setName("CustomerAnalytics");
        configurer.setMainClass(CustomerAnalyticsMain.class);
        configurer.useDataset("customers");
        configurer.useDataset("analytics");
    }
    
    public static class CustomerAnalyticsMain {
        public static void main(String[] args) throws Exception {
            SparkClientContext context = new SparkClientContext();
            JavaSparkContext jsc = new JavaSparkContext();
            SQLContext sqlContext = new SQLContext(jsc);
            
            // Access CDAP datasets
            ObjectStore<Customer> customers = context.getDataset("customers");
            ObjectStore<Analytics> analytics = context.getDataset("analytics");
            
            // Load customer data into Spark
            List<Customer> customerList = loadCustomers(customers);
            JavaRDD<Customer> customerRDD = jsc.parallelize(customerList);
            
            // Convert to DataFrame for SQL operations
            Dataset<Row> customerDF = sqlContext.createDataFrame(customerRDD, Customer.class);
            customerDF.createOrReplaceTempView("customers");
            
            // Perform analytics using Spark SQL
            Dataset<Row> regionAnalytics = sqlContext.sql(
                "SELECT region, COUNT(*) as customerCount, AVG(purchaseAmount) as avgPurchase " +
                "FROM customers GROUP BY region"
            );
            
            // Save results back to CDAP
            regionAnalytics.toJavaRDD().foreach(row -> {
                Analytics result = new Analytics(
                    row.getString(0),  // region
                    row.getLong(1),    // customerCount
                    row.getDouble(2)   // avgPurchase
                );
                analytics.write(result.getRegion(), result);
            });
            
            context.getMetrics().gauge("customers.analyzed", customerList.size());
            jsc.close();
        }
        
        private static List<Customer> loadCustomers(ObjectStore<Customer> store) {
            // Load customers from dataset
            List<Customer> customers = new ArrayList<>();
            // Implementation to read from ObjectStore
            return customers;
        }
    }
}

Spark Streaming Program

public class RealTimeProcessingSpark extends AbstractSpark {
    
    @Override
    public void configure(SparkConfigurer configurer) {
        configurer.setName("RealTimeProcessor");
        configurer.setMainClass(StreamingMain.class);
        configurer.useDataset("streamData");
        configurer.useDataset("processedEvents");
        
        // Set properties for streaming
        configurer.setProperties(ImmutableMap.of(
            "spark.streaming.batchDuration", "10",
            "spark.streaming.checkpoint.directory", "/tmp/spark-checkpoint"
        ));
    }
    
    public static class StreamingMain {
        public static void main(String[] args) throws Exception {
            SparkClientContext context = new SparkClientContext();
            JavaStreamingContext jssc = new JavaStreamingContext(
                new SparkConf(), Durations.seconds(10));
            
            ObjectStore<Event> processedEvents = context.getDataset("processedEvents");
            
            // Create input stream from CDAP dataset
            JavaDStream<String> lines = createInputStream(jssc, context);
            
            // Process streaming data
            JavaDStream<Event> events = lines.map(line -> parseEvent(line));
            JavaDStream<Event> filteredEvents = events.filter(event -> event.isValid());
            
            // Save processed events to CDAP dataset
            filteredEvents.foreach(rdd -> {
                rdd.foreach(event -> {
                    processedEvents.write(event.getId(), event);
                });
                
                context.getMetrics().count("events.processed", rdd.count());
            });
            
            jssc.start();
            jssc.awaitTermination();
        }
        
        private static JavaDStream<String> createInputStream(JavaStreamingContext jssc, 
                                                           SparkClientContext context) {
            // Implementation to create stream from CDAP dataset
            return null;
        }
        
        private static Event parseEvent(String line) {
            // Parse event from string
            return new Event();
        }
    }
}

Spark with Plugin Integration

public class PluginSparkProgram extends AbstractSpark {
    
    @Override
    public void configure(SparkConfigurer configurer) {
        configurer.setName("PluginProcessor");
        configurer.setMainClass(PluginSparkMain.class);
        
        // Use transformation plugin
        configurer.usePlugin("transform", "dataTransform", "transformer",
                            PluginProperties.builder()
                                .add("operation", "normalize")
                                .add("fields", "name,email,phone")
                                .build());
    }
    
    public static class PluginSparkMain {
        public static void main(String[] args) throws Exception {
            SparkClientContext context = new SparkClientContext();
            JavaSparkContext jsc = new JavaSparkContext();
            
            // Get plugin instance
            DataTransformer transformer = context.getPluginContext()
                                                .newPluginInstance("transformer");
            
            // Process data using plugin
            JavaRDD<Record> inputData = loadInputData(jsc, context);
            JavaRDD<Record> transformedData = inputData.map(record -> 
                transformer.transform(record));
            
            // Save results
            saveResults(transformedData, context);
            
            context.getMetrics().count("records.transformed", transformedData.count());
            jsc.close();
        }
        
        private static JavaRDD<Record> loadInputData(JavaSparkContext jsc, 
                                                   SparkClientContext context) {
            // Load data from CDAP datasets
            return null;
        }
        
        private static void saveResults(JavaRDD<Record> data, SparkClientContext context) {
            // Save to CDAP datasets
        }
    }
}

Spark ML Program

public class MachineLearningAnalytics extends AbstractSpark {
    
    @Override
    public void configure(SparkConfigurer configurer) {
        configurer.setName("MLAnalytics");
        configurer.setMainClass(MLMain.class);
        configurer.useDataset("trainingData");
        configurer.useDataset("models");
        configurer.useDataset("predictions");
        
        // Allocate more resources for ML workload
        configurer.setDriverResources(new Resources(4096, 2));    // 4GB, 2 cores
        configurer.setExecutorResources(new Resources(8192, 4));  // 8GB, 4 cores
    }
    
    public static class MLMain {
        public static void main(String[] args) throws Exception {
            SparkClientContext context = new SparkClientContext();
            JavaSparkContext jsc = new JavaSparkContext();
            SQLContext sqlContext = new SQLContext(jsc);
            
            // Load training data
            ObjectStore<TrainingRecord> trainingData = context.getDataset("trainingData");
            List<TrainingRecord> records = loadTrainingData(trainingData);
            
            Dataset<Row> training = sqlContext.createDataFrame(records, TrainingRecord.class);
            
            // Prepare features
            VectorAssembler assembler = new VectorAssembler()
                .setInputCols(new String[]{"feature1", "feature2", "feature3"})
                .setOutputCol("features");
            
            Dataset<Row> featuresDF = assembler.transform(training);
            
            // Train model
            LogisticRegression lr = new LogisticRegression()
                .setFeaturesCol("features")
                .setLabelCol("label")
                .setMaxIter(100)
                .setRegParam(0.01);
            
            LogisticRegressionModel model = lr.fit(featuresDF);
            
            // Save model
            ObjectStore<MLModel> models = context.getDataset("models");
            models.write("customer_classifier", new MLModel(model.coefficients()));
            
            // Make predictions on test data
            Dataset<Row> predictions = model.transform(featuresDF);
            
            // Save predictions
            ObjectStore<Prediction> predictionStore = context.getDataset("predictions");
            predictions.toJavaRDD().foreach(row -> {
                Prediction pred = new Prediction(
                    row.getString("id"),
                    row.getDouble("prediction"),
                    row.getDouble("probability")
                );
                predictionStore.write(pred.getId(), pred);
            });
            
            context.getMetrics().gauge("model.accuracy", calculateAccuracy(predictions));
            jsc.close();
        }
        
        private static List<TrainingRecord> loadTrainingData(ObjectStore<TrainingRecord> store) {
            // Load training data from dataset
            return new ArrayList<>();
        }
        
        private static double calculateAccuracy(Dataset<Row> predictions) {
            // Calculate model accuracy
            return 0.95;
        }
    }
}

Spark programs in CDAP provide powerful distributed processing capabilities for large-scale data analytics, machine learning, stream processing, and complex data transformations while maintaining integration with CDAP's dataset and service ecosystem.

Install with Tessl CLI

npx tessl i tessl/maven-co-cask-cdap--cdap-api

docs

annotations.md

application-framework.md

dataset-management.md

index.md

mapreduce-programs.md

plugin-framework.md

scheduling.md

service-programs.md

spark-programs.md

system-services.md

transactions.md

worker-programs.md

workflow-programs.md

tile.json