0
# Spark Programs
1
2
Spark programs in CDAP provide distributed data processing capabilities using Apache Spark, supporting both batch and streaming workloads with integrated access to CDAP datasets and services.
3
4
## Core Spark Interfaces
5
6
### Spark
7
8
```java { .api }
9
@Beta
10
public interface Spark {
11
void configure(SparkConfigurer configurer);
12
}
13
```
14
15
Base interface for Spark programs. Spark programs are executed using Apache Spark's distributed computing framework.
16
17
### AbstractSpark
18
19
```java { .api }
20
public abstract class AbstractSpark implements Spark {
21
public abstract void configure(SparkConfigurer configurer);
22
}
23
```
24
25
Base implementation class for Spark programs providing configuration framework.
26
27
## Spark Configuration
28
29
### SparkConfigurer
30
31
```java { .api }
32
public interface SparkConfigurer extends ProgramConfigurer, DatasetConfigurer, PluginConfigurer {
33
void setMainClass(Class<?> mainClass);
34
void setMainClassName(String mainClassName);
35
void setDriverResources(Resources resources);
36
void setExecutorResources(Resources resources);
37
void setClientResources(Resources resources);
38
}
39
```
40
41
Interface for configuring Spark programs including main class specification and resource allocation.
42
43
### SparkSpecification
44
45
```java { .api }
46
public class SparkSpecification implements ProgramSpecification {
47
public String getName();
48
public String getDescription();
49
public String getClassName();
50
public String getMainClassName();
51
public Map<String, String> getProperties();
52
public Resources getDriverResources();
53
public Resources getExecutorResources();
54
public Resources getClientResources();
55
public Set<String> getDatasets();
56
}
57
```
58
59
Complete specification of a Spark program including main class and resource requirements.
60
61
## Spark Context
62
63
### SparkClientContext
64
65
```java { .api }
66
public interface SparkClientContext extends RuntimeContext, DatasetContext {
67
Map<String, String> getRuntimeArguments();
68
Map<String, String> getSparkConf();
69
70
PluginContext getPluginContext();
71
ServiceDiscoverer getServiceDiscoverer();
72
Metrics getMetrics();
73
Admin getAdmin();
74
75
void localize(String fileName, URI uri);
76
void localize(String fileName, URI uri, boolean archive);
77
}
78
```
79
80
Client-side context available to Spark programs providing access to configuration, datasets, and CDAP services.
81
82
## Usage Examples
83
84
### Basic Spark Program
85
86
```java
87
public class WordCountSpark extends AbstractSpark {
88
89
@Override
90
public void configure(SparkConfigurer configurer) {
91
configurer.setName("WordCountSpark");
92
configurer.setDescription("Counts words using Spark");
93
configurer.setMainClass(WordCountSparkMain.class);
94
95
// Configure resources
96
configurer.setDriverResources(new Resources(1024)); // 1GB for driver
97
configurer.setExecutorResources(new Resources(2048)); // 2GB for executors
98
99
// Use datasets
100
configurer.useDataset("textFiles");
101
configurer.useDataset("wordCounts");
102
}
103
104
public static class WordCountSparkMain {
105
public static void main(String[] args) throws Exception {
106
SparkClientContext context = new SparkClientContext();
107
JavaSparkContext jsc = new JavaSparkContext();
108
109
// Read input data from CDAP dataset
110
FileSet textFiles = context.getDataset("textFiles");
111
KeyValueTable wordCounts = context.getDataset("wordCounts");
112
113
// Spark processing
114
JavaRDD<String> lines = jsc.textFile(textFiles.getBaseLocation().toURI().getPath());
115
JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
116
JavaPairRDD<String, Integer> pairs = words.mapToPair(word -> new Tuple2<>(word, 1));
117
JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);
118
119
// Write results back to CDAP dataset
120
counts.foreach(wordCount -> {
121
wordCounts.write(wordCount._1(), String.valueOf(wordCount._2()));
122
});
123
124
context.getMetrics().count("words.processed", counts.count());
125
jsc.close();
126
}
127
}
128
}
129
```
130
131
### Spark with Dataset Integration
132
133
```java
134
public class CustomerAnalyticsSpark extends AbstractSpark {
135
136
@Override
137
public void configure(SparkConfigurer configurer) {
138
configurer.setName("CustomerAnalytics");
139
configurer.setMainClass(CustomerAnalyticsMain.class);
140
configurer.useDataset("customers");
141
configurer.useDataset("analytics");
142
}
143
144
public static class CustomerAnalyticsMain {
145
public static void main(String[] args) throws Exception {
146
SparkClientContext context = new SparkClientContext();
147
JavaSparkContext jsc = new JavaSparkContext();
148
SQLContext sqlContext = new SQLContext(jsc);
149
150
// Access CDAP datasets
151
ObjectStore<Customer> customers = context.getDataset("customers");
152
ObjectStore<Analytics> analytics = context.getDataset("analytics");
153
154
// Load customer data into Spark
155
List<Customer> customerList = loadCustomers(customers);
156
JavaRDD<Customer> customerRDD = jsc.parallelize(customerList);
157
158
// Convert to DataFrame for SQL operations
159
Dataset<Row> customerDF = sqlContext.createDataFrame(customerRDD, Customer.class);
160
customerDF.createOrReplaceTempView("customers");
161
162
// Perform analytics using Spark SQL
163
Dataset<Row> regionAnalytics = sqlContext.sql(
164
"SELECT region, COUNT(*) as customerCount, AVG(purchaseAmount) as avgPurchase " +
165
"FROM customers GROUP BY region"
166
);
167
168
// Save results back to CDAP
169
regionAnalytics.toJavaRDD().foreach(row -> {
170
Analytics result = new Analytics(
171
row.getString(0), // region
172
row.getLong(1), // customerCount
173
row.getDouble(2) // avgPurchase
174
);
175
analytics.write(result.getRegion(), result);
176
});
177
178
context.getMetrics().gauge("customers.analyzed", customerList.size());
179
jsc.close();
180
}
181
182
private static List<Customer> loadCustomers(ObjectStore<Customer> store) {
183
// Load customers from dataset
184
List<Customer> customers = new ArrayList<>();
185
// Implementation to read from ObjectStore
186
return customers;
187
}
188
}
189
}
190
```
191
192
### Spark Streaming Program
193
194
```java
195
public class RealTimeProcessingSpark extends AbstractSpark {
196
197
@Override
198
public void configure(SparkConfigurer configurer) {
199
configurer.setName("RealTimeProcessor");
200
configurer.setMainClass(StreamingMain.class);
201
configurer.useDataset("streamData");
202
configurer.useDataset("processedEvents");
203
204
// Set properties for streaming
205
configurer.setProperties(ImmutableMap.of(
206
"spark.streaming.batchDuration", "10",
207
"spark.streaming.checkpoint.directory", "/tmp/spark-checkpoint"
208
));
209
}
210
211
public static class StreamingMain {
212
public static void main(String[] args) throws Exception {
213
SparkClientContext context = new SparkClientContext();
214
JavaStreamingContext jssc = new JavaStreamingContext(
215
new SparkConf(), Durations.seconds(10));
216
217
ObjectStore<Event> processedEvents = context.getDataset("processedEvents");
218
219
// Create input stream from CDAP dataset
220
JavaDStream<String> lines = createInputStream(jssc, context);
221
222
// Process streaming data
223
JavaDStream<Event> events = lines.map(line -> parseEvent(line));
224
JavaDStream<Event> filteredEvents = events.filter(event -> event.isValid());
225
226
// Save processed events to CDAP dataset
227
filteredEvents.foreach(rdd -> {
228
rdd.foreach(event -> {
229
processedEvents.write(event.getId(), event);
230
});
231
232
context.getMetrics().count("events.processed", rdd.count());
233
});
234
235
jssc.start();
236
jssc.awaitTermination();
237
}
238
239
private static JavaDStream<String> createInputStream(JavaStreamingContext jssc,
240
SparkClientContext context) {
241
// Implementation to create stream from CDAP dataset
242
return null;
243
}
244
245
private static Event parseEvent(String line) {
246
// Parse event from string
247
return new Event();
248
}
249
}
250
}
251
```
252
253
### Spark with Plugin Integration
254
255
```java
256
public class PluginSparkProgram extends AbstractSpark {
257
258
@Override
259
public void configure(SparkConfigurer configurer) {
260
configurer.setName("PluginProcessor");
261
configurer.setMainClass(PluginSparkMain.class);
262
263
// Use transformation plugin
264
configurer.usePlugin("transform", "dataTransform", "transformer",
265
PluginProperties.builder()
266
.add("operation", "normalize")
267
.add("fields", "name,email,phone")
268
.build());
269
}
270
271
public static class PluginSparkMain {
272
public static void main(String[] args) throws Exception {
273
SparkClientContext context = new SparkClientContext();
274
JavaSparkContext jsc = new JavaSparkContext();
275
276
// Get plugin instance
277
DataTransformer transformer = context.getPluginContext()
278
.newPluginInstance("transformer");
279
280
// Process data using plugin
281
JavaRDD<Record> inputData = loadInputData(jsc, context);
282
JavaRDD<Record> transformedData = inputData.map(record ->
283
transformer.transform(record));
284
285
// Save results
286
saveResults(transformedData, context);
287
288
context.getMetrics().count("records.transformed", transformedData.count());
289
jsc.close();
290
}
291
292
private static JavaRDD<Record> loadInputData(JavaSparkContext jsc,
293
SparkClientContext context) {
294
// Load data from CDAP datasets
295
return null;
296
}
297
298
private static void saveResults(JavaRDD<Record> data, SparkClientContext context) {
299
// Save to CDAP datasets
300
}
301
}
302
}
303
```
304
305
### Spark ML Program
306
307
```java
308
public class MachineLearningAnalytics extends AbstractSpark {
309
310
@Override
311
public void configure(SparkConfigurer configurer) {
312
configurer.setName("MLAnalytics");
313
configurer.setMainClass(MLMain.class);
314
configurer.useDataset("trainingData");
315
configurer.useDataset("models");
316
configurer.useDataset("predictions");
317
318
// Allocate more resources for ML workload
319
configurer.setDriverResources(new Resources(4096, 2)); // 4GB, 2 cores
320
configurer.setExecutorResources(new Resources(8192, 4)); // 8GB, 4 cores
321
}
322
323
public static class MLMain {
324
public static void main(String[] args) throws Exception {
325
SparkClientContext context = new SparkClientContext();
326
JavaSparkContext jsc = new JavaSparkContext();
327
SQLContext sqlContext = new SQLContext(jsc);
328
329
// Load training data
330
ObjectStore<TrainingRecord> trainingData = context.getDataset("trainingData");
331
List<TrainingRecord> records = loadTrainingData(trainingData);
332
333
Dataset<Row> training = sqlContext.createDataFrame(records, TrainingRecord.class);
334
335
// Prepare features
336
VectorAssembler assembler = new VectorAssembler()
337
.setInputCols(new String[]{"feature1", "feature2", "feature3"})
338
.setOutputCol("features");
339
340
Dataset<Row> featuresDF = assembler.transform(training);
341
342
// Train model
343
LogisticRegression lr = new LogisticRegression()
344
.setFeaturesCol("features")
345
.setLabelCol("label")
346
.setMaxIter(100)
347
.setRegParam(0.01);
348
349
LogisticRegressionModel model = lr.fit(featuresDF);
350
351
// Save model
352
ObjectStore<MLModel> models = context.getDataset("models");
353
models.write("customer_classifier", new MLModel(model.coefficients()));
354
355
// Make predictions on test data
356
Dataset<Row> predictions = model.transform(featuresDF);
357
358
// Save predictions
359
ObjectStore<Prediction> predictionStore = context.getDataset("predictions");
360
predictions.toJavaRDD().foreach(row -> {
361
Prediction pred = new Prediction(
362
row.getString("id"),
363
row.getDouble("prediction"),
364
row.getDouble("probability")
365
);
366
predictionStore.write(pred.getId(), pred);
367
});
368
369
context.getMetrics().gauge("model.accuracy", calculateAccuracy(predictions));
370
jsc.close();
371
}
372
373
private static List<TrainingRecord> loadTrainingData(ObjectStore<TrainingRecord> store) {
374
// Load training data from dataset
375
return new ArrayList<>();
376
}
377
378
private static double calculateAccuracy(Dataset<Row> predictions) {
379
// Calculate model accuracy
380
return 0.95;
381
}
382
}
383
}
384
```
385
386
Spark programs in CDAP provide powerful distributed processing capabilities for large-scale data analytics, machine learning, stream processing, and complex data transformations while maintaining integration with CDAP's dataset and service ecosystem.