or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

aws-configuration.mddata-processing.mdindex.mdjava-api.mdstream-creation.md

stream-creation.mddocs/

0

# Stream Creation

1

2

Primary factory methods for creating Kinesis input streams with various configuration options including custom message handlers, explicit AWS credentials, and different storage levels.

3

4

## Capabilities

5

6

### Basic Stream Creation (Scala)

7

8

Creates a Kinesis input stream using default AWS credential discovery and byte array message handler.

9

10

```scala { .api }

11

/**

12

* Create an input stream that pulls messages from a Kinesis stream using the KCL.

13

* Uses DefaultAWSCredentialsProviderChain for AWS authentication.

14

*

15

* @param ssc StreamingContext object

16

* @param kinesisAppName Kinesis application name used by the KCL to update DynamoDB

17

* @param streamName Kinesis stream name

18

* @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)

19

* @param regionName Name of region used by the KCL for DynamoDB and CloudWatch

20

* @param initialPositionInStream Starting position in stream (TRIM_HORIZON or LATEST)

21

* @param checkpointInterval Checkpoint interval for Kinesis checkpointing

22

* @param storageLevel Storage level for received objects (MEMORY_AND_DISK_2 recommended)

23

* @return ReceiverInputDStream[Array[Byte]] containing raw message data

24

*/

25

def createStream(

26

ssc: StreamingContext,

27

kinesisAppName: String,

28

streamName: String,

29

endpointUrl: String,

30

regionName: String,

31

initialPositionInStream: InitialPositionInStream,

32

checkpointInterval: Duration,

33

storageLevel: StorageLevel

34

): ReceiverInputDStream[Array[Byte]]

35

```

36

37

**Usage Example:**

38

39

```scala

40

import org.apache.spark.streaming.kinesis._

41

import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream

42

import org.apache.spark.storage.StorageLevel

43

import org.apache.spark.streaming.Duration

44

45

val stream = KinesisUtils.createStream(

46

ssc,

47

"MySparkKinesisApp",

48

"my-kinesis-stream",

49

"https://kinesis.us-east-1.amazonaws.com",

50

"us-east-1",

51

InitialPositionInStream.LATEST,

52

Duration.milliseconds(2000),

53

StorageLevel.MEMORY_AND_DISK_2

54

)

55

```

56

57

### Stream Creation with Explicit Credentials (Scala)

58

59

Creates a Kinesis input stream with explicitly provided AWS credentials.

60

61

```scala { .api }

62

/**

63

* Create an input stream with explicit AWS credentials.

64

* Note: Credentials will be saved in DStream checkpoints if checkpointing is enabled.

65

*

66

* @param ssc StreamingContext object

67

* @param kinesisAppName Kinesis application name used by the KCL to update DynamoDB

68

* @param streamName Kinesis stream name

69

* @param endpointUrl Url of Kinesis service

70

* @param regionName Name of region used by the KCL for DynamoDB and CloudWatch

71

* @param initialPositionInStream Starting position in stream

72

* @param checkpointInterval Checkpoint interval for Kinesis checkpointing

73

* @param storageLevel Storage level for received objects

74

* @param awsAccessKeyId AWS AccessKeyId (if null, uses DefaultAWSCredentialsProviderChain)

75

* @param awsSecretKey AWS SecretKey (if null, uses DefaultAWSCredentialsProviderChain)

76

* @return ReceiverInputDStream[Array[Byte]] containing raw message data

77

*/

78

def createStream(

79

ssc: StreamingContext,

80

kinesisAppName: String,

81

streamName: String,

82

endpointUrl: String,

83

regionName: String,

84

initialPositionInStream: InitialPositionInStream,

85

checkpointInterval: Duration,

86

storageLevel: StorageLevel,

87

awsAccessKeyId: String,

88

awsSecretKey: String

89

): ReceiverInputDStream[Array[Byte]]

90

```

91

92

**Usage Example:**

93

94

```scala

95

val stream = KinesisUtils.createStream(

96

ssc,

97

"MySparkKinesisApp",

98

"my-kinesis-stream",

99

"https://kinesis.us-east-1.amazonaws.com",

100

"us-east-1",

101

InitialPositionInStream.LATEST,

102

Duration.milliseconds(2000),

103

StorageLevel.MEMORY_AND_DISK_2,

104

"AKIAIOSFODNN7EXAMPLE",

105

"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"

106

)

107

```

108

109

### Stream Creation with Custom Message Handler (Scala)

110

111

Creates a typed Kinesis input stream with a custom message handler function.

112

113

```scala { .api }

114

/**

115

* Create an input stream with a custom message handler for type-safe data processing.

116

*

117

* @param ssc StreamingContext object

118

* @param kinesisAppName Kinesis application name used by the KCL to update DynamoDB

119

* @param streamName Kinesis stream name

120

* @param endpointUrl Url of Kinesis service

121

* @param regionName Name of region used by the KCL for DynamoDB and CloudWatch

122

* @param initialPositionInStream Starting position in stream

123

* @param checkpointInterval Checkpoint interval for Kinesis checkpointing

124

* @param storageLevel Storage level for received objects

125

* @param messageHandler Custom function to process Kinesis Records into type T

126

* @return ReceiverInputDStream[T] containing processed data

127

*/

128

def createStream[T: ClassTag](

129

ssc: StreamingContext,

130

kinesisAppName: String,

131

streamName: String,

132

endpointUrl: String,

133

regionName: String,

134

initialPositionInStream: InitialPositionInStream,

135

checkpointInterval: Duration,

136

storageLevel: StorageLevel,

137

messageHandler: Record => T

138

): ReceiverInputDStream[T]

139

```

140

141

**Usage Example:**

142

143

```scala

144

import com.amazonaws.services.kinesis.model.Record

145

import spray.json._

146

147

case class MyEvent(id: String, timestamp: Long, data: String)

148

149

// Custom message handler to parse JSON events

150

def parseMyEvent(record: Record): MyEvent = {

151

val data = new String(record.getData.array())

152

data.parseJson.convertTo[MyEvent]

153

}

154

155

val stream = KinesisUtils.createStream[MyEvent](

156

ssc,

157

"MySparkKinesisApp",

158

"my-events-stream",

159

"https://kinesis.us-east-1.amazonaws.com",

160

"us-east-1",

161

InitialPositionInStream.LATEST,

162

Duration.milliseconds(2000),

163

StorageLevel.MEMORY_AND_DISK_2,

164

parseMyEvent

165

)

166

```

167

168

### Stream Creation with Custom Handler and Credentials (Scala)

169

170

Creates a typed Kinesis input stream with both custom message handler and explicit AWS credentials.

171

172

```scala { .api }

173

/**

174

* Create an input stream with custom message handler and explicit AWS credentials.

175

*

176

* @param ssc StreamingContext object

177

* @param kinesisAppName Kinesis application name used by the KCL to update DynamoDB

178

* @param streamName Kinesis stream name

179

* @param endpointUrl Url of Kinesis service

180

* @param regionName Name of region used by the KCL for DynamoDB and CloudWatch

181

* @param initialPositionInStream Starting position in stream

182

* @param checkpointInterval Checkpoint interval for Kinesis checkpointing

183

* @param storageLevel Storage level for received objects

184

* @param messageHandler Custom function to process Kinesis Records into type T

185

* @param awsAccessKeyId AWS AccessKeyId

186

* @param awsSecretKey AWS SecretKey

187

* @return ReceiverInputDStream[T] containing processed data

188

*/

189

def createStream[T: ClassTag](

190

ssc: StreamingContext,

191

kinesisAppName: String,

192

streamName: String,

193

endpointUrl: String,

194

regionName: String,

195

initialPositionInStream: InitialPositionInStream,

196

checkpointInterval: Duration,

197

storageLevel: StorageLevel,

198

messageHandler: Record => T,

199

awsAccessKeyId: String,

200

awsSecretKey: String

201

): ReceiverInputDStream[T]

202

```

203

204

### Deprecated Stream Creation (Scala)

205

206

Simplified stream creation method (deprecated since version 1.4.0).

207

208

```scala { .api }

209

/**

210

* Create an input stream using app name from SparkConf and region from endpoint.

211

* @deprecated use other forms of createStream

212

*

213

* @param ssc StreamingContext object

214

* @param streamName Kinesis stream name

215

* @param endpointUrl Endpoint url of Kinesis service

216

* @param checkpointInterval Checkpoint interval for Kinesis checkpointing

217

* @param initialPositionInStream Starting position in stream

218

* @param storageLevel Storage level for received objects

219

* @return ReceiverInputDStream[Array[Byte]] containing raw message data

220

*/

221

@deprecated("use other forms of createStream", "1.4.0")

222

def createStream(

223

ssc: StreamingContext,

224

streamName: String,

225

endpointUrl: String,

226

checkpointInterval: Duration,

227

initialPositionInStream: InitialPositionInStream,

228

storageLevel: StorageLevel

229

): ReceiverInputDStream[Array[Byte]]

230

```

231

232

## Configuration Guidelines

233

234

### Application Name (kinesisAppName)

235

- Must be unique per Kinesis stream

236

- Used by KCL for DynamoDB coordination table naming

237

- Changing this requires deleting the associated DynamoDB table

238

239

### Checkpoint Interval

240

- Should typically match or be a multiple of the batch interval

241

- Shorter intervals provide better fault tolerance but increase DynamoDB usage

242

- Must be >= 1 second

243

244

### Storage Level

245

- `StorageLevel.MEMORY_AND_DISK_2` is recommended for fault tolerance

246

- Higher replication levels provide better fault tolerance

247

- Consider memory constraints when choosing storage levels

248

249

### Initial Position

250

- `InitialPositionInStream.LATEST`: Start from the most recent records

251

- `InitialPositionInStream.TRIM_HORIZON`: Start from the oldest available records (up to 24 hours)

252

- Only applies when no checkpoint exists in DynamoDB

253

254

## Error Handling

255

256

Common error scenarios and handling:

257

258

- **Invalid region names**: Throws `IllegalArgumentException`

259

- **Authentication failures**: Runtime exceptions during stream processing

260

- **Network connectivity**: Automatic retries via KCL with exponential backoff

261

- **DynamoDB access**: Requires proper IAM permissions for lease coordination

262

- **CloudWatch access**: Optional but recommended for monitoring metrics