or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

aws-credentials.mdconfiguration.mdindex.mdstream-creation.mdstream-positioning.mdtesting.md

index.mddocs/

0

# Spark Streaming Kinesis ASL

1

2

Apache Spark Streaming integration with Amazon Kinesis Data Streams for real-time processing of streaming data at massive scale. It provides a receiver-based approach using the Amazon Kinesis Client Library (KCL) to create input DStreams with built-in load balancing, fault tolerance, and checkpointing capabilities.

3

4

## Package Information

5

6

- **Package Name**: spark-streaming-kinesis-asl_2.12

7

- **Package Type**: maven

8

- **Language**: Scala (with Java support)

9

- **Group ID**: org.apache.spark

10

- **Installation**: Add to `pom.xml` or `build.sbt` with Amazon Software License terms

11

12

**Maven dependency:**

13

```xml

14

<dependency>

15

<groupId>org.apache.spark</groupId>

16

<artifactId>spark-streaming-kinesis-asl_2.12</artifactId>

17

<version>2.4.8</version>

18

</dependency>

19

```

20

21

**SBT dependency:**

22

```scala

23

libraryDependencies += "org.apache.spark" %% "spark-streaming-kinesis-asl" % "2.4.8"

24

```

25

26

## Core Imports

27

28

```scala

29

import org.apache.spark.streaming.kinesis.KinesisInputDStream

30

import org.apache.spark.streaming.kinesis.KinesisInitialPositions._

31

import org.apache.spark.streaming.kinesis.SparkAWSCredentials

32

```

33

34

Legacy imports (deprecated):

35

```scala

36

import org.apache.spark.streaming.kinesis.KinesisUtils

37

```

38

39

## Basic Usage

40

41

```scala

42

import org.apache.spark.streaming.StreamingContext

43

import org.apache.spark.streaming.kinesis.KinesisInputDStream

44

import org.apache.spark.streaming.kinesis.KinesisInitialPositions._

45

import org.apache.spark.streaming.{Seconds, Duration}

46

import org.apache.spark.storage.StorageLevel

47

48

// Create streaming context

49

val ssc = new StreamingContext(sparkConf, Seconds(10))

50

51

// Create Kinesis input stream using Builder pattern (recommended)

52

val kinesisStream = KinesisInputDStream.builder

53

.streamingContext(ssc)

54

.streamName("my-kinesis-stream")

55

.endpointUrl("https://kinesis.us-west-2.amazonaws.com")

56

.regionName("us-west-2")

57

.initialPosition(new Latest())

58

.checkpointAppName("my-spark-app")

59

.checkpointInterval(Duration(30000))

60

.storageLevel(StorageLevel.MEMORY_AND_DISK_2)

61

.build()

62

63

// Process the stream

64

kinesisStream.foreachRDD { rdd =>

65

if (!rdd.isEmpty()) {

66

// Process each record as Array[Byte]

67

rdd.collect().foreach { record =>

68

val data = new String(record, "UTF-8")

69

println(s"Received: $data")

70

}

71

}

72

}

73

74

// Start the streaming context

75

ssc.start()

76

ssc.awaitTermination()

77

```

78

79

## Architecture

80

81

The Spark Kinesis ASL integration consists of several key components:

82

83

- **KinesisInputDStream**: Core DStream implementation that creates Kinesis-backed RDDs

84

- **KinesisReceiver**: Receiver implementation using Amazon KCL for consuming data

85

- **Builder Pattern**: Modern configuration API for flexible stream creation

86

- **AWS Credentials**: Pluggable credential providers supporting various authentication methods

87

- **Checkpointing**: Automatic state management through DynamoDB for fault tolerance

88

- **Recovery**: Sequence number-based recovery mechanism for exactly-once processing

89

90

## Capabilities

91

92

### Stream Creation

93

94

Primary interface for creating Kinesis input streams with full configuration options using the modern Builder pattern.

95

96

```scala { .api }

97

object KinesisInputDStream {

98

def builder: Builder

99

100

class Builder {

101

def streamingContext(ssc: StreamingContext): Builder

102

def streamName(streamName: String): Builder

103

def checkpointAppName(appName: String): Builder

104

def build(): KinesisInputDStream[Array[Byte]]

105

def buildWithMessageHandler[T: ClassTag](handler: Record => T): KinesisInputDStream[T]

106

}

107

}

108

```

109

110

[Stream Creation](./stream-creation.md)

111

112

### AWS Credentials Management

113

114

Flexible credential providers supporting default AWS credentials, basic key/secret authentication, and STS role assumption.

115

116

```scala { .api }

117

sealed trait SparkAWSCredentials {

118

def provider: AWSCredentialsProvider

119

}

120

121

object SparkAWSCredentials {

122

def builder: Builder

123

124

class Builder {

125

def basicCredentials(accessKeyId: String, secretKey: String): Builder

126

def stsCredentials(roleArn: String, sessionName: String): Builder

127

def build(): SparkAWSCredentials

128

}

129

}

130

131

case object DefaultCredentials extends SparkAWSCredentials

132

case class BasicCredentials(awsAccessKeyId: String, awsSecretKey: String) extends SparkAWSCredentials

133

case class STSCredentials(stsRoleArn: String, stsSessionName: String, stsExternalId: Option[String]) extends SparkAWSCredentials

134

```

135

136

[AWS Credentials](./aws-credentials.md)

137

138

### Initial Stream Positioning

139

140

Configuration for specifying where to start reading from Kinesis streams, supporting latest, trim horizon, and timestamp-based positioning.

141

142

```scala { .api }

143

// Java classes for initial position specification

144

class Latest extends KinesisInitialPosition

145

class TrimHorizon extends KinesisInitialPosition

146

class AtTimestamp extends KinesisInitialPosition {

147

def AtTimestamp(timestamp: java.util.Date)

148

def getTimestamp(): java.util.Date

149

}

150

```

151

152

[Stream Positioning](./stream-positioning.md)

153

154

### Configuration Management

155

156

Advanced configuration options for controlling retry behavior, timeouts, and performance tuning.

157

158

```scala { .api }

159

case class KinesisReadConfigurations(

160

maxRetries: Int,

161

retryWaitTimeMs: Long,

162

retryTimeoutMs: Long

163

)

164

165

object KinesisReadConfigurations {

166

def apply(): KinesisReadConfigurations

167

def apply(ssc: StreamingContext): KinesisReadConfigurations

168

169

val RETRY_MAX_ATTEMPTS_KEY: String

170

val RETRY_WAIT_TIME_KEY: String

171

val DEFAULT_MAX_RETRIES: Int

172

val DEFAULT_RETRY_WAIT_TIME: String

173

val DEFAULT_RETRY_TIMEOUT: Long

174

}

175

```

176

177

[Configuration](./configuration.md)

178

179

### Testing Utilities

180

181

Utilities for testing Kinesis integration including stream creation, data generation, and cleanup operations.

182

183

```scala { .api }

184

class KinesisTestUtils(streamShardCount: Int = 2) {

185

def streamName: String

186

def createStream(): Unit

187

def pushData(testData: Seq[Int], aggregate: Boolean): Map[String, Seq[(Int, String)]]

188

def deleteStream(): Unit

189

}

190

191

object KinesisTestUtils {

192

val shouldRunTests: Boolean

193

val endpointUrl: String

194

def isAWSCredentialsPresent: Boolean

195

}

196

```

197

198

[Testing Utilities](./testing.md)

199

200

## Error Handling

201

202

The library can throw the following exceptions:

203

204

- `IllegalArgumentException` - For invalid configuration parameters (region names, credentials, etc.)

205

- `UnsupportedOperationException` - For unsupported initial position types in legacy APIs

206

- AWS SDK exceptions - For authentication failures, network issues, and service errors

207

- KCL exceptions - For Kinesis-specific operational errors during stream processing

208

209

### Legacy KinesisUtils (Deprecated)

210

211

Legacy factory methods for creating Kinesis streams (deprecated since Spark 2.2.0, still functional).

212

213

```scala { .api }

214

@deprecated("Use KinesisInputDStream.builder instead", "2.2.0")

215

object KinesisUtils {

216

def createStream[T: ClassTag](

217

ssc: StreamingContext,

218

kinesisAppName: String,

219

streamName: String,

220

endpointUrl: String,

221

regionName: String,

222

initialPositionInStream: InitialPositionInStream,

223

checkpointInterval: Duration,

224

storageLevel: StorageLevel,

225

messageHandler: Record => T

226

): ReceiverInputDStream[T]

227

228

def createStream[T: ClassTag](

229

ssc: StreamingContext,

230

kinesisAppName: String,

231

streamName: String,

232

endpointUrl: String,

233

regionName: String,

234

initialPositionInStream: InitialPositionInStream,

235

checkpointInterval: Duration,

236

storageLevel: StorageLevel

237

): ReceiverInputDStream[Array[Byte]]

238

}

239

```

240

241

## Migration Notes

242

243

The legacy `KinesisUtils` factory methods are deprecated since Spark 2.2.0. Use `KinesisInputDStream.builder` for new development:

244

245

```scala

246

// Deprecated (still functional)

247

KinesisUtils.createStream(ssc, appName, streamName, endpointUrl, regionName, ...)

248

249

// Recommended

250

KinesisInputDStream.builder

251

.streamingContext(ssc)

252

.streamName(streamName)

253

.checkpointAppName(appName)

254

.endpointUrl(endpointUrl)

255

.regionName(regionName)

256

.build()

257

```