or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-operations.mdindex.mdinput-sources.mdjava-api.mdoutput-operations.mdstateful-operations.mdtransformations.md

java-api.mddocs/

0

# Java API

1

2

Complete Java-friendly wrappers providing full feature parity with Scala APIs using Java Function interfaces for Apache Spark Streaming.

3

4

## JavaStreamingContext

5

6

### Creation

7

8

Create JavaStreamingContext with configuration:

9

```java { .api }

10

public JavaStreamingContext(SparkConf conf, Duration batchDuration)

11

public JavaStreamingContext(String master, String appName, Duration batchDuration)

12

public JavaStreamingContext(String master, String appName, Duration batchDuration, String sparkHome, String jarFile)

13

public JavaStreamingContext(String master, String appName, Duration batchDuration, String sparkHome, String[] jars)

14

public JavaStreamingContext(String master, String appName, Duration batchDuration, String sparkHome, String[] jars, Map<String, String> environment)

15

```

16

17

Create with existing JavaSparkContext:

18

```java { .api }

19

public JavaStreamingContext(JavaSparkContext sparkContext, Duration batchDuration)

20

```

21

22

Create from checkpoint:

23

```java { .api }

24

public JavaStreamingContext(String path)

25

```

26

27

Example context creation:

28

```java

29

SparkConf conf = new SparkConf()

30

.setAppName("JavaWordCount")

31

.setMaster("local[2]");

32

33

JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));

34

```

35

36

### Lifecycle Management

37

38

Start and stop operations:

39

```java { .api }

40

public void start()

41

public void stop()

42

public void stop(boolean stopSparkContext)

43

public void stop(boolean stopSparkContext, boolean stopGracefully)

44

public void awaitTermination() throws InterruptedException

45

public boolean awaitTerminationOrTimeout(long timeout) throws InterruptedException

46

```

47

48

Context state and configuration:

49

```java { .api }

50

public StreamingContextState getState()

51

public JavaSparkContext sparkContext()

52

public void checkpoint(String directory)

53

public void remember(Duration duration)

54

```

55

56

## Input Sources

57

58

### Socket Streams

59

60

Text socket stream:

61

```java { .api }

62

public JavaReceiverInputDStream<String> socketTextStream(String hostname, int port)

63

public JavaReceiverInputDStream<String> socketTextStream(String hostname, int port, StorageLevel storageLevel)

64

```

65

66

Custom socket stream:

67

```java { .api }

68

public <T> JavaReceiverInputDStream<T> socketStream(

69

String hostname,

70

int port,

71

Function<InputStream, Iterable<T>> converter,

72

StorageLevel storageLevel

73

)

74

```

75

76

Example socket streams:

77

```java

78

JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);

79

80

// Custom converter

81

JavaReceiverInputDStream<Integer> numbers = jssc.socketStream(

82

"localhost", 8080,

83

inputStream -> {

84

List<Integer> result = new ArrayList<>();

85

// Custom parsing logic

86

return result;

87

},

88

StorageLevel.MEMORY_AND_DISK_SER()

89

);

90

```

91

92

### File Streams

93

94

Text file stream:

95

```java { .api }

96

public JavaDStream<String> textFileStream(String directory)

97

```

98

99

Generic file stream:

100

```java { .api }

101

public <K, V, F extends NewInputFormat<K, V>> JavaPairInputDStream<K, V> fileStream(

102

String directory,

103

Class<K> kClass,

104

Class<V> vClass,

105

Class<F> fClass

106

)

107

108

public <K, V, F extends NewInputFormat<K, V>> JavaPairInputDStream<K, V> fileStream(

109

String directory,

110

Class<K> kClass,

111

Class<V> vClass,

112

Class<F> fClass,

113

Function<Path, Boolean> filter,

114

boolean newFilesOnly

115

)

116

```

117

118

Binary records stream:

119

```java { .api }

120

public JavaDStream<byte[]> binaryRecordsStream(String directory, int recordLength)

121

```

122

123

Example file streams:

124

```java

125

JavaDStream<String> fileStream = jssc.textFileStream("/data/input");

126

127

// Hadoop file stream

128

JavaPairInputDStream<LongWritable, Text> hadoopStream = jssc.fileStream(

129

"/data/input",

130

LongWritable.class,

131

Text.class,

132

TextInputFormat.class

133

);

134

```

135

136

### Queue and Receiver Streams

137

138

Queue stream:

139

```java { .api }

140

public <T> JavaDStream<T> queueStream(Queue<JavaRDD<T>> queue)

141

public <T> JavaDStream<T> queueStream(Queue<JavaRDD<T>> queue, boolean oneAtATime)

142

public <T> JavaDStream<T> queueStream(Queue<JavaRDD<T>> queue, boolean oneAtATime, JavaRDD<T> defaultRDD)

143

```

144

145

Receiver stream:

146

```java { .api }

147

public <T> JavaReceiverInputDStream<T> receiverStream(Receiver<T> receiver)

148

```

149

150

Example queue stream:

151

```java

152

Queue<JavaRDD<String>> rddQueue = new LinkedList<>();

153

JavaDStream<String> queueStream = jssc.queueStream(rddQueue);

154

155

// Add RDDs to queue

156

rddQueue.add(jssc.sparkContext().parallelize(Arrays.asList("hello", "world")));

157

```

158

159

## JavaDStream Transformations

160

161

### Basic Transformations

162

163

Map operations:

164

```java { .api }

165

public <R> JavaDStream<R> map(Function<T, R> f)

166

public <R> JavaDStream<R> mapPartitions(FlatMapFunction<Iterator<T>, R> f)

167

public <R> JavaDStream<R> flatMap(FlatMapFunction<T, R> f)

168

```

169

170

Filter and utility operations:

171

```java { .api }

172

public JavaDStream<T> filter(Function<T, Boolean> f)

173

public JavaDStream<T[]> glom()

174

public JavaDStream<T> cache()

175

public JavaDStream<T> persist(StorageLevel level)

176

public JavaDStream<T> repartition(int numPartitions)

177

```

178

179

Example basic transformations:

180

```java

181

JavaDStream<String> lines = jssc.socketTextStream("localhost", 9999);

182

183

JavaDStream<Integer> lengths = lines.map(String::length);

184

JavaDStream<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());

185

JavaDStream<String> nonEmpty = lines.filter(line -> !line.isEmpty());

186

```

187

188

### Aggregation Operations

189

190

Reduce and count operations:

191

```java { .api }

192

public JavaDStream<T> reduce(Function2<T, T, T> f)

193

public JavaDStream<Long> count()

194

public JavaPairDStream<T, Long> countByValue()

195

public JavaPairDStream<T, Long> countByValue(int numPartitions)

196

```

197

198

Example aggregations:

199

```java

200

JavaDStream<Integer> numbers = lines.map(Integer::parseInt);

201

202

JavaDStream<Integer> sum = numbers.reduce(Integer::sum);

203

JavaDStream<Long> count = numbers.count();

204

JavaPairDStream<Integer, Long> histogram = numbers.countByValue();

205

```

206

207

### Window Operations

208

209

Basic windowing:

210

```java { .api }

211

public JavaDStream<T> window(Duration windowDuration)

212

public JavaDStream<T> window(Duration windowDuration, Duration slideDuration)

213

```

214

215

Windowed reductions:

216

```java { .api }

217

public JavaDStream<T> reduceByWindow(Function2<T, T, T> reduceFunc, Duration windowDuration, Duration slideDuration)

218

public JavaDStream<T> reduceByWindow(Function2<T, T, T> reduceFunc, Function2<T, T, T> invReduceFunc, Duration windowDuration, Duration slideDuration)

219

public JavaDStream<Long> countByWindow(Duration windowDuration, Duration slideDuration)

220

```

221

222

Example windowing:

223

```java

224

JavaDStream<String> windowedLines = lines.window(Durations.seconds(30), Durations.seconds(10));

225

JavaDStream<Integer> windowSum = numbers.reduceByWindow(

226

Integer::sum,

227

Durations.minutes(1),

228

Durations.seconds(10)

229

);

230

```

231

232

### Transform Operations

233

234

RDD-level transformations:

235

```java { .api }

236

public <R> JavaDStream<R> transform(Function<JavaRDD<T>, JavaRDD<R>> transformFunc)

237

public <R> JavaDStream<R> transform(Function2<JavaRDD<T>, Time, JavaRDD<R>> transformFunc)

238

public <U, R> JavaDStream<R> transformWith(JavaDStream<U> other, Function3<JavaRDD<T>, JavaRDD<U>, Time, JavaRDD<R>> transformFunc)

239

```

240

241

Example transforms:

242

```java

243

JavaDStream<String> processed = lines.transform(rdd -> {

244

return rdd.filter(line -> !line.isEmpty())

245

.map(String::toUpperCase);

246

});

247

248

JavaDStream<String> timestamped = lines.transform((rdd, time) -> {

249

return rdd.map(line -> time.milliseconds() + ": " + line);

250

});

251

```

252

253

## JavaPairDStream Operations

254

255

### Pair Creation

256

257

Create pair DStream:

258

```java { .api }

259

public <K2, V2> JavaPairDStream<K2, V2> mapToPair(PairFunction<T, K2, V2> f)

260

public <K2, V2> JavaPairDStream<K2, V2> flatMapToPair(PairFlatMapFunction<T, K2, V2> f)

261

```

262

263

Example pair creation:

264

```java

265

JavaDStream<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());

266

JavaPairDStream<String, Integer> pairs = words.mapToPair(word -> new Tuple2<>(word, 1));

267

```

268

269

### Key-Value Transformations

270

271

Value transformations:

272

```java { .api }

273

public <W> JavaPairDStream<K, W> mapValues(Function<V, W> f) // On JavaPairDStream<K, V>

274

public <W> JavaPairDStream<K, W> flatMapValues(Function<V, Iterable<W>> f)

275

```

276

277

Grouping and reduction:

278

```java { .api }

279

public JavaPairDStream<K, Iterable<V>> groupByKey()

280

public JavaPairDStream<K, Iterable<V>> groupByKey(int numPartitions)

281

public JavaPairDStream<K, Iterable<V>> groupByKey(Partitioner partitioner)

282

public JavaPairDStream<K, V> reduceByKey(Function2<V, V, V> func)

283

public JavaPairDStream<K, V> reduceByKey(Function2<V, V, V> func, int numPartitions)

284

```

285

286

Example key-value operations:

287

```java

288

JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(Integer::sum);

289

JavaPairDStream<String, String> upperValues = pairs.mapValues(String::valueOf).mapValues(String::toUpperCase);

290

```

291

292

### Windowed Key-Value Operations

293

294

Windowed grouping and reduction:

295

```java { .api }

296

public JavaPairDStream<K, Iterable<V>> groupByKeyAndWindow(Duration windowDuration)

297

public JavaPairDStream<K, Iterable<V>> groupByKeyAndWindow(Duration windowDuration, Duration slideDuration)

298

public JavaPairDStream<K, V> reduceByKeyAndWindow(Function2<V, V, V> func, Duration windowDuration)

299

public JavaPairDStream<K, V> reduceByKeyAndWindow(Function2<V, V, V> func, Duration windowDuration, Duration slideDuration)

300

public JavaPairDStream<K, V> reduceByKeyAndWindow(Function2<V, V, V> reduceFunc, Function2<V, V, V> invReduceFunc, Duration windowDuration, Duration slideDuration)

301

```

302

303

Example windowed operations:

304

```java

305

JavaPairDStream<String, Integer> windowedCounts = pairs.reduceByKeyAndWindow(

306

Integer::sum,

307

Durations.minutes(5),

308

Durations.seconds(30)

309

);

310

```

311

312

### Join Operations

313

314

Join DStreams:

315

```java { .api }

316

public <W> JavaPairDStream<K, Tuple2<V, W>> join(JavaPairDStream<K, W> other)

317

public <W> JavaPairDStream<K, Tuple2<V, W>> join(JavaPairDStream<K, W> other, int numPartitions)

318

public <W> JavaPairDStream<K, Tuple2<V, Optional<W>>> leftOuterJoin(JavaPairDStream<K, W> other)

319

public <W> JavaPairDStream<K, Tuple2<Optional<V>, W>> rightOuterJoin(JavaPairDStream<K, W> other)

320

public <W> JavaPairDStream<K, Tuple2<Optional<V>, Optional<W>>> fullOuterJoin(JavaPairDStream<K, W> other)

321

```

322

323

Example joins:

324

```java

325

JavaPairDStream<String, String> stream1 = lines1.mapToPair(line -> new Tuple2<>(line.split(",")[0], line.split(",")[1]));

326

JavaPairDStream<String, String> stream2 = lines2.mapToPair(line -> new Tuple2<>(line.split(",")[0], line.split(",")[1]));

327

328

JavaPairDStream<String, Tuple2<String, String>> joined = stream1.join(stream2);

329

```

330

331

## Stateful Operations

332

333

### UpdateStateByKey

334

335

Update state by key:

336

```java { .api }

337

public <S> JavaPairDStream<K, S> updateStateByKey(Function2<List<V>, Optional<S>, Optional<S>> updateFunc) // On JavaPairDStream<K, V>

338

public <S> JavaPairDStream<K, S> updateStateByKey(Function2<List<V>, Optional<S>, Optional<S>> updateFunc, int numPartitions)

339

public <S> JavaPairDStream<K, S> updateStateByKey(Function2<List<V>, Optional<S>, Optional<S>> updateFunc, Partitioner partitioner, JavaRDD<Tuple2<K, S>> initialRDD)

340

```

341

342

Example updateStateByKey:

343

```java

344

JavaPairDStream<String, Integer> runningCounts = pairs.updateStateByKey(

345

(values, state) -> {

346

int currentCount = values.stream().mapToInt(Integer::intValue).sum();

347

int newCount = state.orElse(0) + currentCount;

348

return Optional.of(newCount);

349

}

350

);

351

```

352

353

### MapWithState

354

355

State specification and mapping:

356

```java { .api }

357

// StateSpec factory methods for Java

358

public static <K, V, S, T> StateSpec<K, V, S, T> function(JFunction3<K, Optional<V>, State<S>, T> mappingFunction)

359

public static <K, V, S, T> StateSpec<K, V, S, T> function(JFunction4<Time, K, Optional<V>, State<S>, Optional<T>> mappingFunction)

360

```

361

362

StateSpec configuration:

363

```java { .api }

364

public StateSpec<K, V, S, T> initialState(JavaPairRDD<K, S> rdd)

365

public StateSpec<K, V, S, T> numPartitions(int numPartitions)

366

public StateSpec<K, V, S, T> partitioner(Partitioner partitioner)

367

public StateSpec<K, V, S, T> timeout(Duration idleDuration)

368

```

369

370

Example mapWithState:

371

```java

372

StateSpec<String, Integer, Integer, Tuple2<String, Integer>> stateSpec =

373

StateSpec.function((word, one, state) -> {

374

int sum = one.orElse(0) + state.getOption().orElse(0);

375

Tuple2<String, Integer> output = new Tuple2<>(word, sum);

376

state.update(sum);

377

return output;

378

});

379

380

JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateDStream =

381

pairs.mapWithState(stateSpec);

382

```

383

384

## Output Operations

385

386

### Basic Output

387

388

Print and forEach operations:

389

```java { .api }

390

public void print()

391

public void print(int num)

392

public void foreachRDD(VoidFunction<JavaRDD<T>> foreachFunc)

393

public void foreachRDD(VoidFunction2<JavaRDD<T>, Time> foreachFunc)

394

```

395

396

### File Output

397

398

Save operations:

399

```java { .api }

400

public void saveAsTextFiles(String prefix, String suffix) // On JavaDStream

401

public void saveAsObjectFiles(String prefix, String suffix)

402

public void saveAsHadoopFiles(String prefix, String suffix, Class<?> keyClass, Class<?> valueClass, Class<? extends OutputFormat> outputFormatClass) // On JavaPairDStream

403

public void saveAsNewAPIHadoopFiles(String prefix, String suffix, Class<?> keyClass, Class<?> valueClass, Class<? extends NewOutputFormat> outputFormatClass)

404

```

405

406

Example output operations:

407

```java

408

wordCounts.print();

409

410

wordCounts.foreachRDD(rdd -> {

411

System.out.println("Batch size: " + rdd.count());

412

rdd.collect().forEach(System.out::println);

413

});

414

415

lines.saveAsTextFiles("output", "txt");

416

```

417

418

## Event Listeners

419

420

### JavaStreamingListener

421

422

Java streaming listener interface:

423

```java { .api }

424

public abstract class JavaStreamingListener {

425

public void onStreamingStarted(StreamingListenerStreamingStarted streamingStarted) {}

426

public void onBatchSubmitted(StreamingListenerBatchSubmitted batchSubmitted) {}

427

public void onBatchStarted(StreamingListenerBatchStarted batchStarted) {}

428

public void onBatchCompleted(StreamingListenerBatchCompleted batchCompleted) {}

429

public void onOutputOperationStarted(StreamingListenerOutputOperationStarted outputOperationStarted) {}

430

public void onOutputOperationCompleted(StreamingListenerOutputOperationCompleted outputOperationCompleted) {}

431

public void onReceiverStarted(StreamingListenerReceiverStarted receiverStarted) {}

432

public void onReceiverError(StreamingListenerReceiverError receiverError) {}

433

public void onReceiverStopped(StreamingListenerReceiverStopped receiverStopped) {}

434

}

435

```

436

437

Add/remove listeners:

438

```java { .api }

439

public void addStreamingListener(StreamingListener streamingListener)

440

public void removeStreamingListener(StreamingListener streamingListener)

441

```

442

443

Example listener:

444

```java

445

jssc.addStreamingListener(new JavaStreamingListener() {

446

@Override

447

public void onBatchCompleted(StreamingListenerBatchCompleted batchCompleted) {

448

BatchInfo info = batchCompleted.batchInfo();

449

System.out.println("Batch completed: " + info.batchTime() +

450

" Processing time: " + info.processingDelay());

451

}

452

});

453

```

454

455

## Duration Utilities

456

457

Duration creation:

458

```java { .api }

459

public class Durations {

460

public static Duration milliseconds(long milliseconds)

461

public static Duration seconds(long seconds)

462

public static Duration minutes(long minutes)

463

}

464

```

465

466

Example duration usage:

467

```java

468

Duration batchInterval = Durations.seconds(5);

469

Duration windowSize = Durations.minutes(10);

470

Duration slideInterval = Durations.seconds(30);

471

```

472

473

## Complete Java Example

474

475

```java

476

import org.apache.spark.SparkConf;

477

import org.apache.spark.streaming.api.java.*;

478

import org.apache.spark.streaming.Durations;

479

import scala.Tuple2;

480

import java.util.Arrays;

481

import java.util.Optional;

482

483

public class JavaWordCount {

484

public static void main(String[] args) throws InterruptedException {

485

SparkConf conf = new SparkConf()

486

.setAppName("JavaWordCount")

487

.setMaster("local[2]");

488

489

JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));

490

jssc.checkpoint("checkpoint");

491

492

// Create input stream

493

JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);

494

495

// Transform and count words

496

JavaDStream<String> words = lines.flatMap(line ->

497

Arrays.asList(line.split(" ")).iterator());

498

499

JavaPairDStream<String, Integer> pairs = words.mapToPair(word ->

500

new Tuple2<>(word, 1));

501

502

JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(Integer::sum);

503

504

// Running count across batches

505

JavaPairDStream<String, Integer> runningCounts = pairs.updateStateByKey(

506

(values, state) -> {

507

int currentCount = values.stream().mapToInt(Integer::intValue).sum();

508

int newCount = state.orElse(0) + currentCount;

509

return Optional.of(newCount);

510

}

511

);

512

513

wordCounts.print();

514

runningCounts.print();

515

516

jssc.start();

517

jssc.awaitTermination();

518

}

519

}

520

```