Spark Streaming integration with Amazon Kinesis for real-time data processing using the Kinesis Client Library
npx @tessl/cli install tessl/maven-org-apache-spark--spark-streaming-kinesis-asl-2-10@1.6.00
# Spark Streaming Kinesis ASL
1
2
Spark Streaming integration with Amazon Kinesis for real-time data processing using the Kinesis Client Library (KCL). This library enables Spark Streaming applications to consume data from Amazon Kinesis streams with automatic load-balancing, fault-tolerance, and checkpointing capabilities.
3
4
## Package Information
5
6
- **Package Name**: spark-streaming-kinesis-asl_2.10
7
- **Package Type**: maven
8
- **Language**: Scala/Java
9
- **Installation**: Add dependency to your Maven pom.xml or SBT build.sbt
10
11
### Maven Dependency
12
13
```xml
14
<dependency>
15
<groupId>org.apache.spark</groupId>
16
<artifactId>spark-streaming-kinesis-asl_2.10</artifactId>
17
<version>1.6.3</version>
18
</dependency>
19
```
20
21
### SBT Dependency
22
23
```scala
24
libraryDependencies += "org.apache.spark" %% "spark-streaming-kinesis-asl" % "1.6.3"
25
```
26
27
## Core Imports
28
29
### Scala
30
31
```scala
32
import org.apache.spark.streaming.kinesis._
33
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
34
import org.apache.spark.storage.StorageLevel
35
import org.apache.spark.streaming.{Duration, StreamingContext}
36
```
37
38
### Java
39
40
```java
41
import org.apache.spark.streaming.kinesis.KinesisUtils;
42
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
43
import org.apache.spark.storage.StorageLevel;
44
import org.apache.spark.streaming.Duration;
45
import org.apache.spark.streaming.api.java.JavaStreamingContext;
46
```
47
48
## Basic Usage
49
50
### Scala Example
51
52
```scala
53
import org.apache.spark.streaming.kinesis._
54
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
55
import org.apache.spark.storage.StorageLevel
56
import org.apache.spark.streaming.{Duration, StreamingContext}
57
58
// Create Spark Streaming context
59
val ssc = new StreamingContext(sparkContext, Duration.milliseconds(2000))
60
61
// Create Kinesis input stream
62
val kinesisStream = KinesisUtils.createStream(
63
ssc,
64
"MyKinesisApp", // KCL application name
65
"MyKinesisStream", // Kinesis stream name
66
"https://kinesis.us-east-1.amazonaws.com", // Endpoint URL
67
"us-east-1", // Region name
68
InitialPositionInStream.LATEST, // Starting position
69
Duration.milliseconds(2000), // Checkpoint interval
70
StorageLevel.MEMORY_AND_DISK_2 // Storage level
71
)
72
73
// Process the stream
74
kinesisStream.foreachRDD { rdd =>
75
rdd.foreach { byteArray =>
76
println(new String(byteArray))
77
}
78
}
79
80
ssc.start()
81
ssc.awaitTermination()
82
```
83
84
### Java Example
85
86
```java
87
import org.apache.spark.streaming.kinesis.KinesisUtils;
88
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
89
import org.apache.spark.storage.StorageLevel;
90
import org.apache.spark.streaming.Duration;
91
import org.apache.spark.streaming.api.java.JavaStreamingContext;
92
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
93
94
// Create Java Streaming context
95
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
96
97
// Create Kinesis input stream
98
JavaReceiverInputDStream<byte[]> kinesisStream = KinesisUtils.createStream(
99
jssc,
100
"MyKinesisApp", // KCL application name
101
"MyKinesisStream", // Kinesis stream name
102
"https://kinesis.us-east-1.amazonaws.com", // Endpoint URL
103
"us-east-1", // Region name
104
InitialPositionInStream.LATEST, // Starting position
105
new Duration(2000), // Checkpoint interval
106
StorageLevel.MEMORY_AND_DISK_2() // Storage level
107
);
108
109
// Process the stream
110
kinesisStream.foreachRDD(rdd -> {
111
rdd.foreach(byteArray -> {
112
System.out.println(new String(byteArray));
113
return null;
114
});
115
return null;
116
});
117
118
jssc.start();
119
jssc.awaitTermination();
120
```
121
122
## Architecture
123
124
The Spark Streaming Kinesis ASL integration is built around several key components:
125
126
- **KinesisUtils**: Factory object providing static methods to create Kinesis input streams
127
- **KinesisInputDStream**: Kinesis-specific implementation of ReceiverInputDStream with enhanced fault tolerance
128
- **KinesisReceiver**: Custom Spark Streaming Receiver that uses the Kinesis Client Library (KCL) Worker
129
- **KinesisBackedBlockRDD**: Specialized RDD that can re-read data from Kinesis using sequence number ranges
130
- **Checkpointing System**: Automatic checkpointing through DynamoDB for fault tolerance and exactly-once processing
131
- **Multi-shard Support**: Automatic distribution and load balancing across multiple Kinesis shards
132
133
## Capabilities
134
135
### Stream Creation
136
137
Primary factory methods for creating Kinesis input streams with various configuration options including custom message handlers, explicit AWS credentials, and different storage levels.
138
139
```scala { .api }
140
// Basic stream creation with default byte array handler
141
def createStream(
142
ssc: StreamingContext,
143
kinesisAppName: String,
144
streamName: String,
145
endpointUrl: String,
146
regionName: String,
147
initialPositionInStream: InitialPositionInStream,
148
checkpointInterval: Duration,
149
storageLevel: StorageLevel
150
): ReceiverInputDStream[Array[Byte]]
151
152
// Stream creation with custom message handler
153
def createStream[T: ClassTag](
154
ssc: StreamingContext,
155
kinesisAppName: String,
156
streamName: String,
157
endpointUrl: String,
158
regionName: String,
159
initialPositionInStream: InitialPositionInStream,
160
checkpointInterval: Duration,
161
storageLevel: StorageLevel,
162
messageHandler: Record => T
163
): ReceiverInputDStream[T]
164
```
165
166
[Stream Creation](./stream-creation.md)
167
168
### Data Processing and Message Handling
169
170
Advanced message processing capabilities including custom message handlers, type-safe transformations, and integration with Kinesis Record metadata such as partition keys and sequence numbers.
171
172
```scala { .api }
173
// Custom message handler function type
174
type MessageHandler[T] = com.amazonaws.services.kinesis.model.Record => T
175
176
// Default message handler for byte arrays
177
def defaultMessageHandler(record: Record): Array[Byte]
178
```
179
180
[Data Processing](./data-processing.md)
181
182
### Java API Integration
183
184
Complete Java API support with type-safe bindings, function interfaces, and seamless integration with Java Streaming contexts and data processing pipelines.
185
186
```java { .api }
187
// Java API stream creation
188
public static JavaReceiverInputDStream<byte[]> createStream(
189
JavaStreamingContext jssc,
190
String kinesisAppName,
191
String streamName,
192
String endpointUrl,
193
String regionName,
194
InitialPositionInStream initialPositionInStream,
195
Duration checkpointInterval,
196
StorageLevel storageLevel
197
);
198
199
// Java API with custom message handler
200
public static <T> JavaReceiverInputDStream<T> createStream(
201
JavaStreamingContext jssc,
202
String kinesisAppName,
203
String streamName,
204
String endpointUrl,
205
String regionName,
206
InitialPositionInStream initialPositionInStream,
207
Duration checkpointInterval,
208
StorageLevel storageLevel,
209
Function<Record, T> messageHandler,
210
Class<T> recordClass
211
);
212
```
213
214
[Java API](./java-api.md)
215
216
### AWS Authentication and Configuration
217
218
Flexible AWS authentication options including default credential provider chains, explicit credential specification, and IAM role integration for secure access to Kinesis streams.
219
220
```scala { .api }
221
// Stream creation with explicit AWS credentials
222
def createStream[T: ClassTag](
223
ssc: StreamingContext,
224
kinesisAppName: String,
225
streamName: String,
226
endpointUrl: String,
227
regionName: String,
228
initialPositionInStream: InitialPositionInStream,
229
checkpointInterval: Duration,
230
storageLevel: StorageLevel,
231
messageHandler: Record => T,
232
awsAccessKeyId: String,
233
awsSecretKey: String
234
): ReceiverInputDStream[T]
235
```
236
237
[AWS Configuration](./aws-configuration.md)
238
239
## Types
240
241
```scala { .api }
242
// AWS Credentials wrapper for serialization
243
case class SerializableAWSCredentials(
244
accessKeyId: String,
245
secretKey: String
246
) extends AWSCredentials
247
248
// Sequence number range for fault tolerance
249
case class SequenceNumberRange(
250
streamName: String,
251
shardId: String,
252
fromSeqNumber: String,
253
toSeqNumber: String
254
)
255
256
// Collection of sequence number ranges
257
case class SequenceNumberRanges(ranges: Seq[SequenceNumberRange]) {
258
def isEmpty(): Boolean
259
def nonEmpty(): Boolean
260
}
261
262
// External AWS KCL types (from com.amazonaws.services.kinesis.clientlibrary.lib.worker)
263
// InitialPositionInStream enum values:
264
// - InitialPositionInStream.LATEST: Start from most recent records
265
// - InitialPositionInStream.TRIM_HORIZON: Start from oldest available records (up to 24 hours)
266
```