Apache Spark Kinesis Integration providing real-time stream processing of data from Amazon Kinesis Data Streams
npx @tessl/cli install tessl/maven-org-apache-spark--spark-streaming-kinesis-asl_2-13@4.0.00
# Apache Spark Kinesis Integration
1
2
Apache Spark Kinesis Integration enables real-time stream processing of data from Amazon Kinesis Data Streams using Spark Streaming. It provides a receiver-based approach using the Amazon Kinesis Client Library (KCL) to consume data from Kinesis streams and transform it into Spark DStreams for distributed processing.
3
4
## Package Information
5
6
- **Package Name**: spark-streaming-kinesis-asl_2.13
7
- **Package Type**: maven
8
- **Group ID**: org.apache.spark
9
- **Language**: Scala
10
- **Installation**: Add Maven dependency:
11
12
```xml
13
<dependency>
14
<groupId>org.apache.spark</groupId>
15
<artifactId>spark-streaming-kinesis-asl_2.13</artifactId>
16
<version>4.0.0</version>
17
</dependency>
18
```
19
20
For SBT:
21
22
```scala
23
libraryDependencies += "org.apache.spark" %% "spark-streaming-kinesis-asl" % "4.0.0"
24
```
25
26
## Core Imports
27
28
```scala
29
import org.apache.spark.streaming.kinesis.KinesisInputDStream
30
import org.apache.spark.streaming.kinesis.{SparkAWSCredentials, DefaultCredentials, BasicCredentials, STSCredentials}
31
import org.apache.spark.streaming.kinesis.KinesisInitialPositions
32
import org.apache.spark.streaming.StreamingContext
33
import org.apache.spark.storage.StorageLevel
34
import com.amazonaws.services.kinesis.model.Record
35
```
36
37
## Basic Usage
38
39
```scala
40
import org.apache.spark.streaming.StreamingContext
41
import org.apache.spark.streaming.kinesis.{KinesisInputDStream, KinesisInitialPositions}
42
import org.apache.spark.streaming.Seconds
43
import org.apache.spark.SparkConf
44
import org.apache.spark.storage.StorageLevel
45
import com.amazonaws.services.kinesis.model.Record
46
47
val conf = new SparkConf().setAppName("KinesisExample")
48
val ssc = new StreamingContext(conf, Seconds(10))
49
50
// Create Kinesis DStream using the builder pattern
51
val kinesisStream = KinesisInputDStream.builder
52
.streamingContext(ssc)
53
.streamName("my-kinesis-stream")
54
.checkpointAppName("my-app")
55
.regionName("us-east-1")
56
.initialPosition(new KinesisInitialPositions.Latest())
57
.build()
58
59
// Process the stream
60
kinesisStream.foreachRDD { rdd =>
61
rdd.foreach { bytes =>
62
val data = new String(bytes)
63
println(s"Received: $data")
64
}
65
}
66
67
ssc.start()
68
ssc.awaitTermination()
69
```
70
71
## Architecture
72
73
The Spark Kinesis Integration is built around several key components:
74
75
- **KinesisInputDStream**: The main DStream implementation that creates Kinesis receivers
76
- **KinesisReceiver**: Manages KCL workers for consuming data from Kinesis shards
77
- **Builder Pattern**: Fluent API for configuring Kinesis streams with required and optional parameters
78
- **Credentials Management**: Flexible authentication supporting AWS default chain, basic credentials, and STS assume role
79
- **Fault Tolerance**: Uses KinesisBackedBlockRDD for recovery from Kinesis data when local blocks are lost
80
- **Checkpointing**: Automatic state management through DynamoDB for tracking consumption progress
81
82
## Capabilities
83
84
### Stream Creation
85
86
Core functionality for creating Kinesis DStreams with comprehensive configuration options including authentication, checkpointing, and performance tuning.
87
88
```scala { .api }
89
object KinesisInputDStream {
90
def builder: KinesisInputDStream.Builder
91
}
92
93
class Builder {
94
def streamingContext(ssc: StreamingContext): Builder
95
def streamName(streamName: String): Builder
96
def checkpointAppName(appName: String): Builder
97
def build(): KinesisInputDStream[Array[Byte]]
98
def buildWithMessageHandler[T: ClassTag](handler: Record => T): KinesisInputDStream[T]
99
}
100
```
101
102
[Stream Creation](./stream-creation.md)
103
104
### AWS Credentials Management
105
106
Authentication system supporting multiple credential providers including default AWS chain, basic access keys, and STS assume role for cross-account access.
107
108
```scala { .api }
109
sealed trait SparkAWSCredentials {
110
def provider: AWSCredentialsProvider
111
}
112
113
object SparkAWSCredentials {
114
def builder: SparkAWSCredentials.Builder
115
}
116
117
case object DefaultCredentials extends SparkAWSCredentials
118
case class BasicCredentials(awsAccessKeyId: String, awsSecretKey: String) extends SparkAWSCredentials
119
case class STSCredentials(stsRoleArn: String, stsSessionName: String, stsExternalId: Option[String], longLivedCreds: SparkAWSCredentials) extends SparkAWSCredentials
120
```
121
122
[AWS Credentials](./aws-credentials.md)
123
124
### Initial Position Configuration
125
126
Configuration system for specifying where to start reading from Kinesis streams, supporting latest records, oldest records, or specific timestamps.
127
128
```scala { .api }
129
interface KinesisInitialPosition {
130
def getPosition(): InitialPositionInStream
131
}
132
133
object KinesisInitialPositions {
134
class Latest extends KinesisInitialPosition
135
class TrimHorizon extends KinesisInitialPosition
136
class AtTimestamp(timestamp: Date) extends KinesisInitialPosition
137
}
138
```
139
140
[Initial Position](./initial-position.md)
141
142
### Configuration and Performance Tuning
143
144
Advanced configuration options for retry logic, timeouts, storage levels, metrics, and checkpointing intervals to optimize performance and reliability.
145
146
```scala { .api }
147
case class KinesisReadConfigurations(
148
maxRetries: Int,
149
retryWaitTimeMs: Long,
150
retryTimeoutMs: Long
151
)
152
153
object KinesisReadConfigurations {
154
def apply(): KinesisReadConfigurations
155
def apply(ssc: StreamingContext): KinesisReadConfigurations
156
}
157
```
158
159
[Configuration](./configuration.md)
160
161
### Python Integration
162
163
Internal utilities for PySpark integration providing Python-friendly interfaces to the Scala/Java APIs.
164
165
```scala { .api }
166
private class KinesisUtilsPythonHelper {
167
def createStream(
168
jssc: JavaStreamingContext,
169
kinesisAppName: String,
170
streamName: String,
171
endpointUrl: String,
172
regionName: String,
173
initialPositionInStream: Int,
174
checkpointInterval: Duration,
175
metricsLevel: Int,
176
storageLevel: StorageLevel,
177
awsAccessKeyId: String,
178
awsSecretKey: String,
179
stsAssumeRoleArn: String,
180
stsSessionName: String,
181
stsExternalId: String
182
): JavaReceiverInputDStream[Array[Byte]]
183
}
184
```
185
186
[Python Integration](./python-integration.md)
187
188
## Types
189
190
```scala { .api }
191
// Core DStream type
192
class KinesisInputDStream[T: ClassTag] extends ReceiverInputDStream[T]
193
194
// Sequence number tracking for fault tolerance
195
case class SequenceNumberRange(
196
streamName: String,
197
shardId: String,
198
fromSeqNumber: String,
199
toSeqNumber: String,
200
recordCount: Int
201
)
202
203
case class SequenceNumberRanges(ranges: Seq[SequenceNumberRange]) {
204
def isEmpty: Boolean
205
def nonEmpty: Boolean
206
}
207
208
// Fault tolerant RDD implementation
209
class KinesisBackedBlockRDD[T: ClassTag](
210
sc: SparkContext,
211
regionName: String,
212
endpointUrl: String,
213
blockIds: Array[BlockId],
214
seqNumRanges: Array[SequenceNumberRanges],
215
isBlockIdValid: Array[Boolean],
216
messageHandler: Record => T,
217
kinesisCreds: SparkAWSCredentials,
218
kinesisReadConfigs: KinesisReadConfigurations
219
) extends BlockRDD[T]
220
```