Apache Spark Streaming integration library for consuming data from Amazon Kinesis streams with fault-tolerant checkpointing and automatic shard management
—
AWS credential handling for secure access to Kinesis streams, DynamoDB checkpointing, and CloudWatch metrics. Supports both automatic credential discovery and explicit credential specification.
The recommended approach uses AWS DefaultAWSCredentialsProviderChain which automatically discovers credentials from:
All stream creation methods without explicit credentials use this automatic discovery.
For applications requiring specific credential control, all createStream methods have overloaded versions accepting explicit AWS credentials.
Security Note: Explicit credentials are stored in DStream checkpoints. Ensure checkpoint directories are properly secured.
def createStream[T: ClassTag](
ssc: StreamingContext,
kinesisAppName: String,
streamName: String,
endpointUrl: String,
regionName: String,
initialPositionInStream: InitialPositionInStream,
checkpointInterval: Duration,
storageLevel: StorageLevel,
messageHandler: Record => T,
awsAccessKeyId: String,
awsSecretKey: String
): ReceiverInputDStream[T]Usage Example:
val credentialedStream = KinesisUtils.createStream[String](
ssc,
"secure-app",
"private-stream",
"https://kinesis.us-east-1.amazonaws.com",
"us-east-1",
InitialPositionInStream.LATEST,
Seconds(30),
StorageLevel.MEMORY_AND_DISK_2,
record => new String(record.getData.array()),
"AKIAIOSFODNN7EXAMPLE",
"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
)def createStream(
ssc: StreamingContext,
kinesisAppName: String,
streamName: String,
endpointUrl: String,
regionName: String,
initialPositionInStream: InitialPositionInStream,
checkpointInterval: Duration,
storageLevel: StorageLevel,
awsAccessKeyId: String,
awsSecretKey: String
): ReceiverInputDStream[Array[Byte]]Usage Example:
val secureByteStream = KinesisUtils.createStream(
ssc,
"secure-byte-processor",
"encrypted-data-stream",
"https://kinesis.us-west-2.amazonaws.com",
"us-west-2",
InitialPositionInStream.TRIM_HORIZON,
Seconds(60),
StorageLevel.MEMORY_AND_DISK_2,
sys.env("AWS_ACCESS_KEY_ID"),
sys.env("AWS_SECRET_ACCESS_KEY")
)public static <T> JavaReceiverInputDStream<T> createStream(
JavaStreamingContext jssc,
String kinesisAppName,
String streamName,
String endpointUrl,
String regionName,
InitialPositionInStream initialPositionInStream,
Duration checkpointInterval,
StorageLevel storageLevel,
Function<Record, T> messageHandler,
Class<T> recordClass,
String awsAccessKeyId,
String awsSecretKey
);Usage Example:
import org.apache.spark.api.java.function.Function;
Function<Record, String> handler = record ->
new String(record.getData().array());
JavaReceiverInputDStream<String> secureStream = KinesisUtils.createStream(
jssc,
"java-secure-app",
"confidential-stream",
"https://kinesis.eu-west-1.amazonaws.com",
"eu-west-1",
InitialPositionInStream.LATEST,
Durations.seconds(30),
StorageLevel.MEMORY_AND_DISK_2(),
handler,
String.class,
System.getenv("AWS_ACCESS_KEY_ID"),
System.getenv("AWS_SECRET_ACCESS_KEY")
);public static JavaReceiverInputDStream<byte[]> createStream(
JavaStreamingContext jssc,
String kinesisAppName,
String streamName,
String endpointUrl,
String regionName,
InitialPositionInStream initialPositionInStream,
Duration checkpointInterval,
StorageLevel storageLevel,
String awsAccessKeyId,
String awsSecretKey
);Usage Example:
JavaReceiverInputDStream<byte[]> secureByteStream = KinesisUtils.createStream(
jssc,
"java-secure-bytes",
"secure-binary-stream",
"https://kinesis.ap-northeast-1.amazonaws.com",
"ap-northeast-1",
InitialPositionInStream.TRIM_HORIZON,
Durations.seconds(45),
StorageLevel.MEMORY_AND_DISK_2(),
System.getProperty("aws.accessKeyId"),
System.getProperty("aws.secretKey")
);Internal credential wrapper for secure serialization in distributed environments.
case class SerializableAWSCredentials(
accessKeyId: String,
secretKey: String
) extends AWSCredentials {
override def getAWSAccessKeyId: String = accessKeyId
override def getAWSSecretKey: String = secretKey
}This class wraps AWS credentials for safe serialization when distributing stream processing tasks across Spark workers.
val accessKey = sys.env.getOrElse("AWS_ACCESS_KEY_ID",
throw new IllegalArgumentException("AWS_ACCESS_KEY_ID not set"))
val secretKey = sys.env.getOrElse("AWS_SECRET_ACCESS_KEY",
throw new IllegalArgumentException("AWS_SECRET_ACCESS_KEY not set"))// Set secure checkpoint directory
ssc.checkpoint("hdfs://secure-cluster/checkpoints/kinesis-app")// Prefer this approach - uses automatic credential discovery
val stream = KinesisUtils.createStream(ssc, ...) // No explicit credentialsFor applications using explicit credentials:
When accessing Kinesis streams in different regions than your Spark cluster:
// Ensure credentials have cross-region permissions
val crossRegionStream = KinesisUtils.createStream(
ssc,
"cross-region-app",
"remote-stream",
"https://kinesis.eu-central-1.amazonaws.com", // Different region
"eu-central-1",
InitialPositionInStream.LATEST,
Seconds(30),
StorageLevel.MEMORY_AND_DISK_2,
explicitAccessKey,
explicitSecretKey
)Ensure credentials have the following minimum permissions:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kinesis:DescribeStream",
"kinesis:GetShardIterator",
"kinesis:GetRecords",
"kinesis:ListStreams"
],
"Resource": "arn:aws:kinesis:*:*:stream/your-stream-name"
}
]
}{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"dynamodb:CreateTable",
"dynamodb:DescribeTable",
"dynamodb:GetItem",
"dynamodb:PutItem",
"dynamodb:UpdateItem",
"dynamodb:Scan"
],
"Resource": "arn:aws:dynamodb:*:*:table/your-kinesis-app-name"
}
]
}{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"cloudwatch:PutMetricData"
],
"Resource": "*"
}
]
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-streaming-kinesis-asl-assembly