or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

aws-credentials.mdconfiguration.mdindex.mdinitial-position.mdpython-integration.mdstream-creation.md

index.mddocs/

0

# Apache Spark Kinesis Integration

1

2

Apache Spark Kinesis Integration enables real-time stream processing of data from Amazon Kinesis Data Streams using Spark Streaming. It provides a receiver-based approach using the Amazon Kinesis Client Library (KCL) to consume data from Kinesis streams and transform it into Spark DStreams for distributed processing.

3

4

## Package Information

5

6

- **Package Name**: spark-streaming-kinesis-asl_2.13

7

- **Package Type**: maven

8

- **Group ID**: org.apache.spark

9

- **Language**: Scala

10

- **Installation**: Add Maven dependency:

11

12

```xml

13

<dependency>

14

<groupId>org.apache.spark</groupId>

15

<artifactId>spark-streaming-kinesis-asl_2.13</artifactId>

16

<version>4.0.0</version>

17

</dependency>

18

```

19

20

For SBT:

21

22

```scala

23

libraryDependencies += "org.apache.spark" %% "spark-streaming-kinesis-asl" % "4.0.0"

24

```

25

26

## Core Imports

27

28

```scala

29

import org.apache.spark.streaming.kinesis.KinesisInputDStream

30

import org.apache.spark.streaming.kinesis.{SparkAWSCredentials, DefaultCredentials, BasicCredentials, STSCredentials}

31

import org.apache.spark.streaming.kinesis.KinesisInitialPositions

32

import org.apache.spark.streaming.StreamingContext

33

import org.apache.spark.storage.StorageLevel

34

import com.amazonaws.services.kinesis.model.Record

35

```

36

37

## Basic Usage

38

39

```scala

40

import org.apache.spark.streaming.StreamingContext

41

import org.apache.spark.streaming.kinesis.{KinesisInputDStream, KinesisInitialPositions}

42

import org.apache.spark.streaming.Seconds

43

import org.apache.spark.SparkConf

44

import org.apache.spark.storage.StorageLevel

45

import com.amazonaws.services.kinesis.model.Record

46

47

val conf = new SparkConf().setAppName("KinesisExample")

48

val ssc = new StreamingContext(conf, Seconds(10))

49

50

// Create Kinesis DStream using the builder pattern

51

val kinesisStream = KinesisInputDStream.builder

52

.streamingContext(ssc)

53

.streamName("my-kinesis-stream")

54

.checkpointAppName("my-app")

55

.regionName("us-east-1")

56

.initialPosition(new KinesisInitialPositions.Latest())

57

.build()

58

59

// Process the stream

60

kinesisStream.foreachRDD { rdd =>

61

rdd.foreach { bytes =>

62

val data = new String(bytes)

63

println(s"Received: $data")

64

}

65

}

66

67

ssc.start()

68

ssc.awaitTermination()

69

```

70

71

## Architecture

72

73

The Spark Kinesis Integration is built around several key components:

74

75

- **KinesisInputDStream**: The main DStream implementation that creates Kinesis receivers

76

- **KinesisReceiver**: Manages KCL workers for consuming data from Kinesis shards

77

- **Builder Pattern**: Fluent API for configuring Kinesis streams with required and optional parameters

78

- **Credentials Management**: Flexible authentication supporting AWS default chain, basic credentials, and STS assume role

79

- **Fault Tolerance**: Uses KinesisBackedBlockRDD for recovery from Kinesis data when local blocks are lost

80

- **Checkpointing**: Automatic state management through DynamoDB for tracking consumption progress

81

82

## Capabilities

83

84

### Stream Creation

85

86

Core functionality for creating Kinesis DStreams with comprehensive configuration options including authentication, checkpointing, and performance tuning.

87

88

```scala { .api }

89

object KinesisInputDStream {

90

def builder: KinesisInputDStream.Builder

91

}

92

93

class Builder {

94

def streamingContext(ssc: StreamingContext): Builder

95

def streamName(streamName: String): Builder

96

def checkpointAppName(appName: String): Builder

97

def build(): KinesisInputDStream[Array[Byte]]

98

def buildWithMessageHandler[T: ClassTag](handler: Record => T): KinesisInputDStream[T]

99

}

100

```

101

102

[Stream Creation](./stream-creation.md)

103

104

### AWS Credentials Management

105

106

Authentication system supporting multiple credential providers including default AWS chain, basic access keys, and STS assume role for cross-account access.

107

108

```scala { .api }

109

sealed trait SparkAWSCredentials {

110

def provider: AWSCredentialsProvider

111

}

112

113

object SparkAWSCredentials {

114

def builder: SparkAWSCredentials.Builder

115

}

116

117

case object DefaultCredentials extends SparkAWSCredentials

118

case class BasicCredentials(awsAccessKeyId: String, awsSecretKey: String) extends SparkAWSCredentials

119

case class STSCredentials(stsRoleArn: String, stsSessionName: String, stsExternalId: Option[String], longLivedCreds: SparkAWSCredentials) extends SparkAWSCredentials

120

```

121

122

[AWS Credentials](./aws-credentials.md)

123

124

### Initial Position Configuration

125

126

Configuration system for specifying where to start reading from Kinesis streams, supporting latest records, oldest records, or specific timestamps.

127

128

```scala { .api }

129

interface KinesisInitialPosition {

130

def getPosition(): InitialPositionInStream

131

}

132

133

object KinesisInitialPositions {

134

class Latest extends KinesisInitialPosition

135

class TrimHorizon extends KinesisInitialPosition

136

class AtTimestamp(timestamp: Date) extends KinesisInitialPosition

137

}

138

```

139

140

[Initial Position](./initial-position.md)

141

142

### Configuration and Performance Tuning

143

144

Advanced configuration options for retry logic, timeouts, storage levels, metrics, and checkpointing intervals to optimize performance and reliability.

145

146

```scala { .api }

147

case class KinesisReadConfigurations(

148

maxRetries: Int,

149

retryWaitTimeMs: Long,

150

retryTimeoutMs: Long

151

)

152

153

object KinesisReadConfigurations {

154

def apply(): KinesisReadConfigurations

155

def apply(ssc: StreamingContext): KinesisReadConfigurations

156

}

157

```

158

159

[Configuration](./configuration.md)

160

161

### Python Integration

162

163

Internal utilities for PySpark integration providing Python-friendly interfaces to the Scala/Java APIs.

164

165

```scala { .api }

166

private class KinesisUtilsPythonHelper {

167

def createStream(

168

jssc: JavaStreamingContext,

169

kinesisAppName: String,

170

streamName: String,

171

endpointUrl: String,

172

regionName: String,

173

initialPositionInStream: Int,

174

checkpointInterval: Duration,

175

metricsLevel: Int,

176

storageLevel: StorageLevel,

177

awsAccessKeyId: String,

178

awsSecretKey: String,

179

stsAssumeRoleArn: String,

180

stsSessionName: String,

181

stsExternalId: String

182

): JavaReceiverInputDStream[Array[Byte]]

183

}

184

```

185

186

[Python Integration](./python-integration.md)

187

188

## Types

189

190

```scala { .api }

191

// Core DStream type

192

class KinesisInputDStream[T: ClassTag] extends ReceiverInputDStream[T]

193

194

// Sequence number tracking for fault tolerance

195

case class SequenceNumberRange(

196

streamName: String,

197

shardId: String,

198

fromSeqNumber: String,

199

toSeqNumber: String,

200

recordCount: Int

201

)

202

203

case class SequenceNumberRanges(ranges: Seq[SequenceNumberRange]) {

204

def isEmpty: Boolean

205

def nonEmpty: Boolean

206

}

207

208

// Fault tolerant RDD implementation

209

class KinesisBackedBlockRDD[T: ClassTag](

210

sc: SparkContext,

211

regionName: String,

212

endpointUrl: String,

213

blockIds: Array[BlockId],

214

seqNumRanges: Array[SequenceNumberRanges],

215

isBlockIdValid: Array[Boolean],

216

messageHandler: Record => T,

217

kinesisCreds: SparkAWSCredentials,

218

kinesisReadConfigs: KinesisReadConfigurations

219

) extends BlockRDD[T]

220

```