or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

advanced-transformations.mdcore-streaming.mdindex.mdinput-sources.mdjava-api.md

java-api.mddocs/

0

# Java API Integration

1

2

Java-friendly wrappers providing seamless integration for Java applications with lambda expressions, Java collections support, and idiomatic Java patterns for Spark Streaming functionality.

3

4

## Capabilities

5

6

### JavaStreamingContext

7

8

Java-friendly version of StreamingContext with native Java types and collections.

9

10

```java { .api }

11

/**

12

* Java-friendly wrapper for StreamingContext

13

*/

14

public class JavaStreamingContext {

15

/** Create from JavaSparkContext and batch duration */

16

public JavaStreamingContext(JavaSparkContext sparkContext, Duration batchDuration);

17

18

/** Create from SparkConf and batch duration */

19

public JavaStreamingContext(SparkConf conf, Duration batchDuration);

20

21

/** Create with master URL, app name, and batch duration */

22

public JavaStreamingContext(String master, String appName, Duration batchDuration);

23

24

/** Restore from checkpoint */

25

public JavaStreamingContext(String path);

26

27

/** Get underlying StreamingContext */

28

public StreamingContext ssc();

29

30

/** Get underlying JavaSparkContext */

31

public JavaSparkContext sparkContext();

32

33

/** Start the streaming computation */

34

public void start();

35

36

/** Stop the streaming computation */

37

public void stop();

38

39

/** Stop with option to stop SparkContext */

40

public void stop(boolean stopSparkContext);

41

42

/** Stop with graceful shutdown options */

43

public void stop(boolean stopSparkContext, boolean stopGracefully);

44

45

/** Wait for termination */

46

public void awaitTermination();

47

48

/** Wait for termination with timeout */

49

public boolean awaitTerminationOrTimeout(long timeout);

50

51

/** Set checkpoint directory */

52

public void checkpoint(String directory);

53

54

/** Set remember duration */

55

public void remember(Duration duration);

56

57

/** Add streaming listener */

58

public void addStreamingListener(StreamingListener streamingListener);

59

60

/** Remove streaming listener */

61

public void removeStreamingListener(StreamingListener streamingListener);

62

}

63

```

64

65

**Static Factory Methods:**

66

67

```java { .api }

68

public class JavaStreamingContext {

69

/** Get currently active JavaStreamingContext */

70

public static Optional<JavaStreamingContext> getActive();

71

72

/** Get active context or create new one */

73

public static JavaStreamingContext getActiveOrCreate(Function0<JavaStreamingContext> creatingFunc);

74

75

/** Create from checkpoint or use creating function */

76

public static JavaStreamingContext getOrCreate(

77

String checkpointPath,

78

Function0<JavaStreamingContext> creatingFunc

79

);

80

81

/** Create from checkpoint with Hadoop configuration */

82

public static JavaStreamingContext getOrCreate(

83

String checkpointPath,

84

Configuration hadoopConf,

85

Function0<JavaStreamingContext> creatingFunc

86

);

87

}

88

```

89

90

**Usage Examples:**

91

92

```java

93

import org.apache.spark.*;

94

import org.apache.spark.streaming.*;

95

import org.apache.spark.streaming.api.java.*;

96

97

// Create JavaStreamingContext

98

SparkConf conf = new SparkConf().setAppName("JavaStreamingApp");

99

JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));

100

101

// Create with checkpoint recovery

102

JavaStreamingContext jssc2 = JavaStreamingContext.getOrCreate("/path/to/checkpoint", () -> {

103

SparkConf conf = new SparkConf().setAppName("RecoverableApp");

104

return new JavaStreamingContext(conf, Durations.seconds(1));

105

});

106

107

// Configure and start

108

jssc.checkpoint("/path/to/checkpoint");

109

jssc.start();

110

jssc.awaitTermination();

111

```

112

113

### Input Stream Creation

114

115

Java-friendly methods for creating input streams from various sources.

116

117

```java { .api }

118

public class JavaStreamingContext {

119

/** Create text file stream */

120

public JavaDStream<String> textFileStream(String directory);

121

122

/** Create socket text stream */

123

public JavaReceiverInputDStream<String> socketTextStream(String hostname, int port);

124

125

/** Create socket text stream with storage level */

126

public JavaReceiverInputDStream<String> socketTextStream(

127

String hostname,

128

int port,

129

StorageLevel storageLevel

130

);

131

132

/** Create file stream with Hadoop InputFormat */

133

public <K, V, F extends NewInputFormat<K, V>> JavaPairInputDStream<K, V> fileStream(

134

String directory,

135

Class<K> keyClass,

136

Class<V> valueClass,

137

Class<F> inputFormatClass

138

);

139

140

/** Create file stream with configuration */

141

public <K, V, F extends NewInputFormat<K, V>> JavaPairInputDStream<K, V> fileStream(

142

String directory,

143

Class<K> keyClass,

144

Class<V> valueClass,

145

Class<F> inputFormatClass,

146

Function<Path, Boolean> filter,

147

boolean newFilesOnly,

148

Configuration conf

149

);

150

151

/** Create queue stream */

152

public <T> JavaDStream<T> queueStream(Queue<JavaRDD<T>> queue);

153

154

/** Create queue stream with processing options */

155

public <T> JavaDStream<T> queueStream(Queue<JavaRDD<T>> queue, boolean oneAtATime);

156

157

/** Create queue stream with default RDD */

158

public <T> JavaDStream<T> queueStream(

159

Queue<JavaRDD<T>> queue,

160

boolean oneAtATime,

161

JavaRDD<T> defaultRDD

162

);

163

164

/** Create receiver stream */

165

public <T> JavaReceiverInputDStream<T> receiverStream(Receiver<T> receiver);

166

167

/** Union multiple streams */

168

public <T> JavaDStream<T> union(JavaDStream<T> first, List<JavaDStream<T>> rest);

169

}

170

```

171

172

**Usage Examples:**

173

174

```java

175

// Text file stream

176

JavaDStream<String> lines = jssc.textFileStream("/path/to/files");

177

178

// Socket stream

179

JavaReceiverInputDStream<String> socketLines = jssc.socketTextStream("localhost", 9999);

180

181

// File stream with Hadoop InputFormat

182

JavaPairInputDStream<LongWritable, Text> fileStream = jssc.fileStream(

183

"/path/to/files",

184

LongWritable.class,

185

Text.class,

186

TextInputFormat.class

187

);

188

189

// Queue stream

190

Queue<JavaRDD<Integer>> rddQueue = new LinkedList<>();

191

JavaRDD<Integer> rdd1 = jssc.sparkContext().parallelize(Arrays.asList(1, 2, 3));

192

rddQueue.add(rdd1);

193

JavaDStream<Integer> queueStream = jssc.queueStream(rddQueue);

194

195

// Union streams

196

JavaDStream<String> stream1 = jssc.textFileStream("/path1");

197

JavaDStream<String> stream2 = jssc.textFileStream("/path2");

198

JavaDStream<String> combined = jssc.union(stream1, Arrays.asList(stream2));

199

```

200

201

### JavaDStream

202

203

Java wrapper for DStream with lambda expression support and Java-friendly transformations.

204

205

```java { .api }

206

/**

207

* Java-friendly wrapper for DStream

208

*/

209

public class JavaDStream<T> {

210

/** Get underlying Scala DStream */

211

public DStream<T> dstream();

212

213

/** Cache RDDs in memory */

214

public JavaDStream<T> cache();

215

216

/** Persist with default storage level */

217

public JavaDStream<T> persist();

218

219

/** Persist with specific storage level */

220

public JavaDStream<T> persist(StorageLevel storageLevel);

221

222

/** Enable checkpointing */

223

public JavaDStream<T> checkpoint(Duration interval);

224

225

/** Get associated JavaStreamingContext */

226

public JavaStreamingContext context();

227

228

/** Print first 10 elements */

229

public void print();

230

231

/** Print first num elements */

232

public void print(int num);

233

}

234

```

235

236

**Basic Transformations:**

237

238

```java { .api }

239

public class JavaDStream<T> {

240

/** Map transformation with Java function */

241

public <U> JavaDStream<U> map(Function<T, U> f);

242

243

/** FlatMap transformation */

244

public <U> JavaDStream<U> flatMap(FlatMapFunction<T, U> f);

245

246

/** Filter transformation */

247

public JavaDStream<T> filter(Function<T, Boolean> f);

248

249

/** Map partitions */

250

public <U> JavaDStream<U> mapPartitions(FlatMapFunction<Iterator<T>, U> f);

251

252

/** Group elements by partition */

253

public JavaDStream<T[]> glom();

254

255

/** Repartition RDDs */

256

public JavaDStream<T> repartition(int numPartitions);

257

258

/** Union with another stream */

259

public JavaDStream<T> union(JavaDStream<T> other);

260

261

/** Count elements */

262

public JavaDStream<Long> count();

263

264

/** Count occurrences of each value */

265

public JavaPairDStream<T, Long> countByValue();

266

267

/** Reduce elements */

268

public JavaDStream<T> reduce(Function2<T, T, T> f);

269

}

270

```

271

272

**Advanced Transformations:**

273

274

```java { .api }

275

public class JavaDStream<T> {

276

/** Transform using RDD operations */

277

public <U> JavaDStream<U> transform(Function<JavaRDD<T>, JavaRDD<U>> transformFunc);

278

279

/** Transform with time access */

280

public <U> JavaDStream<U> transform(Function2<JavaRDD<T>, Time, JavaRDD<U>> transformFunc);

281

282

/** Transform with another DStream */

283

public <U, V> JavaDStream<V> transformWith(

284

JavaDStream<U> other,

285

Function2<JavaRDD<T>, JavaRDD<U>, JavaRDD<V>> transformFunc

286

);

287

288

/** Transform with another DStream and time access */

289

public <U, V> JavaDStream<V> transformWith(

290

JavaDStream<U> other,

291

Function3<JavaRDD<T>, JavaRDD<U>, Time, JavaRDD<V>> transformFunc

292

);

293

}

294

```

295

296

**Window Operations:**

297

298

```java { .api }

299

public class JavaDStream<T> {

300

/** Create windowed stream */

301

public JavaDStream<T> window(Duration windowDuration);

302

303

/** Create windowed stream with slide duration */

304

public JavaDStream<T> window(Duration windowDuration, Duration slideDuration);

305

306

/** Reduce over window */

307

public JavaDStream<T> reduceByWindow(

308

Function2<T, T, T> reduceFunc,

309

Duration windowDuration,

310

Duration slideDuration

311

);

312

313

/** Incremental reduce over window */

314

public JavaDStream<T> reduceByWindow(

315

Function2<T, T, T> reduceFunc,

316

Function2<T, T, T> invReduceFunc,

317

Duration windowDuration,

318

Duration slideDuration

319

);

320

321

/** Count elements in window */

322

public JavaDStream<Long> countByWindow(Duration windowDuration, Duration slideDuration);

323

324

/** Count values in window */

325

public JavaPairDStream<T, Long> countByValueAndWindow(

326

Duration windowDuration,

327

Duration slideDuration

328

);

329

}

330

```

331

332

**Output Operations:**

333

334

```java { .api }

335

public class JavaDStream<T> {

336

/** Apply function to each RDD */

337

public void foreachRDD(VoidFunction<JavaRDD<T>> foreachFunc);

338

339

/** Apply function to each RDD with time */

340

public void foreachRDD(VoidFunction2<JavaRDD<T>, Time> foreachFunc);

341

342

/** Save as object files */

343

public void saveAsObjectFiles(String prefix, String suffix);

344

345

/** Save as text files */

346

public void saveAsTextFiles(String prefix, String suffix);

347

}

348

```

349

350

**Usage Examples:**

351

352

```java

353

import org.apache.spark.api.java.function.*;

354

355

JavaDStream<String> lines = jssc.socketTextStream("localhost", 9999);

356

357

// Basic transformations with lambda expressions

358

JavaDStream<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());

359

JavaDStream<String> filteredWords = words.filter(word -> word.length() > 2);

360

JavaDStream<String> upperWords = words.map(String::toUpperCase);

361

362

// Window operations

363

JavaDStream<String> windowedWords = words.window(Durations.seconds(30), Durations.seconds(10));

364

JavaDStream<String> reducedWindow = words.reduceByWindow(

365

(s1, s2) -> s1 + " " + s2,

366

Durations.seconds(30),

367

Durations.seconds(10)

368

);

369

370

// Output operations

371

words.foreachRDD(rdd -> {

372

System.out.println("Batch size: " + rdd.count());

373

rdd.take(10).forEach(System.out::println);

374

});

375

376

// Custom transformations

377

JavaDStream<Integer> wordLengths = words.transform(rdd -> {

378

return rdd.map(String::length).filter(len -> len > 0);

379

});

380

```

381

382

### JavaPairDStream

383

384

Java wrapper for pair DStreams with key-value operations.

385

386

```java { .api }

387

/**

388

* Java-friendly wrapper for pair DStreams

389

*/

390

public class JavaPairDStream<K, V> {

391

/** Get underlying Scala DStream */

392

public DStream<Tuple2<K, V>> dstream();

393

394

/** Cache RDDs */

395

public JavaPairDStream<K, V> cache();

396

397

/** Persist with storage level */

398

public JavaPairDStream<K, V> persist(StorageLevel storageLevel);

399

400

/** Enable checkpointing */

401

public JavaPairDStream<K, V> checkpoint(Duration interval);

402

403

/** Convert to regular JavaDStream */

404

public JavaDStream<Tuple2<K, V>> toJavaDStream();

405

}

406

```

407

408

**Key-Value Transformations:**

409

410

```java { .api }

411

public class JavaPairDStream<K, V> {

412

/** Group values by key */

413

public JavaPairDStream<K, Iterable<V>> groupByKey();

414

415

/** Group by key with partitions */

416

public JavaPairDStream<K, Iterable<V>> groupByKey(int numPartitions);

417

418

/** Group by key with partitioner */

419

public JavaPairDStream<K, Iterable<V>> groupByKey(Partitioner partitioner);

420

421

/** Reduce values by key */

422

public JavaPairDStream<K, V> reduceByKey(Function2<V, V, V> func);

423

424

/** Reduce by key with partitions */

425

public JavaPairDStream<K, V> reduceByKey(Function2<V, V, V> func, int numPartitions);

426

427

/** Reduce by key with partitioner */

428

public JavaPairDStream<K, V> reduceByKey(Function2<V, V, V> func, Partitioner partitioner);

429

430

/** Combine by key */

431

public <C> JavaPairDStream<K, C> combineByKey(

432

Function<V, C> createCombiner,

433

Function2<C, V, C> mergeValue,

434

Function2<C, C, C> mergeCombiner

435

);

436

437

/** Map values only */

438

public <U> JavaPairDStream<K, U> mapValues(Function<V, U> f);

439

440

/** FlatMap values only */

441

public <U> JavaPairDStream<K, U> flatMapValues(FlatMapFunction<V, U> f);

442

}

443

```

444

445

**Join Operations:**

446

447

```java { .api }

448

public class JavaPairDStream<K, V> {

449

/** Inner join */

450

public <W> JavaPairDStream<K, Tuple2<V, W>> join(JavaPairDStream<K, W> other);

451

452

/** Left outer join */

453

public <W> JavaPairDStream<K, Tuple2<V, Optional<W>>> leftOuterJoin(JavaPairDStream<K, W> other);

454

455

/** Right outer join */

456

public <W> JavaPairDStream<K, Tuple2<Optional<V>, W>> rightOuterJoin(JavaPairDStream<K, W> other);

457

458

/** Full outer join */

459

public <W> JavaPairDStream<K, Tuple2<Optional<V>, Optional<W>>> fullOuterJoin(JavaPairDStream<K, W> other);

460

461

/** Cogroup operation */

462

public <W> JavaPairDStream<K, Tuple2<Iterable<V>, Iterable<W>>> cogroup(JavaPairDStream<K, W> other);

463

}

464

```

465

466

**Windowed Operations:**

467

468

```java { .api }

469

public class JavaPairDStream<K, V> {

470

/** Group by key over window */

471

public JavaPairDStream<K, Iterable<V>> groupByKeyAndWindow(Duration windowDuration);

472

473

/** Group by key with slide duration */

474

public JavaPairDStream<K, Iterable<V>> groupByKeyAndWindow(

475

Duration windowDuration,

476

Duration slideDuration

477

);

478

479

/** Reduce by key over window */

480

public JavaPairDStream<K, V> reduceByKeyAndWindow(

481

Function2<V, V, V> func,

482

Duration windowDuration,

483

Duration slideDuration

484

);

485

486

/** Incremental reduce by key over window */

487

public JavaPairDStream<K, V> reduceByKeyAndWindow(

488

Function2<V, V, V> reduceFunc,

489

Function2<V, V, V> invReduceFunc,

490

Duration windowDuration,

491

Duration slideDuration

492

);

493

}

494

```

495

496

**Stateful Operations:**

497

498

```java { .api }

499

public class JavaPairDStream<K, V> {

500

/** Update state by key */

501

public <S> JavaPairDStream<K, S> updateStateByKey(

502

Function2<List<V>, Optional<S>, Optional<S>> updateFunc

503

);

504

505

/** Update state with partitioner */

506

public <S> JavaPairDStream<K, S> updateStateByKey(

507

Function2<List<V>, Optional<S>, Optional<S>> updateFunc,

508

Partitioner partitioner

509

);

510

511

/** Update state with partition count */

512

public <S> JavaPairDStream<K, S> updateStateByKey(

513

Function2<List<V>, Optional<S>, Optional<S>> updateFunc,

514

int numPartitions

515

);

516

517

/** Map with state */

518

public <StateType, MappedType> JavaMapWithStateDStream<K, V, StateType, MappedType> mapWithState(

519

StateSpec<K, V, StateType, MappedType> spec

520

);

521

}

522

```

523

524

**Usage Examples:**

525

526

```java

527

// Create pair stream from words

528

JavaPairDStream<String, Integer> pairs = words.mapToPair(word -> new Tuple2<>(word, 1));

529

530

// Word count

531

JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(Integer::sum);

532

533

// Group and count

534

JavaPairDStream<String, Iterable<Integer>> grouped = pairs.groupByKey();

535

536

// Windowed word count

537

JavaPairDStream<String, Integer> windowedCounts = pairs.reduceByKeyAndWindow(

538

Integer::sum,

539

Durations.minutes(1),

540

Durations.seconds(10)

541

);

542

543

// Stateful word count

544

JavaPairDStream<String, Integer> runningCounts = pairs.updateStateByKey(

545

(values, state) -> {

546

int sum = values.stream().mapToInt(Integer::intValue).sum();

547

return Optional.of(sum + state.orElse(0));

548

}

549

);

550

551

// Join operations

552

JavaPairDStream<String, Double> scores = // another pair stream

553

JavaPairDStream<String, Tuple2<Integer, Double>> joined = wordCounts.join(scores);

554

555

// Output

556

wordCounts.foreachRDD(rdd -> {

557

Map<String, Integer> wordCountMap = rdd.collectAsMap();

558

wordCountMap.forEach((word, count) -> {

559

System.out.println(word + ": " + count);

560

});

561

});

562

```

563

564

### Java Function Interfaces

565

566

Functional interfaces for lambda expressions and method references.

567

568

```java { .api }

569

// Basic function interfaces

570

@FunctionalInterface

571

public interface Function<T, R> extends Serializable {

572

R call(T t) throws Exception;

573

}

574

575

@FunctionalInterface

576

public interface Function2<T1, T2, R> extends Serializable {

577

R call(T1 t1, T2 t2) throws Exception;

578

}

579

580

@FunctionalInterface

581

public interface VoidFunction<T> extends Serializable {

582

void call(T t) throws Exception;

583

}

584

585

@FunctionalInterface

586

public interface VoidFunction2<T1, T2> extends Serializable {

587

void call(T1 t1, T2 t2) throws Exception;

588

}

589

590

@FunctionalInterface

591

public interface FlatMapFunction<T, R> extends Serializable {

592

Iterator<R> call(T t) throws Exception;

593

}

594

595

@FunctionalInterface

596

public interface PairFunction<T, K, V> extends Serializable {

597

Tuple2<K, V> call(T t) throws Exception;

598

}

599

600

@FunctionalInterface

601

public interface Function3<T1, T2, T3, R> extends Serializable {

602

R call(T1 t1, T2 t2, T3 t3) throws Exception;

603

}

604

605

@FunctionalInterface

606

public interface FlatMapFunction2<T1, T2, R> extends Serializable {

607

Iterator<R> call(T1 t1, T2 t2) throws Exception;

608

}

609

610

@FunctionalInterface

611

public interface PairFlatMapFunction<T, K, V> extends Serializable {

612

Iterator<Tuple2<K, V>> call(T t) throws Exception;

613

}

614

615

@FunctionalInterface

616

public interface DoubleFlatMapFunction<T> extends Serializable {

617

Iterator<Double> call(T t) throws Exception;

618

}

619

620

@FunctionalInterface

621

public interface DoubleFunction<T> extends Serializable {

622

double call(T t) throws Exception;

623

}

624

```

625

626

**Usage Examples:**

627

628

```java

629

// Lambda expressions

630

JavaDStream<String> filtered = lines.filter(line -> line.length() > 0);

631

JavaDStream<Integer> lengths = lines.map(String::length);

632

JavaPairDStream<String, Integer> pairs = words.mapToPair(word -> new Tuple2<>(word, 1));

633

634

// Method references

635

JavaDStream<String> upper = lines.map(String::toUpperCase);

636

JavaDStream<Integer> wordCounts = pairs.values().reduce(Integer::sum);

637

638

// Anonymous functions

639

JavaDStream<String[]> splitLines = lines.map(new Function<String, String[]>() {

640

@Override

641

public String[] call(String line) {

642

return line.split("\\s+");

643

}

644

});

645

```