0
# Spark Streaming Kinesis ASL
1
2
Spark Streaming Kinesis ASL provides seamless integration between Apache Spark Streaming and Amazon Kinesis Data Streams, enabling real-time processing of streaming data from Kinesis. It offers a comprehensive Kinesis receiver implementation that handles AWS credentials, checkpointing, metrics collection, and fault tolerance with configurable initial positions, automatic scaling based on Kinesis shard count, and efficient data processing through Spark's distributed computing capabilities.
3
4
## Package Information
5
6
- **Package Name**: spark-streaming-kinesis-asl_2.13
7
- **Package Type**: Maven
8
- **Language**: Scala/Java
9
- **Group ID**: org.apache.spark
10
- **Version**: 3.5.6
11
- **Installation**:
12
```xml
13
<dependency>
14
<groupId>org.apache.spark</groupId>
15
<artifactId>spark-streaming-kinesis-asl_2.13</artifactId>
16
<version>3.5.6</version>
17
</dependency>
18
```
19
20
## Core Imports
21
22
**Scala:**
23
```scala
24
import org.apache.spark.streaming.kinesis.KinesisInputDStream
25
import org.apache.spark.streaming.kinesis.{KinesisInitialPositions, SparkAWSCredentials}
26
import org.apache.spark.streaming.StreamingContext
27
```
28
29
**Java:**
30
```java
31
import org.apache.spark.streaming.kinesis.KinesisInputDStream;
32
import org.apache.spark.streaming.kinesis.KinesisInitialPositions;
33
import org.apache.spark.streaming.kinesis.SparkAWSCredentials;
34
import org.apache.spark.streaming.api.java.JavaStreamingContext;
35
import org.apache.spark.streaming.api.java.JavaDStream;
36
import org.apache.spark.storage.StorageLevel;
37
import org.apache.spark.SparkConf;
38
import org.apache.spark.streaming.Duration;
39
import scala.reflect.ClassTag$;
40
```
41
42
**Python:**
43
```python
44
from pyspark.streaming import StreamingContext
45
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream, MetricsLevel
46
from pyspark.storagelevel import StorageLevel
47
```
48
49
## Basic Usage
50
51
**Scala:**
52
```scala
53
import org.apache.spark.streaming.kinesis.KinesisInputDStream
54
import org.apache.spark.streaming.kinesis.KinesisInitialPositions
55
import org.apache.spark.streaming.{StreamingContext, Duration}
56
import org.apache.spark.SparkConf
57
import org.apache.spark.storage.StorageLevel
58
59
val conf = new SparkConf().setAppName("KinesisExample")
60
val ssc = new StreamingContext(conf, Duration.seconds(10))
61
62
val kinesisStream = KinesisInputDStream.builder
63
.streamingContext(ssc)
64
.streamName("my-kinesis-stream")
65
.checkpointAppName("my-spark-app")
66
.regionName("us-east-1")
67
.initialPosition(new KinesisInitialPositions.Latest())
68
.build()
69
70
kinesisStream.foreachRDD { rdd =>
71
val records = rdd.collect()
72
records.foreach(record => println(new String(record)))
73
}
74
75
ssc.start()
76
ssc.awaitTermination()
77
```
78
79
**Java:**
80
```java
81
import org.apache.spark.streaming.kinesis.KinesisInputDStream;
82
import org.apache.spark.streaming.kinesis.KinesisInitialPositions;
83
import org.apache.spark.streaming.api.java.JavaStreamingContext;
84
import org.apache.spark.streaming.api.java.JavaDStream;
85
import org.apache.spark.streaming.Duration;
86
import org.apache.spark.SparkConf;
87
import org.apache.spark.storage.StorageLevel;
88
89
SparkConf conf = new SparkConf().setAppName("JavaKinesisExample");
90
JavaStreamingContext jssc = new JavaStreamingContext(conf, new Duration(10000));
91
92
JavaDStream<byte[]> kinesisStream = JavaDStream.fromDStream(
93
KinesisInputDStream.builder()
94
.streamingContext(jssc)
95
.streamName("my-kinesis-stream")
96
.checkpointAppName("my-spark-app")
97
.regionName("us-east-1")
98
.initialPosition(new KinesisInitialPositions.Latest())
99
.build(),
100
ClassTag$.MODULE$.apply(byte[].class)
101
);
102
103
kinesisStream.foreachRDD(rdd -> {
104
List<byte[]> records = rdd.collect();
105
records.forEach(record -> System.out.println(new String(record)));
106
});
107
108
jssc.start();
109
jssc.awaitTermination();
110
```
111
112
**Python:**
113
```python
114
from pyspark import SparkContext, SparkConf
115
from pyspark.streaming import StreamingContext
116
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
117
118
conf = SparkConf().setAppName("PythonKinesisExample")
119
sc = SparkContext(conf=conf)
120
ssc = StreamingContext(sc, 10)
121
122
kinesisStream = KinesisUtils.createStream(
123
ssc,
124
kinesisAppName="my-spark-app",
125
streamName="my-kinesis-stream",
126
endpointUrl="https://kinesis.us-east-1.amazonaws.com",
127
regionName="us-east-1",
128
initialPositionInStream=InitialPositionInStream.LATEST,
129
checkpointInterval=10
130
)
131
132
def process_rdd(rdd):
133
records = rdd.collect()
134
for record in records:
135
if record:
136
print(record)
137
138
kinesisStream.foreachRDD(process_rdd)
139
140
ssc.start()
141
ssc.awaitTermination()
142
```
143
144
## Architecture
145
146
The Spark Streaming Kinesis ASL integration is built around several key components:
147
148
- **KinesisInputDStream**: The main DStream class that extends Spark's ReceiverInputDStream to handle Kinesis data
149
- **KinesisReceiver**: Uses Amazon's Kinesis Client Library (KCL) Worker to process data from multiple shards
150
- **Builder Pattern**: Fluent API for configuring Kinesis DStreams with sensible defaults
151
- **Fault Tolerance**: KinesisBackedBlockRDD enables recovery from failures using Kinesis sequence numbers
152
- **Checkpointing**: DynamoDB-based state management for tracking processing progress
153
154
## Capabilities
155
156
### Stream Creation and Configuration
157
158
Create and configure Kinesis DStreams with builder pattern for both Scala and Java APIs.
159
160
```scala { .api }
161
object KinesisInputDStream {
162
def builder: Builder
163
164
class Builder {
165
def streamingContext(ssc: StreamingContext): Builder
166
def streamingContext(jssc: JavaStreamingContext): Builder
167
def streamName(streamName: String): Builder
168
def checkpointAppName(appName: String): Builder
169
def endpointUrl(url: String): Builder
170
def regionName(regionName: String): Builder
171
def initialPosition(initialPosition: KinesisInitialPosition): Builder
172
@deprecated("use initialPosition(initialPosition: KinesisInitialPosition)", "2.3.0")
173
def initialPositionInStream(initialPosition: InitialPositionInStream): Builder
174
def checkpointInterval(interval: Duration): Builder
175
def storageLevel(storageLevel: StorageLevel): Builder
176
def kinesisCredentials(credentials: SparkAWSCredentials): Builder
177
def dynamoDBCredentials(credentials: SparkAWSCredentials): Builder
178
def cloudWatchCredentials(credentials: SparkAWSCredentials): Builder
179
def metricsLevel(metricsLevel: MetricsLevel): Builder
180
def metricsEnabledDimensions(dimensions: Set[String]): Builder
181
def build(): KinesisInputDStream[Array[Byte]]
182
def buildWithMessageHandler[T: ClassTag](handler: Record => T): KinesisInputDStream[T]
183
}
184
}
185
```
186
187
### Initial Position Configuration
188
189
Configure where in the Kinesis stream to start reading data from - latest records, oldest available records, or from a specific timestamp.
190
191
```java { .api }
192
// Interface for initial position
193
interface KinesisInitialPosition {
194
InitialPositionInStream getPosition();
195
}
196
197
// Position implementations
198
class KinesisInitialPositions {
199
static class Latest implements KinesisInitialPosition
200
static class TrimHorizon implements KinesisInitialPosition
201
static class AtTimestamp implements KinesisInitialPosition {
202
public AtTimestamp(Date timestamp)
203
public Date getTimestamp()
204
}
205
}
206
```
207
208
### AWS Credentials Management
209
210
Handle AWS authentication through multiple credential providers including basic keys, STS assume role, and default AWS credential chains.
211
212
```scala { .api }
213
sealed trait SparkAWSCredentials {
214
def provider: AWSCredentialsProvider
215
}
216
217
case object DefaultCredentials extends SparkAWSCredentials
218
case class BasicCredentials(awsAccessKeyId: String, awsSecretKey: String) extends SparkAWSCredentials
219
case class STSCredentials(
220
stsRoleArn: String,
221
stsSessionName: String,
222
stsExternalId: Option[String] = None,
223
longLivedCreds: SparkAWSCredentials = DefaultCredentials
224
) extends SparkAWSCredentials
225
226
object SparkAWSCredentials {
227
def builder: Builder
228
229
class Builder {
230
def basicCredentials(accessKeyId: String, secretKey: String): Builder
231
def stsCredentials(roleArn: String, sessionName: String): Builder
232
def stsCredentials(roleArn: String, sessionName: String, externalId: String): Builder
233
def build(): SparkAWSCredentials
234
}
235
}
236
```
237
238
### Python API
239
240
Python integration through KinesisUtils class providing a simpler interface for Kinesis streaming.
241
242
```python { .api }
243
class KinesisUtils:
244
@staticmethod
245
def createStream(
246
ssc: StreamingContext,
247
kinesisAppName: str,
248
streamName: str,
249
endpointUrl: str,
250
regionName: str,
251
initialPositionInStream: int,
252
checkpointInterval: int,
253
metricsLevel: int = MetricsLevel.DETAILED,
254
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_2,
255
awsAccessKeyId: Optional[str] = None,
256
awsSecretKey: Optional[str] = None,
257
decoder: Callable[[Optional[bytes]], T] = utf8_decoder,
258
stsAssumeRoleArn: Optional[str] = None,
259
stsSessionName: Optional[str] = None,
260
stsExternalId: Optional[str] = None
261
) -> DStream[T]
262
```
263
264
```python { .api }
265
class InitialPositionInStream:
266
"""Constants for initial position in Kinesis stream"""
267
LATEST = 0
268
TRIM_HORIZON = 1
269
270
class MetricsLevel:
271
"""Constants for CloudWatch metrics level"""
272
DETAILED = 0
273
SUMMARY = 1
274
NONE = 2
275
276
def utf8_decoder(s: Optional[bytes]) -> Optional[str]:
277
"""Default decoder function for Kinesis records"""
278
if s is None:
279
return None
280
return s.decode("utf-8")
281
```
282
283
### Advanced Configuration
284
285
Configure retry behavior, timeout settings, and CloudWatch metrics for production deployments.
286
287
```scala { .api }
288
// Kinesis read configuration
289
case class KinesisReadConfigurations(
290
maxRetries: Int,
291
retryWaitTimeMs: Long,
292
retryTimeoutMs: Long
293
)
294
295
object KinesisReadConfigurations {
296
def apply(): KinesisReadConfigurations
297
def apply(ssc: StreamingContext): KinesisReadConfigurations
298
299
val RETRY_MAX_ATTEMPTS_KEY: String = "spark.streaming.kinesis.retry.maxAttempts"
300
val RETRY_WAIT_TIME_KEY: String = "spark.streaming.kinesis.retry.waitTime"
301
val DEFAULT_MAX_RETRIES: Int = 3
302
val DEFAULT_RETRY_WAIT_TIME: String = "100ms"
303
val DEFAULT_RETRY_TIMEOUT: Long = 10000
304
}
305
306
```
307
308
## Types
309
310
```scala { .api }
311
// Core DStream type
312
class KinesisInputDStream[T: ClassTag](
313
_ssc: StreamingContext,
314
streamName: String,
315
endpointUrl: String,
316
regionName: String,
317
initialPosition: KinesisInitialPosition,
318
checkpointAppName: String,
319
checkpointInterval: Duration,
320
_storageLevel: StorageLevel,
321
messageHandler: Record => T,
322
kinesisCreds: SparkAWSCredentials,
323
dynamoDBCreds: Option[SparkAWSCredentials],
324
cloudWatchCreds: Option[SparkAWSCredentials],
325
metricsLevel: MetricsLevel,
326
metricsEnabledDimensions: Set[String]
327
) extends ReceiverInputDStream[T]
328
329
// Message handler function type
330
type MessageHandler[T] = Record => T
331
332
// Default constants
333
object KinesisInputDStream {
334
val DEFAULT_KINESIS_ENDPOINT_URL: String = "https://kinesis.us-east-1.amazonaws.com"
335
val DEFAULT_KINESIS_REGION_NAME: String = "us-east-1"
336
val DEFAULT_INITIAL_POSITION: KinesisInitialPosition = new Latest()
337
val DEFAULT_STORAGE_LEVEL: StorageLevel = StorageLevel.MEMORY_AND_DISK_2
338
val DEFAULT_METRICS_LEVEL: MetricsLevel = KinesisClientLibConfiguration.DEFAULT_METRICS_LEVEL
339
val DEFAULT_METRICS_ENABLED_DIMENSIONS: Set[String] = KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet
340
}
341
```