or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

aws-credentials.mdconfiguration.mdindex.mdstream-creation.mdstream-positioning.mdtesting.md

stream-creation.mddocs/

0

# Stream Creation

1

2

Core functionality for creating Kinesis input streams using the modern Builder pattern with comprehensive configuration options.

3

4

## Capabilities

5

6

### KinesisInputDStream Builder

7

8

Creates a fluent builder for configuring and instantiating Kinesis input streams.

9

10

```scala { .api }

11

/**

12

* Creates a new Builder instance for constructing KinesisInputDStream instances

13

* @return Builder instance for configuration

14

*/

15

object KinesisInputDStream {

16

def builder: Builder

17

}

18

```

19

20

### Builder Configuration

21

22

Fluent builder class providing comprehensive configuration options for Kinesis streams.

23

24

```scala { .api }

25

class Builder {

26

/** Sets the StreamingContext (required) */

27

def streamingContext(ssc: StreamingContext): Builder

28

29

/** Sets the Java StreamingContext (required) */

30

def streamingContext(jssc: JavaStreamingContext): Builder

31

32

/** Sets the Kinesis stream name (required) */

33

def streamName(streamName: String): Builder

34

35

/** Sets the KCL application name for checkpointing (required) */

36

def checkpointAppName(appName: String): Builder

37

38

/** Sets the AWS Kinesis endpoint URL (optional, defaults to us-east-1) */

39

def endpointUrl(url: String): Builder

40

41

/** Sets the AWS region name (optional, defaults to us-east-1) */

42

def regionName(regionName: String): Builder

43

44

/** Sets the initial stream position (optional, defaults to Latest) */

45

def initialPosition(initialPosition: KinesisInitialPosition): Builder

46

47

/** Sets the initial stream position using legacy enum (deprecated since 2.3.0) */

48

@deprecated("use initialPosition(initialPosition: KinesisInitialPosition)", "2.3.0")

49

def initialPositionInStream(initialPosition: InitialPositionInStream): Builder

50

51

/** Sets the checkpoint interval (optional, defaults to batch duration) */

52

def checkpointInterval(interval: Duration): Builder

53

54

/** Sets the storage level for received data (optional, defaults to MEMORY_AND_DISK_2) */

55

def storageLevel(storageLevel: StorageLevel): Builder

56

57

/** Sets AWS credentials for Kinesis access (optional, defaults to DefaultCredentials) */

58

def kinesisCredentials(credentials: SparkAWSCredentials): Builder

59

60

/** Sets AWS credentials for DynamoDB access (optional, uses Kinesis credentials if not set) */

61

def dynamoDBCredentials(credentials: SparkAWSCredentials): Builder

62

63

/** Sets AWS credentials for CloudWatch access (optional, uses Kinesis credentials if not set) */

64

def cloudWatchCredentials(credentials: SparkAWSCredentials): Builder

65

}

66

```

67

68

### Stream Building

69

70

Methods to create the final KinesisInputDStream instances with configured parameters.

71

72

```scala { .api }

73

class Builder {

74

/**

75

* Builds KinesisInputDStream with default message handler returning Array[Byte]

76

* @return Configured KinesisInputDStream instance

77

*/

78

def build(): KinesisInputDStream[Array[Byte]]

79

80

/**

81

* Builds KinesisInputDStream with custom message handler

82

* @param handler Function to transform Kinesis Record to desired type

83

* @return Configured KinesisInputDStream instance with custom type

84

*/

85

def buildWithMessageHandler[T: ClassTag](handler: Record => T): KinesisInputDStream[T]

86

}

87

```

88

89

**Usage Examples:**

90

91

```scala

92

import org.apache.spark.streaming.StreamingContext

93

import org.apache.spark.streaming.kinesis.KinesisInputDStream

94

import org.apache.spark.streaming.kinesis.KinesisInitialPositions._

95

import org.apache.spark.streaming.Seconds

96

import org.apache.spark.storage.StorageLevel

97

import com.amazonaws.services.kinesis.model.Record

98

99

// Basic stream creation with default byte array handler

100

val basicStream = KinesisInputDStream.builder

101

.streamingContext(ssc)

102

.streamName("my-kinesis-stream")

103

.checkpointAppName("my-spark-app")

104

.build()

105

106

// Advanced stream creation with custom configuration

107

val advancedStream = KinesisInputDStream.builder

108

.streamingContext(ssc)

109

.streamName("my-kinesis-stream")

110

.endpointUrl("https://kinesis.us-west-2.amazonaws.com")

111

.regionName("us-west-2")

112

.initialPosition(new TrimHorizon())

113

.checkpointAppName("my-spark-app")

114

.checkpointInterval(Seconds(30))

115

.storageLevel(StorageLevel.MEMORY_AND_DISK_SER_2)

116

.build()

117

118

// Stream with custom message handler for JSON processing

119

case class MyEvent(id: String, timestamp: Long, data: String)

120

121

val jsonStream = KinesisInputDStream.builder

122

.streamingContext(ssc)

123

.streamName("json-events-stream")

124

.checkpointAppName("json-processor")

125

.buildWithMessageHandler { record: Record =>

126

val json = new String(record.getData().array(), "UTF-8")

127

// Parse JSON to case class (assuming JSON parsing library)

128

parseJson[MyEvent](json)

129

}

130

```

131

132

### Default Values

133

134

The builder uses these default values for optional parameters:

135

136

```scala { .api }

137

object KinesisInputDStream {

138

private[kinesis] val DEFAULT_KINESIS_ENDPOINT_URL: String = "https://kinesis.us-east-1.amazonaws.com"

139

private[kinesis] val DEFAULT_KINESIS_REGION_NAME: String = "us-east-1"

140

private[kinesis] val DEFAULT_INITIAL_POSITION: KinesisInitialPosition = new Latest()

141

private[kinesis] val DEFAULT_STORAGE_LEVEL: StorageLevel = StorageLevel.MEMORY_AND_DISK_2

142

}

143

```

144

145

### Message Handler Function

146

147

Custom message handlers transform Kinesis Record objects into desired types:

148

149

```scala { .api }

150

/**

151

* Default message handler that extracts byte array from Kinesis Record

152

* @param record Kinesis Record containing data and metadata

153

* @return Byte array of record data

154

*/

155

def defaultMessageHandler(record: Record): Array[Byte]

156

```

157

158

**Custom Handler Examples:**

159

160

```scala

161

import com.amazonaws.services.kinesis.model.Record

162

import java.nio.charset.StandardCharsets

163

164

// String handler

165

val stringHandler: Record => String = { record =>

166

new String(record.getData().array(), StandardCharsets.UTF_8)

167

}

168

169

// Handler with metadata extraction

170

case class KinesisEvent(data: String, partitionKey: String, sequenceNumber: String)

171

172

val eventHandler: Record => KinesisEvent = { record =>

173

KinesisEvent(

174

data = new String(record.getData().array(), StandardCharsets.UTF_8),

175

partitionKey = record.getPartitionKey(),

176

sequenceNumber = record.getSequenceNumber()

177

)

178

}

179

180

// Error-handling wrapper

181

val safeHandler: Record => Option[String] = { record =>

182

try {

183

Some(new String(record.getData().array(), StandardCharsets.UTF_8))

184

} catch {

185

case _: Exception => None

186

}

187

}

188

```

189

190

### Required Parameters

191

192

These parameters must be provided or the builder will throw `IllegalArgumentException`:

193

194

- `streamingContext` - Either Scala or Java StreamingContext

195

- `streamName` - Name of the Kinesis stream to consume

196

- `checkpointAppName` - Application name for KCL checkpointing

197

198

### Parameter Validation

199

200

The builder validates parameters during construction:

201

202

- Region names are validated against AWS region list

203

- Endpoint URLs must be valid HTTP/HTTPS URLs

204

- Checkpoint intervals must be positive durations

205

- Storage levels must be valid Spark storage level constants