or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

credential-management.mdfault-tolerance.mdindex.mdjava-api.mdstream-creation.md

stream-creation.mddocs/

0

# Stream Creation

1

2

The primary capability for creating Kinesis input streams in Spark Streaming applications. KinesisUtils provides multiple overloaded methods to accommodate different use cases and type requirements.

3

4

## Scala API

5

6

### Generic Stream Creation with Custom Message Handler

7

8

Create a stream that transforms Kinesis Records to a custom type using a message handler function.

9

10

```scala { .api }

11

def createStream[T: ClassTag](

12

ssc: StreamingContext,

13

kinesisAppName: String,

14

streamName: String,

15

endpointUrl: String,

16

regionName: String,

17

initialPositionInStream: InitialPositionInStream,

18

checkpointInterval: Duration,

19

storageLevel: StorageLevel,

20

messageHandler: Record => T

21

): ReceiverInputDStream[T]

22

```

23

24

**Parameters:**

25

- `ssc` - StreamingContext object

26

- `kinesisAppName` - Kinesis application name used by KCL for DynamoDB coordination

27

- `streamName` - Kinesis stream name

28

- `endpointUrl` - Kinesis service URL (e.g., "https://kinesis.us-east-1.amazonaws.com")

29

- `regionName` - AWS region name for DynamoDB and CloudWatch

30

- `initialPositionInStream` - Starting position: TRIM_HORIZON or LATEST

31

- `checkpointInterval` - Checkpoint frequency for fault tolerance

32

- `storageLevel` - Storage level for received objects (recommended: MEMORY_AND_DISK_2)

33

- `messageHandler` - Function to transform Record to type T

34

35

**Usage Example:**

36

37

```scala

38

import com.amazonaws.services.kinesis.model.Record

39

import org.json4s._

40

import org.json4s.jackson.JsonMethods._

41

42

// Custom message handler for JSON data

43

def jsonMessageHandler(record: Record): JValue = {

44

val data = new String(record.getData.array())

45

parse(data)

46

}

47

48

val jsonStream = KinesisUtils.createStream[JValue](

49

ssc,

50

"json-processor-app",

51

"json-events",

52

"https://kinesis.us-west-2.amazonaws.com",

53

"us-west-2",

54

InitialPositionInStream.LATEST,

55

Seconds(30),

56

StorageLevel.MEMORY_AND_DISK_2,

57

jsonMessageHandler

58

)

59

```

60

61

### Default Byte Array Stream Creation

62

63

Create a stream that returns raw byte arrays using the default message handler.

64

65

```scala { .api }

66

def createStream(

67

ssc: StreamingContext,

68

kinesisAppName: String,

69

streamName: String,

70

endpointUrl: String,

71

regionName: String,

72

initialPositionInStream: InitialPositionInStream,

73

checkpointInterval: Duration,

74

storageLevel: StorageLevel

75

): ReceiverInputDStream[Array[Byte]]

76

```

77

78

**Usage Example:**

79

80

```scala

81

val byteStream = KinesisUtils.createStream(

82

ssc,

83

"data-processor",

84

"raw-data-stream",

85

"https://kinesis.eu-west-1.amazonaws.com",

86

"eu-west-1",

87

InitialPositionInStream.TRIM_HORIZON,

88

Seconds(60),

89

StorageLevel.MEMORY_AND_DISK_2

90

)

91

92

// Convert bytes to strings

93

val stringStream = byteStream.map(new String(_))

94

```

95

96

### Deprecated Stream Creation (Legacy)

97

98

```scala { .api }

99

@deprecated("use other forms of createStream", "1.4.0")

100

def createStream(

101

ssc: StreamingContext,

102

streamName: String,

103

endpointUrl: String,

104

checkpointInterval: Duration,

105

initialPositionInStream: InitialPositionInStream,

106

storageLevel: StorageLevel

107

): ReceiverInputDStream[Array[Byte]]

108

```

109

110

This method uses the SparkConf app name as the Kinesis application name and extracts the region from the endpoint URL.

111

112

## Java API

113

114

### Generic Stream Creation with Function Interface

115

116

```java { .api }

117

public static <T> JavaReceiverInputDStream<T> createStream(

118

JavaStreamingContext jssc,

119

String kinesisAppName,

120

String streamName,

121

String endpointUrl,

122

String regionName,

123

InitialPositionInStream initialPositionInStream,

124

Duration checkpointInterval,

125

StorageLevel storageLevel,

126

Function<Record, T> messageHandler,

127

Class<T> recordClass

128

);

129

```

130

131

**Usage Example:**

132

133

```java

134

import org.apache.spark.api.java.function.Function;

135

import com.amazonaws.services.kinesis.model.Record;

136

137

// Define message handler

138

Function<Record, String> messageHandler = new Function<Record, String>() {

139

@Override

140

public String call(Record record) throws Exception {

141

return new String(record.getData().array());

142

}

143

};

144

145

// Create stream

146

JavaReceiverInputDStream<String> stringStream = KinesisUtils.createStream(

147

jssc,

148

"java-kinesis-app",

149

"text-stream",

150

"https://kinesis.us-east-1.amazonaws.com",

151

"us-east-1",

152

InitialPositionInStream.LATEST,

153

Durations.seconds(30),

154

StorageLevel.MEMORY_AND_DISK_2(),

155

messageHandler,

156

String.class

157

);

158

```

159

160

### Default Byte Array Stream Creation (Java)

161

162

```java { .api }

163

public static JavaReceiverInputDStream<byte[]> createStream(

164

JavaStreamingContext jssc,

165

String kinesisAppName,

166

String streamName,

167

String endpointUrl,

168

String regionName,

169

InitialPositionInStream initialPositionInStream,

170

Duration checkpointInterval,

171

StorageLevel storageLevel

172

);

173

```

174

175

**Usage Example:**

176

177

```java

178

JavaReceiverInputDStream<byte[]> byteStream = KinesisUtils.createStream(

179

jssc,

180

"java-byte-processor",

181

"binary-data-stream",

182

"https://kinesis.ap-southeast-1.amazonaws.com",

183

"ap-southeast-1",

184

InitialPositionInStream.TRIM_HORIZON,

185

Durations.seconds(45),

186

StorageLevel.MEMORY_AND_DISK_2()

187

);

188

189

// Convert to strings

190

JavaDStream<String> stringStream = byteStream.map(

191

bytes -> new String(bytes)

192

);

193

```

194

195

## Configuration Options

196

197

### Initial Position in Stream

198

199

```scala { .api }

200

// From AWS KCL

201

enum InitialPositionInStream {

202

LATEST, // Start from the most recent record

203

TRIM_HORIZON // Start from the oldest available record (up to 24 hours)

204

}

205

```

206

207

- **LATEST**: Start processing from the most recent records in the stream

208

- **TRIM_HORIZON**: Start from the oldest available records (Kinesis retains data for 24 hours minimum)

209

210

### Storage Level Recommendations

211

212

```scala { .api }

213

import org.apache.spark.storage.StorageLevel

214

215

// Recommended storage levels

216

StorageLevel.MEMORY_AND_DISK_2 // Replicated in memory and disk (recommended)

217

StorageLevel.MEMORY_AND_DISK // Memory and disk fallback

218

StorageLevel.MEMORY_ONLY_2 // Memory only with replication

219

```

220

221

**MEMORY_AND_DISK_2** is recommended for fault tolerance as it provides both memory performance and disk persistence with replication.

222

223

### Checkpoint Intervals

224

225

Choose checkpoint intervals based on your application requirements:

226

227

- **Short intervals (10-30 seconds)**: Lower data loss risk, higher DynamoDB costs

228

- **Medium intervals (30-120 seconds)**: Balanced approach for most applications

229

- **Long intervals (2-5 minutes)**: Lower costs, higher potential data loss on failure

230

231

## Error Handling

232

233

Common errors and their solutions:

234

235

**IllegalArgumentException**: Invalid region name

236

```scala

237

// Ensure region name is valid

238

val validRegions = Seq("us-east-1", "us-west-2", "eu-west-1", "ap-southeast-1")

239

```

240

241

**AWS Authentication Errors**: Ensure proper AWS credentials are configured

242

- Use DefaultAWSCredentialsProviderChain for automatic credential discovery

243

- Or provide explicit credentials using the credential management methods

244

245

**DynamoDB Access Errors**: Ensure the application has proper permissions for DynamoDB table creation and access for checkpointing.