or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

aws-credentials.mdconfiguration.mdindex.mdinitial-position.mdpython-integration.mdstream-creation.md

stream-creation.mddocs/

0

# Stream Creation

1

2

The core functionality for creating Kinesis DStreams using the builder pattern. This provides a fluent API for configuring all aspects of Kinesis stream consumption including required parameters, optional configurations, and message handling.

3

4

## Core API

5

6

### KinesisInputDStream.Builder

7

8

The builder provides a fluent interface for configuring Kinesis streams with comprehensive parameter validation and sensible defaults.

9

10

```scala { .api }

11

object KinesisInputDStream {

12

def builder: KinesisInputDStream.Builder

13

14

// Default message handler that extracts byte arrays from Kinesis records

15

private[kinesis] def defaultMessageHandler(record: Record): Array[Byte]

16

}

17

18

class Builder {

19

// Required configuration methods

20

def streamingContext(ssc: StreamingContext): Builder

21

def streamingContext(jssc: JavaStreamingContext): Builder

22

def streamName(streamName: String): Builder

23

def checkpointAppName(appName: String): Builder

24

25

// Optional configuration methods

26

def endpointUrl(url: String): Builder

27

def regionName(regionName: String): Builder

28

def initialPosition(initialPosition: KinesisInitialPosition): Builder

29

def initialPositionInStream(initialPosition: InitialPositionInStream): Builder // Deprecated in 2.3.0

30

def checkpointInterval(interval: Duration): Builder

31

def storageLevel(storageLevel: StorageLevel): Builder

32

def kinesisCredentials(credentials: SparkAWSCredentials): Builder

33

def dynamoDBCredentials(credentials: SparkAWSCredentials): Builder

34

def cloudWatchCredentials(credentials: SparkAWSCredentials): Builder

35

def metricsLevel(metricsLevel: MetricsLevel): Builder

36

def metricsEnabledDimensions(dimensions: Set[String]): Builder

37

38

// Build methods

39

def build(): KinesisInputDStream[Array[Byte]]

40

def buildWithMessageHandler[T: ClassTag](handler: Record => T): KinesisInputDStream[T]

41

}

42

```

43

44

## Required Parameters

45

46

### StreamingContext

47

48

The Spark StreamingContext that will manage the DStream lifecycle.

49

50

```scala

51

val builder = KinesisInputDStream.builder

52

.streamingContext(ssc) // Required

53

```

54

55

For Java API:

56

57

```java

58

import org.apache.spark.streaming.api.java.JavaStreamingContext;

59

import org.apache.spark.streaming.kinesis.KinesisInputDStream;

60

61

KinesisInputDStream.Builder builder = KinesisInputDStream.builder()

62

.streamingContext(jssc); // JavaStreamingContext

63

```

64

65

### Stream Name

66

67

The name of the Kinesis stream to consume from.

68

69

```scala

70

val builder = KinesisInputDStream.builder

71

.streamName("my-kinesis-stream") // Required

72

```

73

74

### Checkpoint Application Name

75

76

The KCL application name used for DynamoDB checkpointing. This must be unique per stream and consumer application.

77

78

```scala

79

val builder = KinesisInputDStream.builder

80

.checkpointAppName("my-unique-app-name") // Required

81

```

82

83

## Optional Configuration

84

85

### AWS Region and Endpoint

86

87

Configure the AWS region and Kinesis endpoint URL. Defaults to `us-east-1` and the standard Kinesis endpoint.

88

89

```scala

90

val builder = KinesisInputDStream.builder

91

.regionName("us-west-2")

92

.endpointUrl("https://kinesis.us-west-2.amazonaws.com")

93

```

94

95

### Initial Position

96

97

Specify where to start reading from the stream. See [Initial Position](./initial-position.md) for details.

98

99

```scala

100

val builder = KinesisInputDStream.builder

101

.initialPosition(new KinesisInitialPositions.Latest())

102

```

103

104

### Checkpoint Interval

105

106

How frequently to checkpoint progress to DynamoDB. Defaults to the streaming batch duration.

107

108

```scala

109

import org.apache.spark.streaming.Seconds

110

111

val builder = KinesisInputDStream.builder

112

.checkpointInterval(Seconds(30))

113

```

114

115

### Storage Level

116

117

Storage level for cached blocks. Defaults to `MEMORY_AND_DISK_2`.

118

119

```scala

120

import org.apache.spark.storage.StorageLevel

121

122

val builder = KinesisInputDStream.builder

123

.storageLevel(StorageLevel.MEMORY_ONLY_2)

124

```

125

126

### AWS Credentials

127

128

Configure authentication for Kinesis, DynamoDB, and CloudWatch. See [AWS Credentials](./aws-credentials.md) for details.

129

130

```scala

131

val credentials = SparkAWSCredentials.builder

132

.basicCredentials("access-key", "secret-key")

133

.build()

134

135

val builder = KinesisInputDStream.builder

136

.kinesisCredentials(credentials)

137

.dynamoDBCredentials(credentials) // Optional, defaults to Kinesis credentials

138

.cloudWatchCredentials(credentials) // Optional, defaults to Kinesis credentials

139

```

140

141

### CloudWatch Metrics

142

143

Configure CloudWatch metrics collection level and dimensions.

144

145

```scala

146

import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel

147

148

val builder = KinesisInputDStream.builder

149

.metricsLevel(MetricsLevel.SUMMARY)

150

.metricsEnabledDimensions(Set("Operation", "ShardId"))

151

```

152

153

## Building the DStream

154

155

### Default Message Handler

156

157

Creates a DStream of byte arrays using the default message handler:

158

159

```scala

160

val kinesisStream: KinesisInputDStream[Array[Byte]] = builder.build()

161

```

162

163

### Custom Message Handler

164

165

Creates a DStream with a custom message transformation function:

166

167

```scala

168

import com.amazonaws.services.kinesis.model.Record

169

170

// Custom handler that extracts JSON strings

171

val jsonHandler: Record => String = record => {

172

val bytes = new Array[Byte](record.getData().remaining())

173

record.getData().get(bytes)

174

new String(bytes, "UTF-8")

175

}

176

177

val kinesisStream: KinesisInputDStream[String] = builder

178

.buildWithMessageHandler(jsonHandler)

179

```

180

181

## Complete Example

182

183

```scala

184

import org.apache.spark.streaming.StreamingContext

185

import org.apache.spark.streaming.kinesis.{KinesisInputDStream, KinesisInitialPositions, SparkAWSCredentials}

186

import org.apache.spark.streaming.Seconds

187

import org.apache.spark.storage.StorageLevel

188

import com.amazonaws.services.kinesis.model.Record

189

190

val ssc = new StreamingContext(sparkConf, Seconds(10))

191

192

// Configure credentials

193

val credentials = SparkAWSCredentials.builder

194

.stsCredentials("arn:aws:iam::123456789012:role/KinesisRole", "MySession")

195

.build()

196

197

// Create the stream

198

val kinesisStream = KinesisInputDStream.builder

199

.streamingContext(ssc)

200

.streamName("my-data-stream")

201

.checkpointAppName("spark-kinesis-consumer")

202

.regionName("us-west-2")

203

.initialPosition(new KinesisInitialPositions.TrimHorizon())

204

.checkpointInterval(Seconds(30))

205

.storageLevel(StorageLevel.MEMORY_AND_DISK_2)

206

.kinesisCredentials(credentials)

207

.buildWithMessageHandler { record: Record =>

208

val data = new Array[Byte](record.getData().remaining())

209

record.getData().get(data)

210

new String(data, "UTF-8")

211

}

212

213

// Process the stream

214

kinesisStream.foreachRDD { rdd =>

215

rdd.collect().foreach(println)

216

}

217

218

ssc.start()

219

ssc.awaitTermination()

220

```

221

222

## Java API Example

223

224

Complete example using the Java API:

225

226

```java

227

import org.apache.spark.SparkConf;

228

import org.apache.spark.api.java.function.Function;

229

import org.apache.spark.storage.StorageLevel;

230

import org.apache.spark.streaming.Duration;

231

import org.apache.spark.streaming.Durations;

232

import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;

233

import org.apache.spark.streaming.api.java.JavaStreamingContext;

234

import org.apache.spark.streaming.kinesis.KinesisInitialPositions;

235

import org.apache.spark.streaming.kinesis.KinesisInputDStream;

236

import org.apache.spark.streaming.kinesis.SparkAWSCredentials;

237

238

public class JavaKinesisExample {

239

public static void main(String[] args) throws InterruptedException {

240

SparkConf conf = new SparkConf().setAppName("JavaKinesisExample");

241

JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));

242

243

// Configure credentials

244

SparkAWSCredentials credentials = SparkAWSCredentials.builder()

245

.basicCredentials("access-key", "secret-key")

246

.build();

247

248

// Create the stream

249

JavaReceiverInputDStream<byte[]> kinesisStream = KinesisInputDStream.builder()

250

.streamingContext(jssc)

251

.streamName("my-data-stream")

252

.checkpointAppName("java-kinesis-consumer")

253

.regionName("us-west-2")

254

.initialPosition(new KinesisInitialPositions.Latest())

255

.checkpointInterval(Durations.seconds(30))

256

.storageLevel(StorageLevel.MEMORY_AND_DISK_2())

257

.kinesisCredentials(credentials)

258

.build();

259

260

// Process the stream

261

kinesisStream.foreachRDD(rdd -> {

262

rdd.foreach(bytes -> {

263

String data = new String(bytes);

264

System.out.println("Received: " + data);

265

});

266

return null;

267

});

268

269

jssc.start();

270

jssc.awaitTermination();

271

}

272

}

273

```

274

275

## Error Handling

276

277

The builder validates required parameters and throws `IllegalArgumentException` for missing required values:

278

279

```scala

280

// This will throw IllegalArgumentException at build time

281

val invalidStream = KinesisInputDStream.builder

282

.streamingContext(ssc)

283

// Missing streamName and checkpointAppName

284

.build() // Throws exception

285

```

286

287

Common validation errors:

288

- Missing required parameters (streamingContext, streamName, checkpointAppName)

289

- Invalid AWS credentials configuration

290

- Invalid checkpoint interval or storage level settings

291

292

## Default Values

293

294

The builder provides sensible defaults for optional parameters:

295

296

- **endpointUrl**: `"https://kinesis.us-east-1.amazonaws.com"`

297

- **regionName**: `"us-east-1"`

298

- **initialPosition**: `new KinesisInitialPositions.Latest()`

299

- **checkpointInterval**: Streaming batch duration (`ssc.graph.batchDuration`)

300

- **storageLevel**: `StorageLevel.MEMORY_AND_DISK_2`

301

- **kinesisCredentials**: `DefaultCredentials` (AWS default provider chain)

302

- **dynamoDBCredentials**: Uses same as `kinesisCredentials` if not specified

303

- **cloudWatchCredentials**: Uses same as `kinesisCredentials` if not specified

304

- **metricsLevel**: `KinesisClientLibConfiguration.DEFAULT_METRICS_LEVEL`

305

- **metricsEnabledDimensions**: `KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS`