or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

data-streams.mdevent-monitoring.mdindex.mdinput-sources.mdjava-api.mdkey-value-ops.mdstate-management.mdstreaming-context.mdwindow-ops.md

java-api.mddocs/

0

# Java API

1

2

The Java API for Spark Streaming provides Java-friendly wrappers around the Scala implementation, using Java 8 functional interfaces and familiar Java patterns. All core Spark Streaming functionality is available through the Java API with appropriate type safety and lambda support.

3

4

## Capabilities

5

6

### Java Streaming Context

7

8

Entry point for Java-based Spark Streaming applications.

9

10

```java { .api }

11

/**

12

* Create JavaStreamingContext from SparkConf

13

* @param conf - Spark configuration

14

* @param batchDuration - Time interval for batching streaming data

15

*/

16

public JavaStreamingContext(SparkConf conf, Duration batchDuration);

17

18

/**

19

* Create JavaStreamingContext from JavaSparkContext

20

* @param sparkContext - Existing JavaSparkContext instance

21

* @param batchDuration - Time interval for batching streaming data

22

*/

23

public JavaStreamingContext(JavaSparkContext sparkContext, Duration batchDuration);

24

25

/**

26

* Create JavaStreamingContext with master and app name

27

* @param master - Cluster URL to connect to

28

* @param appName - Name for your application

29

* @param batchDuration - Time interval for batching streaming data

30

*/

31

public JavaStreamingContext(String master, String appName, Duration batchDuration);

32

33

// Lifecycle management

34

public void start();

35

public void stop();

36

public void stop(boolean stopSparkContext);

37

public void awaitTermination() throws InterruptedException;

38

public boolean awaitTerminationOrTimeout(long timeout) throws InterruptedException;

39

40

// Configuration

41

public void checkpoint(String directory);

42

public void remember(Duration duration);

43

```

44

45

**Usage Examples:**

46

47

```java

48

import org.apache.spark.SparkConf;

49

import org.apache.spark.streaming.api.java.JavaStreamingContext;

50

import org.apache.spark.streaming.Durations;

51

52

// Create streaming context

53

SparkConf conf = new SparkConf().setAppName("JavaStreamingApp").setMaster("local[2]");

54

JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));

55

56

// Set checkpoint

57

jssc.checkpoint("hdfs://checkpoint");

58

59

// Start processing

60

jssc.start();

61

jssc.awaitTermination();

62

```

63

64

### Java Input Streams

65

66

Java-friendly methods for creating input streams from various sources.

67

68

```java { .api }

69

/**

70

* Create text input stream from TCP socket

71

* @param hostname - Hostname to connect to

72

* @param port - Port number to connect to

73

* @returns JavaReceiverInputDStream of strings

74

*/

75

public JavaReceiverInputDStream<String> socketTextStream(String hostname, int port);

76

77

/**

78

* Create text input stream with storage level

79

* @param hostname - Hostname to connect to

80

* @param port - Port number to connect to

81

* @param storageLevel - Storage level for received data

82

* @returns JavaReceiverInputDStream of strings

83

*/

84

public JavaReceiverInputDStream<String> socketTextStream(

85

String hostname,

86

int port,

87

StorageLevel storageLevel

88

);

89

90

/**

91

* Create input stream from text files in directory

92

* @param directory - Directory path to monitor

93

* @returns JavaDStream of strings

94

*/

95

public JavaDStream<String> textFileStream(String directory);

96

97

/**

98

* Create input stream from queue of JavaRDDs

99

* @param queue - Queue containing JavaRDDs to process

100

* @returns JavaInputDStream from queue

101

*/

102

public <T> JavaInputDStream<T> queueStream(Queue<JavaRDD<T>> queue);

103

104

/**

105

* Create input stream from custom receiver

106

* @param receiver - Custom receiver implementation

107

* @returns JavaReceiverInputDStream from receiver

108

*/

109

public <T> JavaReceiverInputDStream<T> receiverStream(Receiver<T> receiver);

110

```

111

112

### JavaDStream Operations

113

114

Core DStream operations with Java 8 functional interfaces.

115

116

```java { .api }

117

/**

118

* Transform each element using a function

119

* @param f - Function to apply to each element

120

* @returns New JavaDStream with transformed elements

121

*/

122

public <R> JavaDStream<R> map(Function<T, R> f);

123

124

/**

125

* Transform each element to multiple elements

126

* @param f - Function returning an Iterable for each element

127

* @returns New JavaDStream with flattened results

128

*/

129

public <R> JavaDStream<R> flatMap(FlatMapFunction<T, R> f);

130

131

/**

132

* Filter elements based on predicate

133

* @param f - Predicate function returning boolean

134

* @returns New JavaDStream with filtered elements

135

*/

136

public JavaDStream<T> filter(Function<T, Boolean> f);

137

138

/**

139

* Union with another JavaDStream

140

* @param other - JavaDStream to union with

141

* @returns Combined JavaDStream

142

*/

143

public JavaDStream<T> union(JavaDStream<T> other);

144

145

/**

146

* Repartition the stream

147

* @param numPartitions - Number of partitions for output

148

* @returns Repartitioned JavaDStream

149

*/

150

public JavaDStream<T> repartition(int numPartitions);

151

152

/**

153

* Transform each RDD using custom function

154

* @param f - Function to transform JavaRDD

155

* @returns New JavaDStream with transformed RDDs

156

*/

157

public <R> JavaDStream<R> transform(Function<JavaRDD<T>, JavaRDD<R>> f);

158

159

/**

160

* Transform each RDD with time information

161

* @param f - Function receiving JavaRDD and Time

162

* @returns New JavaDStream with transformed RDDs

163

*/

164

public <R> JavaDStream<R> transform(Function2<JavaRDD<T>, Time, JavaRDD<R>> f);

165

```

166

167

### Window Operations in Java

168

169

Java-friendly window operations with Duration objects.

170

171

```java { .api }

172

/**

173

* Create windowed stream

174

* @param windowDuration - Width of the window

175

* @returns JavaDStream containing windowed data

176

*/

177

public JavaDStream<T> window(Duration windowDuration);

178

179

/**

180

* Create windowed stream with slide duration

181

* @param windowDuration - Width of the window

182

* @param slideDuration - Sliding interval

183

* @returns JavaDStream containing windowed data

184

*/

185

public JavaDStream<T> window(Duration windowDuration, Duration slideDuration);

186

187

/**

188

* Count elements over sliding window

189

* @param windowDuration - Width of the window

190

* @param slideDuration - Sliding interval

191

* @returns JavaDStream of counts

192

*/

193

public JavaDStream<Long> countByWindow(Duration windowDuration, Duration slideDuration);

194

195

/**

196

* Reduce elements over sliding window

197

* @param reduceFunc - Function to combine elements

198

* @param windowDuration - Width of the window

199

* @param slideDuration - Sliding interval

200

* @returns JavaDStream with reduced results

201

*/

202

public JavaDStream<T> reduceByWindow(

203

Function2<T, T, T> reduceFunc,

204

Duration windowDuration,

205

Duration slideDuration

206

);

207

```

208

209

### Output Operations in Java

210

211

Java-friendly output operations for processing and saving data.

212

213

```java { .api }

214

/**

215

* Apply function to each RDD

216

* @param f - Function to apply to each JavaRDD

217

*/

218

public void foreachRDD(VoidFunction<JavaRDD<T>> f);

219

220

/**

221

* Apply function to each RDD with time information

222

* @param f - Function receiving JavaRDD and Time

223

*/

224

public void foreachRDD(VoidFunction2<JavaRDD<T>, Time> f);

225

226

/**

227

* Print first 10 elements of each RDD

228

*/

229

public void print();

230

231

/**

232

* Print first num elements of each RDD

233

* @param num - Number of elements to print

234

*/

235

public void print(int num);

236

237

/**

238

* Save as text files with prefix

239

* @param prefix - Prefix for output file names

240

*/

241

public void saveAsTextFiles(String prefix);

242

243

/**

244

* Save as text files with prefix and suffix

245

* @param prefix - Prefix for output file names

246

* @param suffix - Suffix for output file names

247

*/

248

public void saveAsTextFiles(String prefix, String suffix);

249

```

250

251

**Usage Examples:**

252

253

```java

254

import org.apache.spark.streaming.api.java.JavaDStream;

255

import org.apache.spark.api.java.function.Function;

256

import org.apache.spark.api.java.function.VoidFunction;

257

258

JavaDStream<String> lines = jssc.socketTextStream("localhost", 9999);

259

260

// Basic transformations

261

JavaDStream<Integer> lengths = lines.map(String::length);

262

JavaDStream<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());

263

JavaDStream<String> filtered = lines.filter(line -> line.length() > 0);

264

265

// Window operations

266

JavaDStream<String> windowed = lines.window(Durations.seconds(30), Durations.seconds(10));

267

JavaDStream<Long> counts = lines.countByWindow(Durations.minutes(1), Durations.seconds(30));

268

269

// Output operations

270

words.foreachRDD(rdd -> {

271

long count = rdd.count();

272

System.out.println("Words in this batch: " + count);

273

});

274

275

lines.print(20);

276

```

277

278

### JavaPairDStream Operations

279

280

Operations for key-value pair streams in Java.

281

282

```java { .api }

283

/**

284

* Group values by key

285

* @returns JavaPairDStream of (key, iterable of values)

286

*/

287

public JavaPairDStream<K, Iterable<V>> groupByKey();

288

289

/**

290

* Reduce values by key

291

* @param func - Function to combine values

292

* @returns JavaPairDStream of (key, reduced value)

293

*/

294

public JavaPairDStream<K, V> reduceByKey(Function2<V, V, V> func);

295

296

/**

297

* Combine values by key using combiner functions

298

* @param createCombiner - Function to create initial combiner

299

* @param mergeValue - Function to merge value into combiner

300

* @param mergeCombiner - Function to merge combiners

301

* @returns JavaPairDStream of (key, combined value)

302

*/

303

public <C> JavaPairDStream<K, C> combineByKey(

304

Function<V, C> createCombiner,

305

Function2<C, V, C> mergeValue,

306

Function2<C, C, C> mergeCombiner

307

);

308

309

/**

310

* Transform values while keeping keys

311

* @param f - Function to transform values

312

* @returns JavaPairDStream with transformed values

313

*/

314

public <U> JavaPairDStream<K, U> mapValues(Function<V, U> f);

315

316

/**

317

* Join with another JavaPairDStream

318

* @param other - JavaPairDStream to join with

319

* @returns JavaPairDStream of (key, (leftValue, rightValue))

320

*/

321

public <W> JavaPairDStream<K, Tuple2<V, W>> join(JavaPairDStream<K, W> other);

322

323

/**

324

* Left outer join with another JavaPairDStream

325

* @param other - JavaPairDStream to join with

326

* @returns JavaPairDStream of (key, (leftValue, Optional[rightValue]))

327

*/

328

public <W> JavaPairDStream<K, Tuple2<V, Optional<W>>> leftOuterJoin(JavaPairDStream<K, W> other);

329

```

330

331

### Java State Management

332

333

Stateful operations using Java functional interfaces.

334

335

```java { .api }

336

/**

337

* Update state by key using Java function

338

* @param updateFunc - Function to update state

339

* @returns JavaPairDStream of (key, state)

340

*/

341

public <S> JavaPairDStream<K, S> updateStateByKey(

342

Function2<List<V>, Optional<S>, Optional<S>> updateFunc

343

);

344

345

/**

346

* Update state by key with custom partitioner

347

* @param updateFunc - Function to update state

348

* @param partitioner - Custom partitioner

349

* @returns JavaPairDStream of (key, state)

350

*/

351

public <S> JavaPairDStream<K, S> updateStateByKey(

352

Function2<List<V>, Optional<S>, Optional<S>> updateFunc,

353

Partitioner partitioner

354

);

355

356

/**

357

* Map with state using StateSpec (experimental)

358

* @param spec - StateSpec configuration

359

* @returns JavaMapWithStateDStream

360

*/

361

public <StateType, MappedType> JavaMapWithStateDStream<K, V, StateType, MappedType> mapWithState(

362

StateSpec<K, V, StateType, MappedType> spec

363

);

364

```

365

366

**Usage Examples:**

367

368

```java

369

import org.apache.spark.streaming.api.java.JavaPairDStream;

370

import org.apache.spark.api.java.Optional;

371

import scala.Tuple2;

372

373

// Create pair stream

374

JavaDStream<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());

375

JavaPairDStream<String, Integer> pairs = words.mapToPair(word -> new Tuple2<>(word, 1));

376

377

// Aggregations

378

JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((a, b) -> a + b);

379

JavaPairDStream<String, Iterable<Integer>> grouped = pairs.groupByKey();

380

381

// State management

382

JavaPairDStream<String, Integer> runningCounts = pairs.updateStateByKey((values, state) -> {

383

int currentCount = state.or(0);

384

int newCount = currentCount + values.stream().mapToInt(Integer::intValue).sum();

385

return newCount == 0 ? Optional.empty() : Optional.of(newCount);

386

});

387

388

// Window operations on pairs

389

JavaPairDStream<String, Integer> windowedCounts = pairs.reduceByKeyAndWindow(

390

(a, b) -> a + b,

391

Durations.seconds(30),

392

Durations.seconds(10)

393

);

394

```

395

396

### Java Listeners

397

398

Java-friendly streaming listeners for monitoring applications.

399

400

```java { .api }

401

/**

402

* Abstract base class for Java streaming listeners

403

*/

404

public abstract class JavaStreamingListener {

405

// Override methods you need to handle

406

public void onStreamingStarted(StreamingListenerStreamingStarted streamingStarted) {}

407

public void onBatchSubmitted(StreamingListenerBatchSubmitted batchSubmitted) {}

408

public void onBatchCompleted(StreamingListenerBatchCompleted batchCompleted) {}

409

public void onReceiverStarted(StreamingListenerReceiverStarted receiverStarted) {}

410

public void onReceiverError(StreamingListenerReceiverError receiverError) {}

411

public void onReceiverStopped(StreamingListenerReceiverStopped receiverStopped) {}

412

}

413

414

// Add listener to streaming context

415

jssc.addStreamingListener(new MyJavaStreamingListener());

416

```

417

418

### Duration Helper Class

419

420

Java-friendly duration creation utilities.

421

422

```java { .api }

423

/**

424

* Utility class for creating Duration objects

425

*/

426

public class Durations {

427

public static Duration milliseconds(long milliseconds);

428

public static Duration seconds(long seconds);

429

public static Duration minutes(long minutes);

430

public static Duration hours(long hours);

431

}

432

```

433

434

## Complete Java Example

435

436

```java

437

import org.apache.spark.SparkConf;

438

import org.apache.spark.streaming.api.java.*;

439

import org.apache.spark.streaming.Durations;

440

import org.apache.spark.api.java.function.*;

441

import org.apache.spark.api.java.Optional;

442

import scala.Tuple2;

443

import java.util.Arrays;

444

import java.util.List;

445

446

public class JavaWordCount {

447

public static void main(String[] args) throws InterruptedException {

448

// Create streaming context

449

SparkConf conf = new SparkConf()

450

.setAppName("JavaWordCount")

451

.setMaster("local[2]");

452

453

JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));

454

jssc.checkpoint("checkpoint");

455

456

// Create input stream

457

JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);

458

459

// Process data

460

JavaDStream<String> words = lines.flatMap(line ->

461

Arrays.asList(line.split(" ")).iterator()

462

);

463

464

JavaPairDStream<String, Integer> pairs = words.mapToPair(word ->

465

new Tuple2<>(word, 1)

466

);

467

468

// Running word count with state

469

JavaPairDStream<String, Integer> runningCounts = pairs.updateStateByKey(

470

(values, state) -> {

471

int sum = values.stream().mapToInt(Integer::intValue).sum();

472

return Optional.of(state.or(0) + sum);

473

}

474

);

475

476

// Windowed word count

477

JavaPairDStream<String, Integer> windowedCounts = pairs.reduceByKeyAndWindow(

478

(a, b) -> a + b, // Reduce function

479

(a, b) -> a - b, // Inverse reduce function

480

Durations.seconds(30), // Window duration

481

Durations.seconds(10) // Slide duration

482

);

483

484

// Output results

485

runningCounts.print();

486

windowedCounts.print();

487

488

// Advanced output with custom processing

489

runningCounts.foreachRDD(rdd -> {

490

List<Tuple2<String, Integer>> topWords = rdd.top(10,

491

(tuple1, tuple2) -> tuple1._2().compareTo(tuple2._2())

492

);

493

494

System.out.println("Top 10 words:");

495

topWords.forEach(tuple ->

496

System.out.println(tuple._1() + ": " + tuple._2())

497

);

498

});

499

500

// Start processing

501

jssc.start();

502

jssc.awaitTermination();

503

}

504

}

505

```

506

507

## Java API Type Conversions

508

509

Converting between Java and Scala types when needed:

510

511

```java

512

// Convert JavaRDD to RDD when calling Scala APIs

513

JavaRDD<String> javaRDD = /* ... */;

514

RDD<String> scalaRDD = javaRDD.rdd();

515

516

// Convert JavaDStream to DStream

517

JavaDStream<String> javaDStream = /* ... */;

518

DStream<String> scalaDStream = javaDStream.dstream();

519

520

// Working with Options

521

import org.apache.spark.api.java.Optional;

522

523

Optional<Integer> javaOptional = Optional.of(42);

524

Option<Integer> scalaOption = Optional.toScala(javaOptional);

525

```