or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

aws-credentials.mdindex.mdinitial-positions.mdpython-api.mdstream-creation.md

stream-creation.mddocs/

0

# Stream Creation and Configuration

1

2

The KinesisInputDStream.Builder provides a fluent API for creating and configuring Kinesis input streams with comprehensive options for performance, reliability, and monitoring.

3

4

## Builder Pattern API

5

6

```scala { .api }

7

object KinesisInputDStream {

8

def builder: Builder

9

}

10

11

class Builder {

12

// Required parameters

13

def streamingContext(ssc: StreamingContext): Builder

14

def streamingContext(jssc: JavaStreamingContext): Builder

15

def streamName(streamName: String): Builder

16

def checkpointAppName(appName: String): Builder

17

18

// Optional configuration

19

def endpointUrl(url: String): Builder

20

def regionName(regionName: String): Builder

21

def initialPosition(initialPosition: KinesisInitialPosition): Builder

22

def checkpointInterval(interval: Duration): Builder

23

def storageLevel(storageLevel: StorageLevel): Builder

24

25

// Credentials configuration

26

def kinesisCredentials(credentials: SparkAWSCredentials): Builder

27

def dynamoDBCredentials(credentials: SparkAWSCredentials): Builder

28

def cloudWatchCredentials(credentials: SparkAWSCredentials): Builder

29

30

// Metrics configuration

31

def metricsLevel(metricsLevel: MetricsLevel): Builder

32

def metricsEnabledDimensions(metricsEnabledDimensions: Set[String]): Builder

33

34

// Build methods

35

def build(): KinesisInputDStream[Array[Byte]]

36

def buildWithMessageHandler[T: ClassTag](handler: Record => T): KinesisInputDStream[T]

37

}

38

```

39

40

## Required Parameters

41

42

### StreamingContext

43

Sets the Spark StreamingContext that will be used to construct the Kinesis DStream.

44

45

```scala

46

val builder = KinesisInputDStream.builder

47

.streamingContext(ssc)

48

```

49

50

### Stream Name

51

The name of the Kinesis stream to read from.

52

53

```scala

54

val builder = KinesisInputDStream.builder

55

.streamName("my-kinesis-stream")

56

```

57

58

### Checkpoint Application Name

59

The KCL application name used for checkpointing state to DynamoDB and CloudWatch metrics.

60

61

```scala

62

val builder = KinesisInputDStream.builder

63

.checkpointAppName("my-spark-kinesis-app")

64

```

65

66

## Optional Configuration

67

68

### AWS Region and Endpoint

69

Configure the AWS region and Kinesis endpoint URL.

70

71

```scala

72

val builder = KinesisInputDStream.builder

73

.regionName("us-west-2")

74

.endpointUrl("https://kinesis.us-west-2.amazonaws.com")

75

```

76

77

**Defaults:**

78

- Region: "us-east-1"

79

- Endpoint: "https://kinesis.us-east-1.amazonaws.com"

80

81

### Storage Level

82

Configure how received data blocks are stored in memory/disk.

83

84

```scala

85

import org.apache.spark.storage.StorageLevel

86

87

val builder = KinesisInputDStream.builder

88

.storageLevel(StorageLevel.MEMORY_AND_DISK_SER_2)

89

```

90

91

**Default:** `StorageLevel.MEMORY_AND_DISK_2`

92

93

### Checkpoint Interval

94

How often the KCL application state is checkpointed to DynamoDB.

95

96

```scala

97

import org.apache.spark.streaming.Seconds

98

99

val builder = KinesisInputDStream.builder

100

.checkpointInterval(Seconds(30))

101

```

102

103

**Default:** Uses the Spark Streaming batch interval

104

105

## Metrics Configuration

106

107

### Metrics Level

108

Configure CloudWatch metrics detail level.

109

110

```scala

111

import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel

112

113

val builder = KinesisInputDStream.builder

114

.metricsLevel(MetricsLevel.SUMMARY)

115

```

116

117

**Options:**

118

- `MetricsLevel.DETAILED` - All available metrics (default)

119

- `MetricsLevel.SUMMARY` - Summary metrics only

120

- `MetricsLevel.NONE` - No metrics

121

122

### Metrics Dimensions

123

Specify which CloudWatch metrics dimensions should be enabled.

124

125

```scala

126

val builder = KinesisInputDStream.builder

127

.metricsEnabledDimensions(Set("Operation", "ShardId"))

128

```

129

130

## Message Handling

131

132

### Default Handler

133

Creates a stream of byte arrays using the default message handler.

134

135

```scala

136

val stream: KinesisInputDStream[Array[Byte]] = KinesisInputDStream.builder

137

.streamingContext(ssc)

138

.streamName("my-stream")

139

.checkpointAppName("my-app")

140

.build()

141

```

142

143

### Custom Handler

144

Process Kinesis records with a custom message handler function.

145

146

```scala

147

import com.amazonaws.services.kinesis.model.Record

148

149

// Custom handler to extract string data

150

val customHandler = (record: Record) => {

151

val data = new Array[Byte](record.getData.remaining())

152

record.getData.get(data)

153

new String(data, "UTF-8")

154

}

155

156

val stream: KinesisInputDStream[String] = KinesisInputDStream.builder

157

.streamingContext(ssc)

158

.streamName("my-stream")

159

.checkpointAppName("my-app")

160

.buildWithMessageHandler(customHandler)

161

```

162

163

## Java API Usage

164

165

```java

166

import org.apache.spark.streaming.api.java.JavaStreamingContext;

167

import org.apache.spark.streaming.kinesis.KinesisInputDStream;

168

169

JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Duration.seconds(2));

170

171

KinesisInputDStream<byte[]> stream = KinesisInputDStream.builder()

172

.streamingContext(jssc)

173

.streamName("my-kinesis-stream")

174

.checkpointAppName("my-app")

175

.regionName("us-west-2")

176

.build();

177

```

178

179

## Complete Configuration Example

180

181

```scala

182

import org.apache.spark.streaming.{Seconds, StreamingContext}

183

import org.apache.spark.streaming.kinesis.{KinesisInputDStream, SparkAWSCredentials}

184

import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest

185

import org.apache.spark.storage.StorageLevel

186

import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel

187

188

val credentials = SparkAWSCredentials.builder

189

.basicCredentials("access-key", "secret-key")

190

.build()

191

192

val stream = KinesisInputDStream.builder

193

.streamingContext(ssc)

194

.streamName("production-events")

195

.checkpointAppName("event-processor")

196

.regionName("us-west-2")

197

.endpointUrl("https://kinesis.us-west-2.amazonaws.com")

198

.initialPosition(new Latest())

199

.checkpointInterval(Seconds(30))

200

.storageLevel(StorageLevel.MEMORY_AND_DISK_SER_2)

201

.kinesisCredentials(credentials)

202

.metricsLevel(MetricsLevel.SUMMARY)

203

.metricsEnabledDimensions(Set("Operation", "ShardId"))

204

.buildWithMessageHandler { record =>

205

val data = new Array[Byte](record.getData.remaining())

206

record.getData.get(data)

207

new String(data, "UTF-8")

208

}

209

```