or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-org-apache-spark--spark-streaming-kinesis-asl-assembly

Apache Spark Streaming integration library for consuming data from Amazon Kinesis streams with fault-tolerant checkpointing and automatic shard management

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-streaming-kinesis-asl-assembly_2.11@1.6.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-spark--spark-streaming-kinesis-asl-assembly@1.6.0

0

# Spark Streaming Kinesis ASL Assembly

1

2

Apache Spark Streaming integration library for consuming data from Amazon Kinesis streams. Provides fault-tolerant, scalable stream processing with automatic checkpointing, shard management, and configurable parallelism through the Kinesis Client Library (KCL).

3

4

## Package Information

5

6

- **Package Name**: spark-streaming-kinesis-asl-assembly_2.11

7

- **Package Type**: maven

8

- **Language**: Scala/Java

9

- **Installation**:

10

```xml

11

<dependency>

12

<groupId>org.apache.spark</groupId>

13

<artifactId>spark-streaming-kinesis-asl-assembly_2.11</artifactId>

14

<version>1.6.2</version>

15

</dependency>

16

```

17

18

## Core Imports

19

20

### Scala API

21

22

```scala

23

import org.apache.spark.streaming.kinesis.KinesisUtils

24

import org.apache.spark.streaming.StreamingContext

25

import org.apache.spark.storage.StorageLevel

26

import org.apache.spark.streaming.Duration

27

import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream

28

import com.amazonaws.services.kinesis.model.Record

29

```

30

31

### Java API

32

33

```java

34

import org.apache.spark.streaming.kinesis.KinesisUtils;

35

import org.apache.spark.streaming.api.java.JavaStreamingContext;

36

import org.apache.spark.storage.StorageLevel;

37

import org.apache.spark.streaming.Duration;

38

import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;

39

```

40

41

## Basic Usage

42

43

### Scala Example

44

45

```scala

46

import org.apache.spark.SparkConf

47

import org.apache.spark.streaming.{Seconds, StreamingContext}

48

import org.apache.spark.streaming.kinesis.KinesisUtils

49

import org.apache.spark.storage.StorageLevel

50

import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream

51

52

val conf = new SparkConf().setAppName("KinesisExample")

53

val ssc = new StreamingContext(conf, Seconds(10))

54

55

// Create Kinesis stream

56

val kinesisStream = KinesisUtils.createStream(

57

ssc,

58

"myKinesisApp",

59

"myStreamName",

60

"https://kinesis.us-east-1.amazonaws.com",

61

"us-east-1",

62

InitialPositionInStream.LATEST,

63

Seconds(30),

64

StorageLevel.MEMORY_AND_DISK_2

65

)

66

67

// Process the stream

68

kinesisStream.map(new String(_)).print()

69

70

ssc.start()

71

ssc.awaitTermination()

72

```

73

74

### Java Example

75

76

```java

77

import org.apache.spark.SparkConf;

78

import org.apache.spark.streaming.api.java.JavaStreamingContext;

79

import org.apache.spark.streaming.Durations;

80

import org.apache.spark.streaming.kinesis.KinesisUtils;

81

import org.apache.spark.storage.StorageLevel;

82

import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;

83

84

SparkConf conf = new SparkConf().setAppName("JavaKinesisExample");

85

JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));

86

87

// Create Kinesis stream

88

JavaReceiverInputDStream<byte[]> kinesisStream = KinesisUtils.createStream(

89

jssc,

90

"myKinesisApp",

91

"myStreamName",

92

"https://kinesis.us-east-1.amazonaws.com",

93

"us-east-1",

94

InitialPositionInStream.LATEST,

95

Durations.seconds(30),

96

StorageLevel.MEMORY_AND_DISK_2()

97

);

98

99

// Process the stream

100

kinesisStream.map(bytes -> new String(bytes)).print();

101

102

jssc.start();

103

jssc.awaitTermination();

104

```

105

106

## Architecture

107

108

The Spark Streaming Kinesis integration is built around several key components:

109

110

- **KinesisUtils**: Main entry point providing factory methods for creating Kinesis input streams

111

- **Kinesis Client Library (KCL) Integration**: Uses AWS KCL for reliable stream consumption and checkpointing

112

- **Fault Tolerance**: Sequence number-based recovery allowing streams to recover from failures using stored metadata

113

- **Automatic Checkpointing**: DynamoDB-based checkpoint coordination for tracking stream progress

114

- **Multi-Shard Support**: Automatic parallelization across Kinesis shards with configurable processing

115

- **Credential Management**: Support for both default AWS credential providers and explicit credential specification

116

117

## Capabilities

118

119

### Stream Creation

120

121

Core functionality for creating Kinesis input streams with various configuration options including custom message handlers, credential specifications, and both Scala and Java APIs.

122

123

```scala { .api }

124

object KinesisUtils {

125

// Generic stream creation with custom message handler

126

def createStream[T: ClassTag](

127

ssc: StreamingContext,

128

kinesisAppName: String,

129

streamName: String,

130

endpointUrl: String,

131

regionName: String,

132

initialPositionInStream: InitialPositionInStream,

133

checkpointInterval: Duration,

134

storageLevel: StorageLevel,

135

messageHandler: Record => T

136

): ReceiverInputDStream[T]

137

138

// Default byte array stream creation

139

def createStream(

140

ssc: StreamingContext,

141

kinesisAppName: String,

142

streamName: String,

143

endpointUrl: String,

144

regionName: String,

145

initialPositionInStream: InitialPositionInStream,

146

checkpointInterval: Duration,

147

storageLevel: StorageLevel

148

): ReceiverInputDStream[Array[Byte]]

149

}

150

```

151

152

[Stream Creation](./stream-creation.md)

153

154

### Credential Management

155

156

Authentication and credential handling for AWS Kinesis access, supporting both default credential providers and explicit credential specification.

157

158

```scala { .api }

159

// Stream creation with explicit AWS credentials

160

def createStream[T: ClassTag](

161

ssc: StreamingContext,

162

kinesisAppName: String,

163

streamName: String,

164

endpointUrl: String,

165

regionName: String,

166

initialPositionInStream: InitialPositionInStream,

167

checkpointInterval: Duration,

168

storageLevel: StorageLevel,

169

messageHandler: Record => T,

170

awsAccessKeyId: String,

171

awsSecretKey: String

172

): ReceiverInputDStream[T]

173

```

174

175

[Credential Management](./credential-management.md)

176

177

### Java API Support

178

179

Complete Java API compatibility with functional interfaces and Java-friendly method signatures for integration with Java applications.

180

181

```java { .api }

182

// Java API for generic stream creation

183

public static <T> JavaReceiverInputDStream<T> createStream(

184

JavaStreamingContext jssc,

185

String kinesisAppName,

186

String streamName,

187

String endpointUrl,

188

String regionName,

189

InitialPositionInStream initialPositionInStream,

190

Duration checkpointInterval,

191

StorageLevel storageLevel,

192

Function<Record, T> messageHandler,

193

Class<T> recordClass

194

);

195

```

196

197

[Java API](./java-api.md)

198

199

### Fault Tolerance & Recovery

200

201

Built-in fault tolerance mechanisms using Kinesis sequence numbers for reliable stream processing and recovery from failures.

202

203

```scala { .api }

204

// Sequence number range for fault tolerance

205

case class SequenceNumberRange(

206

streamName: String,

207

shardId: String,

208

fromSeqNumber: String,

209

toSeqNumber: String

210

)

211

212

case class SequenceNumberRanges(ranges: Seq[SequenceNumberRange]) {

213

def isEmpty(): Boolean

214

def nonEmpty(): Boolean

215

}

216

```

217

218

[Fault Tolerance](./fault-tolerance.md)

219

220

## Core Types

221

222

```scala { .api }

223

// AWS credential wrapper for serialization

224

case class SerializableAWSCredentials(

225

accessKeyId: String,

226

secretKey: String

227

) extends AWSCredentials {

228

def getAWSAccessKeyId: String

229

def getAWSSecretKey: String

230

}

231

232

// Message handler function type

233

type MessageHandler[T] = Record => T

234

235

// Java function interface for message handling

236

import org.apache.spark.api.java.function.{Function => JFunction}

237

```

238

239

## Key Parameters

240

241

- **kinesisAppName**: Unique identifier for the Kinesis application used by KCL for DynamoDB coordination

242

- **streamName**: Name of the Kinesis stream to consume from

243

- **endpointUrl**: AWS Kinesis service endpoint (e.g., "https://kinesis.us-east-1.amazonaws.com")

244

- **regionName**: AWS region name for DynamoDB and CloudWatch operations

245

- **initialPositionInStream**: Starting position when no checkpoint exists (LATEST or TRIM_HORIZON)

246

- **checkpointInterval**: Frequency of checkpointing to DynamoDB

247

- **storageLevel**: Spark storage level for received data (recommended: MEMORY_AND_DISK_2)

248

- **messageHandler**: Function to transform Kinesis Record objects to desired output type