Apache Spark Streaming integration library for consuming data from Amazon Kinesis streams with fault-tolerant checkpointing and automatic shard management
—
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pending
The risk profile of this skill
Apache Spark Streaming integration library for consuming data from Amazon Kinesis streams. Provides fault-tolerant, scalable stream processing with automatic checkpointing, shard management, and configurable parallelism through the Kinesis Client Library (KCL).
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kinesis-asl-assembly_2.11</artifactId>
<version>1.6.2</version>
</dependency>import org.apache.spark.streaming.kinesis.KinesisUtils
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.Duration
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import com.amazonaws.services.kinesis.model.Recordimport org.apache.spark.streaming.kinesis.KinesisUtils;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Duration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kinesis.KinesisUtils
import org.apache.spark.storage.StorageLevel
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
val conf = new SparkConf().setAppName("KinesisExample")
val ssc = new StreamingContext(conf, Seconds(10))
// Create Kinesis stream
val kinesisStream = KinesisUtils.createStream(
ssc,
"myKinesisApp",
"myStreamName",
"https://kinesis.us-east-1.amazonaws.com",
"us-east-1",
InitialPositionInStream.LATEST,
Seconds(30),
StorageLevel.MEMORY_AND_DISK_2
)
// Process the stream
kinesisStream.map(new String(_)).print()
ssc.start()
ssc.awaitTermination()import org.apache.spark.SparkConf;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.kinesis.KinesisUtils;
import org.apache.spark.storage.StorageLevel;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
SparkConf conf = new SparkConf().setAppName("JavaKinesisExample");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));
// Create Kinesis stream
JavaReceiverInputDStream<byte[]> kinesisStream = KinesisUtils.createStream(
jssc,
"myKinesisApp",
"myStreamName",
"https://kinesis.us-east-1.amazonaws.com",
"us-east-1",
InitialPositionInStream.LATEST,
Durations.seconds(30),
StorageLevel.MEMORY_AND_DISK_2()
);
// Process the stream
kinesisStream.map(bytes -> new String(bytes)).print();
jssc.start();
jssc.awaitTermination();The Spark Streaming Kinesis integration is built around several key components:
Core functionality for creating Kinesis input streams with various configuration options including custom message handlers, credential specifications, and both Scala and Java APIs.
object KinesisUtils {
// Generic stream creation with custom message handler
def createStream[T: ClassTag](
ssc: StreamingContext,
kinesisAppName: String,
streamName: String,
endpointUrl: String,
regionName: String,
initialPositionInStream: InitialPositionInStream,
checkpointInterval: Duration,
storageLevel: StorageLevel,
messageHandler: Record => T
): ReceiverInputDStream[T]
// Default byte array stream creation
def createStream(
ssc: StreamingContext,
kinesisAppName: String,
streamName: String,
endpointUrl: String,
regionName: String,
initialPositionInStream: InitialPositionInStream,
checkpointInterval: Duration,
storageLevel: StorageLevel
): ReceiverInputDStream[Array[Byte]]
}Authentication and credential handling for AWS Kinesis access, supporting both default credential providers and explicit credential specification.
// Stream creation with explicit AWS credentials
def createStream[T: ClassTag](
ssc: StreamingContext,
kinesisAppName: String,
streamName: String,
endpointUrl: String,
regionName: String,
initialPositionInStream: InitialPositionInStream,
checkpointInterval: Duration,
storageLevel: StorageLevel,
messageHandler: Record => T,
awsAccessKeyId: String,
awsSecretKey: String
): ReceiverInputDStream[T]Complete Java API compatibility with functional interfaces and Java-friendly method signatures for integration with Java applications.
// Java API for generic stream creation
public static <T> JavaReceiverInputDStream<T> createStream(
JavaStreamingContext jssc,
String kinesisAppName,
String streamName,
String endpointUrl,
String regionName,
InitialPositionInStream initialPositionInStream,
Duration checkpointInterval,
StorageLevel storageLevel,
Function<Record, T> messageHandler,
Class<T> recordClass
);Built-in fault tolerance mechanisms using Kinesis sequence numbers for reliable stream processing and recovery from failures.
// Sequence number range for fault tolerance
case class SequenceNumberRange(
streamName: String,
shardId: String,
fromSeqNumber: String,
toSeqNumber: String
)
case class SequenceNumberRanges(ranges: Seq[SequenceNumberRange]) {
def isEmpty(): Boolean
def nonEmpty(): Boolean
}// AWS credential wrapper for serialization
case class SerializableAWSCredentials(
accessKeyId: String,
secretKey: String
) extends AWSCredentials {
def getAWSAccessKeyId: String
def getAWSSecretKey: String
}
// Message handler function type
type MessageHandler[T] = Record => T
// Java function interface for message handling
import org.apache.spark.api.java.function.{Function => JFunction}