or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

credential-management.mdfault-tolerance.mdindex.mdjava-api.mdstream-creation.md

java-api.mddocs/

0

# Java API

1

2

Complete Java API support for Spark Streaming Kinesis integration, providing Java-friendly method signatures, functional interfaces, and seamless integration with Java applications and frameworks.

3

4

## Core Java API Methods

5

6

### Generic Type Stream Creation

7

8

Create streams with custom type transformation using Java Function interfaces.

9

10

```java { .api }

11

public static <T> JavaReceiverInputDStream<T> createStream(

12

JavaStreamingContext jssc,

13

String kinesisAppName,

14

String streamName,

15

String endpointUrl,

16

String regionName,

17

InitialPositionInStream initialPositionInStream,

18

Duration checkpointInterval,

19

StorageLevel storageLevel,

20

Function<Record, T> messageHandler,

21

Class<T> recordClass

22

);

23

```

24

25

**Parameters:**

26

- `jssc` - JavaStreamingContext object

27

- `kinesisAppName` - Kinesis application name for KCL coordination

28

- `streamName` - Kinesis stream name

29

- `endpointUrl` - Kinesis service endpoint URL

30

- `regionName` - AWS region name

31

- `initialPositionInStream` - Starting position (LATEST or TRIM_HORIZON)

32

- `checkpointInterval` - Checkpoint frequency using Duration

33

- `storageLevel` - Spark storage level for received data

34

- `messageHandler` - Function interface for Record transformation

35

- `recordClass` - Class object for type T

36

37

**Usage Example:**

38

39

```java

40

import org.apache.spark.api.java.function.Function;

41

import com.amazonaws.services.kinesis.model.Record;

42

import org.json.JSONObject;

43

44

// Define message handler for JSON processing

45

Function<Record, JSONObject> jsonHandler = new Function<Record, JSONObject>() {

46

@Override

47

public JSONObject call(Record record) throws Exception {

48

String data = new String(record.getData().array());

49

return new JSONObject(data);

50

}

51

};

52

53

// Create stream

54

JavaReceiverInputDStream<JSONObject> jsonStream = KinesisUtils.createStream(

55

jssc,

56

"java-json-processor",

57

"json-events-stream",

58

"https://kinesis.us-east-1.amazonaws.com",

59

"us-east-1",

60

InitialPositionInStream.LATEST,

61

Durations.seconds(30),

62

StorageLevel.MEMORY_AND_DISK_2(),

63

jsonHandler,

64

JSONObject.class

65

);

66

67

// Process JSON stream

68

jsonStream.foreachRDD(rdd -> {

69

rdd.foreach(json -> {

70

System.out.println("Event ID: " + json.getString("eventId"));

71

System.out.println("Timestamp: " + json.getLong("timestamp"));

72

});

73

});

74

```

75

76

### Generic Type Stream with Credentials

77

78

```java { .api }

79

public static <T> JavaReceiverInputDStream<T> createStream(

80

JavaStreamingContext jssc,

81

String kinesisAppName,

82

String streamName,

83

String endpointUrl,

84

String regionName,

85

InitialPositionInStream initialPositionInStream,

86

Duration checkpointInterval,

87

StorageLevel storageLevel,

88

Function<Record, T> messageHandler,

89

Class<T> recordClass,

90

String awsAccessKeyId,

91

String awsSecretKey

92

);

93

```

94

95

**Usage Example:**

96

97

```java

98

// Custom message handler with error handling

99

Function<Record, String> safeStringHandler = new Function<Record, String>() {

100

@Override

101

public String call(Record record) throws Exception {

102

try {

103

return new String(record.getData().array(), "UTF-8");

104

} catch (Exception e) {

105

return "ERROR: " + e.getMessage();

106

}

107

}

108

};

109

110

JavaReceiverInputDStream<String> secureStream = KinesisUtils.createStream(

111

jssc,

112

"secure-java-app",

113

"secure-text-stream",

114

"https://kinesis.us-west-2.amazonaws.com",

115

"us-west-2",

116

InitialPositionInStream.TRIM_HORIZON,

117

Durations.seconds(45),

118

StorageLevel.MEMORY_AND_DISK_2(),

119

safeStringHandler,

120

String.class,

121

System.getenv("AWS_ACCESS_KEY_ID"),

122

System.getenv("AWS_SECRET_ACCESS_KEY")

123

);

124

```

125

126

### Default Byte Array Streams

127

128

Create streams returning raw byte arrays using default message handling.

129

130

```java { .api }

131

public static JavaReceiverInputDStream<byte[]> createStream(

132

JavaStreamingContext jssc,

133

String kinesisAppName,

134

String streamName,

135

String endpointUrl,

136

String regionName,

137

InitialPositionInStream initialPositionInStream,

138

Duration checkpointInterval,

139

StorageLevel storageLevel

140

);

141

```

142

143

**Usage Example:**

144

145

```java

146

// Create byte array stream

147

JavaReceiverInputDStream<byte[]> byteStream = KinesisUtils.createStream(

148

jssc,

149

"java-binary-processor",

150

"binary-data-stream",

151

"https://kinesis.ap-southeast-1.amazonaws.com",

152

"ap-southeast-1",

153

InitialPositionInStream.LATEST,

154

Durations.seconds(60),

155

StorageLevel.MEMORY_AND_DISK_2()

156

);

157

158

// Convert bytes to strings and process

159

JavaDStream<String> stringStream = byteStream.map(

160

bytes -> new String(bytes, "UTF-8")

161

);

162

163

// Filter and transform

164

JavaDStream<String> processedStream = stringStream

165

.filter(text -> text.length() > 10)

166

.map(text -> text.toUpperCase());

167

168

processedStream.print();

169

```

170

171

### Default Byte Array Stream with Credentials

172

173

```java { .api }

174

public static JavaReceiverInputDStream<byte[]> createStream(

175

JavaStreamingContext jssc,

176

String kinesisAppName,

177

String streamName,

178

String endpointUrl,

179

String regionName,

180

InitialPositionInStream initialPositionInStream,

181

Duration checkpointInterval,

182

StorageLevel storageLevel,

183

String awsAccessKeyId,

184

String awsSecretKey

185

);

186

```

187

188

### Deprecated Method (Legacy Support)

189

190

```java { .api }

191

@Deprecated

192

public static JavaReceiverInputDStream<byte[]> createStream(

193

JavaStreamingContext jssc,

194

String streamName,

195

String endpointUrl,

196

Duration checkpointInterval,

197

InitialPositionInStream initialPositionInStream,

198

StorageLevel storageLevel

199

);

200

```

201

202

## Java-Specific Patterns

203

204

### Using Lambda Expressions (Java 8+)

205

206

Modern Java applications can use lambda expressions for cleaner message handling:

207

208

```java

209

// Lambda expression for message handling

210

JavaReceiverInputDStream<String> lambdaStream = KinesisUtils.createStream(

211

jssc,

212

"lambda-app",

213

"text-stream",

214

"https://kinesis.eu-west-1.amazonaws.com",

215

"eu-west-1",

216

InitialPositionInStream.LATEST,

217

Durations.seconds(30),

218

StorageLevel.MEMORY_AND_DISK_2(),

219

record -> new String(record.getData().array()), // Lambda expression

220

String.class

221

);

222

223

// Process with lambda expressions

224

lambdaStream

225

.filter(text -> !text.isEmpty())

226

.map(String::trim)

227

.foreachRDD(rdd -> {

228

rdd.collect().forEach(System.out::println);

229

});

230

```

231

232

### Method References

233

234

```java

235

// Using method references for common transformations

236

JavaReceiverInputDStream<String> stream = KinesisUtils.createStream(

237

jssc, "app", "stream", endpoint, region,

238

InitialPositionInStream.LATEST, Durations.seconds(30),

239

StorageLevel.MEMORY_AND_DISK_2(),

240

this::parseRecord, // Method reference

241

String.class

242

);

243

244

private String parseRecord(Record record) {

245

return new String(record.getData().array());

246

}

247

```

248

249

### Exception Handling in Message Handlers

250

251

```java

252

// Robust error handling in message handlers

253

Function<Record, String> robustHandler = new Function<Record, String>() {

254

@Override

255

public String call(Record record) throws Exception {

256

try {

257

byte[] data = record.getData().array();

258

String text = new String(data, "UTF-8");

259

260

// Validate data

261

if (text.trim().isEmpty()) {

262

return null; // Filter out empty records

263

}

264

265

return text.trim();

266

} catch (Exception e) {

267

// Log error and return indicator

268

System.err.println("Error processing record: " + e.getMessage());

269

return "ERROR";

270

}

271

}

272

};

273

```

274

275

## Java Integration Examples

276

277

### Spring Framework Integration

278

279

```java

280

import org.springframework.beans.factory.annotation.Value;

281

import org.springframework.stereotype.Component;

282

283

@Component

284

public class KinesisStreamProcessor {

285

286

@Value("${kinesis.app.name}")

287

private String kinesisAppName;

288

289

@Value("${kinesis.stream.name}")

290

private String streamName;

291

292

@Value("${aws.kinesis.endpoint}")

293

private String endpointUrl;

294

295

@Value("${aws.region}")

296

private String region;

297

298

public void startProcessing(JavaStreamingContext jssc) {

299

JavaReceiverInputDStream<String> stream = KinesisUtils.createStream(

300

jssc,

301

kinesisAppName,

302

streamName,

303

endpointUrl,

304

region,

305

InitialPositionInStream.LATEST,

306

Durations.seconds(30),

307

StorageLevel.MEMORY_AND_DISK_2(),

308

this::processRecord,

309

String.class

310

);

311

312

stream.foreachRDD(this::handleBatch);

313

}

314

315

private String processRecord(Record record) {

316

return new String(record.getData().array());

317

}

318

319

private void handleBatch(JavaRDD<String> rdd) {

320

// Process batch with Spring services

321

rdd.collect().forEach(this::processMessage);

322

}

323

324

private void processMessage(String message) {

325

// Business logic here

326

System.out.println("Processing: " + message);

327

}

328

}

329

```

330

331

### Serialization Considerations

332

333

When using custom objects, ensure they are serializable:

334

335

```java

336

import java.io.Serializable;

337

338

public class EventData implements Serializable {

339

private static final long serialVersionUID = 1L;

340

341

private String eventId;

342

private long timestamp;

343

private String payload;

344

345

// Constructors, getters, setters

346

public EventData(String eventId, long timestamp, String payload) {

347

this.eventId = eventId;

348

this.timestamp = timestamp;

349

this.payload = payload;

350

}

351

352

// Getters and setters...

353

}

354

355

// Message handler creating serializable objects

356

Function<Record, EventData> eventHandler = record -> {

357

String data = new String(record.getData().array());

358

JSONObject json = new JSONObject(data);

359

return new EventData(

360

json.getString("eventId"),

361

json.getLong("timestamp"),

362

json.getString("payload")

363

);

364

};

365

```

366

367

## Java Type System Integration

368

369

### Working with Generic Types

370

371

```java

372

// Create custom parameterized types

373

import java.lang.reflect.ParameterizedType;

374

import java.lang.reflect.Type;

375

376

public class TypeReference<T> {

377

private final Type type;

378

379

protected TypeReference() {

380

Type superClass = getClass().getGenericSuperclass();

381

this.type = ((ParameterizedType) superClass).getActualTypeArguments()[0];

382

}

383

384

public Type getType() {

385

return type;

386

}

387

}

388

389

// Usage with complex types

390

TypeReference<List<String>> typeRef = new TypeReference<List<String>>() {};

391

```

392

393

### Null Safety and Optional Integration

394

395

```java

396

import java.util.Optional;

397

398

// Message handler with Optional return

399

Function<Record, Optional<String>> optionalHandler = record -> {

400

try {

401

String data = new String(record.getData().array());

402

return data.trim().isEmpty() ? Optional.empty() : Optional.of(data);

403

} catch (Exception e) {

404

return Optional.empty();

405

}

406

};

407

408

// Filter out empty optionals

409

JavaReceiverInputDStream<Optional<String>> optionalStream = KinesisUtils.createStream(

410

jssc, "app", "stream", endpoint, region,

411

InitialPositionInStream.LATEST, Durations.seconds(30),

412

StorageLevel.MEMORY_AND_DISK_2(),

413

optionalHandler,

414

Optional.class

415

);

416

417

JavaDStream<String> filteredStream = optionalStream

418

.filter(Optional::isPresent)

419

.map(Optional::get);

420

```