Apache Spark Streaming integration with Amazon Kinesis for real-time data processing
npx @tessl/cli install tessl/maven-org-apache-spark--spark-streaming-kinesis-asl-2-12@2.4.00
# Spark Streaming Kinesis ASL
1
2
Apache Spark Streaming integration with Amazon Kinesis Data Streams for real-time processing of streaming data at massive scale. It provides a receiver-based approach using the Amazon Kinesis Client Library (KCL) to create input DStreams with built-in load balancing, fault tolerance, and checkpointing capabilities.
3
4
## Package Information
5
6
- **Package Name**: spark-streaming-kinesis-asl_2.12
7
- **Package Type**: maven
8
- **Language**: Scala (with Java support)
9
- **Group ID**: org.apache.spark
10
- **Installation**: Add to `pom.xml` or `build.sbt` with Amazon Software License terms
11
12
**Maven dependency:**
13
```xml
14
<dependency>
15
<groupId>org.apache.spark</groupId>
16
<artifactId>spark-streaming-kinesis-asl_2.12</artifactId>
17
<version>2.4.8</version>
18
</dependency>
19
```
20
21
**SBT dependency:**
22
```scala
23
libraryDependencies += "org.apache.spark" %% "spark-streaming-kinesis-asl" % "2.4.8"
24
```
25
26
## Core Imports
27
28
```scala
29
import org.apache.spark.streaming.kinesis.KinesisInputDStream
30
import org.apache.spark.streaming.kinesis.KinesisInitialPositions._
31
import org.apache.spark.streaming.kinesis.SparkAWSCredentials
32
```
33
34
Legacy imports (deprecated):
35
```scala
36
import org.apache.spark.streaming.kinesis.KinesisUtils
37
```
38
39
## Basic Usage
40
41
```scala
42
import org.apache.spark.streaming.StreamingContext
43
import org.apache.spark.streaming.kinesis.KinesisInputDStream
44
import org.apache.spark.streaming.kinesis.KinesisInitialPositions._
45
import org.apache.spark.streaming.{Seconds, Duration}
46
import org.apache.spark.storage.StorageLevel
47
48
// Create streaming context
49
val ssc = new StreamingContext(sparkConf, Seconds(10))
50
51
// Create Kinesis input stream using Builder pattern (recommended)
52
val kinesisStream = KinesisInputDStream.builder
53
.streamingContext(ssc)
54
.streamName("my-kinesis-stream")
55
.endpointUrl("https://kinesis.us-west-2.amazonaws.com")
56
.regionName("us-west-2")
57
.initialPosition(new Latest())
58
.checkpointAppName("my-spark-app")
59
.checkpointInterval(Duration(30000))
60
.storageLevel(StorageLevel.MEMORY_AND_DISK_2)
61
.build()
62
63
// Process the stream
64
kinesisStream.foreachRDD { rdd =>
65
if (!rdd.isEmpty()) {
66
// Process each record as Array[Byte]
67
rdd.collect().foreach { record =>
68
val data = new String(record, "UTF-8")
69
println(s"Received: $data")
70
}
71
}
72
}
73
74
// Start the streaming context
75
ssc.start()
76
ssc.awaitTermination()
77
```
78
79
## Architecture
80
81
The Spark Kinesis ASL integration consists of several key components:
82
83
- **KinesisInputDStream**: Core DStream implementation that creates Kinesis-backed RDDs
84
- **KinesisReceiver**: Receiver implementation using Amazon KCL for consuming data
85
- **Builder Pattern**: Modern configuration API for flexible stream creation
86
- **AWS Credentials**: Pluggable credential providers supporting various authentication methods
87
- **Checkpointing**: Automatic state management through DynamoDB for fault tolerance
88
- **Recovery**: Sequence number-based recovery mechanism for exactly-once processing
89
90
## Capabilities
91
92
### Stream Creation
93
94
Primary interface for creating Kinesis input streams with full configuration options using the modern Builder pattern.
95
96
```scala { .api }
97
object KinesisInputDStream {
98
def builder: Builder
99
100
class Builder {
101
def streamingContext(ssc: StreamingContext): Builder
102
def streamName(streamName: String): Builder
103
def checkpointAppName(appName: String): Builder
104
def build(): KinesisInputDStream[Array[Byte]]
105
def buildWithMessageHandler[T: ClassTag](handler: Record => T): KinesisInputDStream[T]
106
}
107
}
108
```
109
110
[Stream Creation](./stream-creation.md)
111
112
### AWS Credentials Management
113
114
Flexible credential providers supporting default AWS credentials, basic key/secret authentication, and STS role assumption.
115
116
```scala { .api }
117
sealed trait SparkAWSCredentials {
118
def provider: AWSCredentialsProvider
119
}
120
121
object SparkAWSCredentials {
122
def builder: Builder
123
124
class Builder {
125
def basicCredentials(accessKeyId: String, secretKey: String): Builder
126
def stsCredentials(roleArn: String, sessionName: String): Builder
127
def build(): SparkAWSCredentials
128
}
129
}
130
131
case object DefaultCredentials extends SparkAWSCredentials
132
case class BasicCredentials(awsAccessKeyId: String, awsSecretKey: String) extends SparkAWSCredentials
133
case class STSCredentials(stsRoleArn: String, stsSessionName: String, stsExternalId: Option[String]) extends SparkAWSCredentials
134
```
135
136
[AWS Credentials](./aws-credentials.md)
137
138
### Initial Stream Positioning
139
140
Configuration for specifying where to start reading from Kinesis streams, supporting latest, trim horizon, and timestamp-based positioning.
141
142
```scala { .api }
143
// Java classes for initial position specification
144
class Latest extends KinesisInitialPosition
145
class TrimHorizon extends KinesisInitialPosition
146
class AtTimestamp extends KinesisInitialPosition {
147
def AtTimestamp(timestamp: java.util.Date)
148
def getTimestamp(): java.util.Date
149
}
150
```
151
152
[Stream Positioning](./stream-positioning.md)
153
154
### Configuration Management
155
156
Advanced configuration options for controlling retry behavior, timeouts, and performance tuning.
157
158
```scala { .api }
159
case class KinesisReadConfigurations(
160
maxRetries: Int,
161
retryWaitTimeMs: Long,
162
retryTimeoutMs: Long
163
)
164
165
object KinesisReadConfigurations {
166
def apply(): KinesisReadConfigurations
167
def apply(ssc: StreamingContext): KinesisReadConfigurations
168
169
val RETRY_MAX_ATTEMPTS_KEY: String
170
val RETRY_WAIT_TIME_KEY: String
171
val DEFAULT_MAX_RETRIES: Int
172
val DEFAULT_RETRY_WAIT_TIME: String
173
val DEFAULT_RETRY_TIMEOUT: Long
174
}
175
```
176
177
[Configuration](./configuration.md)
178
179
### Testing Utilities
180
181
Utilities for testing Kinesis integration including stream creation, data generation, and cleanup operations.
182
183
```scala { .api }
184
class KinesisTestUtils(streamShardCount: Int = 2) {
185
def streamName: String
186
def createStream(): Unit
187
def pushData(testData: Seq[Int], aggregate: Boolean): Map[String, Seq[(Int, String)]]
188
def deleteStream(): Unit
189
}
190
191
object KinesisTestUtils {
192
val shouldRunTests: Boolean
193
val endpointUrl: String
194
def isAWSCredentialsPresent: Boolean
195
}
196
```
197
198
[Testing Utilities](./testing.md)
199
200
## Error Handling
201
202
The library can throw the following exceptions:
203
204
- `IllegalArgumentException` - For invalid configuration parameters (region names, credentials, etc.)
205
- `UnsupportedOperationException` - For unsupported initial position types in legacy APIs
206
- AWS SDK exceptions - For authentication failures, network issues, and service errors
207
- KCL exceptions - For Kinesis-specific operational errors during stream processing
208
209
### Legacy KinesisUtils (Deprecated)
210
211
Legacy factory methods for creating Kinesis streams (deprecated since Spark 2.2.0, still functional).
212
213
```scala { .api }
214
@deprecated("Use KinesisInputDStream.builder instead", "2.2.0")
215
object KinesisUtils {
216
def createStream[T: ClassTag](
217
ssc: StreamingContext,
218
kinesisAppName: String,
219
streamName: String,
220
endpointUrl: String,
221
regionName: String,
222
initialPositionInStream: InitialPositionInStream,
223
checkpointInterval: Duration,
224
storageLevel: StorageLevel,
225
messageHandler: Record => T
226
): ReceiverInputDStream[T]
227
228
def createStream[T: ClassTag](
229
ssc: StreamingContext,
230
kinesisAppName: String,
231
streamName: String,
232
endpointUrl: String,
233
regionName: String,
234
initialPositionInStream: InitialPositionInStream,
235
checkpointInterval: Duration,
236
storageLevel: StorageLevel
237
): ReceiverInputDStream[Array[Byte]]
238
}
239
```
240
241
## Migration Notes
242
243
The legacy `KinesisUtils` factory methods are deprecated since Spark 2.2.0. Use `KinesisInputDStream.builder` for new development:
244
245
```scala
246
// Deprecated (still functional)
247
KinesisUtils.createStream(ssc, appName, streamName, endpointUrl, regionName, ...)
248
249
// Recommended
250
KinesisInputDStream.builder
251
.streamingContext(ssc)
252
.streamName(streamName)
253
.checkpointAppName(appName)
254
.endpointUrl(endpointUrl)
255
.regionName(regionName)
256
.build()
257
```