or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-org-apache-spark--spark-streaming-kinesis-asl_2-12

Apache Spark Streaming integration with Amazon Kinesis for real-time processing of streaming data

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-streaming-kinesis-asl_2.12@3.5.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-spark--spark-streaming-kinesis-asl_2-12@3.5.0

0

# Apache Spark Streaming Kinesis Connector

1

2

Apache Spark Streaming integration with Amazon Kinesis for real-time processing of streaming data at massive scale. This connector provides a Kinesis receiver that creates input DStreams using the Amazon Kinesis Client Library (KCL), enabling load-balancing, fault-tolerance, and checkpointing capabilities through Workers, Checkpoints, and Shard Leases.

3

4

## Package Information

5

6

- **Package Name**: spark-streaming-kinesis-asl_2.12

7

- **Package Type**: maven

8

- **Language**: Scala

9

- **Installation**:

10

```xml

11

<dependency>

12

<groupId>org.apache.spark</groupId>

13

<artifactId>spark-streaming-kinesis-asl_2.12</artifactId>

14

<version>3.5.6</version>

15

</dependency>

16

```

17

18

## Core Imports

19

20

```scala

21

import org.apache.spark.streaming.kinesis.KinesisInputDStream

22

import org.apache.spark.streaming.kinesis.KinesisInitialPositions

23

import org.apache.spark.streaming.kinesis.SparkAWSCredentials

24

import org.apache.spark.streaming.StreamingContext

25

import org.apache.spark.streaming.api.java.JavaStreamingContext

26

import org.apache.spark.storage.StorageLevel

27

import org.apache.spark.streaming.Duration

28

import com.amazonaws.services.kinesis.model.Record

29

import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel

30

import com.amazonaws.auth.AWSCredentialsProvider

31

import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream

32

import scala.reflect.ClassTag

33

```

34

35

For Java:

36

```java

37

import org.apache.spark.streaming.kinesis.KinesisInputDStream;

38

import org.apache.spark.streaming.kinesis.KinesisInitialPositions;

39

import org.apache.spark.streaming.kinesis.SparkAWSCredentials;

40

import org.apache.spark.streaming.StreamingContext;

41

import org.apache.spark.streaming.api.java.JavaStreamingContext;

42

import org.apache.spark.storage.StorageLevel;

43

import org.apache.spark.streaming.Duration;

44

import com.amazonaws.services.kinesis.model.Record;

45

import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;

46

```

47

48

For Python:

49

```python

50

from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream

51

```

52

53

## Basic Usage

54

55

```scala

56

import org.apache.spark.streaming.{Seconds, StreamingContext}

57

import org.apache.spark.streaming.kinesis.KinesisInputDStream

58

import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest

59

60

val ssc = new StreamingContext(sparkConf, Seconds(2))

61

62

// Create Kinesis DStream using builder pattern

63

val kinesisStream = KinesisInputDStream.builder

64

.streamingContext(ssc)

65

.streamName("myKinesisStream")

66

.checkpointAppName("myKinesisApp")

67

.regionName("us-east-1")

68

.initialPosition(new Latest())

69

.build()

70

71

// Process the stream

72

kinesisStream.map(new String(_))

73

.flatMap(_.split(" "))

74

.map((_, 1))

75

.reduceByKey(_ + _)

76

.print()

77

78

ssc.start()

79

ssc.awaitTermination()

80

```

81

82

## Architecture

83

84

The Spark Kinesis connector uses the following key components:

85

86

- **KinesisInputDStream**: The main DStream implementation that creates Kinesis receivers

87

- **KinesisReceiver**: Receives data from Kinesis shards using the KCL

88

- **KinesisBackedBlockRDD**: Enables fault-tolerant recovery by re-reading data from Kinesis

89

- **SparkAWSCredentials**: Provides flexible AWS credential management

90

- **Checkpointing**: Uses DynamoDB for KCL state and Spark checkpointing for stream processing state

91

92

## Capabilities

93

94

### Stream Creation

95

96

Create Kinesis input streams with comprehensive configuration options including credentials, initial positions, and metrics.

97

98

```scala { .api }

99

object KinesisInputDStream {

100

def builder: Builder

101

def defaultMessageHandler: Record => Array[Byte]

102

}

103

104

class Builder {

105

def streamingContext(ssc: StreamingContext): Builder

106

def streamingContext(jssc: JavaStreamingContext): Builder

107

def streamName(streamName: String): Builder

108

def checkpointAppName(appName: String): Builder

109

def endpointUrl(url: String): Builder

110

def regionName(regionName: String): Builder

111

def initialPosition(initialPosition: KinesisInitialPosition): Builder

112

def checkpointInterval(interval: Duration): Builder

113

def storageLevel(storageLevel: StorageLevel): Builder

114

def kinesisCredentials(credentials: SparkAWSCredentials): Builder

115

def dynamoDBCredentials(credentials: SparkAWSCredentials): Builder

116

def cloudWatchCredentials(credentials: SparkAWSCredentials): Builder

117

def metricsLevel(metricsLevel: MetricsLevel): Builder

118

def metricsEnabledDimensions(metricsEnabledDimensions: Set[String]): Builder

119

def build(): KinesisInputDStream[Array[Byte]]

120

def buildWithMessageHandler[T: ClassTag](handler: Record => T): KinesisInputDStream[T]

121

}

122

```

123

124

[Stream Creation and Configuration](./stream-creation.md)

125

126

### Initial Position Management

127

128

Configure where to start reading from Kinesis streams with support for latest, earliest, and timestamp-based positioning.

129

130

```scala { .api }

131

object KinesisInitialPositions {

132

class Latest() extends KinesisInitialPosition

133

class TrimHorizon() extends KinesisInitialPosition

134

class AtTimestamp(timestamp: Date) extends KinesisInitialPosition

135

}

136

```

137

138

[Initial Position Configuration](./initial-positions.md)

139

140

### AWS Credentials Management

141

142

Flexible credential management supporting default provider chains, basic credentials, and STS assume role patterns.

143

144

```scala { .api }

145

object SparkAWSCredentials {

146

def builder: Builder

147

}

148

149

class Builder {

150

def basicCredentials(accessKeyId: String, secretKey: String): Builder

151

def stsCredentials(roleArn: String, sessionName: String): Builder

152

def build(): SparkAWSCredentials

153

}

154

```

155

156

[AWS Credentials Configuration](./aws-credentials.md)

157

158

### Python API

159

160

Python interface for creating Kinesis streams with simplified parameter handling.

161

162

```python { .api }

163

class KinesisUtils:

164

@staticmethod

165

def createStream(

166

ssc: StreamingContext,

167

kinesisAppName: str,

168

streamName: str,

169

endpointUrl: str,

170

regionName: str,

171

initialPositionInStream: int,

172

checkpointInterval: int,

173

**kwargs

174

) -> DStream

175

```

176

177

[Python API Usage](./python-api.md)

178

179

## Types

180

181

```scala { .api }

182

// Core interface types

183

trait KinesisInitialPosition {

184

def getPosition(): InitialPositionInStream

185

}

186

187

sealed trait SparkAWSCredentials extends Serializable {

188

def provider: AWSCredentialsProvider

189

}

190

191

// Concrete credential implementations

192

case object DefaultCredentials extends SparkAWSCredentials

193

194

case class BasicCredentials(

195

awsAccessKeyId: String,

196

awsSecretKey: String

197

) extends SparkAWSCredentials

198

199

case class STSCredentials(

200

stsRoleArn: String,

201

stsSessionName: String,

202

stsExternalId: Option[String] = None,

203

longLivedCreds: SparkAWSCredentials = DefaultCredentials

204

) extends SparkAWSCredentials

205

```