Flexible credential management supporting default provider chains, basic credentials, and STS assume role patterns for accessing Kinesis, DynamoDB, and CloudWatch services.
package org.apache.spark.streaming.kinesis
sealed trait SparkAWSCredentials extends Serializable {
def provider: AWSCredentialsProvider
}
case object DefaultCredentials extends SparkAWSCredentials
case class BasicCredentials(
awsAccessKeyId: String,
awsSecretKey: String
) extends SparkAWSCredentials
case class STSCredentials(
stsRoleArn: String,
stsSessionName: String,
stsExternalId: Option[String] = None,
longLivedCreds: SparkAWSCredentials = DefaultCredentials
) extends SparkAWSCredentialsobject SparkAWSCredentials {
def builder: Builder
}
class Builder {
def basicCredentials(accessKeyId: String, secretKey: String): Builder
def stsCredentials(roleArn: String, sessionName: String): Builder
def stsCredentials(roleArn: String, sessionName: String, externalId: String): Builder
def build(): SparkAWSCredentials
}Uses the default AWS credentials provider chain, which checks credentials in this order:
AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)aws.accessKeyId, aws.secretKey)~/.aws/credentials)import org.apache.spark.streaming.kinesis.{DefaultCredentials, KinesisInputDStream}
val stream = KinesisInputDStream.builder
.streamingContext(ssc)
.streamName("my-stream")
.checkpointAppName("my-app")
.kinesisCredentials(DefaultCredentials)
.build()Use Case: Recommended for production environments with proper IAM roles and policies.
Use explicit AWS access key ID and secret access key.
import org.apache.spark.streaming.kinesis.SparkAWSCredentials
val credentials = SparkAWSCredentials.builder
.basicCredentials("AKIAIOSFODNN7EXAMPLE", "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
.build()
val stream = KinesisInputDStream.builder
.streamingContext(ssc)
.streamName("my-stream")
.checkpointAppName("my-app")
.kinesisCredentials(credentials)
.build()Security Warning: Basic credentials will be saved in DStream checkpoints if checkpointing is enabled. Ensure your checkpoint directory is secure.
Use AWS Security Token Service (STS) to assume an IAM role for temporary credentials.
import org.apache.spark.streaming.kinesis.SparkAWSCredentials
val credentials = SparkAWSCredentials.builder
.stsCredentials(
roleArn = "arn:aws:iam::123456789012:role/KinesisAccessRole",
sessionName = "spark-kinesis-session"
)
.build()
val stream = KinesisInputDStream.builder
.streamingContext(ssc)
.streamName("my-stream")
.checkpointAppName("my-app")
.kinesisCredentials(credentials)
.build()Use external ID for additional security when crossing account boundaries.
val credentials = SparkAWSCredentials.builder
.stsCredentials(
roleArn = "arn:aws:iam::123456789012:role/CrossAccountKinesisRole",
sessionName = "spark-kinesis-session",
externalId = "unique-external-identifier"
)
.build()Combine STS with basic credentials for the initial authentication.
val credentials = SparkAWSCredentials.builder
.basicCredentials("AKIAIOSFODNN7EXAMPLE", "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
.stsCredentials(
roleArn = "arn:aws:iam::123456789012:role/KinesisAccessRole",
sessionName = "spark-kinesis-session"
)
.build()Configure different credentials for different AWS services.
val kinesisCredentials = SparkAWSCredentials.builder
.basicCredentials("kinesis-access-key", "kinesis-secret-key")
.build()
val dynamoCredentials = SparkAWSCredentials.builder
.stsCredentials("arn:aws:iam::123456789012:role/DynamoDBRole", "dynamo-session")
.build()
val cloudWatchCredentials = SparkAWSCredentials.builder
.basicCredentials("cloudwatch-access-key", "cloudwatch-secret-key")
.build()
val stream = KinesisInputDStream.builder
.streamingContext(ssc)
.streamName("my-stream")
.checkpointAppName("my-app")
.kinesisCredentials(kinesisCredentials)
.dynamoDBCredentials(dynamoCredentials)
.cloudWatchCredentials(cloudWatchCredentials)
.build()Default Behavior: If not specified, DynamoDB and CloudWatch credentials default to the same credentials used for Kinesis.
import org.apache.spark.streaming.kinesis.SparkAWSCredentials;
// Default credentials
SparkAWSCredentials defaultCreds = DefaultCredentials.MODULE$;
// Basic credentials
SparkAWSCredentials basicCreds = SparkAWSCredentials.builder()
.basicCredentials("access-key", "secret-key")
.build();
// STS credentials
SparkAWSCredentials stsCreds = SparkAWSCredentials.builder()
.stsCredentials("arn:aws:iam::123456789012:role/MyRole", "my-session")
.build();
// Use with stream builder
KinesisInputDStream<byte[]> stream = KinesisInputDStream.builder()
.streamingContext(jssc)
.streamName("my-stream")
.checkpointAppName("my-app")
.kinesisCredentials(basicCreds)
.build();import org.apache.spark.streaming.kinesis.BasicCredentials
// This will fall back to DefaultCredentials if invalid
val credentials = BasicCredentials(null, "secret-key") // Invalid: null access keyWhen BasicCredentials cannot construct valid credentials, it logs a warning and falls back to DefaultCredentials.
// All STS parameters must be provided together or all must be null
val credentials = SparkAWSCredentials.builder
.stsCredentials("arn:aws:iam::123456789012:role/MyRole", null) // Invalid: missing session name
.build() // Throws IllegalArgumentExceptionPrefer IAM roles over hardcoded credentials:
// Good: Uses IAM instance profile or task role
val stream = KinesisInputDStream.builder
.streamingContext(ssc)
.streamName("my-stream")
.checkpointAppName("my-app")
// No explicit credentials - uses default provider chain
.build()Create IAM policies with minimal required permissions:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kinesis:DescribeStream",
"kinesis:GetShardIterator",
"kinesis:GetRecords",
"kinesis:ListShards"
],
"Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream"
},
{
"Effect": "Allow",
"Action": [
"dynamodb:CreateTable",
"dynamodb:DescribeTable",
"dynamodb:GetItem",
"dynamodb:PutItem",
"dynamodb:UpdateItem",
"dynamodb:DeleteItem"
],
"Resource": "arn:aws:dynamodb:us-east-1:123456789012:table/my-app"
}
]
}When using basic credentials, ensure checkpoint directories are secure:
// Ensure checkpoint directory has restricted access
ssc.checkpoint("s3://secure-bucket/checkpoints/")Use STS credentials for automatic credential rotation:
val credentials = SparkAWSCredentials.builder
.stsCredentials(
roleArn = "arn:aws:iam::123456789012:role/KinesisRole",
sessionName = "spark-session-" + System.currentTimeMillis()
)
.build()