The AWS credentials system provides flexible authentication for accessing Kinesis, DynamoDB, and CloudWatch services. It supports multiple authentication methods including default provider chains, basic access keys, and STS assume role for cross-account access.
The base interface for all credential providers.
sealed trait SparkAWSCredentials extends Serializable {
def provider: AWSCredentialsProvider
}// Uses AWS default credential provider chain
case object DefaultCredentials extends SparkAWSCredentials {
def provider: AWSCredentialsProvider
}
// Uses basic AWS access key and secret key
case class BasicCredentials(
awsAccessKeyId: String,
awsSecretKey: String
) extends SparkAWSCredentials {
def provider: AWSCredentialsProvider
}
// Uses STS assume role for temporary credentials
case class STSCredentials(
stsRoleArn: String,
stsSessionName: String,
stsExternalId: Option[String] = None,
longLivedCreds: SparkAWSCredentials = DefaultCredentials
) extends SparkAWSCredentials {
def provider: AWSCredentialsProvider
}object SparkAWSCredentials {
def builder: SparkAWSCredentials.Builder
}
class Builder {
def basicCredentials(accessKeyId: String, secretKey: String): Builder
def stsCredentials(roleArn: String, sessionName: String): Builder
def stsCredentials(roleArn: String, sessionName: String, externalId: String): Builder
def build(): SparkAWSCredentials
}Uses the AWS default credential provider chain, which checks credentials in this order:
AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)aws.accessKeyId, aws.secretKey)~/.aws/credentials)import org.apache.spark.streaming.kinesis.{DefaultCredentials, KinesisInputDStream}
val stream = KinesisInputDStream.builder
.streamingContext(ssc)
.streamName("my-stream")
.checkpointAppName("my-app")
.kinesisCredentials(DefaultCredentials) // Explicit, but this is the default
.build()Or using the builder:
val credentials = SparkAWSCredentials.builder.build() // Creates DefaultCredentials
val stream = KinesisInputDStream.builder
.kinesisCredentials(credentials)
// ... other configuration
.build()Uses explicit AWS access key ID and secret access key. Warning: These credentials will be saved in DStream checkpoints, so ensure your checkpoint directory is secure.
import org.apache.spark.streaming.kinesis.BasicCredentials
val credentials = BasicCredentials("AKIAIOSFODNN7EXAMPLE", "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
val stream = KinesisInputDStream.builder
.streamingContext(ssc)
.streamName("my-stream")
.checkpointAppName("my-app")
.kinesisCredentials(credentials)
.build()Using the builder pattern (recommended):
val credentials = SparkAWSCredentials.builder
.basicCredentials("AKIAIOSFODNN7EXAMPLE", "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
.build()
val stream = KinesisInputDStream.builder
.kinesisCredentials(credentials)
// ... other configuration
.build()BasicCredentials will fall back to the default provider chain if the provided credentials are invalid:
// If credentials are null or invalid, falls back to DefaultCredentials
val credentials = BasicCredentials(null, "invalid")
// This will log a warning and use DefaultCredentials insteadUses AWS Security Token Service (STS) to assume an IAM role for temporary credentials. This is useful for cross-account access or enhanced security.
import org.apache.spark.streaming.kinesis.STSCredentials
val credentials = STSCredentials(
stsRoleArn = "arn:aws:iam::123456789012:role/KinesisAccessRole",
stsSessionName = "spark-kinesis-session"
)
val stream = KinesisInputDStream.builder
.kinesisCredentials(credentials)
// ... other configuration
.build()For roles that require an external ID for additional security:
val credentials = STSCredentials(
stsRoleArn = "arn:aws:iam::123456789012:role/CrossAccountRole",
stsSessionName = "spark-session",
stsExternalId = Some("unique-external-id")
)Specify different long-lived credentials for assuming the role:
val longLivedCreds = BasicCredentials("access-key", "secret-key")
val stsCredentials = STSCredentials(
stsRoleArn = "arn:aws:iam::123456789012:role/KinesisRole",
stsSessionName = "my-session",
longLivedCreds = longLivedCreds
)// Basic STS
val credentials = SparkAWSCredentials.builder
.stsCredentials("arn:aws:iam::123456789012:role/KinesisRole", "my-session")
.build()
// STS with external ID
val credentialsWithExternalId = SparkAWSCredentials.builder
.basicCredentials("access-key", "secret-key") // Long-lived credentials
.stsCredentials("arn:aws:iam::123456789012:role/CrossAccountRole", "session", "external-id")
.build()You can configure different credentials for different AWS services:
val kinesisCredentials = SparkAWSCredentials.builder
.basicCredentials("kinesis-access-key", "kinesis-secret")
.build()
val dynamoCredentials = SparkAWSCredentials.builder
.stsCredentials("arn:aws:iam::123456789012:role/DynamoRole", "dynamo-session")
.build()
val stream = KinesisInputDStream.builder
.streamingContext(ssc)
.streamName("my-stream")
.checkpointAppName("my-app")
.kinesisCredentials(kinesisCredentials) // For Kinesis API calls
.dynamoDBCredentials(dynamoCredentials) // For DynamoDB checkpointing
.cloudWatchCredentials(DefaultCredentials) // For CloudWatch metrics
.build()If you don't specify dynamoDBCredentials or cloudWatchCredentials, they will default to the same credentials as kinesisCredentials.
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kinesis:DescribeStream",
"kinesis:GetShardIterator",
"kinesis:GetRecords",
"kinesis:ListShards"
],
"Resource": "arn:aws:kinesis:*:*:stream/your-stream-name"
}
]
}{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"dynamodb:CreateTable",
"dynamodb:DescribeTable",
"dynamodb:GetItem",
"dynamodb:PutItem",
"dynamodb:UpdateItem",
"dynamodb:DeleteItem"
],
"Resource": "arn:aws:dynamodb:*:*:table/your-checkpoint-app-name"
}
]
}{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"cloudwatch:PutMetricData"
],
"Resource": "*"
}
]
}// Production setup with assume role
val productionCredentials = SparkAWSCredentials.builder
.stsCredentials(
roleArn = "arn:aws:iam::123456789012:role/SparkKinesisRole",
sessionName = s"spark-kinesis-${java.util.UUID.randomUUID()}"
)
.build()
val stream = KinesisInputDStream.builder
.streamingContext(ssc)
.streamName("production-data-stream")
.checkpointAppName("production-spark-consumer")
.regionName("us-west-2")
.kinesisCredentials(productionCredentials)
.dynamoDBCredentials(productionCredentials)
.cloudWatchCredentials(productionCredentials)
.build()