or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

broadcasting-accumulators.mdindex.mdjava-api.mdrdd-operations.mdspark-context.mdstorage-persistence.md

java-api.mddocs/

0

# Java API

1

2

The Spark Java API provides Java-friendly wrappers around Spark's core functionality, using Java collections and functional interfaces to enable seamless integration with Java applications.

3

4

## JavaSparkContext

5

6

The Java-friendly version of SparkContext that returns JavaRDDs and works with Java collections.

7

8

### Constructors

9

10

```java { .api }

11

public class JavaSparkContext implements Closeable {

12

public JavaSparkContext()

13

public JavaSparkContext(SparkConf conf)

14

public JavaSparkContext(String master, String appName)

15

public JavaSparkContext(String master, String appName, SparkConf conf)

16

public JavaSparkContext(String master, String appName, String sparkHome, String jarFile)

17

public JavaSparkContext(String master, String appName, String sparkHome, String[] jars)

18

}

19

```

20

21

### Context Access

22

23

```java { .api }

24

public class JavaSparkContext {

25

public SparkContext sc()

26

public void close()

27

}

28

```

29

30

### Java RDD Creation

31

32

```java { .api }

33

public class JavaSparkContext {

34

public <T> JavaRDD<T> parallelize(List<T> list)

35

public <T> JavaRDD<T> parallelize(List<T> list, int numSlices)

36

37

public <K, V> JavaPairRDD<K, V> parallelizePairs(List<Tuple2<K, V>> list)

38

public <K, V> JavaPairRDD<K, V> parallelizePairs(List<Tuple2<K, V>> list, int numSlices)

39

40

public JavaDoubleRDD parallelizeDoubles(List<Double> list)

41

public JavaDoubleRDD parallelizeDoubles(List<Double> list, int numSlices)

42

43

public <T> JavaRDD<T> emptyRDD()

44

public <T> JavaRDD<T> union(JavaRDD<T> first, JavaRDD<T>... rest)

45

public <T> JavaRDD<T> union(List<JavaRDD<T>> rdds)

46

}

47

```

48

49

### File I/O (Java-friendly)

50

51

```java { .api }

52

public class JavaSparkContext {

53

public JavaRDD<String> textFile(String path)

54

public JavaRDD<String> textFile(String path, int minPartitions)

55

56

public JavaPairRDD<String, String> wholeTextFiles(String path)

57

public JavaPairRDD<String, String> wholeTextFiles(String path, int minPartitions)

58

59

public JavaPairRDD<String, PortableDataStream> binaryFiles(String path)

60

public JavaPairRDD<String, PortableDataStream> binaryFiles(String path, int minPartitions)

61

62

public JavaRDD<byte[]> binaryRecords(String path, int recordLength)

63

64

public <T> JavaRDD<T> objectFile(String path)

65

public <T> JavaRDD<T> objectFile(String path, int minPartitions)

66

}

67

```

68

69

### Hadoop Integration (Java)

70

71

```java { .api }

72

public class JavaSparkContext {

73

public <K, V> JavaPairRDD<K, V> hadoopRDD(

74

JobConf conf,

75

Class<? extends InputFormat<K, V>> inputFormatClass,

76

Class<K> keyClass,

77

Class<V> valueClass

78

)

79

80

public <K, V> JavaPairRDD<K, V> hadoopRDD(

81

JobConf conf,

82

Class<? extends InputFormat<K, V>> inputFormatClass,

83

Class<K> keyClass,

84

Class<V> valueClass,

85

int minPartitions

86

)

87

88

public <K, V> JavaPairRDD<K, V> hadoopFile(

89

String path,

90

Class<? extends InputFormat<K, V>> inputFormatClass,

91

Class<K> keyClass,

92

Class<V> valueClass

93

)

94

95

public <K, V> JavaPairRDD<K, V> hadoopFile(

96

String path,

97

Class<? extends InputFormat<K, V>> inputFormatClass,

98

Class<K> keyClass,

99

Class<V> valueClass,

100

int minPartitions

101

)

102

103

public <K, V, F extends NewInputFormat<K, V>> JavaPairRDD<K, V> newAPIHadoopFile(

104

String path,

105

Class<F> fClass,

106

Class<K> kClass,

107

Class<V> vClass

108

)

109

110

public <K, V, F extends NewInputFormat<K, V>> JavaPairRDD<K, V> newAPIHadoopRDD(

111

Configuration conf,

112

Class<F> fClass,

113

Class<K> kClass,

114

Class<V> vClass

115

)

116

117

public <K, V> JavaPairRDD<K, V> sequenceFile(

118

String path,

119

Class<K> keyClass,

120

Class<V> valueClass

121

)

122

123

public <K, V> JavaPairRDD<K, V> sequenceFile(

124

String path,

125

Class<K> keyClass,

126

Class<V> valueClass,

127

int minPartitions

128

)

129

}

130

```

131

132

### Shared Variables (Java)

133

134

```java { .api }

135

public class JavaSparkContext {

136

public <T> Broadcast<T> broadcast(T value)

137

138

public LongAccumulator longAccumulator()

139

public LongAccumulator longAccumulator(String name)

140

141

public DoubleAccumulator doubleAccumulator()

142

public DoubleAccumulator doubleAccumulator(String name)

143

144

public <T> CollectionAccumulator<T> collectionAccumulator()

145

}

146

```

147

148

## JavaRDD

149

150

Java-friendly wrapper around RDD that provides Java-compatible method signatures.

151

152

### Transformations (Java-friendly)

153

154

```java { .api }

155

public class JavaRDD<T> {

156

public <R> JavaRDD<R> map(Function<T, R> f)

157

public <U> JavaRDD<U> flatMap(FlatMapFunction<T, U> f)

158

public JavaRDD<T> filter(Function<T, Boolean> f)

159

160

public JavaRDD<T> distinct()

161

public JavaRDD<T> distinct(int numPartitions)

162

163

public JavaRDD<T> sample(boolean withReplacement, double fraction)

164

public JavaRDD<T> sample(boolean withReplacement, double fraction, long seed)

165

166

public JavaRDD<T>[] randomSplit(double[] weights)

167

public JavaRDD<T>[] randomSplit(double[] weights, long seed)

168

169

public JavaRDD<T> union(JavaRDD<T> other)

170

public JavaRDD<T> intersection(JavaRDD<T> other)

171

public JavaRDD<T> subtract(JavaRDD<T> other)

172

173

public <U> JavaPairRDD<T, U> cartesian(JavaRDD<U> other)

174

}

175

```

176

177

### Partition Operations (Java)

178

179

```java { .api }

180

public class JavaRDD<T> {

181

public <U> JavaRDD<U> mapPartitions(FlatMapFunction<Iterator<T>, U> f)

182

public <U> JavaRDD<U> mapPartitions(FlatMapFunction<Iterator<T>, U> f, boolean preservesPartitioning)

183

184

public <U> JavaRDD<U> mapPartitionsWithIndex(

185

Function2<Integer, Iterator<T>, Iterator<U>> f,

186

boolean preservesPartitioning

187

)

188

189

public void foreachPartition(VoidFunction<Iterator<T>> f)

190

public JavaRDD<List<T>> glom()

191

}

192

```

193

194

### Grouping & Sorting (Java)

195

196

```java { .api }

197

public class JavaRDD<T> {

198

public <K> JavaPairRDD<K, Iterable<T>> groupBy(Function<T, K> f)

199

public <K> JavaPairRDD<K, Iterable<T>> groupBy(Function<T, K> f, int numPartitions)

200

201

public <S> JavaRDD<T> sortBy(Function<T, S> f, boolean ascending, int numPartitions)

202

}

203

```

204

205

### Pairing Operations (Java)

206

207

```java { .api }

208

public class JavaRDD<T> {

209

public <U> JavaPairRDD<T, U> zip(JavaRDD<U> other)

210

public JavaPairRDD<T, Long> zipWithIndex()

211

public JavaPairRDD<T, Long> zipWithUniqueId()

212

public <K> JavaPairRDD<K, T> keyBy(Function<T, K> f)

213

}

214

```

215

216

### Actions (Java)

217

218

```java { .api }

219

public class JavaRDD<T> {

220

public List<T> collect()

221

public <U> List<U> collect(Function<T, U> f)

222

223

public T reduce(Function2<T, T, T> f)

224

public T fold(T zeroValue, Function2<T, T, T> f)

225

public <U> U aggregate(U zeroValue, Function2<U, T, U> seqFunc, Function2<U, U, U> combFunc)

226

227

public long count()

228

public Map<T, Long> countByValue()

229

230

public List<T> take(int num)

231

public List<T> takeSample(boolean withReplacement, int num)

232

public List<T> takeSample(boolean withReplacement, int num, long seed)

233

234

public List<T> takeOrdered(int num)

235

public List<T> takeOrdered(int num, Comparator<T> comp)

236

237

public List<T> top(int num)

238

public List<T> top(int num, Comparator<T> comp)

239

240

public T first()

241

public boolean isEmpty()

242

243

public void foreach(VoidFunction<T> f)

244

}

245

```

246

247

### Persistence (Java)

248

249

```java { .api }

250

public class JavaRDD<T> {

251

public JavaRDD<T> cache()

252

public JavaRDD<T> persist(StorageLevel newLevel)

253

public JavaRDD<T> unpersist()

254

public JavaRDD<T> unpersist(boolean blocking)

255

}

256

```

257

258

### Repartitioning (Java)

259

260

```java { .api }

261

public class JavaRDD<T> {

262

public JavaRDD<T> repartition(int numPartitions)

263

public JavaRDD<T> coalesce(int numPartitions)

264

public JavaRDD<T> coalesce(int numPartitions, boolean shuffle)

265

}

266

```

267

268

### Output (Java)

269

270

```java { .api }

271

public class JavaRDD<T> {

272

public void saveAsTextFile(String path)

273

public void saveAsObjectFile(String path)

274

}

275

```

276

277

### Conversion & Access

278

279

```java { .api }

280

public class JavaRDD<T> {

281

public RDD<T> rdd()

282

public Object[] toArray()

283

}

284

```

285

286

## JavaPairRDD

287

288

Java-friendly version of key-value pair RDD with operations specific to (K, V) pairs.

289

290

### Key-Value Transformations (Java)

291

292

```java { .api }

293

public class JavaPairRDD<K, V> {

294

public <U> JavaPairRDD<K, U> mapValues(Function<V, U> f)

295

public <U> JavaPairRDD<K, U> flatMapValues(Function<V, Iterable<U>> f)

296

public <K2, V2> JavaPairRDD<K2, V2> mapToPair(PairFunction<Tuple2<K, V>, K2, V2> f)

297

298

public JavaRDD<K> keys()

299

public JavaRDD<V> values()

300

}

301

```

302

303

### Grouping Operations (Java)

304

305

```java { .api }

306

public class JavaPairRDD<K, V> {

307

public JavaPairRDD<K, Iterable<V>> groupByKey()

308

public JavaPairRDD<K, Iterable<V>> groupByKey(int numPartitions)

309

public JavaPairRDD<K, Iterable<V>> groupByKey(Partitioner partitioner)

310

311

public JavaPairRDD<K, V> reduceByKey(Function2<V, V, V> func)

312

public JavaPairRDD<K, V> reduceByKey(Function2<V, V, V> func, int numPartitions)

313

public JavaPairRDD<K, V> reduceByKey(Partitioner partitioner, Function2<V, V, V> func)

314

315

public Map<K, V> reduceByKeyLocally(Function2<V, V, V> func)

316

public Map<K, Long> countByKey()

317

318

public <U> JavaPairRDD<K, U> aggregateByKey(

319

U zeroValue,

320

Function2<U, V, U> seqFunc,

321

Function2<U, U, U> combFunc

322

)

323

public <U> JavaPairRDD<K, U> aggregateByKey(

324

U zeroValue,

325

Partitioner partitioner,

326

Function2<U, V, U> seqFunc,

327

Function2<U, U, U> combFunc

328

)

329

public <U> JavaPairRDD<K, U> aggregateByKey(

330

U zeroValue,

331

int numPartitions,

332

Function2<U, V, U> seqFunc,

333

Function2<U, U, U> combFunc

334

)

335

336

public JavaPairRDD<K, V> foldByKey(V zeroValue, Function2<V, V, V> func)

337

public JavaPairRDD<K, V> foldByKey(V zeroValue, int numPartitions, Function2<V, V, V> func)

338

public JavaPairRDD<K, V> foldByKey(V zeroValue, Partitioner partitioner, Function2<V, V, V> func)

339

340

public <C> JavaPairRDD<K, C> combineByKey(

341

Function<V, C> createCombiner,

342

Function2<C, V, C> mergeValue,

343

Function2<C, C, C> mergeCombiners

344

)

345

public <C> JavaPairRDD<K, C> combineByKey(

346

Function<V, C> createCombiner,

347

Function2<C, V, C> mergeValue,

348

Function2<C, C, C> mergeCombiners,

349

int numPartitions

350

)

351

public <C> JavaPairRDD<K, C> combineByKey(

352

Function<V, C> createCombiner,

353

Function2<C, V, C> mergeValue,

354

Function2<C, C, C> mergeCombiners,

355

Partitioner partitioner

356

)

357

}

358

```

359

360

### Join Operations (Java)

361

362

```java { .api }

363

public class JavaPairRDD<K, V> {

364

public <W> JavaPairRDD<K, Tuple2<V, W>> join(JavaPairRDD<K, W> other)

365

public <W> JavaPairRDD<K, Tuple2<V, W>> join(JavaPairRDD<K, W> other, int numPartitions)

366

public <W> JavaPairRDD<K, Tuple2<V, W>> join(JavaPairRDD<K, W> other, Partitioner partitioner)

367

368

public <W> JavaPairRDD<K, Tuple2<V, Optional<W>>> leftOuterJoin(JavaPairRDD<K, W> other)

369

public <W> JavaPairRDD<K, Tuple2<V, Optional<W>>> leftOuterJoin(JavaPairRDD<K, W> other, int numPartitions)

370

public <W> JavaPairRDD<K, Tuple2<V, Optional<W>>> leftOuterJoin(JavaPairRDD<K, W> other, Partitioner partitioner)

371

372

public <W> JavaPairRDD<K, Tuple2<Optional<V>, W>> rightOuterJoin(JavaPairRDD<K, W> other)

373

public <W> JavaPairRDD<K, Tuple2<Optional<V>, W>> rightOuterJoin(JavaPairRDD<K, W> other, int numPartitions)

374

public <W> JavaPairRDD<K, Tuple2<Optional<V>, W>> rightOuterJoin(JavaPairRDD<K, W> other, Partitioner partitioner)

375

376

public <W> JavaPairRDD<K, Tuple2<Optional<V>, Optional<W>>> fullOuterJoin(JavaPairRDD<K, W> other)

377

public <W> JavaPairRDD<K, Tuple2<Optional<V>, Optional<W>>> fullOuterJoin(JavaPairRDD<K, W> other, int numPartitions)

378

public <W> JavaPairRDD<K, Tuple2<Optional<V>, Optional<W>>> fullOuterJoin(JavaPairRDD<K, W> other, Partitioner partitioner)

379

380

public <W> JavaPairRDD<K, Tuple2<Iterable<V>, Iterable<W>>> cogroup(JavaPairRDD<K, W> other)

381

public <W> JavaPairRDD<K, Tuple2<Iterable<V>, Iterable<W>>> cogroup(JavaPairRDD<K, W> other, int numPartitions)

382

public <W> JavaPairRDD<K, Tuple2<Iterable<V>, Iterable<W>>> cogroup(JavaPairRDD<K, W> other, Partitioner partitioner)

383

384

public <W1, W2> JavaPairRDD<K, Tuple3<Iterable<V>, Iterable<W1>, Iterable<W2>>> cogroup(

385

JavaPairRDD<K, W1> other1,

386

JavaPairRDD<K, W2> other2

387

)

388

389

public <W1, W2, W3> JavaPairRDD<K, Tuple4<Iterable<V>, Iterable<W1>, Iterable<W2>, Iterable<W3>>> cogroup(

390

JavaPairRDD<K, W1> other1,

391

JavaPairRDD<K, W2> other2,

392

JavaPairRDD<K, W3> other3

393

)

394

}

395

```

396

397

### Set Operations (Java)

398

399

```java { .api }

400

public class JavaPairRDD<K, V> {

401

public <W> JavaPairRDD<K, V> subtractByKey(JavaPairRDD<K, W> other)

402

public <W> JavaPairRDD<K, V> subtractByKey(JavaPairRDD<K, W> other, int numPartitions)

403

public <W> JavaPairRDD<K, V> subtractByKey(JavaPairRDD<K, W> other, Partitioner partitioner)

404

}

405

```

406

407

### Sorting Operations (Java)

408

409

```java { .api }

410

public class JavaPairRDD<K, V> {

411

public JavaPairRDD<K, V> sortByKey()

412

public JavaPairRDD<K, V> sortByKey(boolean ascending)

413

public JavaPairRDD<K, V> sortByKey(boolean ascending, int numPartitions)

414

415

public JavaPairRDD<K, V> sortByKey(Comparator<K> comp)

416

public JavaPairRDD<K, V> sortByKey(Comparator<K> comp, boolean ascending)

417

public JavaPairRDD<K, V> sortByKey(Comparator<K> comp, boolean ascending, int numPartitions)

418

}

419

```

420

421

### Partitioning Operations (Java)

422

423

```java { .api }

424

public class JavaPairRDD<K, V> {

425

public JavaPairRDD<K, V> partitionBy(Partitioner partitioner)

426

public JavaPairRDD<K, V> repartitionAndSortWithinPartitions(Partitioner partitioner)

427

public JavaPairRDD<K, V> repartitionAndSortWithinPartitions(

428

Partitioner partitioner,

429

Comparator<K> comp

430

)

431

}

432

```

433

434

### Lookup Operations (Java)

435

436

```java { .api }

437

public class JavaPairRDD<K, V> {

438

public List<V> lookup(K key)

439

public Map<K, V> collectAsMap()

440

}

441

```

442

443

### Output Operations (Java)

444

445

```java { .api }

446

public class JavaPairRDD<K, V> {

447

public <F extends OutputFormat<?, ?>> void saveAsHadoopFile(

448

String path,

449

Class<?> keyClass,

450

Class<?> valueClass,

451

Class<F> outputFormatClass

452

)

453

454

public <F extends OutputFormat<?, ?>> void saveAsHadoopFile(

455

String path,

456

Class<?> keyClass,

457

Class<?> valueClass,

458

Class<F> outputFormatClass,

459

JobConf conf

460

)

461

462

public void saveAsHadoopDataset(JobConf conf)

463

464

public <F extends NewOutputFormat<?, ?>> void saveAsNewAPIHadoopFile(

465

String path,

466

Class<?> keyClass,

467

Class<?> valueClass,

468

Class<F> outputFormatClass

469

)

470

471

public <F extends NewOutputFormat<?, ?>> void saveAsNewAPIHadoopFile(

472

String path,

473

Class<?> keyClass,

474

Class<?> valueClass,

475

Class<F> outputFormatClass,

476

Configuration conf

477

)

478

479

public void saveAsNewAPIHadoopDataset(Configuration conf)

480

}

481

```

482

483

### Conversion

484

485

```java { .api }

486

public class JavaPairRDD<K, V> {

487

public RDD<Tuple2<K, V>> rdd()

488

public JavaRDD<Tuple2<K, V>> toJavaRDD()

489

}

490

```

491

492

## JavaDoubleRDD

493

494

Specialized RDD for double values with numeric operations.

495

496

```java { .api }

497

public class JavaDoubleRDD {

498

public double sum()

499

public StatCounter stats()

500

public double mean()

501

public double variance()

502

public double stdev()

503

public double sampleStdev()

504

public double sampleVariance()

505

506

public long[] histogram(double[] buckets)

507

public Tuple2<double[], long[]> histogram(int buckets)

508

}

509

```

510

511

## Function Interfaces

512

513

The Java API uses functional interfaces from the `org.apache.spark.api.java.function` package.

514

515

### Basic Function Types

516

517

```java { .api }

518

// Single argument function

519

@FunctionalInterface

520

public interface Function<T, R> extends Serializable {

521

R call(T t) throws Exception;

522

}

523

524

// Two argument function

525

@FunctionalInterface

526

public interface Function2<T1, T2, R> extends Serializable {

527

R call(T1 t1, T2 t2) throws Exception;

528

}

529

530

// Void function (for actions)

531

@FunctionalInterface

532

public interface VoidFunction<T> extends Serializable {

533

void call(T t) throws Exception;

534

}

535

536

// Flat map function (returns Iterable)

537

@FunctionalInterface

538

public interface FlatMapFunction<T, R> extends Serializable {

539

Iterator<R> call(T t) throws Exception;

540

}

541

542

// Pair function (returns Tuple2)

543

@FunctionalInterface

544

public interface PairFunction<T, K, V> extends Serializable {

545

Tuple2<K, V> call(T t) throws Exception;

546

}

547

```

548

549

### Specialized Function Types

550

551

```java { .api }

552

// Double function

553

@FunctionalInterface

554

public interface DoubleFunction<T> extends Serializable {

555

double call(T t) throws Exception;

556

}

557

558

// Pair flat map function

559

@FunctionalInterface

560

public interface PairFlatMapFunction<T, K, V> extends Serializable {

561

Iterator<Tuple2<K, V>> call(T t) throws Exception;

562

}

563

564

// Double flat map function

565

@FunctionalInterface

566

public interface DoubleFlatMapFunction<T> extends Serializable {

567

Iterator<Double> call(T t) throws Exception;

568

}

569

```

570

571

## Usage Examples

572

573

### Basic Java API Usage

574

575

```java

576

import org.apache.spark.SparkConf;

577

import org.apache.spark.api.java.JavaSparkContext;

578

import org.apache.spark.api.java.JavaRDD;

579

import java.util.Arrays;

580

import java.util.List;

581

582

public class SparkJavaExample {

583

public static void main(String[] args) {

584

SparkConf conf = new SparkConf()

585

.setAppName("Java Spark Example")

586

.setMaster("local[*]");

587

588

try (JavaSparkContext sc = new JavaSparkContext(conf)) {

589

// Create RDD from Java collection

590

List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);

591

JavaRDD<Integer> rdd = sc.parallelize(data);

592

593

// Transform and collect

594

JavaRDD<Integer> filtered = rdd.filter(x -> x % 2 == 0);

595

JavaRDD<Integer> doubled = filtered.map(x -> x * 2);

596

597

List<Integer> result = doubled.collect();

598

System.out.println(result); // [4, 8]

599

}

600

}

601

}

602

```

603

604

### Working with Key-Value Pairs

605

606

```java

607

import org.apache.spark.api.java.JavaPairRDD;

608

import scala.Tuple2;

609

import java.util.Arrays;

610

import java.util.List;

611

import java.util.Map;

612

613

// Create pair RDD from tuples

614

List<Tuple2<String, Integer>> pairs = Arrays.asList(

615

new Tuple2<>("apple", 1),

616

new Tuple2<>("banana", 2),

617

new Tuple2<>("apple", 3)

618

);

619

620

JavaPairRDD<String, Integer> pairRDD = sc.parallelizePairs(pairs);

621

622

// Group by key

623

JavaPairRDD<String, Iterable<Integer>> grouped = pairRDD.groupByKey();

624

625

// Reduce by key

626

JavaPairRDD<String, Integer> totals = pairRDD.reduceByKey(Integer::sum);

627

628

// Collect as map

629

Map<String, Integer> resultMap = totals.collectAsMap();

630

```

631

632

### Lambda Expressions (Java 8+)

633

634

```java

635

// Using lambda expressions for cleaner code

636

JavaRDD<String> lines = sc.textFile("input.txt");

637

638

JavaRDD<String> words = lines.flatMap(line ->

639

Arrays.asList(line.split(" ")).iterator()

640

);

641

642

JavaPairRDD<String, Integer> wordCounts = words

643

.mapToPair(word -> new Tuple2<>(word, 1))

644

.reduceByKey(Integer::sum);

645

646

// Sort by count (descending)

647

JavaPairRDD<String, Integer> sorted = wordCounts

648

.mapToPair(tuple -> new Tuple2<>(tuple._2, tuple._1))

649

.sortByKey(false)

650

.mapToPair(tuple -> new Tuple2<>(tuple._2, tuple._1));

651

652

List<Tuple2<String, Integer>> top10 = sorted.take(10);

653

```

654

655

### Advanced Aggregations

656

657

```java

658

import org.apache.spark.api.java.function.Function2;

659

660

// Custom aggregation with aggregateByKey

661

JavaPairRDD<String, Integer> sales = sc.parallelizePairs(Arrays.asList(

662

new Tuple2<>("store1", 100),

663

new Tuple2<>("store2", 200),

664

new Tuple2<>("store1", 150)

665

));

666

667

// Calculate sum and count for each store

668

class SalesSummary implements Serializable {

669

public int sum;

670

public int count;

671

672

public SalesSummary(int sum, int count) {

673

this.sum = sum;

674

this.count = count;

675

}

676

677

public double average() {

678

return (double) sum / count;

679

}

680

}

681

682

JavaPairRDD<String, SalesSummary> summary = sales.aggregateByKey(

683

new SalesSummary(0, 0),

684

685

// Sequence function

686

(summary, sale) -> new SalesSummary(

687

summary.sum + sale,

688

summary.count + 1

689

),

690

691

// Combiner function

692

(sum1, sum2) -> new SalesSummary(

693

sum1.sum + sum2.sum,

694

sum1.count + sum2.count

695

)

696

);

697

```

698

699

### Working with Files

700

701

```java

702

// Read text file

703

JavaRDD<String> textFile = sc.textFile("hdfs://path/to/file");

704

705

// Read whole text files (returns filename and content)

706

JavaPairRDD<String, String> wholeFiles = sc.wholeTextFiles("hdfs://path/to/dir");

707

708

// Process each file

709

JavaPairRDD<String, Integer> lineCounts = wholeFiles.mapToPair(file ->

710

new Tuple2<>(file._1, file._2.split("\n").length)

711

);

712

713

// Save results

714

lineCounts.saveAsTextFile("hdfs://path/to/output");

715

```

716

717

### Exception Handling

718

719

```java

720

// Functions can throw exceptions - they're automatically wrapped

721

JavaRDD<Integer> numbers = sc.parallelize(Arrays.asList(1, 2, 0, 4, 5));

722

723

JavaRDD<Double> inverses = numbers.map(x -> {

724

if (x == 0) {

725

throw new IllegalArgumentException("Cannot divide by zero");

726

}

727

return 1.0 / x;

728

});

729

730

// Filter out problematic values instead

731

JavaRDD<Double> safeInverses = numbers

732

.filter(x -> x != 0)

733

.map(x -> 1.0 / x);

734

```

735

736

## Important Notes

737

738

- **All Function interfaces are from `org.apache.spark.api.java.function` package**

739

- **Functions must be Serializable** - avoid capturing non-serializable objects

740

- **Use `Tuple2`, `Tuple3`, etc. from Scala** for pair operations

741

- **Use `Optional` for nullable values** in joins and outer operations

742

- **JavaSparkContext implements Closeable** - use try-with-resources for automatic cleanup

743

- **Java collections are used throughout** - List, Map, Iterator instead of Scala collections

744

- **Lambda expressions (Java 8+) provide cleaner syntax** than anonymous inner classes

745

- **Method references can be used** where appropriate (e.g., `Integer::sum`)

746

- **All operations maintain the same semantics** as the Scala API but with Java-friendly types