or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

aws-configuration.mddata-processing.mdindex.mdjava-api.mdstream-creation.md

java-api.mddocs/

0

# Java API

1

2

Complete Java API support with type-safe bindings, function interfaces, and seamless integration with Java Streaming contexts and data processing pipelines.

3

4

## Capabilities

5

6

### Basic Stream Creation (Java)

7

8

Creates a Kinesis input stream using default AWS credential discovery and byte array message handler.

9

10

```java { .api }

11

/**

12

* Create an input stream that pulls messages from a Kinesis stream using the KCL.

13

* Uses DefaultAWSCredentialsProviderChain for AWS authentication.

14

*

15

* @param jssc Java StreamingContext object

16

* @param kinesisAppName Kinesis application name used by the KCL to update DynamoDB

17

* @param streamName Kinesis stream name

18

* @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)

19

* @param regionName Name of region used by the KCL for DynamoDB and CloudWatch

20

* @param initialPositionInStream Starting position in stream (TRIM_HORIZON or LATEST)

21

* @param checkpointInterval Checkpoint interval for Kinesis checkpointing

22

* @param storageLevel Storage level for received objects (MEMORY_AND_DISK_2 recommended)

23

* @return JavaReceiverInputDStream<byte[]> containing raw message data

24

*/

25

public static JavaReceiverInputDStream<byte[]> createStream(

26

JavaStreamingContext jssc,

27

String kinesisAppName,

28

String streamName,

29

String endpointUrl,

30

String regionName,

31

InitialPositionInStream initialPositionInStream,

32

Duration checkpointInterval,

33

StorageLevel storageLevel

34

);

35

```

36

37

**Usage Example:**

38

39

```java

40

import org.apache.spark.streaming.kinesis.KinesisUtils;

41

import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;

42

import org.apache.spark.storage.StorageLevel;

43

import org.apache.spark.streaming.Duration;

44

import org.apache.spark.streaming.api.java.JavaStreamingContext;

45

import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;

46

47

JavaReceiverInputDStream<byte[]> stream = KinesisUtils.createStream(

48

jssc,

49

"MySparkKinesisApp",

50

"my-kinesis-stream",

51

"https://kinesis.us-east-1.amazonaws.com",

52

"us-east-1",

53

InitialPositionInStream.LATEST,

54

new Duration(2000),

55

StorageLevel.MEMORY_AND_DISK_2()

56

);

57

```

58

59

### Stream Creation with Explicit Credentials (Java)

60

61

Creates a Kinesis input stream with explicitly provided AWS credentials.

62

63

```java { .api }

64

/**

65

* Create an input stream with explicit AWS credentials.

66

* Note: Credentials will be saved in DStream checkpoints if checkpointing is enabled.

67

*

68

* @param jssc Java StreamingContext object

69

* @param kinesisAppName Kinesis application name used by the KCL to update DynamoDB

70

* @param streamName Kinesis stream name

71

* @param endpointUrl Url of Kinesis service

72

* @param regionName Name of region used by the KCL for DynamoDB and CloudWatch

73

* @param initialPositionInStream Starting position in stream

74

* @param checkpointInterval Checkpoint interval for Kinesis checkpointing

75

* @param storageLevel Storage level for received objects

76

* @param awsAccessKeyId AWS AccessKeyId (if null, uses DefaultAWSCredentialsProviderChain)

77

* @param awsSecretKey AWS SecretKey (if null, uses DefaultAWSCredentialsProviderChain)

78

* @return JavaReceiverInputDStream<byte[]> containing raw message data

79

*/

80

public static JavaReceiverInputDStream<byte[]> createStream(

81

JavaStreamingContext jssc,

82

String kinesisAppName,

83

String streamName,

84

String endpointUrl,

85

String regionName,

86

InitialPositionInStream initialPositionInStream,

87

Duration checkpointInterval,

88

StorageLevel storageLevel,

89

String awsAccessKeyId,

90

String awsSecretKey

91

);

92

```

93

94

**Usage Example:**

95

96

```java

97

JavaReceiverInputDStream<byte[]> stream = KinesisUtils.createStream(

98

jssc,

99

"MySparkKinesisApp",

100

"my-kinesis-stream",

101

"https://kinesis.us-east-1.amazonaws.com",

102

"us-east-1",

103

InitialPositionInStream.LATEST,

104

new Duration(2000),

105

StorageLevel.MEMORY_AND_DISK_2(),

106

"AKIAIOSFODNN7EXAMPLE",

107

"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"

108

);

109

```

110

111

### Stream Creation with Custom Message Handler (Java)

112

113

Creates a typed Kinesis input stream with a custom message handler function.

114

115

```java { .api }

116

/**

117

* Create an input stream with a custom message handler for type-safe data processing.

118

*

119

* @param jssc Java StreamingContext object

120

* @param kinesisAppName Kinesis application name used by the KCL to update DynamoDB

121

* @param streamName Kinesis stream name

122

* @param endpointUrl Url of Kinesis service

123

* @param regionName Name of region used by the KCL for DynamoDB and CloudWatch

124

* @param initialPositionInStream Starting position in stream

125

* @param checkpointInterval Checkpoint interval for Kinesis checkpointing

126

* @param storageLevel Storage level for received objects

127

* @param messageHandler Custom function to process Kinesis Records into type T

128

* @param recordClass Class object for type T (required for Java type erasure)

129

* @return JavaReceiverInputDStream<T> containing processed data

130

*/

131

public static <T> JavaReceiverInputDStream<T> createStream(

132

JavaStreamingContext jssc,

133

String kinesisAppName,

134

String streamName,

135

String endpointUrl,

136

String regionName,

137

InitialPositionInStream initialPositionInStream,

138

Duration checkpointInterval,

139

StorageLevel storageLevel,

140

Function<Record, T> messageHandler,

141

Class<T> recordClass

142

);

143

```

144

145

**Usage Example:**

146

147

```java

148

import org.apache.spark.api.java.function.Function;

149

import com.amazonaws.services.kinesis.model.Record;

150

import com.fasterxml.jackson.databind.ObjectMapper;

151

152

// Define data class

153

public class MyEvent implements Serializable {

154

private String id;

155

private long timestamp;

156

private String data;

157

158

// Constructors, getters, setters...

159

public MyEvent() {}

160

public MyEvent(String id, long timestamp, String data) {

161

this.id = id;

162

this.timestamp = timestamp;

163

this.data = data;

164

}

165

166

// Getters and setters

167

public String getId() { return id; }

168

public void setId(String id) { this.id = id; }

169

public long getTimestamp() { return timestamp; }

170

public void setTimestamp(long timestamp) { this.timestamp = timestamp; }

171

public String getData() { return data; }

172

public void setData(String data) { this.data = data; }

173

}

174

175

// Custom message handler

176

Function<Record, MyEvent> parseMyEvent = new Function<Record, MyEvent>() {

177

private final ObjectMapper mapper = new ObjectMapper();

178

179

@Override

180

public MyEvent call(Record record) throws Exception {

181

byte[] data = new byte[record.getData().remaining()];

182

record.getData().get(data);

183

String json = new String(data, "UTF-8");

184

return mapper.readValue(json, MyEvent.class);

185

}

186

};

187

188

JavaReceiverInputDStream<MyEvent> stream = KinesisUtils.createStream(

189

jssc,

190

"MySparkKinesisApp",

191

"my-events-stream",

192

"https://kinesis.us-east-1.amazonaws.com",

193

"us-east-1",

194

InitialPositionInStream.LATEST,

195

new Duration(2000),

196

StorageLevel.MEMORY_AND_DISK_2(),

197

parseMyEvent,

198

MyEvent.class

199

);

200

```

201

202

### Stream Creation with Custom Handler and Credentials (Java)

203

204

Creates a typed Kinesis input stream with both custom message handler and explicit AWS credentials.

205

206

```java { .api }

207

/**

208

* Create an input stream with custom message handler and explicit AWS credentials.

209

*

210

* @param jssc Java StreamingContext object

211

* @param kinesisAppName Kinesis application name used by the KCL to update DynamoDB

212

* @param streamName Kinesis stream name

213

* @param endpointUrl Url of Kinesis service

214

* @param regionName Name of region used by the KCL for DynamoDB and CloudWatch

215

* @param initialPositionInStream Starting position in stream

216

* @param checkpointInterval Checkpoint interval for Kinesis checkpointing

217

* @param storageLevel Storage level for received objects

218

* @param messageHandler Custom function to process Kinesis Records into type T

219

* @param recordClass Class object for type T

220

* @param awsAccessKeyId AWS AccessKeyId

221

* @param awsSecretKey AWS SecretKey

222

* @return JavaReceiverInputDStream<T> containing processed data

223

*/

224

public static <T> JavaReceiverInputDStream<T> createStream(

225

JavaStreamingContext jssc,

226

String kinesisAppName,

227

String streamName,

228

String endpointUrl,

229

String regionName,

230

InitialPositionInStream initialPositionInStream,

231

Duration checkpointInterval,

232

StorageLevel storageLevel,

233

Function<Record, T> messageHandler,

234

Class<T> recordClass,

235

String awsAccessKeyId,

236

String awsSecretKey

237

);

238

```

239

240

### Deprecated Stream Creation (Java)

241

242

Simplified stream creation method (deprecated since version 1.4.0).

243

244

```java { .api }

245

/**

246

* Create an input stream using app name from SparkConf and region from endpoint.

247

* @deprecated use other forms of createStream

248

*

249

* @param jssc Java StreamingContext object

250

* @param streamName Kinesis stream name

251

* @param endpointUrl Endpoint url of Kinesis service

252

* @param checkpointInterval Checkpoint interval for Kinesis checkpointing

253

* @param initialPositionInStream Starting position in stream

254

* @param storageLevel Storage level for received objects

255

* @return JavaReceiverInputDStream<byte[]> containing raw message data

256

*/

257

@Deprecated

258

public static JavaReceiverInputDStream<byte[]> createStream(

259

JavaStreamingContext jssc,

260

String streamName,

261

String endpointUrl,

262

Duration checkpointInterval,

263

InitialPositionInStream initialPositionInStream,

264

StorageLevel storageLevel

265

);

266

```

267

268

## Java Usage Examples

269

270

### Simple Text Processing

271

272

```java

273

import org.apache.spark.streaming.kinesis.KinesisUtils;

274

import org.apache.spark.api.java.function.Function;

275

import com.amazonaws.services.kinesis.model.Record;

276

277

// Convert bytes to string

278

Function<Record, String> textConverter = new Function<Record, String>() {

279

@Override

280

public String call(Record record) throws Exception {

281

byte[] data = new byte[record.getData().remaining()];

282

record.getData().get(data);

283

return new String(data, "UTF-8");

284

}

285

};

286

287

JavaReceiverInputDStream<String> textStream = KinesisUtils.createStream(

288

jssc,

289

"TextProcessor",

290

"text-stream",

291

"https://kinesis.us-east-1.amazonaws.com",

292

"us-east-1",

293

InitialPositionInStream.LATEST,

294

new Duration(2000),

295

StorageLevel.MEMORY_AND_DISK_2(),

296

textConverter,

297

String.class

298

);

299

300

// Process text messages

301

textStream.foreachRDD(rdd -> {

302

rdd.foreach(text -> {

303

System.out.println("Received: " + text);

304

});

305

return null;

306

});

307

```

308

309

### JSON Processing with Error Handling

310

311

```java

312

import com.fasterxml.jackson.databind.ObjectMapper;

313

import com.fasterxml.jackson.databind.JsonNode;

314

import java.util.Optional;

315

316

// JSON parser with error handling

317

Function<Record, Optional<JsonNode>> jsonParser = new Function<Record, Optional<JsonNode>>() {

318

private final ObjectMapper mapper = new ObjectMapper();

319

320

@Override

321

public Optional<JsonNode> call(Record record) throws Exception {

322

try {

323

byte[] data = new byte[record.getData().remaining()];

324

record.getData().get(data);

325

String json = new String(data, "UTF-8");

326

JsonNode node = mapper.readTree(json);

327

return Optional.of(node);

328

} catch (Exception e) {

329

System.err.println("Failed to parse JSON: " + e.getMessage());

330

return Optional.empty();

331

}

332

}

333

};

334

335

JavaReceiverInputDStream<Optional<JsonNode>> jsonStream = KinesisUtils.createStream(

336

jssc,

337

"JsonProcessor",

338

"json-stream",

339

"https://kinesis.us-east-1.amazonaws.com",

340

"us-east-1",

341

InitialPositionInStream.LATEST,

342

new Duration(2000),

343

StorageLevel.MEMORY_AND_DISK_2(),

344

jsonParser,

345

Optional.class

346

);

347

348

// Filter and process valid JSON

349

JavaDStream<JsonNode> validJson = jsonStream.flatMap(opt -> {

350

return opt.isPresent() ?

351

Arrays.asList(opt.get()).iterator() :

352

Collections.emptyList().iterator();

353

});

354

```

355

356

### Message Handler with Metadata Access

357

358

```java

359

// Data class that includes metadata

360

public class EnrichedMessage implements Serializable {

361

private String data;

362

private String partitionKey;

363

private String sequenceNumber;

364

private long arrivalTime;

365

366

public EnrichedMessage(String data, String partitionKey,

367

String sequenceNumber, long arrivalTime) {

368

this.data = data;

369

this.partitionKey = partitionKey;

370

this.sequenceNumber = sequenceNumber;

371

this.arrivalTime = arrivalTime;

372

}

373

374

// Getters...

375

public String getData() { return data; }

376

public String getPartitionKey() { return partitionKey; }

377

public String getSequenceNumber() { return sequenceNumber; }

378

public long getArrivalTime() { return arrivalTime; }

379

}

380

381

// Message handler that captures metadata

382

Function<Record, EnrichedMessage> enrichedHandler = new Function<Record, EnrichedMessage>() {

383

@Override

384

public EnrichedMessage call(Record record) throws Exception {

385

byte[] bytes = new byte[record.getData().remaining()];

386

record.getData().get(bytes);

387

String data = new String(bytes, "UTF-8");

388

389

return new EnrichedMessage(

390

data,

391

record.getPartitionKey(),

392

record.getSequenceNumber(),

393

record.getApproximateArrivalTimestamp().getTime()

394

);

395

}

396

};

397

398

JavaReceiverInputDStream<EnrichedMessage> enrichedStream =

399

KinesisUtils.createStream(

400

jssc,

401

"EnrichedProcessor",

402

"enriched-stream",

403

"https://kinesis.us-east-1.amazonaws.com",

404

"us-east-1",

405

InitialPositionInStream.LATEST,

406

new Duration(2000),

407

StorageLevel.MEMORY_AND_DISK_2(),

408

enrichedHandler,

409

EnrichedMessage.class

410

);

411

```

412

413

### Word Count Example

414

415

Complete example demonstrating Java API usage with word counting:

416

417

```java

418

import org.apache.spark.SparkConf;

419

import org.apache.spark.api.java.function.FlatMapFunction;

420

import org.apache.spark.api.java.function.Function2;

421

import org.apache.spark.api.java.function.PairFunction;

422

import org.apache.spark.streaming.Duration;

423

import org.apache.spark.streaming.api.java.JavaDStream;

424

import org.apache.spark.streaming.api.java.JavaPairDStream;

425

import org.apache.spark.streaming.api.java.JavaStreamingContext;

426

import scala.Tuple2;

427

import java.util.Arrays;

428

import java.util.regex.Pattern;

429

430

public class JavaKinesisWordCount {

431

private static final Pattern WORD_SEPARATOR = Pattern.compile(" ");

432

433

public static void main(String[] args) {

434

SparkConf sparkConf = new SparkConf().setAppName("JavaKinesisWordCount");

435

JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));

436

437

// Create Kinesis stream

438

JavaReceiverInputDStream<byte[]> kinesisStream = KinesisUtils.createStream(

439

jssc,

440

"JavaWordCount",

441

"word-stream",

442

"https://kinesis.us-east-1.amazonaws.com",

443

"us-east-1",

444

InitialPositionInStream.LATEST,

445

new Duration(2000),

446

StorageLevel.MEMORY_AND_DISK_2()

447

);

448

449

// Convert bytes to strings and split into words

450

JavaDStream<String> words = kinesisStream.flatMap(

451

new FlatMapFunction<byte[], String>() {

452

@Override

453

public Iterator<String> call(byte[] line) {

454

String text = new String(line);

455

return Arrays.asList(WORD_SEPARATOR.split(text)).iterator();

456

}

457

}

458

);

459

460

// Count words

461

JavaPairDStream<String, Integer> wordCounts = words.mapToPair(

462

new PairFunction<String, String, Integer>() {

463

@Override

464

public Tuple2<String, Integer> call(String s) {

465

return new Tuple2<>(s, 1);

466

}

467

}

468

).reduceByKey(

469

new Function2<Integer, Integer, Integer>() {

470

@Override

471

public Integer call(Integer i1, Integer i2) {

472

return i1 + i2;

473

}

474

}

475

);

476

477

// Print results

478

wordCounts.print();

479

480

jssc.start();

481

jssc.awaitTermination();

482

}

483

}

484

```

485

486

## Java Function Interfaces

487

488

The Java API uses Spark's Function interfaces for message handlers:

489

490

```java { .api }

491

// Main function interface for message handlers

492

org.apache.spark.api.java.function.Function<Record, T>

493

494

// For operations that may throw exceptions

495

org.apache.spark.api.java.function.Function<Record, T> {

496

T call(Record record) throws Exception;

497

}

498

```

499

500

## Best Practices for Java API

501

502

### Type Safety

503

- Always specify the record class parameter for type safety

504

- Use Optional<T> for operations that may fail

505

- Leverage Java generics for compile-time type checking

506

507

### Error Handling

508

- Wrap parsing operations in try-catch blocks

509

- Return Optional or use custom error types

510

- Log errors appropriately for monitoring

511

512

### Performance

513

- Reuse expensive objects like ObjectMapper in message handlers

514

- Avoid creating new objects in tight loops

515

- Consider using static methods for stateless operations

516

517

### Memory Management

518

- Be careful with large message payloads

519

- Don't hold references to ByteBuffer objects

520

- Use streaming parsers for large JSON/XML documents