0
# Apache Spark Streaming Kinesis Connector
1
2
Apache Spark Streaming integration with Amazon Kinesis for real-time processing of streaming data at massive scale. This connector provides a Kinesis receiver that creates input DStreams using the Amazon Kinesis Client Library (KCL), enabling load-balancing, fault-tolerance, and checkpointing capabilities through Workers, Checkpoints, and Shard Leases.
3
4
## Package Information
5
6
- **Package Name**: spark-streaming-kinesis-asl_2.12
7
- **Package Type**: maven
8
- **Language**: Scala
9
- **Installation**:
10
```xml
11
<dependency>
12
<groupId>org.apache.spark</groupId>
13
<artifactId>spark-streaming-kinesis-asl_2.12</artifactId>
14
<version>3.5.6</version>
15
</dependency>
16
```
17
18
## Core Imports
19
20
```scala
21
import org.apache.spark.streaming.kinesis.KinesisInputDStream
22
import org.apache.spark.streaming.kinesis.KinesisInitialPositions
23
import org.apache.spark.streaming.kinesis.SparkAWSCredentials
24
import org.apache.spark.streaming.StreamingContext
25
import org.apache.spark.streaming.api.java.JavaStreamingContext
26
import org.apache.spark.storage.StorageLevel
27
import org.apache.spark.streaming.Duration
28
import com.amazonaws.services.kinesis.model.Record
29
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
30
import com.amazonaws.auth.AWSCredentialsProvider
31
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
32
import scala.reflect.ClassTag
33
```
34
35
For Java:
36
```java
37
import org.apache.spark.streaming.kinesis.KinesisInputDStream;
38
import org.apache.spark.streaming.kinesis.KinesisInitialPositions;
39
import org.apache.spark.streaming.kinesis.SparkAWSCredentials;
40
import org.apache.spark.streaming.StreamingContext;
41
import org.apache.spark.streaming.api.java.JavaStreamingContext;
42
import org.apache.spark.storage.StorageLevel;
43
import org.apache.spark.streaming.Duration;
44
import com.amazonaws.services.kinesis.model.Record;
45
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
46
```
47
48
For Python:
49
```python
50
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
51
```
52
53
## Basic Usage
54
55
```scala
56
import org.apache.spark.streaming.{Seconds, StreamingContext}
57
import org.apache.spark.streaming.kinesis.KinesisInputDStream
58
import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest
59
60
val ssc = new StreamingContext(sparkConf, Seconds(2))
61
62
// Create Kinesis DStream using builder pattern
63
val kinesisStream = KinesisInputDStream.builder
64
.streamingContext(ssc)
65
.streamName("myKinesisStream")
66
.checkpointAppName("myKinesisApp")
67
.regionName("us-east-1")
68
.initialPosition(new Latest())
69
.build()
70
71
// Process the stream
72
kinesisStream.map(new String(_))
73
.flatMap(_.split(" "))
74
.map((_, 1))
75
.reduceByKey(_ + _)
76
.print()
77
78
ssc.start()
79
ssc.awaitTermination()
80
```
81
82
## Architecture
83
84
The Spark Kinesis connector uses the following key components:
85
86
- **KinesisInputDStream**: The main DStream implementation that creates Kinesis receivers
87
- **KinesisReceiver**: Receives data from Kinesis shards using the KCL
88
- **KinesisBackedBlockRDD**: Enables fault-tolerant recovery by re-reading data from Kinesis
89
- **SparkAWSCredentials**: Provides flexible AWS credential management
90
- **Checkpointing**: Uses DynamoDB for KCL state and Spark checkpointing for stream processing state
91
92
## Capabilities
93
94
### Stream Creation
95
96
Create Kinesis input streams with comprehensive configuration options including credentials, initial positions, and metrics.
97
98
```scala { .api }
99
object KinesisInputDStream {
100
def builder: Builder
101
def defaultMessageHandler: Record => Array[Byte]
102
}
103
104
class Builder {
105
def streamingContext(ssc: StreamingContext): Builder
106
def streamingContext(jssc: JavaStreamingContext): Builder
107
def streamName(streamName: String): Builder
108
def checkpointAppName(appName: String): Builder
109
def endpointUrl(url: String): Builder
110
def regionName(regionName: String): Builder
111
def initialPosition(initialPosition: KinesisInitialPosition): Builder
112
def checkpointInterval(interval: Duration): Builder
113
def storageLevel(storageLevel: StorageLevel): Builder
114
def kinesisCredentials(credentials: SparkAWSCredentials): Builder
115
def dynamoDBCredentials(credentials: SparkAWSCredentials): Builder
116
def cloudWatchCredentials(credentials: SparkAWSCredentials): Builder
117
def metricsLevel(metricsLevel: MetricsLevel): Builder
118
def metricsEnabledDimensions(metricsEnabledDimensions: Set[String]): Builder
119
def build(): KinesisInputDStream[Array[Byte]]
120
def buildWithMessageHandler[T: ClassTag](handler: Record => T): KinesisInputDStream[T]
121
}
122
```
123
124
[Stream Creation and Configuration](./stream-creation.md)
125
126
### Initial Position Management
127
128
Configure where to start reading from Kinesis streams with support for latest, earliest, and timestamp-based positioning.
129
130
```scala { .api }
131
object KinesisInitialPositions {
132
class Latest() extends KinesisInitialPosition
133
class TrimHorizon() extends KinesisInitialPosition
134
class AtTimestamp(timestamp: Date) extends KinesisInitialPosition
135
}
136
```
137
138
[Initial Position Configuration](./initial-positions.md)
139
140
### AWS Credentials Management
141
142
Flexible credential management supporting default provider chains, basic credentials, and STS assume role patterns.
143
144
```scala { .api }
145
object SparkAWSCredentials {
146
def builder: Builder
147
}
148
149
class Builder {
150
def basicCredentials(accessKeyId: String, secretKey: String): Builder
151
def stsCredentials(roleArn: String, sessionName: String): Builder
152
def build(): SparkAWSCredentials
153
}
154
```
155
156
[AWS Credentials Configuration](./aws-credentials.md)
157
158
### Python API
159
160
Python interface for creating Kinesis streams with simplified parameter handling.
161
162
```python { .api }
163
class KinesisUtils:
164
@staticmethod
165
def createStream(
166
ssc: StreamingContext,
167
kinesisAppName: str,
168
streamName: str,
169
endpointUrl: str,
170
regionName: str,
171
initialPositionInStream: int,
172
checkpointInterval: int,
173
**kwargs
174
) -> DStream
175
```
176
177
[Python API Usage](./python-api.md)
178
179
## Types
180
181
```scala { .api }
182
// Core interface types
183
trait KinesisInitialPosition {
184
def getPosition(): InitialPositionInStream
185
}
186
187
sealed trait SparkAWSCredentials extends Serializable {
188
def provider: AWSCredentialsProvider
189
}
190
191
// Concrete credential implementations
192
case object DefaultCredentials extends SparkAWSCredentials
193
194
case class BasicCredentials(
195
awsAccessKeyId: String,
196
awsSecretKey: String
197
) extends SparkAWSCredentials
198
199
case class STSCredentials(
200
stsRoleArn: String,
201
stsSessionName: String,
202
stsExternalId: Option[String] = None,
203
longLivedCreds: SparkAWSCredentials = DefaultCredentials
204
) extends SparkAWSCredentials
205
```