or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

aws-configuration.mddata-processing.mdindex.mdjava-api.mdstream-creation.md

index.mddocs/

0

# Spark Streaming Kinesis ASL

1

2

Spark Streaming integration with Amazon Kinesis for real-time data processing using the Kinesis Client Library (KCL). This library enables Spark Streaming applications to consume data from Amazon Kinesis streams with automatic load-balancing, fault-tolerance, and checkpointing capabilities.

3

4

## Package Information

5

6

- **Package Name**: spark-streaming-kinesis-asl_2.10

7

- **Package Type**: maven

8

- **Language**: Scala/Java

9

- **Installation**: Add dependency to your Maven pom.xml or SBT build.sbt

10

11

### Maven Dependency

12

13

```xml

14

<dependency>

15

<groupId>org.apache.spark</groupId>

16

<artifactId>spark-streaming-kinesis-asl_2.10</artifactId>

17

<version>1.6.3</version>

18

</dependency>

19

```

20

21

### SBT Dependency

22

23

```scala

24

libraryDependencies += "org.apache.spark" %% "spark-streaming-kinesis-asl" % "1.6.3"

25

```

26

27

## Core Imports

28

29

### Scala

30

31

```scala

32

import org.apache.spark.streaming.kinesis._

33

import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream

34

import org.apache.spark.storage.StorageLevel

35

import org.apache.spark.streaming.{Duration, StreamingContext}

36

```

37

38

### Java

39

40

```java

41

import org.apache.spark.streaming.kinesis.KinesisUtils;

42

import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;

43

import org.apache.spark.storage.StorageLevel;

44

import org.apache.spark.streaming.Duration;

45

import org.apache.spark.streaming.api.java.JavaStreamingContext;

46

```

47

48

## Basic Usage

49

50

### Scala Example

51

52

```scala

53

import org.apache.spark.streaming.kinesis._

54

import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream

55

import org.apache.spark.storage.StorageLevel

56

import org.apache.spark.streaming.{Duration, StreamingContext}

57

58

// Create Spark Streaming context

59

val ssc = new StreamingContext(sparkContext, Duration.milliseconds(2000))

60

61

// Create Kinesis input stream

62

val kinesisStream = KinesisUtils.createStream(

63

ssc,

64

"MyKinesisApp", // KCL application name

65

"MyKinesisStream", // Kinesis stream name

66

"https://kinesis.us-east-1.amazonaws.com", // Endpoint URL

67

"us-east-1", // Region name

68

InitialPositionInStream.LATEST, // Starting position

69

Duration.milliseconds(2000), // Checkpoint interval

70

StorageLevel.MEMORY_AND_DISK_2 // Storage level

71

)

72

73

// Process the stream

74

kinesisStream.foreachRDD { rdd =>

75

rdd.foreach { byteArray =>

76

println(new String(byteArray))

77

}

78

}

79

80

ssc.start()

81

ssc.awaitTermination()

82

```

83

84

### Java Example

85

86

```java

87

import org.apache.spark.streaming.kinesis.KinesisUtils;

88

import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;

89

import org.apache.spark.storage.StorageLevel;

90

import org.apache.spark.streaming.Duration;

91

import org.apache.spark.streaming.api.java.JavaStreamingContext;

92

import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;

93

94

// Create Java Streaming context

95

JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));

96

97

// Create Kinesis input stream

98

JavaReceiverInputDStream<byte[]> kinesisStream = KinesisUtils.createStream(

99

jssc,

100

"MyKinesisApp", // KCL application name

101

"MyKinesisStream", // Kinesis stream name

102

"https://kinesis.us-east-1.amazonaws.com", // Endpoint URL

103

"us-east-1", // Region name

104

InitialPositionInStream.LATEST, // Starting position

105

new Duration(2000), // Checkpoint interval

106

StorageLevel.MEMORY_AND_DISK_2() // Storage level

107

);

108

109

// Process the stream

110

kinesisStream.foreachRDD(rdd -> {

111

rdd.foreach(byteArray -> {

112

System.out.println(new String(byteArray));

113

return null;

114

});

115

return null;

116

});

117

118

jssc.start();

119

jssc.awaitTermination();

120

```

121

122

## Architecture

123

124

The Spark Streaming Kinesis ASL integration is built around several key components:

125

126

- **KinesisUtils**: Factory object providing static methods to create Kinesis input streams

127

- **KinesisInputDStream**: Kinesis-specific implementation of ReceiverInputDStream with enhanced fault tolerance

128

- **KinesisReceiver**: Custom Spark Streaming Receiver that uses the Kinesis Client Library (KCL) Worker

129

- **KinesisBackedBlockRDD**: Specialized RDD that can re-read data from Kinesis using sequence number ranges

130

- **Checkpointing System**: Automatic checkpointing through DynamoDB for fault tolerance and exactly-once processing

131

- **Multi-shard Support**: Automatic distribution and load balancing across multiple Kinesis shards

132

133

## Capabilities

134

135

### Stream Creation

136

137

Primary factory methods for creating Kinesis input streams with various configuration options including custom message handlers, explicit AWS credentials, and different storage levels.

138

139

```scala { .api }

140

// Basic stream creation with default byte array handler

141

def createStream(

142

ssc: StreamingContext,

143

kinesisAppName: String,

144

streamName: String,

145

endpointUrl: String,

146

regionName: String,

147

initialPositionInStream: InitialPositionInStream,

148

checkpointInterval: Duration,

149

storageLevel: StorageLevel

150

): ReceiverInputDStream[Array[Byte]]

151

152

// Stream creation with custom message handler

153

def createStream[T: ClassTag](

154

ssc: StreamingContext,

155

kinesisAppName: String,

156

streamName: String,

157

endpointUrl: String,

158

regionName: String,

159

initialPositionInStream: InitialPositionInStream,

160

checkpointInterval: Duration,

161

storageLevel: StorageLevel,

162

messageHandler: Record => T

163

): ReceiverInputDStream[T]

164

```

165

166

[Stream Creation](./stream-creation.md)

167

168

### Data Processing and Message Handling

169

170

Advanced message processing capabilities including custom message handlers, type-safe transformations, and integration with Kinesis Record metadata such as partition keys and sequence numbers.

171

172

```scala { .api }

173

// Custom message handler function type

174

type MessageHandler[T] = com.amazonaws.services.kinesis.model.Record => T

175

176

// Default message handler for byte arrays

177

def defaultMessageHandler(record: Record): Array[Byte]

178

```

179

180

[Data Processing](./data-processing.md)

181

182

### Java API Integration

183

184

Complete Java API support with type-safe bindings, function interfaces, and seamless integration with Java Streaming contexts and data processing pipelines.

185

186

```java { .api }

187

// Java API stream creation

188

public static JavaReceiverInputDStream<byte[]> createStream(

189

JavaStreamingContext jssc,

190

String kinesisAppName,

191

String streamName,

192

String endpointUrl,

193

String regionName,

194

InitialPositionInStream initialPositionInStream,

195

Duration checkpointInterval,

196

StorageLevel storageLevel

197

);

198

199

// Java API with custom message handler

200

public static <T> JavaReceiverInputDStream<T> createStream(

201

JavaStreamingContext jssc,

202

String kinesisAppName,

203

String streamName,

204

String endpointUrl,

205

String regionName,

206

InitialPositionInStream initialPositionInStream,

207

Duration checkpointInterval,

208

StorageLevel storageLevel,

209

Function<Record, T> messageHandler,

210

Class<T> recordClass

211

);

212

```

213

214

[Java API](./java-api.md)

215

216

### AWS Authentication and Configuration

217

218

Flexible AWS authentication options including default credential provider chains, explicit credential specification, and IAM role integration for secure access to Kinesis streams.

219

220

```scala { .api }

221

// Stream creation with explicit AWS credentials

222

def createStream[T: ClassTag](

223

ssc: StreamingContext,

224

kinesisAppName: String,

225

streamName: String,

226

endpointUrl: String,

227

regionName: String,

228

initialPositionInStream: InitialPositionInStream,

229

checkpointInterval: Duration,

230

storageLevel: StorageLevel,

231

messageHandler: Record => T,

232

awsAccessKeyId: String,

233

awsSecretKey: String

234

): ReceiverInputDStream[T]

235

```

236

237

[AWS Configuration](./aws-configuration.md)

238

239

## Types

240

241

```scala { .api }

242

// AWS Credentials wrapper for serialization

243

case class SerializableAWSCredentials(

244

accessKeyId: String,

245

secretKey: String

246

) extends AWSCredentials

247

248

// Sequence number range for fault tolerance

249

case class SequenceNumberRange(

250

streamName: String,

251

shardId: String,

252

fromSeqNumber: String,

253

toSeqNumber: String

254

)

255

256

// Collection of sequence number ranges

257

case class SequenceNumberRanges(ranges: Seq[SequenceNumberRange]) {

258

def isEmpty(): Boolean

259

def nonEmpty(): Boolean

260

}

261

262

// External AWS KCL types (from com.amazonaws.services.kinesis.clientlibrary.lib.worker)

263

// InitialPositionInStream enum values:

264

// - InitialPositionInStream.LATEST: Start from most recent records

265

// - InitialPositionInStream.TRIM_HORIZON: Start from oldest available records (up to 24 hours)

266

```