Spark Streaming integration with Amazon Kinesis for real-time data processing using the Kinesis Client Library
—
Flexible AWS authentication options including default credential provider chains, explicit credential specification, and IAM role integration for secure access to Kinesis streams.
Uses AWS DefaultAWSCredentialsProviderChain for automatic credential discovery.
// Credentials discovered automatically in this order:
// 1. Environment Variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
// 2. Java System Properties (aws.accessKeyId, aws.secretKey)
// 3. Credential profiles file (~/.aws/credentials)
// 4. Amazon EC2 Instance profile credentialsUsage (Scala):
val stream = KinesisUtils.createStream(
ssc, appName, streamName, endpointUrl, regionName,
initialPosition, checkpointInterval, storageLevel
)
// No explicit credentials - uses DefaultAWSCredentialsProviderChainUsage (Java):
JavaReceiverInputDStream<byte[]> stream = KinesisUtils.createStream(
jssc, appName, streamName, endpointUrl, regionName,
initialPosition, checkpointInterval, storageLevel
);
// No explicit credentials - uses DefaultAWSCredentialsProviderChainProvides AWS credentials directly to the stream creation methods.
/**
* SerializableAWSCredentials wrapper for explicit credential specification.
* Implements AWSCredentials interface with serialization support.
*/
case class SerializableAWSCredentials(
accessKeyId: String,
secretKey: String
) extends AWSCredentials {
def getAWSAccessKeyId: String = accessKeyId
def getAWSSecretKey: String = secretKey
}Important Security Note: Explicit credentials are saved in DStream checkpoints if checkpointing is enabled. Ensure checkpoint directories are properly secured.
val stream = KinesisUtils.createStream(
ssc, appName, streamName, endpointUrl, regionName,
initialPosition, checkpointInterval, storageLevel,
awsAccessKeyId = "AKIAIOSFODNN7EXAMPLE",
awsSecretKey = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
)JavaReceiverInputDStream<byte[]> stream = KinesisUtils.createStream(
jssc, appName, streamName, endpointUrl, regionName,
initialPosition, checkpointInterval, storageLevel,
"AKIAIOSFODNN7EXAMPLE", // awsAccessKeyId
"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY" // awsSecretKey
);Set AWS credentials as environment variables (recommended for development):
export AWS_ACCESS_KEY_ID=AKIAIOSFODNN7EXAMPLE
export AWS_SECRET_ACCESS_KEY=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
export AWS_DEFAULT_REGION=us-east-1Create ~/.aws/credentials file:
[default]
aws_access_key_id = AKIAIOSFODNN7EXAMPLE
aws_secret_access_key = wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
[production]
aws_access_key_id = AKIAI44QH8DHBEXAMPLE
aws_secret_access_key = je7MtGbClwBF/2Zp9Utk/h3yCo8nvbEXAMPLEKEYUse specific profile:
export AWS_PROFILE=productionWhen running on EC2, attach an IAM role with appropriate Kinesis permissions:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kinesis:DescribeStream",
"kinesis:GetShardIterator",
"kinesis:GetRecords",
"kinesis:ListStreams"
],
"Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream"
},
{
"Effect": "Allow",
"Action": [
"dynamodb:CreateTable",
"dynamodb:DescribeTable",
"dynamodb:GetItem",
"dynamodb:PutItem",
"dynamodb:UpdateItem",
"dynamodb:DeleteItem",
"dynamodb:Scan"
],
"Resource": "arn:aws:dynamodb:us-east-1:123456789012:table/MyKinesisApp"
},
{
"Effect": "Allow",
"Action": [
"cloudwatch:PutMetricData"
],
"Resource": "*"
}
]
}Set credentials as JVM system properties:
java -Daws.accessKeyId=AKIAIOSFODNN7EXAMPLE \
-Daws.secretKey=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY \
-jar my-spark-app.jar{
"Effect": "Allow",
"Action": [
"kinesis:DescribeStream",
"kinesis:GetShardIterator",
"kinesis:GetRecords",
"kinesis:ListStreams"
],
"Resource": "arn:aws:kinesis:REGION:ACCOUNT:stream/STREAM_NAME"
}{
"Effect": "Allow",
"Action": [
"dynamodb:CreateTable",
"dynamodb:DescribeTable",
"dynamodb:GetItem",
"dynamodb:PutItem",
"dynamodb:UpdateItem",
"dynamodb:DeleteItem",
"dynamodb:Scan",
"dynamodb:Query"
],
"Resource": "arn:aws:dynamodb:REGION:ACCOUNT:table/KCL_APPLICATION_NAME"
}{
"Effect": "Allow",
"Action": [
"cloudwatch:PutMetricData"
],
"Resource": "*"
}Common Kinesis endpoint URLs:
// US East (N. Virginia)
val endpointUrl = "https://kinesis.us-east-1.amazonaws.com"
// US West (Oregon)
val endpointUrl = "https://kinesis.us-west-2.amazonaws.com"
// Europe (Ireland)
val endpointUrl = "https://kinesis.eu-west-1.amazonaws.com"
// Asia Pacific (Tokyo)
val endpointUrl = "https://kinesis.ap-northeast-1.amazonaws.com"The library validates region names against AWS regions:
/**
* Validates region name against known AWS regions.
* Throws IllegalArgumentException for invalid regions.
*/
private def validateRegion(regionName: String): StringValid region names include:
us-east-1, us-west-1, us-west-2eu-west-1, eu-central-1ap-southeast-1, ap-southeast-2, ap-northeast-1// Use environment variables for development
val stream = KinesisUtils.createStream(
ssc, "dev-kinesis-app", "dev-stream",
"https://kinesis.us-east-1.amazonaws.com", "us-east-1",
InitialPositionInStream.LATEST,
Duration.milliseconds(2000),
StorageLevel.MEMORY_AND_DISK_2
)// Production setup using IAM roles on EC2
val stream = KinesisUtils.createStream(
ssc, "prod-kinesis-app", "production-stream",
"https://kinesis.us-east-1.amazonaws.com", "us-east-1",
InitialPositionInStream.TRIM_HORIZON,
Duration.milliseconds(5000),
StorageLevel.MEMORY_AND_DISK_2
)// Access Kinesis stream in different AWS account
val crossAccountCredentials = SerializableAWSCredentials(
"AKIAI44QH8DHBEXAMPLE",
"je7MtGbClwBF/2Zp9Utk/h3yCo8nvbEXAMPLEKEY"
)
val stream = KinesisUtils.createStream(
ssc, "cross-account-app", "shared-stream",
"https://kinesis.us-east-1.amazonaws.com", "us-east-1",
InitialPositionInStream.LATEST,
Duration.milliseconds(2000),
StorageLevel.MEMORY_AND_DISK_2,
defaultMessageHandler,
crossAccountCredentials.getAWSAccessKeyId,
crossAccountCredentials.getAWSSecretKey
)"Unable to load AWS credentials"
"Access Denied" on Kinesis operations
"Access Denied" on DynamoDB
System.setProperty("com.amazonaws.sdk.enableDefaultMetrics", "true")import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
val provider = new DefaultAWSCredentialsProviderChain()
println(provider.getCredentials()) // Will throw exception if no creds foundaws kinesis describe-stream --stream-name my-streamaws dynamodb list-tablesInstall with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-streaming-kinesis-asl-2-10