or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

batch-rdd.mddirect-streaming.mdindex.mdjava-api.mdoffset-management.mdreceiver-streaming.md

java-api.mddocs/

0

# Java API

1

2

Complete Java API for Apache Spark Streaming Kafka integration, providing type-safe wrappers for all Scala functionality with familiar Java programming patterns.

3

4

## Capabilities

5

6

### Java Direct Streaming

7

8

Java-friendly API for direct stream creation without receivers.

9

10

```java { .api }

11

/**

12

* Create an input stream that directly pulls messages from Kafka Brokers (Java API).

13

*

14

* @param jssc JavaStreamingContext object

15

* @param keyClass Class of the keys in the Kafka records

16

* @param valueClass Class of the values in the Kafka records

17

* @param keyDecoderClass Class of the key decoder

18

* @param valueDecoderClass Class type of the value decoder

19

* @param kafkaParams Kafka configuration parameters

20

* @param topics Names of the topics to consume

21

* @tparam K type of Kafka message key

22

* @tparam V type of Kafka message value

23

* @tparam KD type of Kafka message key decoder

24

* @tparam VD type of Kafka message value decoder

25

* @return DStream of (Kafka message key, Kafka message value)

26

*/

27

public static <K, V, KD extends Decoder<K>, VD extends Decoder<V>>

28

JavaPairInputDStream<K, V> createDirectStream(

29

JavaStreamingContext jssc,

30

Class<K> keyClass,

31

Class<V> valueClass,

32

Class<KD> keyDecoderClass,

33

Class<VD> valueDecoderClass,

34

Map<String, String> kafkaParams,

35

Set<String> topics

36

)

37

```

38

39

**Usage Example:**

40

41

```java

42

import org.apache.spark.streaming.kafka.KafkaUtils;

43

import org.apache.spark.streaming.api.java.*;

44

import kafka.serializer.StringDecoder;

45

import java.util.*;

46

47

// Setup Kafka parameters

48

Map<String, String> kafkaParams = new HashMap<>();

49

kafkaParams.put("metadata.broker.list", "localhost:9092");

50

kafkaParams.put("auto.offset.reset", "largest");

51

52

Set<String> topics = Collections.singleton("user-events");

53

54

// Create direct stream

55

JavaPairInputDStream<String, String> stream = KafkaUtils.createDirectStream(

56

jssc,

57

String.class,

58

String.class,

59

StringDecoder.class,

60

StringDecoder.class,

61

kafkaParams,

62

topics

63

);

64

65

// Process messages

66

stream.foreachRDD(rdd -> {

67

OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();

68

69

rdd.foreach(record -> {

70

System.out.println("Key: " + record._1 + ", Value: " + record._2);

71

});

72

73

// Print offset information

74

for (OffsetRange o : offsetRanges) {

75

System.out.println(o.topic() + " " + o.partition() +

76

" " + o.fromOffset() + " " + o.untilOffset());

77

}

78

});

79

```

80

81

### Java Direct Streaming with Custom Message Handler

82

83

Advanced Java API with custom message transformation and explicit offset control.

84

85

```java { .api }

86

/**

87

* Create an input stream with custom message handler (Java API).

88

*

89

* @param jssc JavaStreamingContext object

90

* @param keyClass Class of the keys in the Kafka records

91

* @param valueClass Class of the values in the Kafka records

92

* @param keyDecoderClass Class of the key decoder

93

* @param valueDecoderClass Class of the value decoder

94

* @param recordClass Class of the records in DStream

95

* @param kafkaParams Kafka configuration parameters

96

* @param fromOffsets Per-topic/partition Kafka offsets defining starting point

97

* @param messageHandler Function for translating each message and metadata

98

* @tparam K type of Kafka message key

99

* @tparam V type of Kafka message value

100

* @tparam KD type of Kafka message key decoder

101

* @tparam VD type of Kafka message value decoder

102

* @tparam R type returned by messageHandler

103

* @return DStream of R

104

*/

105

public static <K, V, KD extends Decoder<K>, VD extends Decoder<V>, R>

106

JavaInputDStream<R> createDirectStream(

107

JavaStreamingContext jssc,

108

Class<K> keyClass,

109

Class<V> valueClass,

110

Class<KD> keyDecoderClass,

111

Class<VD> valueDecoderClass,

112

Class<R> recordClass,

113

Map<String, String> kafkaParams,

114

Map<TopicAndPartition, Long> fromOffsets,

115

Function<MessageAndMetadata<K, V>, R> messageHandler

116

)

117

```

118

119

**Usage Example:**

120

121

```java

122

import kafka.common.TopicAndPartition;

123

import kafka.message.MessageAndMetadata;

124

import org.apache.spark.api.java.function.Function;

125

126

// Define custom message handler

127

Function<MessageAndMetadata<String, String>, String> messageHandler =

128

new Function<MessageAndMetadata<String, String>, String>() {

129

@Override

130

public String call(MessageAndMetadata<String, String> mmd) {

131

return String.format("%s:%d:%d -> %s:%s",

132

mmd.topic(), mmd.partition(), mmd.offset(),

133

mmd.key(), mmd.message());

134

}

135

};

136

137

// Or using lambda (Java 8+)

138

Function<MessageAndMetadata<String, String>, String> lambdaHandler =

139

mmd -> String.format("%s:%d:%d -> %s:%s",

140

mmd.topic(), mmd.partition(), mmd.offset(),

141

mmd.key(), mmd.message());

142

143

// Define starting offsets

144

Map<TopicAndPartition, Long> fromOffsets = new HashMap<>();

145

fromOffsets.put(new TopicAndPartition("events", 0), 1000L);

146

fromOffsets.put(new TopicAndPartition("events", 1), 2000L);

147

148

// Create stream with custom handler

149

JavaInputDStream<String> customStream = KafkaUtils.createDirectStream(

150

jssc,

151

String.class,

152

String.class,

153

StringDecoder.class,

154

StringDecoder.class,

155

String.class,

156

kafkaParams,

157

fromOffsets,

158

lambdaHandler

159

);

160

161

customStream.print();

162

```

163

164

### Java Receiver-based Streaming

165

166

Java API for traditional receiver-based streaming.

167

168

```java { .api }

169

/**

170

* Create an input stream that pulls messages from Kafka Brokers using receivers (Java API).

171

*

172

* @param jssc JavaStreamingContext object

173

* @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..)

174

* @param groupId The group id for this consumer

175

* @param topics Map of (topic_name -> numPartitions) to consume

176

* @return DStream of (Kafka message key, Kafka message value)

177

*/

178

public static JavaPairReceiverInputDStream<String, String> createStream(

179

JavaStreamingContext jssc,

180

String zkQuorum,

181

String groupId,

182

Map<String, Integer> topics

183

)

184

185

/**

186

* Create receiver-based stream with custom storage level.

187

*/

188

public static JavaPairReceiverInputDStream<String, String> createStream(

189

JavaStreamingContext jssc,

190

String zkQuorum,

191

String groupId,

192

Map<String, Integer> topics,

193

StorageLevel storageLevel

194

)

195

```

196

197

**Usage Example:**

198

199

```java

200

import org.apache.spark.storage.StorageLevel;

201

202

Map<String, Integer> topics = new HashMap<>();

203

topics.put("user-events", 1);

204

topics.put("system-logs", 2);

205

206

JavaPairReceiverInputDStream<String, String> receiverStream = KafkaUtils.createStream(

207

jssc,

208

"localhost:2181",

209

"my-consumer-group",

210

topics,

211

StorageLevel.MEMORY_AND_DISK_SER_2()

212

);

213

214

receiverStream.foreachRDD(rdd -> {

215

System.out.println("Batch size: " + rdd.count());

216

rdd.foreach(record -> {

217

System.out.println("Received: " + record._1 + " -> " + record._2);

218

});

219

});

220

```

221

222

### Java Batch RDD Creation

223

224

Java API for creating RDDs from Kafka with precise offset control.

225

226

```java { .api }

227

/**

228

* Create a RDD from Kafka using offset ranges (Java API).

229

*

230

* @param jsc JavaSparkContext object

231

* @param keyClass type of Kafka message key

232

* @param valueClass type of Kafka message value

233

* @param keyDecoderClass type of Kafka message key decoder

234

* @param valueDecoderClass type of Kafka message value decoder

235

* @param kafkaParams Kafka configuration parameters

236

* @param offsetRanges Each OffsetRange corresponds to a range of offsets

237

* @tparam K type of Kafka message key

238

* @tparam V type of Kafka message value

239

* @tparam KD type of Kafka message key decoder

240

* @tparam VD type of Kafka message value decoder

241

* @return RDD of (Kafka message key, Kafka message value)

242

*/

243

public static <K, V, KD extends Decoder<K>, VD extends Decoder<V>>

244

JavaPairRDD<K, V> createRDD(

245

JavaSparkContext jsc,

246

Class<K> keyClass,

247

Class<V> valueClass,

248

Class<KD> keyDecoderClass,

249

Class<VD> valueDecoderClass,

250

Map<String, String> kafkaParams,

251

OffsetRange[] offsetRanges

252

)

253

```

254

255

**Usage Example:**

256

257

```java

258

import org.apache.spark.streaming.kafka.OffsetRange;

259

260

// Create offset ranges

261

OffsetRange[] offsetRanges = {

262

OffsetRange.create("events", 0, 1000, 2000),

263

OffsetRange.create("events", 1, 500, 1500),

264

OffsetRange.create("logs", 0, 100, 200)

265

};

266

267

// Create RDD

268

JavaPairRDD<String, String> rdd = KafkaUtils.createRDD(

269

jsc,

270

String.class,

271

String.class,

272

StringDecoder.class,

273

StringDecoder.class,

274

kafkaParams,

275

offsetRanges

276

);

277

278

// Process data

279

System.out.println("Total messages: " + rdd.count());

280

rdd.foreach(record -> {

281

System.out.println("Key: " + record._1 + ", Value: " + record._2);

282

});

283

```

284

285

## Java Type System Integration

286

287

### Generic Type Support

288

289

The Java API fully supports generic types with proper type safety:

290

291

```java

292

// Custom key/value types

293

public class UserEvent {

294

public String userId;

295

public String eventType;

296

public long timestamp;

297

}

298

299

public class CustomDecoder implements Decoder<UserEvent> {

300

@Override

301

public UserEvent fromBytes(byte[] bytes) {

302

// Custom deserialization logic

303

return parseUserEvent(bytes);

304

}

305

}

306

307

// Type-safe stream creation

308

JavaPairInputDStream<String, UserEvent> typedStream = KafkaUtils.createDirectStream(

309

jssc,

310

String.class,

311

UserEvent.class,

312

StringDecoder.class,

313

CustomDecoder.class,

314

kafkaParams,

315

topics

316

);

317

```

318

319

### Working with HasOffsetRanges

320

321

Accessing offset information from Java RDDs:

322

323

```java

324

import org.apache.spark.streaming.kafka.HasOffsetRanges;

325

import org.apache.spark.streaming.kafka.OffsetRange;

326

327

stream.foreachRDD(rdd -> {

328

// Cast to HasOffsetRanges to access offset information

329

OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();

330

331

for (OffsetRange offsetRange : offsetRanges) {

332

System.out.printf("Topic: %s, Partition: %d, Range: [%d, %d)%n",

333

offsetRange.topic(),

334

offsetRange.partition(),

335

offsetRange.fromOffset(),

336

offsetRange.untilOffset()

337

);

338

}

339

340

// Process the data

341

rdd.foreach(record -> processRecord(record));

342

});

343

```

344

345

### Java Collections Integration

346

347

Working with Java collections for configuration:

348

349

```java

350

import java.util.*;

351

352

// Kafka parameters using Java Maps

353

Map<String, String> kafkaParams = new HashMap<String, String>() {{

354

put("metadata.broker.list", "localhost:9092");

355

put("auto.offset.reset", "largest");

356

put("group.id", "my-consumer-group");

357

}};

358

359

// Topics using Java Sets

360

Set<String> topics = new HashSet<String>() {{

361

add("user-events");

362

add("system-logs");

363

}};

364

365

// Topic partitions using Java Maps

366

Map<String, Integer> topicPartitions = new HashMap<String, Integer>() {{

367

put("user-events", 2);

368

put("system-logs", 1);

369

}};

370

```

371

372

## Lambda Expression Support (Java 8+)

373

374

### Stream Processing with Lambdas

375

376

```java

377

// Direct stream with lambda processing

378

JavaPairInputDStream<String, String> stream = KafkaUtils.createDirectStream(/*...*/);

379

380

stream

381

.filter(record -> record._1 != null && !record._1.isEmpty())

382

.map(record -> record._1.toUpperCase() + ":" + record._2)

383

.foreachRDD(rdd -> {

384

rdd.foreach(System.out::println);

385

});

386

```

387

388

### Custom Message Handlers with Lambdas

389

390

```java

391

// Lambda message handler

392

Function<MessageAndMetadata<String, String>, ProcessedMessage> handler =

393

mmd -> new ProcessedMessage(

394

mmd.topic(),

395

mmd.partition(),

396

mmd.offset(),

397

mmd.key(),

398

mmd.message(),

399

System.currentTimeMillis()

400

);

401

402

JavaInputDStream<ProcessedMessage> processedStream = KafkaUtils.createDirectStream(

403

jssc,

404

String.class,

405

String.class,

406

StringDecoder.class,

407

StringDecoder.class,

408

ProcessedMessage.class,

409

kafkaParams,

410

fromOffsets,

411

handler

412

);

413

```

414

415

## Exception Handling

416

417

### Java Exception Patterns

418

419

```java

420

try {

421

JavaPairInputDStream<String, String> stream = KafkaUtils.createDirectStream(

422

jssc, String.class, String.class,

423

StringDecoder.class, StringDecoder.class,

424

kafkaParams, topics

425

);

426

427

stream.foreachRDD(rdd -> {

428

try {

429

rdd.foreach(record -> {

430

processRecord(record._1, record._2);

431

});

432

} catch (Exception e) {

433

System.err.println("Error processing RDD: " + e.getMessage());

434

e.printStackTrace();

435

}

436

});

437

438

} catch (Exception e) {

439

System.err.println("Error creating Kafka stream: " + e.getMessage());

440

throw new RuntimeException("Failed to initialize Kafka streaming", e);

441

}

442

```

443

444

### Handling Serialization Issues

445

446

```java

447

// Custom error-handling decoder

448

public class SafeStringDecoder implements Decoder<String> {

449

private final StringDecoder delegate = new StringDecoder();

450

451

@Override

452

public String fromBytes(byte[] bytes) {

453

try {

454

return delegate.fromBytes(bytes);

455

} catch (Exception e) {

456

System.err.println("Failed to decode message: " + e.getMessage());

457

return "<DECODE_ERROR>";

458

}

459

}

460

}

461

```

462

463

## Integration Patterns

464

465

### Spring Framework Integration

466

467

```java

468

@Component

469

public class KafkaStreamingService {

470

471

@Autowired

472

private JavaStreamingContext streamingContext;

473

474

@Value("${kafka.brokers}")

475

private String brokers;

476

477

@PostConstruct

478

public void initializeStreams() {

479

Map<String, String> kafkaParams = new HashMap<>();

480

kafkaParams.put("metadata.broker.list", brokers);

481

482

Set<String> topics = Set.of("events");

483

484

JavaPairInputDStream<String, String> stream = KafkaUtils.createDirectStream(

485

streamingContext,

486

String.class, String.class,

487

StringDecoder.class, StringDecoder.class,

488

kafkaParams, topics

489

);

490

491

stream.foreachRDD(this::processRDD);

492

}

493

494

private void processRDD(JavaPairRDD<String, String> rdd) {

495

rdd.foreach(record -> {

496

// Business logic here

497

handleMessage(record._1, record._2);

498

});

499

}

500

}

501

```