or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

caching-persistence.mdcore-rdd.mddata-sources.mdgraphx.mdindex.mdjava-api.mdkey-value-operations.mdmllib.mdpython-api.mdspark-context.mdsql.mdstreaming.md

java-api.mddocs/

0

# Java API

1

2

Spark provides comprehensive Java APIs that mirror the Scala functionality while providing Java-friendly interfaces. The Java API includes JavaRDD, JavaPairRDD, and JavaDoubleRDD classes that offer type-safe operations for Java developers.

3

4

## JavaSparkContext

5

6

The Java-friendly version of SparkContext.

7

8

### JavaSparkContext Class

9

10

```java { .api }

11

public class JavaSparkContext {

12

// Constructors

13

public JavaSparkContext()

14

public JavaSparkContext(SparkConf conf)

15

public JavaSparkContext(String master, String appName)

16

public JavaSparkContext(String master, String appName, SparkConf conf)

17

public JavaSparkContext(String master, String appName, String sparkHome, String jarFile)

18

public JavaSparkContext(String master, String appName, String sparkHome, String[] jars)

19

public JavaSparkContext(String master, String appName, String sparkHome, String[] jars, Map<String, String> environment)

20

21

// Core properties

22

public SparkContext sc()

23

public String master()

24

public String appName()

25

public Boolean isLocal()

26

public Integer defaultParallelism()

27

public Integer defaultMinPartitions()

28

}

29

```

30

31

### Creating JavaSparkContext

32

33

```java

34

import org.apache.spark.SparkConf;

35

import org.apache.spark.api.java.JavaSparkContext;

36

37

// Basic creation with SparkConf

38

SparkConf conf = new SparkConf()

39

.setAppName("Java Spark App")

40

.setMaster("local[*]");

41

42

JavaSparkContext jsc = new JavaSparkContext(conf);

43

44

// Alternative constructors

45

JavaSparkContext jsc2 = new JavaSparkContext("local[*]", "My Java App");

46

47

// With all parameters

48

String[] jars = {"myapp.jar", "dependencies.jar"};

49

Map<String, String> env = new HashMap<>();

50

env.put("SPARK_ENV", "production");

51

52

JavaSparkContext jsc3 = new JavaSparkContext(

53

"local[*]", // master

54

"My Java App", // app name

55

"/path/to/spark", // spark home

56

jars, // jar files

57

env // environment

58

);

59

```

60

61

## JavaRDD

62

63

Java-friendly wrapper for RDD operations.

64

65

### JavaRDD Class

66

67

```java { .api }

68

public class JavaRDD<T> extends AbstractJavaRDDLike<T, JavaRDD<T>> {

69

// Transformations

70

public <U> JavaRDD<U> map(Function<T, U> f)

71

public <U> JavaRDD<U> flatMap(FlatMapFunction<T, U> f)

72

public JavaRDD<T> filter(Function<T, Boolean> f)

73

public JavaRDD<T> distinct()

74

public JavaRDD<T> distinct(int numPartitions)

75

public JavaRDD<T> sample(boolean withReplacement, double fraction)

76

public JavaRDD<T> sample(boolean withReplacement, double fraction, long seed)

77

public JavaRDD<T> union(JavaRDD<T> other)

78

public JavaRDD<T> intersection(JavaRDD<T> other)

79

public JavaRDD<T> subtract(JavaRDD<T> other)

80

public <U> JavaPairRDD<T, U> cartesian(JavaRDD<U> other)

81

82

// Partition operations

83

public <U> JavaRDD<U> mapPartitions(FlatMapFunction<Iterator<T>, U> f)

84

public <U> JavaRDD<U> mapPartitionsWithIndex(Function2<Integer, Iterator<T>, Iterator<U>> f, boolean preservesPartitioning)

85

public JavaRDD<T> coalesce(int numPartitions)

86

public JavaRDD<T> coalesce(int numPartitions, boolean shuffle)

87

public JavaRDD<T> repartition(int numPartitions)

88

89

// Actions

90

public List<T> collect()

91

public long count()

92

public T first()

93

public List<T> take(int num)

94

public List<T> takeSample(boolean withReplacement, int num)

95

public List<T> takeSample(boolean withReplacement, int num, long seed)

96

public T reduce(Function2<T, T, T> f)

97

public T fold(T zeroValue, Function2<T, T, T> func)

98

public <U> U aggregate(U zeroValue, Function2<U, T, U> seqFunc, Function2<U, U, U> combFunc)

99

public void foreach(VoidFunction<T> f)

100

public void foreachPartition(VoidFunction<Iterator<T>> f)

101

102

// Persistence

103

public JavaRDD<T> cache()

104

public JavaRDD<T> persist(StorageLevel newLevel)

105

public JavaRDD<T> unpersist()

106

public JavaRDD<T> unpersist(boolean blocking)

107

}

108

```

109

110

### Creating and Using JavaRDD

111

112

```java

113

import org.apache.spark.api.java.JavaRDD;

114

import org.apache.spark.api.java.function.Function;

115

import org.apache.spark.api.java.function.Function2;

116

import org.apache.spark.api.java.function.FlatMapFunction;

117

import java.util.Arrays;

118

import java.util.List;

119

120

// Create JavaRDD from collection

121

List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);

122

JavaRDD<Integer> javaRDD = jsc.parallelize(data);

123

124

// Map transformation

125

JavaRDD<Integer> doubled = javaRDD.map(new Function<Integer, Integer>() {

126

public Integer call(Integer x) {

127

return x * 2;

128

}

129

});

130

131

// Using lambda expressions (Java 8+)

132

JavaRDD<Integer> doubled2 = javaRDD.map(x -> x * 2);

133

134

// FlatMap transformation

135

JavaRDD<String> lines = jsc.textFile("input.txt");

136

JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {

137

public Iterable<String> call(String line) {

138

return Arrays.asList(line.split(" "));

139

}

140

});

141

142

// With lambda

143

JavaRDD<String> words2 = lines.flatMap(line -> Arrays.asList(line.split(" ")));

144

145

// Filter transformation

146

JavaRDD<Integer> evens = javaRDD.filter(new Function<Integer, Boolean>() {

147

public Boolean call(Integer x) {

148

return x % 2 == 0;

149

}

150

});

151

152

// With lambda

153

JavaRDD<Integer> evens2 = javaRDD.filter(x -> x % 2 == 0);

154

```

155

156

### Actions on JavaRDD

157

158

```java

159

List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);

160

JavaRDD<Integer> rdd = jsc.parallelize(data);

161

162

// Collect all elements

163

List<Integer> result = rdd.collect();

164

165

// Count elements

166

long count = rdd.count();

167

168

// Get first element

169

Integer first = rdd.first();

170

171

// Take first n elements

172

List<Integer> firstThree = rdd.take(3);

173

174

// Reduce with function

175

Integer sum = rdd.reduce(new Function2<Integer, Integer, Integer>() {

176

public Integer call(Integer a, Integer b) {

177

return a + b;

178

}

179

});

180

181

// With lambda

182

Integer sum2 = rdd.reduce((a, b) -> a + b);

183

184

// Fold with zero value

185

Integer foldResult = rdd.fold(0, (a, b) -> a + b);

186

187

// Aggregate with different types

188

class Stats implements Serializable {

189

public int sum;

190

public int count;

191

192

public Stats(int sum, int count) {

193

this.sum = sum;

194

this.count = count;

195

}

196

}

197

198

Stats stats = rdd.aggregate(

199

new Stats(0, 0), // Zero value

200

new Function2<Stats, Integer, Stats>() { // Seq function

201

public Stats call(Stats s, Integer x) {

202

return new Stats(s.sum + x, s.count + 1);

203

}

204

},

205

new Function2<Stats, Stats, Stats>() { // Combine function

206

public Stats call(Stats s1, Stats s2) {

207

return new Stats(s1.sum + s2.sum, s1.count + s2.count);

208

}

209

}

210

);

211

```

212

213

## JavaPairRDD

214

215

Java wrapper for key-value pair RDDs.

216

217

### JavaPairRDD Class

218

219

```java { .api }

220

public class JavaPairRDD<K, V> extends AbstractJavaRDDLike<Tuple2<K, V>, JavaPairRDD<K, V>> {

221

// Key-Value operations

222

public JavaRDD<K> keys()

223

public JavaRDD<V> values()

224

public <U> JavaPairRDD<K, U> mapValues(Function<V, U> f)

225

public <U> JavaPairRDD<K, U> flatMapValues(FlatMapFunction<V, U> f)

226

227

// Aggregations

228

public JavaPairRDD<K, V> reduceByKey(Function2<V, V, V> func)

229

public JavaPairRDD<K, V> reduceByKey(Partitioner partitioner, Function2<V, V, V> func)

230

public JavaPairRDD<K, Iterable<V>> groupByKey()

231

public JavaPairRDD<K, Iterable<V>> groupByKey(Partitioner partitioner)

232

public <C> JavaPairRDD<K, C> combineByKey(Function<V, C> createCombiner, Function2<C, V, C> mergeValue, Function2<C, C, C> mergeCombiners, Partitioner partitioner)

233

public <U> JavaPairRDD<K, U> aggregateByKey(U zeroValue, Partitioner partitioner, Function2<U, V, U> seqFunc, Function2<U, U, U> combFunc)

234

public JavaPairRDD<K, V> foldByKey(V zeroValue, Function2<V, V, V> func)

235

236

// Joins

237

public <W> JavaPairRDD<K, Tuple2<V, W>> join(JavaPairRDD<K, W> other)

238

public <W> JavaPairRDD<K, Tuple2<V, W>> join(JavaPairRDD<K, W> other, Partitioner partitioner)

239

public <W> JavaPairRDD<K, Tuple2<V, Optional<W>>> leftOuterJoin(JavaPairRDD<K, W> other)

240

public <W> JavaPairRDD<K, Tuple2<Optional<V>, W>> rightOuterJoin(JavaPairRDD<K, W> other)

241

public <W> JavaPairRDD<K, Tuple2<Optional<V>, Optional<W>>> fullOuterJoin(JavaPairRDD<K, W> other)

242

public <W> JavaPairRDD<K, Tuple2<Iterable<V>, Iterable<W>>> cogroup(JavaPairRDD<K, W> other)

243

244

// Actions

245

public Map<K, V> collectAsMap()

246

public Map<K, Long> countByKey()

247

public List<V> lookup(K key)

248

249

// Save operations

250

public void saveAsHadoopFile(String path, Class<?> keyClass, Class<?> valueClass, Class<? extends OutputFormat> outputFormatClass)

251

public void saveAsNewAPIHadoopFile(String path, Class<?> keyClass, Class<?> valueClass, Class<? extends org.apache.hadoop.mapreduce.OutputFormat> outputFormatClass)

252

}

253

```

254

255

### Creating and Using JavaPairRDD

256

257

```java

258

import org.apache.spark.api.java.JavaPairRDD;

259

import org.apache.spark.api.java.function.PairFunction;

260

import scala.Tuple2;

261

import java.util.Arrays;

262

import java.util.List;

263

264

// Create JavaPairRDD from tuples

265

List<Tuple2<String, Integer>> pairs = Arrays.asList(

266

new Tuple2<>("apple", 5),

267

new Tuple2<>("banana", 3),

268

new Tuple2<>("apple", 2),

269

new Tuple2<>("orange", 1)

270

);

271

272

JavaPairRDD<String, Integer> pairRDD = jsc.parallelizePairs(pairs);

273

274

// Create from JavaRDD using mapToPair

275

JavaRDD<String> lines = jsc.textFile("input.txt");

276

JavaPairRDD<String, Integer> wordCounts = lines

277

.flatMap(line -> Arrays.asList(line.split(" ")))

278

.mapToPair(new PairFunction<String, String, Integer>() {

279

public Tuple2<String, Integer> call(String word) {

280

return new Tuple2<>(word, 1);

281

}

282

});

283

284

// With lambda

285

JavaPairRDD<String, Integer> wordCounts2 = lines

286

.flatMap(line -> Arrays.asList(line.split(" ")))

287

.mapToPair(word -> new Tuple2<>(word, 1));

288

```

289

290

### Key-Value Transformations

291

292

```java

293

JavaPairRDD<String, Integer> pairs = jsc.parallelizePairs(Arrays.asList(

294

new Tuple2<>("a", 1),

295

new Tuple2<>("b", 2),

296

new Tuple2<>("a", 3)

297

));

298

299

// Get keys and values

300

JavaRDD<String> keys = pairs.keys();

301

JavaRDD<Integer> values = pairs.values();

302

303

// Transform values while preserving keys

304

JavaPairRDD<String, Integer> doubled = pairs.mapValues(x -> x * 2);

305

306

// FlatMap values

307

JavaPairRDD<String, Character> chars = pairs.flatMapValues(

308

value -> Arrays.asList(value.toString().toCharArray())

309

);

310

311

// Reduce by key

312

JavaPairRDD<String, Integer> sums = pairs.reduceByKey((a, b) -> a + b);

313

314

// Group by key

315

JavaPairRDD<String, Iterable<Integer>> grouped = pairs.groupByKey();

316

317

// Aggregate by key

318

JavaPairRDD<String, Integer> aggregated = pairs.aggregateByKey(

319

0, // Zero value

320

(acc, value) -> acc + value, // Seq function

321

(acc1, acc2) -> acc1 + acc2 // Combine function

322

);

323

```

324

325

### Join Operations

326

327

```java

328

JavaPairRDD<String, String> names = jsc.parallelizePairs(Arrays.asList(

329

new Tuple2<>("1", "Alice"),

330

new Tuple2<>("2", "Bob"),

331

new Tuple2<>("3", "Charlie")

332

));

333

334

JavaPairRDD<String, Integer> ages = jsc.parallelizePairs(Arrays.asList(

335

new Tuple2<>("1", 25),

336

new Tuple2<>("2", 30),

337

new Tuple2<>("4", 35)

338

));

339

340

// Inner join

341

JavaPairRDD<String, Tuple2<String, Integer>> joined = names.join(ages);

342

// Result: [("1", ("Alice", 25)), ("2", ("Bob", 30))]

343

344

// Left outer join

345

JavaPairRDD<String, Tuple2<String, Optional<Integer>>> leftJoined = names.leftOuterJoin(ages);

346

// Result: [("1", ("Alice", Some(25))), ("2", ("Bob", Some(30))), ("3", ("Charlie", None))]

347

348

// Full outer join

349

JavaPairRDD<String, Tuple2<Optional<String>, Optional<Integer>>> fullJoined = names.fullOuterJoin(ages);

350

351

// Cogroup

352

JavaPairRDD<String, Tuple2<Iterable<String>, Iterable<Integer>>> cogrouped = names.cogroup(ages);

353

```

354

355

### Actions on JavaPairRDD

356

357

```java

358

JavaPairRDD<String, Integer> pairs = jsc.parallelizePairs(Arrays.asList(

359

new Tuple2<>("apple", 5),

360

new Tuple2<>("banana", 3),

361

new Tuple2<>("apple", 2)

362

));

363

364

// Collect as Map (assumes unique keys)

365

Map<String, Integer> map = pairs.collectAsMap();

366

367

// Count by key

368

Map<String, Long> counts = pairs.countByKey();

369

370

// Lookup values for a key

371

List<Integer> appleValues = pairs.lookup("apple"); // [5, 2]

372

373

// Count all elements

374

long totalCount = pairs.count();

375

```

376

377

## JavaDoubleRDD

378

379

Specialized RDD for double values with statistical operations.

380

381

### JavaDoubleRDD Class

382

383

```java { .api }

384

public class JavaDoubleRDD extends AbstractJavaRDDLike<Double, JavaDoubleRDD> {

385

// Statistical operations

386

public double mean()

387

public double sum()

388

public StatCounter stats()

389

public double variance()

390

public double sampleVariance()

391

public double stdev()

392

public double sampleStdev()

393

public long[] histogram(double[] buckets)

394

public Tuple2<double[], long[]> histogram(int bucketCount)

395

}

396

```

397

398

### Using JavaDoubleRDD

399

400

```java

401

import org.apache.spark.api.java.JavaDoubleRDD;

402

import org.apache.spark.util.StatCounter;

403

404

// Create JavaDoubleRDD

405

List<Double> numbers = Arrays.asList(1.0, 2.0, 3.0, 4.0, 5.0);

406

JavaDoubleRDD doubleRDD = jsc.parallelizeDoubles(numbers);

407

408

// Convert from JavaRDD<Double>

409

JavaRDD<Double> rdd = jsc.parallelize(numbers);

410

JavaDoubleRDD doubleRDD2 = rdd.mapToDouble(x -> x);

411

412

// Statistical operations

413

double mean = doubleRDD.mean();

414

double sum = doubleRDD.sum();

415

double variance = doubleRDD.variance();

416

double stdev = doubleRDD.stdev();

417

418

// Get detailed statistics

419

StatCounter stats = doubleRDD.stats();

420

System.out.println("Count: " + stats.count());

421

System.out.println("Mean: " + stats.mean());

422

System.out.println("Stdev: " + stats.stdev());

423

System.out.println("Max: " + stats.max());

424

System.out.println("Min: " + stats.min());

425

426

// Histogram

427

double[] buckets = {0.0, 2.0, 4.0, 6.0};

428

long[] histogram = doubleRDD.histogram(buckets);

429

430

// Or with automatic bucketing

431

Tuple2<double[], long[]> autoHistogram = doubleRDD.histogram(4);

432

```

433

434

## Function Interfaces

435

436

Java API uses function interfaces for type-safe transformations.

437

438

### Function Interfaces

439

440

```java { .api }

441

// Single argument function

442

public interface Function<T, R> extends Serializable {

443

R call(T t) throws Exception;

444

}

445

446

// Two argument function

447

public interface Function2<T1, T2, R> extends Serializable {

448

R call(T1 t1, T2 t2) throws Exception;

449

}

450

451

// Void function (for actions)

452

public interface VoidFunction<T> extends Serializable {

453

void call(T t) throws Exception;

454

}

455

456

// FlatMap function

457

public interface FlatMapFunction<T, R> extends Serializable {

458

Iterable<R> call(T t) throws Exception;

459

}

460

461

// Pair function (for creating key-value pairs)

462

public interface PairFunction<T, K, V> extends Serializable {

463

Tuple2<K, V> call(T t) throws Exception;

464

}

465

466

// PairFlatMap function

467

public interface PairFlatMapFunction<T, K, V> extends Serializable {

468

Iterable<Tuple2<K, V>> call(T t) throws Exception;

469

}

470

```

471

472

### Function Usage Examples

473

474

```java

475

import org.apache.spark.api.java.function.*;

476

477

// Anonymous inner class

478

JavaRDD<Integer> doubled = rdd.map(new Function<Integer, Integer>() {

479

public Integer call(Integer x) {

480

return x * 2;

481

}

482

});

483

484

// Lambda expression (Java 8+)

485

JavaRDD<Integer> doubled2 = rdd.map(x -> x * 2);

486

487

// Method reference (Java 8+)

488

JavaRDD<String> strings = rdd.map(Object::toString);

489

490

// Complex transformation with PairFunction

491

JavaPairRDD<String, Integer> pairs = words.mapToPair(

492

new PairFunction<String, String, Integer>() {

493

public Tuple2<String, Integer> call(String word) {

494

return new Tuple2<>(word.toLowerCase(), word.length());

495

}

496

}

497

);

498

499

// FlatMap example

500

JavaRDD<String> words = lines.flatMap(

501

new FlatMapFunction<String, String>() {

502

public Iterable<String> call(String line) {

503

return Arrays.asList(line.split("\\s+"));

504

}

505

}

506

);

507

508

// Void function for actions

509

rdd.foreach(new VoidFunction<Integer>() {

510

public void call(Integer x) {

511

System.out.println(x);

512

}

513

});

514

```

515

516

## Shared Variables in Java

517

518

### Broadcast Variables

519

520

```java

521

import org.apache.spark.broadcast.Broadcast;

522

import java.util.Map;

523

import java.util.HashMap;

524

525

// Create broadcast variable

526

Map<String, Integer> lookupTable = new HashMap<>();

527

lookupTable.put("apple", 1);

528

lookupTable.put("banana", 2);

529

lookupTable.put("orange", 3);

530

531

Broadcast<Map<String, Integer>> broadcastTable = jsc.broadcast(lookupTable);

532

533

// Use in transformations

534

JavaRDD<String> fruits = jsc.parallelize(Arrays.asList("apple", "banana", "apple"));

535

JavaRDD<Integer> codes = fruits.map(fruit ->

536

broadcastTable.value().getOrDefault(fruit, 0)

537

);

538

539

// Clean up

540

broadcastTable.unpersist();

541

```

542

543

### Accumulators

544

545

```java

546

import org.apache.spark.Accumulator;

547

548

// Create accumulator

549

Accumulator<Integer> errorCount = jsc.accumulator(0);

550

551

// Use in transformations

552

JavaRDD<String> lines = jsc.textFile("input.txt");

553

JavaRDD<String> validLines = lines.filter(line -> {

554

if (line.trim().isEmpty()) {

555

errorCount.add(1);

556

return false;

557

}

558

return true;

559

});

560

561

// Trigger action to update accumulator

562

validLines.count();

563

564

// Get accumulator value

565

System.out.println("Error count: " + errorCount.value());

566

567

// Custom accumulator types

568

Accumulator<Double> doubleAcc = jsc.accumulator(0.0);

569

```

570

571

## Complete Example

572

573

```java

574

import org.apache.spark.SparkConf;

575

import org.apache.spark.api.java.JavaSparkContext;

576

import org.apache.spark.api.java.JavaRDD;

577

import org.apache.spark.api.java.JavaPairRDD;

578

import org.apache.spark.api.java.function.*;

579

import scala.Tuple2;

580

import java.util.Arrays;

581

import java.util.Map;

582

583

public class SparkWordCount {

584

public static void main(String[] args) {

585

// Create Spark context

586

SparkConf conf = new SparkConf()

587

.setAppName("Java Word Count")

588

.setMaster("local[*]");

589

590

JavaSparkContext jsc = new JavaSparkContext(conf);

591

592

try {

593

// Read input file

594

JavaRDD<String> lines = jsc.textFile("input.txt");

595

596

// Split lines into words

597

JavaRDD<String> words = lines.flatMap(line ->

598

Arrays.asList(line.toLowerCase().split("\\s+"))

599

);

600

601

// Filter out empty words

602

JavaRDD<String> validWords = words.filter(word -> !word.trim().isEmpty());

603

604

// Create word-count pairs

605

JavaPairRDD<String, Integer> wordPairs = validWords.mapToPair(

606

word -> new Tuple2<>(word, 1)

607

);

608

609

// Sum counts by key

610

JavaPairRDD<String, Integer> wordCounts = wordPairs.reduceByKey(

611

(a, b) -> a + b

612

);

613

614

// Sort by count descending

615

JavaPairRDD<String, Integer> sortedCounts = wordCounts.mapToPair(

616

pair -> new Tuple2<>(pair._2, pair._1) // Swap to (count, word)

617

).sortByKey(false).mapToPair(

618

pair -> new Tuple2<>(pair._2, pair._1) // Swap back to (word, count)

619

);

620

621

// Collect and print results

622

Map<String, Integer> results = sortedCounts.collectAsMap();

623

624

System.out.println("Word Count Results:");

625

results.entrySet().stream()

626

.sorted(Map.Entry.<String, Integer>comparingByValue().reversed())

627

.limit(10)

628

.forEach(entry ->

629

System.out.println(entry.getKey() + ": " + entry.getValue())

630

);

631

632

// Save results

633

sortedCounts.saveAsTextFile("output");

634

635

} finally {

636

// Stop Spark context

637

jsc.stop();

638

}

639

}

640

}

641

```

642

643

## Maven Dependencies

644

645

```xml

646

<dependencies>

647

<dependency>

648

<groupId>org.apache.spark</groupId>

649

<artifactId>spark-core_2.10</artifactId>

650

<version>1.0.0</version>

651

</dependency>

652

</dependencies>

653

```

654

655

This comprehensive guide covers the complete Java API for Apache Spark, enabling Java developers to build scalable data processing applications with type safety and familiar Java patterns.