Spark Streaming integration with Amazon Kinesis for real-time data processing using the Kinesis Client Library
—
Primary factory methods for creating Kinesis input streams with various configuration options including custom message handlers, explicit AWS credentials, and different storage levels.
Creates a Kinesis input stream using default AWS credential discovery and byte array message handler.
/**
* Create an input stream that pulls messages from a Kinesis stream using the KCL.
* Uses DefaultAWSCredentialsProviderChain for AWS authentication.
*
* @param ssc StreamingContext object
* @param kinesisAppName Kinesis application name used by the KCL to update DynamoDB
* @param streamName Kinesis stream name
* @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
* @param regionName Name of region used by the KCL for DynamoDB and CloudWatch
* @param initialPositionInStream Starting position in stream (TRIM_HORIZON or LATEST)
* @param checkpointInterval Checkpoint interval for Kinesis checkpointing
* @param storageLevel Storage level for received objects (MEMORY_AND_DISK_2 recommended)
* @return ReceiverInputDStream[Array[Byte]] containing raw message data
*/
def createStream(
ssc: StreamingContext,
kinesisAppName: String,
streamName: String,
endpointUrl: String,
regionName: String,
initialPositionInStream: InitialPositionInStream,
checkpointInterval: Duration,
storageLevel: StorageLevel
): ReceiverInputDStream[Array[Byte]]Usage Example:
import org.apache.spark.streaming.kinesis._
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.Duration
val stream = KinesisUtils.createStream(
ssc,
"MySparkKinesisApp",
"my-kinesis-stream",
"https://kinesis.us-east-1.amazonaws.com",
"us-east-1",
InitialPositionInStream.LATEST,
Duration.milliseconds(2000),
StorageLevel.MEMORY_AND_DISK_2
)Creates a Kinesis input stream with explicitly provided AWS credentials.
/**
* Create an input stream with explicit AWS credentials.
* Note: Credentials will be saved in DStream checkpoints if checkpointing is enabled.
*
* @param ssc StreamingContext object
* @param kinesisAppName Kinesis application name used by the KCL to update DynamoDB
* @param streamName Kinesis stream name
* @param endpointUrl Url of Kinesis service
* @param regionName Name of region used by the KCL for DynamoDB and CloudWatch
* @param initialPositionInStream Starting position in stream
* @param checkpointInterval Checkpoint interval for Kinesis checkpointing
* @param storageLevel Storage level for received objects
* @param awsAccessKeyId AWS AccessKeyId (if null, uses DefaultAWSCredentialsProviderChain)
* @param awsSecretKey AWS SecretKey (if null, uses DefaultAWSCredentialsProviderChain)
* @return ReceiverInputDStream[Array[Byte]] containing raw message data
*/
def createStream(
ssc: StreamingContext,
kinesisAppName: String,
streamName: String,
endpointUrl: String,
regionName: String,
initialPositionInStream: InitialPositionInStream,
checkpointInterval: Duration,
storageLevel: StorageLevel,
awsAccessKeyId: String,
awsSecretKey: String
): ReceiverInputDStream[Array[Byte]]Usage Example:
val stream = KinesisUtils.createStream(
ssc,
"MySparkKinesisApp",
"my-kinesis-stream",
"https://kinesis.us-east-1.amazonaws.com",
"us-east-1",
InitialPositionInStream.LATEST,
Duration.milliseconds(2000),
StorageLevel.MEMORY_AND_DISK_2,
"AKIAIOSFODNN7EXAMPLE",
"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
)Creates a typed Kinesis input stream with a custom message handler function.
/**
* Create an input stream with a custom message handler for type-safe data processing.
*
* @param ssc StreamingContext object
* @param kinesisAppName Kinesis application name used by the KCL to update DynamoDB
* @param streamName Kinesis stream name
* @param endpointUrl Url of Kinesis service
* @param regionName Name of region used by the KCL for DynamoDB and CloudWatch
* @param initialPositionInStream Starting position in stream
* @param checkpointInterval Checkpoint interval for Kinesis checkpointing
* @param storageLevel Storage level for received objects
* @param messageHandler Custom function to process Kinesis Records into type T
* @return ReceiverInputDStream[T] containing processed data
*/
def createStream[T: ClassTag](
ssc: StreamingContext,
kinesisAppName: String,
streamName: String,
endpointUrl: String,
regionName: String,
initialPositionInStream: InitialPositionInStream,
checkpointInterval: Duration,
storageLevel: StorageLevel,
messageHandler: Record => T
): ReceiverInputDStream[T]Usage Example:
import com.amazonaws.services.kinesis.model.Record
import spray.json._
case class MyEvent(id: String, timestamp: Long, data: String)
// Custom message handler to parse JSON events
def parseMyEvent(record: Record): MyEvent = {
val data = new String(record.getData.array())
data.parseJson.convertTo[MyEvent]
}
val stream = KinesisUtils.createStream[MyEvent](
ssc,
"MySparkKinesisApp",
"my-events-stream",
"https://kinesis.us-east-1.amazonaws.com",
"us-east-1",
InitialPositionInStream.LATEST,
Duration.milliseconds(2000),
StorageLevel.MEMORY_AND_DISK_2,
parseMyEvent
)Creates a typed Kinesis input stream with both custom message handler and explicit AWS credentials.
/**
* Create an input stream with custom message handler and explicit AWS credentials.
*
* @param ssc StreamingContext object
* @param kinesisAppName Kinesis application name used by the KCL to update DynamoDB
* @param streamName Kinesis stream name
* @param endpointUrl Url of Kinesis service
* @param regionName Name of region used by the KCL for DynamoDB and CloudWatch
* @param initialPositionInStream Starting position in stream
* @param checkpointInterval Checkpoint interval for Kinesis checkpointing
* @param storageLevel Storage level for received objects
* @param messageHandler Custom function to process Kinesis Records into type T
* @param awsAccessKeyId AWS AccessKeyId
* @param awsSecretKey AWS SecretKey
* @return ReceiverInputDStream[T] containing processed data
*/
def createStream[T: ClassTag](
ssc: StreamingContext,
kinesisAppName: String,
streamName: String,
endpointUrl: String,
regionName: String,
initialPositionInStream: InitialPositionInStream,
checkpointInterval: Duration,
storageLevel: StorageLevel,
messageHandler: Record => T,
awsAccessKeyId: String,
awsSecretKey: String
): ReceiverInputDStream[T]Simplified stream creation method (deprecated since version 1.4.0).
/**
* Create an input stream using app name from SparkConf and region from endpoint.
* @deprecated use other forms of createStream
*
* @param ssc StreamingContext object
* @param streamName Kinesis stream name
* @param endpointUrl Endpoint url of Kinesis service
* @param checkpointInterval Checkpoint interval for Kinesis checkpointing
* @param initialPositionInStream Starting position in stream
* @param storageLevel Storage level for received objects
* @return ReceiverInputDStream[Array[Byte]] containing raw message data
*/
@deprecated("use other forms of createStream", "1.4.0")
def createStream(
ssc: StreamingContext,
streamName: String,
endpointUrl: String,
checkpointInterval: Duration,
initialPositionInStream: InitialPositionInStream,
storageLevel: StorageLevel
): ReceiverInputDStream[Array[Byte]]StorageLevel.MEMORY_AND_DISK_2 is recommended for fault toleranceInitialPositionInStream.LATEST: Start from the most recent recordsInitialPositionInStream.TRIM_HORIZON: Start from the oldest available records (up to 24 hours)Common error scenarios and handling:
IllegalArgumentExceptionInstall with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-streaming-kinesis-asl-2-10