or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

broadcast-accumulators.mdcontext-configuration.mdindex.mdjava-api.mdkey-value-operations.mdrdd-operations.mdstatus-monitoring.mdstorage-persistence.mdtask-context.md

java-api.mddocs/

0

# Java API

1

2

Spark's Java API provides Java-friendly wrappers around the core Scala RDD API. It uses Java collections and functional interfaces to integrate seamlessly with Java applications while maintaining type safety and performance.

3

4

## Core Java Classes

5

6

### JavaSparkContext

7

8

The main entry point for Java applications using Spark.

9

10

```java { .api }

11

public class JavaSparkContext {

12

// Constructors

13

public JavaSparkContext();

14

public JavaSparkContext(String master, String appName);

15

public JavaSparkContext(String master, String appName, String sparkHome, String[] jars);

16

public JavaSparkContext(SparkConf conf);

17

public JavaSparkContext(SparkContext sc);

18

19

// RDD Creation

20

public <T> JavaRDD<T> parallelize(List<T> list);

21

public <T> JavaRDD<T> parallelize(List<T> list, int numSlices);

22

public <K, V> JavaPairRDD<K, V> parallelizePairs(List<Tuple2<K, V>> list);

23

public <K, V> JavaPairRDD<K, V> parallelizePairs(List<Tuple2<K, V>> list, int numSlices);

24

public JavaRDD<Long> range(long start, long end);

25

public JavaRDD<Long> range(long start, long end, long step);

26

public JavaRDD<Long> range(long start, long end, long step, int numSlices);

27

28

// File Input

29

public JavaRDD<String> textFile(String path);

30

public JavaRDD<String> textFile(String path, int minPartitions);

31

public JavaPairRDD<String, String> wholeTextFiles(String path);

32

public JavaPairRDD<String, String> wholeTextFiles(String path, int minPartitions);

33

public JavaPairRDD<String, PortableDataStream> binaryFiles(String path);

34

public JavaPairRDD<String, PortableDataStream> binaryFiles(String path, int minPartitions);

35

36

// Hadoop Integration

37

public <K, V, F extends InputFormat<K, V>> JavaPairRDD<K, V> hadoopRDD(

38

JobConf conf, Class<F> inputFormatClass, Class<K> keyClass, Class<V> valueClass);

39

public <K, V, F extends InputFormat<K, V>> JavaPairRDD<K, V> hadoopRDD(

40

JobConf conf, Class<F> inputFormatClass, Class<K> keyClass, Class<V> valueClass, int minPartitions);

41

public <K, V, F extends NewInputFormat<K, V>> JavaPairRDD<K, V> newAPIHadoopRDD(

42

Configuration conf, Class<F> fClass, Class<K> kClass, Class<V> vClass);

43

44

// Broadcast and Accumulators

45

public <T> Broadcast<T> broadcast(T value);

46

public LongAccumulator longAccumulator();

47

public LongAccumulator longAccumulator(String name);

48

public DoubleAccumulator doubleAccumulator();

49

public DoubleAccumulator doubleAccumulator(String name);

50

public <T> CollectionAccumulator<T> collectionAccumulator();

51

public <T> CollectionAccumulator<T> collectionAccumulator(String name);

52

53

// Application Control

54

public void stop();

55

public void addFile(String path);

56

public void addJar(String path);

57

public void setLogLevel(String logLevel);

58

public void setJobGroup(String groupId, String description);

59

public void setJobGroup(String groupId, String description, boolean interruptOnCancel);

60

public void clearJobGroup();

61

public void setLocalProperty(String key, String value);

62

public String getLocalProperty(String key);

63

64

// Properties

65

public SparkConf getConf();

66

public String master();

67

public String appName();

68

public List<String> jars();

69

public long startTime();

70

public String version();

71

public int defaultParallelism();

72

public SparkStatusTracker statusTracker();

73

}

74

```

75

76

### JavaRDD

77

78

Java wrapper for RDD providing type-safe operations.

79

80

```java { .api }

81

public class JavaRDD<T> {

82

// Transformations

83

public <R> JavaRDD<R> map(Function<T, R> f);

84

public <U> JavaRDD<U> flatMap(FlatMapFunction<T, U> f);

85

public JavaRDD<T> filter(Function<T, Boolean> f);

86

public JavaRDD<T> distinct();

87

public JavaRDD<T> distinct(int numPartitions);

88

public JavaRDD<T> union(JavaRDD<T> other);

89

public JavaRDD<T> intersection(JavaRDD<T> other);

90

public JavaRDD<T> subtract(JavaRDD<T> other);

91

public <U> JavaPairRDD<T, U> cartesian(JavaRDD<U> other);

92

public <U> JavaPairRDD<T, U> zip(JavaRDD<U> other);

93

public JavaPairRDD<T, Long> zipWithIndex();

94

public JavaPairRDD<T, Long> zipWithUniqueId();

95

public JavaRDD<T> sample(boolean withReplacement, double fraction);

96

public JavaRDD<T> sample(boolean withReplacement, double fraction, long seed);

97

public JavaRDD<T>[] randomSplit(double[] weights);

98

public JavaRDD<T>[] randomSplit(double[] weights, long seed);

99

public JavaRDD<T> repartition(int numPartitions);

100

public JavaRDD<T> coalesce(int numPartitions);

101

public JavaRDD<T> coalesce(int numPartitions, boolean shuffle);

102

public <S> JavaRDD<T> sortBy(Function<T, S> f, boolean ascending, int numPartitions);

103

public JavaRDD<T[]> glom();

104

public <U> JavaRDD<U> mapPartitions(FlatMapFunction<Iterator<T>, U> f);

105

public <U> JavaRDD<U> mapPartitions(FlatMapFunction<Iterator<T>, U> f, boolean preservesPartitioning);

106

public <U> JavaRDD<U> mapPartitionsWithIndex(Function2<Integer, Iterator<T>, Iterator<U>> f, boolean preservesPartitioning);

107

public JavaRDD<String> pipe(String command);

108

public JavaRDD<String> pipe(List<String> command);

109

110

// Actions

111

public List<T> collect();

112

public Iterator<T> toLocalIterator();

113

public long count();

114

public PartialResult<BoundedDouble> countApprox(long timeout);

115

public PartialResult<BoundedDouble> countApprox(long timeout, double confidence);

116

public long countApproxDistinct();

117

public long countApproxDistinct(double relativeSD);

118

public Map<T, Long> countByValue();

119

public T first();

120

public boolean isEmpty();

121

public List<T> take(int num);

122

public List<T> takeOrdered(int num);

123

public List<T> takeOrdered(int num, Comparator<T> comp);

124

public List<T> takeSample(boolean withReplacement, int num);

125

public List<T> takeSample(boolean withReplacement, int num, long seed);

126

public List<T> top(int num);

127

public List<T> top(int num, Comparator<T> comp);

128

public T reduce(Function2<T, T, T> f);

129

public T fold(T zeroValue, Function2<T, T, T> f);

130

public <U> U aggregate(U zeroValue, Function2<U, T, U> seqOp, Function2<U, U, U> combOp);

131

public <U> U treeAggregate(U zeroValue, Function2<U, T, U> seqOp, Function2<U, U, U> combOp);

132

public <U> U treeAggregate(U zeroValue, Function2<U, T, U> seqOp, Function2<U, U, U> combOp, int depth);

133

public void foreach(VoidFunction<T> f);

134

public void foreachPartition(VoidFunction<Iterator<T>> f);

135

136

// Persistence

137

public JavaRDD<T> persist(StorageLevel newLevel);

138

public JavaRDD<T> cache();

139

public JavaRDD<T> unpersist();

140

public JavaRDD<T> unpersist(boolean blocking);

141

public void checkpoint();

142

143

// Conversion

144

public <K, V> JavaPairRDD<K, V> mapToPair(PairFunction<T, K, V> f);

145

public JavaDoubleRDD mapToDouble(DoubleFunction<T> f);

146

public JavaDoubleRDD flatMapToDouble(DoubleFlatMapFunction<T> f);

147

public <K, V> JavaPairRDD<K, V> flatMapToPair(PairFlatMapFunction<T, K, V> f);

148

149

// Information

150

public int getNumPartitions();

151

public StorageLevel getStorageLevel();

152

public boolean isCheckpointed();

153

public String name();

154

public JavaRDD<T> setName(String name);

155

public String toDebugString();

156

public SparkContext context();

157

}

158

```

159

160

### JavaPairRDD

161

162

Java wrapper for key-value RDDs.

163

164

```java { .api }

165

public class JavaPairRDD<K, V> {

166

// Basic Operations

167

public JavaRDD<K> keys();

168

public JavaRDD<V> values();

169

public <U> JavaPairRDD<K, U> mapValues(Function<V, U> f);

170

public <U> JavaPairRDD<K, U> flatMapValues(FlatMapFunction<V, U> f);

171

public JavaPairRDD<V, K> swap();

172

173

// Grouping

174

public JavaPairRDD<K, Iterable<V>> groupByKey();

175

public JavaPairRDD<K, Iterable<V>> groupByKey(int numPartitions);

176

public JavaPairRDD<K, Iterable<V>> groupByKey(Partitioner partitioner);

177

178

// Reduction

179

public JavaPairRDD<K, V> reduceByKey(Function2<V, V, V> func);

180

public JavaPairRDD<K, V> reduceByKey(Function2<V, V, V> func, int numPartitions);

181

public JavaPairRDD<K, V> reduceByKey(Partitioner partitioner, Function2<V, V, V> func);

182

public Map<K, V> reduceByKeyLocally(Function2<V, V, V> func);

183

184

public JavaPairRDD<K, V> foldByKey(V zeroValue, Function2<V, V, V> func);

185

public JavaPairRDD<K, V> foldByKey(V zeroValue, int numPartitions, Function2<V, V, V> func);

186

public JavaPairRDD<K, V> foldByKey(V zeroValue, Partitioner partitioner, Function2<V, V, V> func);

187

188

public <U> JavaPairRDD<K, U> aggregateByKey(U zeroValue, Function2<U, V, U> seqFunc, Function2<U, U, U> combFunc);

189

public <U> JavaPairRDD<K, U> aggregateByKey(U zeroValue, int numPartitions, Function2<U, V, U> seqFunc, Function2<U, U, U> combFunc);

190

public <U> JavaPairRDD<K, U> aggregateByKey(U zeroValue, Partitioner partitioner, Function2<U, V, U> seqFunc, Function2<U, U, U> combFunc);

191

192

public <C> JavaPairRDD<K, C> combineByKey(Function<V, C> createCombiner, Function2<C, V, C> mergeValue, Function2<C, C, C> mergeCombiners);

193

public <C> JavaPairRDD<K, C> combineByKey(Function<V, C> createCombiner, Function2<C, V, C> mergeValue, Function2<C, C, C> mergeCombiners, int numPartitions);

194

public <C> JavaPairRDD<K, C> combineByKey(Function<V, C> createCombiner, Function2<C, V, C> mergeValue, Function2<C, C, C> mergeCombiners, Partitioner partitioner);

195

196

// Joins

197

public <W> JavaPairRDD<K, Tuple2<V, W>> join(JavaPairRDD<K, W> other);

198

public <W> JavaPairRDD<K, Tuple2<V, W>> join(JavaPairRDD<K, W> other, int numPartitions);

199

public <W> JavaPairRDD<K, Tuple2<V, W>> join(JavaPairRDD<K, W> other, Partitioner partitioner);

200

201

public <W> JavaPairRDD<K, Tuple2<V, Optional<W>>> leftOuterJoin(JavaPairRDD<K, W> other);

202

public <W> JavaPairRDD<K, Tuple2<V, Optional<W>>> leftOuterJoin(JavaPairRDD<K, W> other, int numPartitions);

203

public <W> JavaPairRDD<K, Tuple2<V, Optional<W>>> leftOuterJoin(JavaPairRDD<K, W> other, Partitioner partitioner);

204

205

public <W> JavaPairRDD<K, Tuple2<Optional<V>, W>> rightOuterJoin(JavaPairRDD<K, W> other);

206

public <W> JavaPairRDD<K, Tuple2<Optional<V>, W>> rightOuterJoin(JavaPairRDD<K, W> other, int numPartitions);

207

public <W> JavaPairRDD<K, Tuple2<Optional<V>, W>> rightOuterJoin(JavaPairRDD<K, W> other, Partitioner partitioner);

208

209

public <W> JavaPairRDD<K, Tuple2<Optional<V>, Optional<W>>> fullOuterJoin(JavaPairRDD<K, W> other);

210

public <W> JavaPairRDD<K, Tuple2<Optional<V>, Optional<W>>> fullOuterJoin(JavaPairRDD<K, W> other, int numPartitions);

211

public <W> JavaPairRDD<K, Tuple2<Optional<V>, Optional<W>>> fullOuterJoin(JavaPairRDD<K, W> other, Partitioner partitioner);

212

213

// Cogroup

214

public <W> JavaPairRDD<K, Tuple2<Iterable<V>, Iterable<W>>> cogroup(JavaPairRDD<K, W> other);

215

public <W> JavaPairRDD<K, Tuple2<Iterable<V>, Iterable<W>>> cogroup(JavaPairRDD<K, W> other, int numPartitions);

216

public <W> JavaPairRDD<K, Tuple2<Iterable<V>, Iterable<W>>> cogroup(JavaPairRDD<K, W> other, Partitioner partitioner);

217

public <W1, W2> JavaPairRDD<K, Tuple3<Iterable<V>, Iterable<W1>, Iterable<W2>>> cogroup(JavaPairRDD<K, W1> other1, JavaPairRDD<K, W2> other2);

218

public <W1, W2> JavaPairRDD<K, Tuple3<Iterable<V>, Iterable<W1>, Iterable<W2>>> cogroup(JavaPairRDD<K, W1> other1, JavaPairRDD<K, W2> other2, Partitioner partitioner);

219

public <W1, W2, W3> JavaPairRDD<K, Tuple4<Iterable<V>, Iterable<W1>, Iterable<W2>, Iterable<W3>>> cogroup(JavaPairRDD<K, W1> other1, JavaPairRDD<K, W2> other2, JavaPairRDD<K, W3> other3);

220

public <W1, W2, W3> JavaPairRDD<K, Tuple4<Iterable<V>, Iterable<W1>, Iterable<W2>, Iterable<W3>>> cogroup(JavaPairRDD<K, W1> other1, JavaPairRDD<K, W2> other2, JavaPairRDD<K, W3> other3, Partitioner partitioner);

221

222

// Sorting and Partitioning

223

public JavaPairRDD<K, V> sortByKey();

224

public JavaPairRDD<K, V> sortByKey(boolean ascending);

225

public JavaPairRDD<K, V> sortByKey(boolean ascending, int numPartitions);

226

public JavaPairRDD<K, V> repartitionAndSortWithinPartitions(Partitioner partitioner);

227

public JavaPairRDD<K, V> partitionBy(Partitioner partitioner);

228

229

// Collection Operations

230

public Map<K, V> collectAsMap();

231

public Map<K, Long> countByKey();

232

public PartialResult<Map<K, BoundedDouble>> countByKeyApprox(long timeout);

233

public PartialResult<Map<K, BoundedDouble>> countByKeyApprox(long timeout, double confidence);

234

public List<V> lookup(K key);

235

236

// Subtraction

237

public <W> JavaPairRDD<K, V> subtractByKey(JavaPairRDD<K, W> other);

238

public <W> JavaPairRDD<K, V> subtractByKey(JavaPairRDD<K, W> other, int numPartitions);

239

public <W> JavaPairRDD<K, V> subtractByKey(JavaPairRDD<K, W> other, Partitioner partitioner);

240

241

// Conversion and Utility

242

public JavaRDD<Tuple2<K, V>> rdd();

243

public JavaDoubleRDD values(); // When V extends Number

244

245

// Standard RDD operations (inherited)

246

public List<Tuple2<K, V>> collect();

247

public long count();

248

public JavaPairRDD<K, V> persist(StorageLevel newLevel);

249

public JavaPairRDD<K, V> cache();

250

public void foreach(VoidFunction<Tuple2<K, V>> f);

251

// ... other inherited operations

252

}

253

```

254

255

### JavaDoubleRDD

256

257

Specialized RDD for double values with statistical operations.

258

259

```java { .api }

260

public class JavaDoubleRDD {

261

// Statistical Operations

262

public StatCounter stats();

263

public double mean();

264

public double sum();

265

public double variance();

266

public double sampleVariance();

267

public double stdev();

268

public double sampleStdev();

269

public Tuple2<double[], long[]> histogram(double[] buckets);

270

public Tuple2<double[], long[]> histogram(int bucketCount);

271

272

// Standard RDD Operations

273

public JavaDoubleRDD map(DoubleFunction<Double> f);

274

public JavaDoubleRDD filter(Function<Double, Boolean> f);

275

public JavaDoubleRDD union(JavaDoubleRDD other);

276

public JavaDoubleRDD distinct();

277

public JavaDoubleRDD sample(boolean withReplacement, double fraction);

278

public JavaDoubleRDD cache();

279

public JavaDoubleRDD persist(StorageLevel newLevel);

280

281

// Collection Operations

282

public List<Double> collect();

283

public double[] collectArray();

284

public long count();

285

public double first();

286

public List<Double> take(int num);

287

public void foreach(VoidFunction<Double> f);

288

}

289

```

290

291

## Function Interfaces

292

293

All function interfaces extend `Serializable` and are marked with `@FunctionalInterface`.

294

295

```java { .api }

296

@FunctionalInterface

297

public interface Function<T1, R> extends Serializable {

298

R call(T1 v1) throws Exception;

299

}

300

301

@FunctionalInterface

302

public interface Function2<T1, T2, R> extends Serializable {

303

R call(T1 v1, T2 v2) throws Exception;

304

}

305

306

@FunctionalInterface

307

public interface Function3<T1, T2, T3, R> extends Serializable {

308

R call(T1 v1, T2 v2, T3 v3) throws Exception;

309

}

310

311

@FunctionalInterface

312

public interface Function4<T1, T2, T3, T4, R> extends Serializable {

313

R call(T1 v1, T2 v2, T3 v3, T4 v4) throws Exception;

314

}

315

316

@FunctionalInterface

317

public interface VoidFunction<T> extends Serializable {

318

void call(T t) throws Exception;

319

}

320

321

@FunctionalInterface

322

public interface VoidFunction2<T1, T2> extends Serializable {

323

void call(T1 v1, T2 v2) throws Exception;

324

}

325

326

@FunctionalInterface

327

public interface PairFunction<T, K, V> extends Serializable {

328

Tuple2<K, V> call(T t) throws Exception;

329

}

330

331

@FunctionalInterface

332

public interface FlatMapFunction<T, R> extends Serializable {

333

Iterator<R> call(T t) throws Exception;

334

}

335

336

@FunctionalInterface

337

public interface PairFlatMapFunction<T, K, V> extends Serializable {

338

Iterator<Tuple2<K, V>> call(T t) throws Exception;

339

}

340

341

@FunctionalInterface

342

public interface DoubleFunction<T> extends Serializable {

343

double call(T t) throws Exception;

344

}

345

346

@FunctionalInterface

347

public interface DoubleFlatMapFunction<T> extends Serializable {

348

Iterator<Double> call(T t) throws Exception;

349

}

350

351

@FunctionalInterface

352

public interface FilterFunction<T> extends Serializable {

353

boolean call(T t) throws Exception;

354

}

355

356

@FunctionalInterface

357

public interface ForeachFunction<T> extends Serializable {

358

void call(T t) throws Exception;

359

}

360

361

@FunctionalInterface

362

public interface ForeachPartitionFunction<T> extends Serializable {

363

void call(Iterator<T> t) throws Exception;

364

}

365

366

@FunctionalInterface

367

public interface ReduceFunction<T> extends Serializable {

368

T call(T v1, T v2) throws Exception;

369

}

370

371

@FunctionalInterface

372

public interface MapFunction<T, R> extends Serializable {

373

R call(T value) throws Exception;

374

}

375

376

@FunctionalInterface

377

public interface MapPartitionsFunction<T, R> extends Serializable {

378

Iterator<R> call(Iterator<T> input) throws Exception;

379

}

380

381

@FunctionalInterface

382

public interface CoGroupFunction<K, V, W, R> extends Serializable {

383

Iterator<R> call(Tuple2<K, Tuple2<Iterable<V>, Iterable<W>>> t) throws Exception;

384

}

385

386

@FunctionalInterface

387

public interface FlatMapFunction2<T1, T2, R> extends Serializable {

388

Iterator<R> call(T1 t1, T2 t2) throws Exception;

389

}

390

391

@FunctionalInterface

392

public interface MapGroupsFunction<K, V, R> extends Serializable {

393

R call(K key, Iterator<V> values) throws Exception;

394

}

395

396

@FunctionalInterface

397

public interface FlatMapGroupsFunction<K, V, R> extends Serializable {

398

Iterator<R> call(K key, Iterator<V> values) throws Exception;

399

}

400

```

401

402

## Usage Examples

403

404

### Basic Application Structure

405

406

```java

407

import org.apache.spark.api.java.JavaSparkContext;

408

import org.apache.spark.api.java.JavaRDD;

409

import org.apache.spark.api.java.JavaPairRDD;

410

import org.apache.spark.SparkConf;

411

import org.apache.spark.api.java.function.*;

412

import scala.Tuple2;

413

import java.util.*;

414

415

public class SparkJavaExample {

416

public static void main(String[] args) {

417

// Configure Spark

418

SparkConf conf = new SparkConf()

419

.setAppName("Java Spark Example")

420

.setMaster("local[*]");

421

422

// Create Spark context

423

JavaSparkContext jsc = new JavaSparkContext(conf);

424

425

try {

426

// Your Spark code here

427

processData(jsc);

428

} finally {

429

// Always stop the context

430

jsc.stop();

431

}

432

}

433

434

private static void processData(JavaSparkContext jsc) {

435

// Create RDD from collection

436

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

437

JavaRDD<Integer> numbersRDD = jsc.parallelize(numbers);

438

439

// Transform and collect

440

List<Integer> evenNumbers = numbersRDD

441

.filter(n -> n % 2 == 0)

442

.collect();

443

444

System.out.println("Even numbers: " + evenNumbers);

445

}

446

}

447

```

448

449

### Word Count Example

450

451

```java

452

public class WordCount {

453

public static void main(String[] args) {

454

SparkConf conf = new SparkConf().setAppName("Word Count").setMaster("local[*]");

455

JavaSparkContext jsc = new JavaSparkContext(conf);

456

457

try {

458

// Read text file

459

JavaRDD<String> lines = jsc.textFile("input.txt");

460

461

// Split lines into words and count

462

JavaPairRDD<String, Integer> wordCounts = lines

463

.flatMap(line -> Arrays.asList(line.split(" ")).iterator())

464

.filter(word -> !word.isEmpty())

465

.mapToPair(word -> new Tuple2<>(word.toLowerCase(), 1))

466

.reduceByKey((a, b) -> a + b);

467

468

// Sort by count descending

469

JavaPairRDD<String, Integer> sortedCounts = wordCounts

470

.mapToPair(tuple -> new Tuple2<>(tuple._2, tuple._1)) // Swap key-value

471

.sortByKey(false) // Sort descending

472

.mapToPair(tuple -> new Tuple2<>(tuple._2, tuple._1)); // Swap back

473

474

// Collect and print results

475

List<Tuple2<String, Integer>> results = sortedCounts.take(10);

476

for (Tuple2<String, Integer> result : results) {

477

System.out.println(result._1 + ": " + result._2);

478

}

479

480

} finally {

481

jsc.stop();

482

}

483

}

484

}

485

```

486

487

### Complex Aggregation Example

488

489

```java

490

public class SalesAnalysis {

491

public static class SaleRecord implements Serializable {

492

public final String product;

493

public final String region;

494

public final double amount;

495

public final int quantity;

496

497

public SaleRecord(String product, String region, double amount, int quantity) {

498

this.product = product;

499

this.region = region;

500

this.amount = amount;

501

this.quantity = quantity;

502

}

503

}

504

505

public static class SalesStats implements Serializable {

506

public final int count;

507

public final double totalAmount;

508

public final int totalQuantity;

509

510

public SalesStats(int count, double totalAmount, int totalQuantity) {

511

this.count = count;

512

this.totalAmount = totalAmount;

513

this.totalQuantity = totalQuantity;

514

}

515

516

public SalesStats combine(SalesStats other) {

517

return new SalesStats(

518

this.count + other.count,

519

this.totalAmount + other.totalAmount,

520

this.totalQuantity + other.totalQuantity

521

);

522

}

523

524

public double averageAmount() {

525

return totalAmount / count;

526

}

527

}

528

529

public static void main(String[] args) {

530

SparkConf conf = new SparkConf().setAppName("Sales Analysis").setMaster("local[*]");

531

JavaSparkContext jsc = new JavaSparkContext(conf);

532

533

try {

534

// Create sample sales data

535

List<SaleRecord> salesData = Arrays.asList(

536

new SaleRecord("Laptop", "North", 999.99, 1),

537

new SaleRecord("Mouse", "North", 29.99, 3),

538

new SaleRecord("Laptop", "South", 899.99, 2),

539

new SaleRecord("Keyboard", "North", 79.99, 1),

540

new SaleRecord("Mouse", "South", 24.99, 5)

541

);

542

543

JavaRDD<SaleRecord> salesRDD = jsc.parallelize(salesData);

544

545

// Aggregate sales by product

546

JavaPairRDD<String, SalesStats> productStats = salesRDD

547

.mapToPair(sale -> new Tuple2<>(sale.product,

548

new SalesStats(1, sale.amount * sale.quantity, sale.quantity)))

549

.reduceByKey((stats1, stats2) -> stats1.combine(stats2));

550

551

// Collect and display results

552

Map<String, SalesStats> results = productStats.collectAsMap();

553

results.forEach((product, stats) -> {

554

System.out.printf("%s: Total Sales=%.2f, Average=%.2f, Units=%d%n",

555

product, stats.totalAmount, stats.averageAmount(), stats.totalQuantity);

556

});

557

558

} finally {

559

jsc.stop();

560

}

561

}

562

}

563

```

564

565

### Join Operations Example

566

567

```java

568

public class CustomerOrderAnalysis {

569

public static void main(String[] args) {

570

SparkConf conf = new SparkConf().setAppName("Customer Order Analysis").setMaster("local[*]");

571

JavaSparkContext jsc = new JavaSparkContext(conf);

572

573

try {

574

// Customer data

575

List<Tuple2<Integer, String>> customerData = Arrays.asList(

576

new Tuple2<>(1, "Alice Johnson"),

577

new Tuple2<>(2, "Bob Smith"),

578

new Tuple2<>(3, "Charlie Brown"),

579

new Tuple2<>(4, "Diana Prince")

580

);

581

JavaPairRDD<Integer, String> customers = jsc.parallelizePairs(customerData);

582

583

// Order data

584

List<Tuple2<Integer, Double>> orderData = Arrays.asList(

585

new Tuple2<>(1, 100.50),

586

new Tuple2<>(1, 75.25),

587

new Tuple2<>(2, 200.00),

588

new Tuple2<>(3, 50.75),

589

new Tuple2<>(1, 125.00),

590

new Tuple2<>(5, 300.00) // Customer 5 doesn't exist

591

);

592

JavaPairRDD<Integer, Double> orders = jsc.parallelizePairs(orderData);

593

594

// Inner join - customers with orders

595

JavaPairRDD<Integer, Tuple2<String, Double>> customerOrders = customers.join(orders);

596

System.out.println("Customer Orders (Inner Join):");

597

customerOrders.collect().forEach(entry -> {

598

int customerId = entry._1;

599

String customerName = entry._2._1;

600

double orderAmount = entry._2._2;

601

System.out.printf("Customer %d (%s): $%.2f%n", customerId, customerName, orderAmount);

602

});

603

604

// Left outer join - all customers, orders if they exist

605

JavaPairRDD<Integer, Tuple2<String, Optional<Double>>> allCustomers =

606

customers.leftOuterJoin(orders);

607

608

// Aggregate total orders per customer

609

JavaPairRDD<Integer, Tuple2<String, Double>> customerTotals = allCustomers

610

.mapValues(tuple -> {

611

String name = tuple._1;

612

double total = tuple._2.isPresent() ? tuple._2.get() : 0.0;

613

return new Tuple2<>(name, total);

614

})

615

.reduceByKey((tuple1, tuple2) -> new Tuple2<>(tuple1._1, tuple1._2 + tuple2._2));

616

617

System.out.println("\nCustomer Totals:");

618

customerTotals.collect().forEach(entry -> {

619

int customerId = entry._1;

620

String customerName = entry._2._1;

621

double totalAmount = entry._2._2;

622

System.out.printf("Customer %d (%s): Total $%.2f%n", customerId, customerName, totalAmount);

623

});

624

625

} finally {

626

jsc.stop();

627

}

628

}

629

}

630

```

631

632

### Using Broadcast Variables and Accumulators

633

634

```java

635

public class BroadcastAccumulatorExample {

636

public static void main(String[] args) {

637

SparkConf conf = new SparkConf().setAppName("Broadcast Accumulator Example").setMaster("local[*]");

638

JavaSparkContext jsc = new JavaSparkContext(conf);

639

640

try {

641

// Create lookup table to broadcast

642

Map<String, String> categoryLookup = new HashMap<>();

643

categoryLookup.put("TECH", "Technology");

644

categoryLookup.put("BOOK", "Books");

645

categoryLookup.put("HOME", "Home & Garden");

646

647

org.apache.spark.broadcast.Broadcast<Map<String, String>> broadcastLookup =

648

jsc.broadcast(categoryLookup);

649

650

// Create accumulators for metrics

651

org.apache.spark.util.LongAccumulator processedCount = jsc.sc().longAccumulator("Processed Items");

652

org.apache.spark.util.LongAccumulator errorCount = jsc.sc().longAccumulator("Error Count");

653

654

// Sample data

655

List<String> products = Arrays.asList(

656

"TECH:Laptop:999.99",

657

"BOOK:Java Programming:49.99",

658

"HOME:Garden Hose:29.99",

659

"INVALID:Bad Data",

660

"TECH:Smartphone:699.99"

661

);

662

663

JavaRDD<String> productsRDD = jsc.parallelize(products);

664

665

// Process data using broadcast and accumulators

666

JavaRDD<String> processedProducts = productsRDD.map(product -> {

667

try {

668

String[] parts = product.split(":");

669

if (parts.length >= 3) {

670

String categoryCode = parts[0];

671

String productName = parts[1];

672

String price = parts[2];

673

674

String categoryName = broadcastLookup.value().getOrDefault(categoryCode, "Unknown");

675

processedCount.add(1);

676

677

return String.format("%s - %s: %s", categoryName, productName, price);

678

} else {

679

errorCount.add(1);

680

return "ERROR: Invalid product format - " + product;

681

}

682

} catch (Exception e) {

683

errorCount.add(1);

684

return "ERROR: Processing failed - " + product;

685

}

686

});

687

688

// Trigger computation

689

List<String> results = processedProducts.collect();

690

691

// Display results

692

System.out.println("Processed Products:");

693

results.forEach(System.out::println);

694

695

// Display metrics

696

System.out.println("\nMetrics:");

697

System.out.println("Processed items: " + processedCount.value());

698

System.out.println("Error count: " + errorCount.value());

699

700

} finally {

701

jsc.stop();

702

}

703

}

704

}

705

```

706

707

## Best Practices for Java API

708

709

### Lambda Expressions vs Anonymous Classes

710

711

```java

712

// Prefer lambda expressions (Java 8+)

713

JavaRDD<String> upperCase = textRDD.map(s -> s.toUpperCase());

714

715

// Instead of anonymous classes

716

JavaRDD<String> upperCaseOld = textRDD.map(new Function<String, String>() {

717

@Override

718

public String call(String s) {

719

return s.toUpperCase();

720

}

721

});

722

```

723

724

### Serialization Considerations

725

726

```java

727

// Ensure all objects used in transformations are Serializable

728

public class ProcessingUtils implements Serializable {

729

private final String prefix;

730

731

public ProcessingUtils(String prefix) {

732

this.prefix = prefix;

733

}

734

735

public String process(String input) {

736

return prefix + ": " + input.toUpperCase();

737

}

738

}

739

740

// Use in transformations

741

ProcessingUtils utils = new ProcessingUtils("PROCESSED");

742

JavaRDD<String> processed = inputRDD.map(utils::process);

743

```

744

745

### Memory Management

746

747

```java

748

// Persist expensive computations

749

JavaRDD<ComplexObject> expensiveRDD = inputRDD

750

.map(this::expensiveTransformation)

751

.filter(obj -> obj.isValid())

752

.persist(StorageLevel.MEMORY_AND_DISK_SER());

753

754

// Use the persisted RDD multiple times

755

long count = expensiveRDD.count();

756

List<ComplexObject> sample = expensiveRDD.take(10);

757

758

// Clean up when done

759

expensiveRDD.unpersist();

760

```