0
# Spark Streaming Kinesis ASL Assembly
1
2
Apache Spark Streaming integration library for consuming data from Amazon Kinesis streams. Provides fault-tolerant, scalable stream processing with automatic checkpointing, shard management, and configurable parallelism through the Kinesis Client Library (KCL).
3
4
## Package Information
5
6
- **Package Name**: spark-streaming-kinesis-asl-assembly_2.11
7
- **Package Type**: maven
8
- **Language**: Scala/Java
9
- **Installation**:
10
```xml
11
<dependency>
12
<groupId>org.apache.spark</groupId>
13
<artifactId>spark-streaming-kinesis-asl-assembly_2.11</artifactId>
14
<version>1.6.2</version>
15
</dependency>
16
```
17
18
## Core Imports
19
20
### Scala API
21
22
```scala
23
import org.apache.spark.streaming.kinesis.KinesisUtils
24
import org.apache.spark.streaming.StreamingContext
25
import org.apache.spark.storage.StorageLevel
26
import org.apache.spark.streaming.Duration
27
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
28
import com.amazonaws.services.kinesis.model.Record
29
```
30
31
### Java API
32
33
```java
34
import org.apache.spark.streaming.kinesis.KinesisUtils;
35
import org.apache.spark.streaming.api.java.JavaStreamingContext;
36
import org.apache.spark.storage.StorageLevel;
37
import org.apache.spark.streaming.Duration;
38
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
39
```
40
41
## Basic Usage
42
43
### Scala Example
44
45
```scala
46
import org.apache.spark.SparkConf
47
import org.apache.spark.streaming.{Seconds, StreamingContext}
48
import org.apache.spark.streaming.kinesis.KinesisUtils
49
import org.apache.spark.storage.StorageLevel
50
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
51
52
val conf = new SparkConf().setAppName("KinesisExample")
53
val ssc = new StreamingContext(conf, Seconds(10))
54
55
// Create Kinesis stream
56
val kinesisStream = KinesisUtils.createStream(
57
ssc,
58
"myKinesisApp",
59
"myStreamName",
60
"https://kinesis.us-east-1.amazonaws.com",
61
"us-east-1",
62
InitialPositionInStream.LATEST,
63
Seconds(30),
64
StorageLevel.MEMORY_AND_DISK_2
65
)
66
67
// Process the stream
68
kinesisStream.map(new String(_)).print()
69
70
ssc.start()
71
ssc.awaitTermination()
72
```
73
74
### Java Example
75
76
```java
77
import org.apache.spark.SparkConf;
78
import org.apache.spark.streaming.api.java.JavaStreamingContext;
79
import org.apache.spark.streaming.Durations;
80
import org.apache.spark.streaming.kinesis.KinesisUtils;
81
import org.apache.spark.storage.StorageLevel;
82
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
83
84
SparkConf conf = new SparkConf().setAppName("JavaKinesisExample");
85
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));
86
87
// Create Kinesis stream
88
JavaReceiverInputDStream<byte[]> kinesisStream = KinesisUtils.createStream(
89
jssc,
90
"myKinesisApp",
91
"myStreamName",
92
"https://kinesis.us-east-1.amazonaws.com",
93
"us-east-1",
94
InitialPositionInStream.LATEST,
95
Durations.seconds(30),
96
StorageLevel.MEMORY_AND_DISK_2()
97
);
98
99
// Process the stream
100
kinesisStream.map(bytes -> new String(bytes)).print();
101
102
jssc.start();
103
jssc.awaitTermination();
104
```
105
106
## Architecture
107
108
The Spark Streaming Kinesis integration is built around several key components:
109
110
- **KinesisUtils**: Main entry point providing factory methods for creating Kinesis input streams
111
- **Kinesis Client Library (KCL) Integration**: Uses AWS KCL for reliable stream consumption and checkpointing
112
- **Fault Tolerance**: Sequence number-based recovery allowing streams to recover from failures using stored metadata
113
- **Automatic Checkpointing**: DynamoDB-based checkpoint coordination for tracking stream progress
114
- **Multi-Shard Support**: Automatic parallelization across Kinesis shards with configurable processing
115
- **Credential Management**: Support for both default AWS credential providers and explicit credential specification
116
117
## Capabilities
118
119
### Stream Creation
120
121
Core functionality for creating Kinesis input streams with various configuration options including custom message handlers, credential specifications, and both Scala and Java APIs.
122
123
```scala { .api }
124
object KinesisUtils {
125
// Generic stream creation with custom message handler
126
def createStream[T: ClassTag](
127
ssc: StreamingContext,
128
kinesisAppName: String,
129
streamName: String,
130
endpointUrl: String,
131
regionName: String,
132
initialPositionInStream: InitialPositionInStream,
133
checkpointInterval: Duration,
134
storageLevel: StorageLevel,
135
messageHandler: Record => T
136
): ReceiverInputDStream[T]
137
138
// Default byte array stream creation
139
def createStream(
140
ssc: StreamingContext,
141
kinesisAppName: String,
142
streamName: String,
143
endpointUrl: String,
144
regionName: String,
145
initialPositionInStream: InitialPositionInStream,
146
checkpointInterval: Duration,
147
storageLevel: StorageLevel
148
): ReceiverInputDStream[Array[Byte]]
149
}
150
```
151
152
[Stream Creation](./stream-creation.md)
153
154
### Credential Management
155
156
Authentication and credential handling for AWS Kinesis access, supporting both default credential providers and explicit credential specification.
157
158
```scala { .api }
159
// Stream creation with explicit AWS credentials
160
def createStream[T: ClassTag](
161
ssc: StreamingContext,
162
kinesisAppName: String,
163
streamName: String,
164
endpointUrl: String,
165
regionName: String,
166
initialPositionInStream: InitialPositionInStream,
167
checkpointInterval: Duration,
168
storageLevel: StorageLevel,
169
messageHandler: Record => T,
170
awsAccessKeyId: String,
171
awsSecretKey: String
172
): ReceiverInputDStream[T]
173
```
174
175
[Credential Management](./credential-management.md)
176
177
### Java API Support
178
179
Complete Java API compatibility with functional interfaces and Java-friendly method signatures for integration with Java applications.
180
181
```java { .api }
182
// Java API for generic stream creation
183
public static <T> JavaReceiverInputDStream<T> createStream(
184
JavaStreamingContext jssc,
185
String kinesisAppName,
186
String streamName,
187
String endpointUrl,
188
String regionName,
189
InitialPositionInStream initialPositionInStream,
190
Duration checkpointInterval,
191
StorageLevel storageLevel,
192
Function<Record, T> messageHandler,
193
Class<T> recordClass
194
);
195
```
196
197
[Java API](./java-api.md)
198
199
### Fault Tolerance & Recovery
200
201
Built-in fault tolerance mechanisms using Kinesis sequence numbers for reliable stream processing and recovery from failures.
202
203
```scala { .api }
204
// Sequence number range for fault tolerance
205
case class SequenceNumberRange(
206
streamName: String,
207
shardId: String,
208
fromSeqNumber: String,
209
toSeqNumber: String
210
)
211
212
case class SequenceNumberRanges(ranges: Seq[SequenceNumberRange]) {
213
def isEmpty(): Boolean
214
def nonEmpty(): Boolean
215
}
216
```
217
218
[Fault Tolerance](./fault-tolerance.md)
219
220
## Core Types
221
222
```scala { .api }
223
// AWS credential wrapper for serialization
224
case class SerializableAWSCredentials(
225
accessKeyId: String,
226
secretKey: String
227
) extends AWSCredentials {
228
def getAWSAccessKeyId: String
229
def getAWSSecretKey: String
230
}
231
232
// Message handler function type
233
type MessageHandler[T] = Record => T
234
235
// Java function interface for message handling
236
import org.apache.spark.api.java.function.{Function => JFunction}
237
```
238
239
## Key Parameters
240
241
- **kinesisAppName**: Unique identifier for the Kinesis application used by KCL for DynamoDB coordination
242
- **streamName**: Name of the Kinesis stream to consume from
243
- **endpointUrl**: AWS Kinesis service endpoint (e.g., "https://kinesis.us-east-1.amazonaws.com")
244
- **regionName**: AWS region name for DynamoDB and CloudWatch operations
245
- **initialPositionInStream**: Starting position when no checkpoint exists (LATEST or TRIM_HORIZON)
246
- **checkpointInterval**: Frequency of checkpointing to DynamoDB
247
- **storageLevel**: Spark storage level for received data (recommended: MEMORY_AND_DISK_2)
248
- **messageHandler**: Function to transform Kinesis Record objects to desired output type