or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

annotations.mdapplication-framework.mddataset-management.mdindex.mdmapreduce-programs.mdplugin-framework.mdscheduling.mdservice-programs.mdspark-programs.mdsystem-services.mdtransactions.mdworker-programs.mdworkflow-programs.md

spark-programs.mddocs/

0

# Spark Programs

1

2

Spark programs in CDAP provide distributed data processing capabilities using Apache Spark, supporting both batch and streaming workloads with integrated access to CDAP datasets and services.

3

4

## Core Spark Interfaces

5

6

### Spark

7

8

```java { .api }

9

@Beta

10

public interface Spark {

11

void configure(SparkConfigurer configurer);

12

}

13

```

14

15

Base interface for Spark programs. Spark programs are executed using Apache Spark's distributed computing framework.

16

17

### AbstractSpark

18

19

```java { .api }

20

public abstract class AbstractSpark implements Spark {

21

public abstract void configure(SparkConfigurer configurer);

22

}

23

```

24

25

Base implementation class for Spark programs providing configuration framework.

26

27

## Spark Configuration

28

29

### SparkConfigurer

30

31

```java { .api }

32

public interface SparkConfigurer extends ProgramConfigurer, DatasetConfigurer, PluginConfigurer {

33

void setMainClass(Class<?> mainClass);

34

void setMainClassName(String mainClassName);

35

void setDriverResources(Resources resources);

36

void setExecutorResources(Resources resources);

37

void setClientResources(Resources resources);

38

}

39

```

40

41

Interface for configuring Spark programs including main class specification and resource allocation.

42

43

### SparkSpecification

44

45

```java { .api }

46

public class SparkSpecification implements ProgramSpecification {

47

public String getName();

48

public String getDescription();

49

public String getClassName();

50

public String getMainClassName();

51

public Map<String, String> getProperties();

52

public Resources getDriverResources();

53

public Resources getExecutorResources();

54

public Resources getClientResources();

55

public Set<String> getDatasets();

56

}

57

```

58

59

Complete specification of a Spark program including main class and resource requirements.

60

61

## Spark Context

62

63

### SparkClientContext

64

65

```java { .api }

66

public interface SparkClientContext extends RuntimeContext, DatasetContext {

67

Map<String, String> getRuntimeArguments();

68

Map<String, String> getSparkConf();

69

70

PluginContext getPluginContext();

71

ServiceDiscoverer getServiceDiscoverer();

72

Metrics getMetrics();

73

Admin getAdmin();

74

75

void localize(String fileName, URI uri);

76

void localize(String fileName, URI uri, boolean archive);

77

}

78

```

79

80

Client-side context available to Spark programs providing access to configuration, datasets, and CDAP services.

81

82

## Usage Examples

83

84

### Basic Spark Program

85

86

```java

87

public class WordCountSpark extends AbstractSpark {

88

89

@Override

90

public void configure(SparkConfigurer configurer) {

91

configurer.setName("WordCountSpark");

92

configurer.setDescription("Counts words using Spark");

93

configurer.setMainClass(WordCountSparkMain.class);

94

95

// Configure resources

96

configurer.setDriverResources(new Resources(1024)); // 1GB for driver

97

configurer.setExecutorResources(new Resources(2048)); // 2GB for executors

98

99

// Use datasets

100

configurer.useDataset("textFiles");

101

configurer.useDataset("wordCounts");

102

}

103

104

public static class WordCountSparkMain {

105

public static void main(String[] args) throws Exception {

106

SparkClientContext context = new SparkClientContext();

107

JavaSparkContext jsc = new JavaSparkContext();

108

109

// Read input data from CDAP dataset

110

FileSet textFiles = context.getDataset("textFiles");

111

KeyValueTable wordCounts = context.getDataset("wordCounts");

112

113

// Spark processing

114

JavaRDD<String> lines = jsc.textFile(textFiles.getBaseLocation().toURI().getPath());

115

JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());

116

JavaPairRDD<String, Integer> pairs = words.mapToPair(word -> new Tuple2<>(word, 1));

117

JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);

118

119

// Write results back to CDAP dataset

120

counts.foreach(wordCount -> {

121

wordCounts.write(wordCount._1(), String.valueOf(wordCount._2()));

122

});

123

124

context.getMetrics().count("words.processed", counts.count());

125

jsc.close();

126

}

127

}

128

}

129

```

130

131

### Spark with Dataset Integration

132

133

```java

134

public class CustomerAnalyticsSpark extends AbstractSpark {

135

136

@Override

137

public void configure(SparkConfigurer configurer) {

138

configurer.setName("CustomerAnalytics");

139

configurer.setMainClass(CustomerAnalyticsMain.class);

140

configurer.useDataset("customers");

141

configurer.useDataset("analytics");

142

}

143

144

public static class CustomerAnalyticsMain {

145

public static void main(String[] args) throws Exception {

146

SparkClientContext context = new SparkClientContext();

147

JavaSparkContext jsc = new JavaSparkContext();

148

SQLContext sqlContext = new SQLContext(jsc);

149

150

// Access CDAP datasets

151

ObjectStore<Customer> customers = context.getDataset("customers");

152

ObjectStore<Analytics> analytics = context.getDataset("analytics");

153

154

// Load customer data into Spark

155

List<Customer> customerList = loadCustomers(customers);

156

JavaRDD<Customer> customerRDD = jsc.parallelize(customerList);

157

158

// Convert to DataFrame for SQL operations

159

Dataset<Row> customerDF = sqlContext.createDataFrame(customerRDD, Customer.class);

160

customerDF.createOrReplaceTempView("customers");

161

162

// Perform analytics using Spark SQL

163

Dataset<Row> regionAnalytics = sqlContext.sql(

164

"SELECT region, COUNT(*) as customerCount, AVG(purchaseAmount) as avgPurchase " +

165

"FROM customers GROUP BY region"

166

);

167

168

// Save results back to CDAP

169

regionAnalytics.toJavaRDD().foreach(row -> {

170

Analytics result = new Analytics(

171

row.getString(0), // region

172

row.getLong(1), // customerCount

173

row.getDouble(2) // avgPurchase

174

);

175

analytics.write(result.getRegion(), result);

176

});

177

178

context.getMetrics().gauge("customers.analyzed", customerList.size());

179

jsc.close();

180

}

181

182

private static List<Customer> loadCustomers(ObjectStore<Customer> store) {

183

// Load customers from dataset

184

List<Customer> customers = new ArrayList<>();

185

// Implementation to read from ObjectStore

186

return customers;

187

}

188

}

189

}

190

```

191

192

### Spark Streaming Program

193

194

```java

195

public class RealTimeProcessingSpark extends AbstractSpark {

196

197

@Override

198

public void configure(SparkConfigurer configurer) {

199

configurer.setName("RealTimeProcessor");

200

configurer.setMainClass(StreamingMain.class);

201

configurer.useDataset("streamData");

202

configurer.useDataset("processedEvents");

203

204

// Set properties for streaming

205

configurer.setProperties(ImmutableMap.of(

206

"spark.streaming.batchDuration", "10",

207

"spark.streaming.checkpoint.directory", "/tmp/spark-checkpoint"

208

));

209

}

210

211

public static class StreamingMain {

212

public static void main(String[] args) throws Exception {

213

SparkClientContext context = new SparkClientContext();

214

JavaStreamingContext jssc = new JavaStreamingContext(

215

new SparkConf(), Durations.seconds(10));

216

217

ObjectStore<Event> processedEvents = context.getDataset("processedEvents");

218

219

// Create input stream from CDAP dataset

220

JavaDStream<String> lines = createInputStream(jssc, context);

221

222

// Process streaming data

223

JavaDStream<Event> events = lines.map(line -> parseEvent(line));

224

JavaDStream<Event> filteredEvents = events.filter(event -> event.isValid());

225

226

// Save processed events to CDAP dataset

227

filteredEvents.foreach(rdd -> {

228

rdd.foreach(event -> {

229

processedEvents.write(event.getId(), event);

230

});

231

232

context.getMetrics().count("events.processed", rdd.count());

233

});

234

235

jssc.start();

236

jssc.awaitTermination();

237

}

238

239

private static JavaDStream<String> createInputStream(JavaStreamingContext jssc,

240

SparkClientContext context) {

241

// Implementation to create stream from CDAP dataset

242

return null;

243

}

244

245

private static Event parseEvent(String line) {

246

// Parse event from string

247

return new Event();

248

}

249

}

250

}

251

```

252

253

### Spark with Plugin Integration

254

255

```java

256

public class PluginSparkProgram extends AbstractSpark {

257

258

@Override

259

public void configure(SparkConfigurer configurer) {

260

configurer.setName("PluginProcessor");

261

configurer.setMainClass(PluginSparkMain.class);

262

263

// Use transformation plugin

264

configurer.usePlugin("transform", "dataTransform", "transformer",

265

PluginProperties.builder()

266

.add("operation", "normalize")

267

.add("fields", "name,email,phone")

268

.build());

269

}

270

271

public static class PluginSparkMain {

272

public static void main(String[] args) throws Exception {

273

SparkClientContext context = new SparkClientContext();

274

JavaSparkContext jsc = new JavaSparkContext();

275

276

// Get plugin instance

277

DataTransformer transformer = context.getPluginContext()

278

.newPluginInstance("transformer");

279

280

// Process data using plugin

281

JavaRDD<Record> inputData = loadInputData(jsc, context);

282

JavaRDD<Record> transformedData = inputData.map(record ->

283

transformer.transform(record));

284

285

// Save results

286

saveResults(transformedData, context);

287

288

context.getMetrics().count("records.transformed", transformedData.count());

289

jsc.close();

290

}

291

292

private static JavaRDD<Record> loadInputData(JavaSparkContext jsc,

293

SparkClientContext context) {

294

// Load data from CDAP datasets

295

return null;

296

}

297

298

private static void saveResults(JavaRDD<Record> data, SparkClientContext context) {

299

// Save to CDAP datasets

300

}

301

}

302

}

303

```

304

305

### Spark ML Program

306

307

```java

308

public class MachineLearningAnalytics extends AbstractSpark {

309

310

@Override

311

public void configure(SparkConfigurer configurer) {

312

configurer.setName("MLAnalytics");

313

configurer.setMainClass(MLMain.class);

314

configurer.useDataset("trainingData");

315

configurer.useDataset("models");

316

configurer.useDataset("predictions");

317

318

// Allocate more resources for ML workload

319

configurer.setDriverResources(new Resources(4096, 2)); // 4GB, 2 cores

320

configurer.setExecutorResources(new Resources(8192, 4)); // 8GB, 4 cores

321

}

322

323

public static class MLMain {

324

public static void main(String[] args) throws Exception {

325

SparkClientContext context = new SparkClientContext();

326

JavaSparkContext jsc = new JavaSparkContext();

327

SQLContext sqlContext = new SQLContext(jsc);

328

329

// Load training data

330

ObjectStore<TrainingRecord> trainingData = context.getDataset("trainingData");

331

List<TrainingRecord> records = loadTrainingData(trainingData);

332

333

Dataset<Row> training = sqlContext.createDataFrame(records, TrainingRecord.class);

334

335

// Prepare features

336

VectorAssembler assembler = new VectorAssembler()

337

.setInputCols(new String[]{"feature1", "feature2", "feature3"})

338

.setOutputCol("features");

339

340

Dataset<Row> featuresDF = assembler.transform(training);

341

342

// Train model

343

LogisticRegression lr = new LogisticRegression()

344

.setFeaturesCol("features")

345

.setLabelCol("label")

346

.setMaxIter(100)

347

.setRegParam(0.01);

348

349

LogisticRegressionModel model = lr.fit(featuresDF);

350

351

// Save model

352

ObjectStore<MLModel> models = context.getDataset("models");

353

models.write("customer_classifier", new MLModel(model.coefficients()));

354

355

// Make predictions on test data

356

Dataset<Row> predictions = model.transform(featuresDF);

357

358

// Save predictions

359

ObjectStore<Prediction> predictionStore = context.getDataset("predictions");

360

predictions.toJavaRDD().foreach(row -> {

361

Prediction pred = new Prediction(

362

row.getString("id"),

363

row.getDouble("prediction"),

364

row.getDouble("probability")

365

);

366

predictionStore.write(pred.getId(), pred);

367

});

368

369

context.getMetrics().gauge("model.accuracy", calculateAccuracy(predictions));

370

jsc.close();

371

}

372

373

private static List<TrainingRecord> loadTrainingData(ObjectStore<TrainingRecord> store) {

374

// Load training data from dataset

375

return new ArrayList<>();

376

}

377

378

private static double calculateAccuracy(Dataset<Row> predictions) {

379

// Calculate model accuracy

380

return 0.95;

381

}

382

}

383

}

384

```

385

386

Spark programs in CDAP provide powerful distributed processing capabilities for large-scale data analytics, machine learning, stream processing, and complex data transformations while maintaining integration with CDAP's dataset and service ecosystem.