or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

aws-credentials.mdindex.mdinitial-positions.mdpython-api.mdstream-creation.md
tile.json

aws-credentials.mddocs/

AWS Credentials Configuration

Flexible credential management supporting default provider chains, basic credentials, and STS assume role patterns for accessing Kinesis, DynamoDB, and CloudWatch services.

Credential Types

package org.apache.spark.streaming.kinesis

sealed trait SparkAWSCredentials extends Serializable {
  def provider: AWSCredentialsProvider
}

case object DefaultCredentials extends SparkAWSCredentials

case class BasicCredentials(
  awsAccessKeyId: String,
  awsSecretKey: String
) extends SparkAWSCredentials

case class STSCredentials(
  stsRoleArn: String,
  stsSessionName: String,
  stsExternalId: Option[String] = None,
  longLivedCreds: SparkAWSCredentials = DefaultCredentials
) extends SparkAWSCredentials

Builder API

object SparkAWSCredentials {
  def builder: Builder
}

class Builder {
  def basicCredentials(accessKeyId: String, secretKey: String): Builder
  def stsCredentials(roleArn: String, sessionName: String): Builder
  def stsCredentials(roleArn: String, sessionName: String, externalId: String): Builder
  def build(): SparkAWSCredentials
}

Default Credentials

Uses the default AWS credentials provider chain, which checks credentials in this order:

  1. Environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
  2. Java system properties (aws.accessKeyId, aws.secretKey)
  3. Credential profiles file (~/.aws/credentials)
  4. EC2 instance profile credentials
import org.apache.spark.streaming.kinesis.{DefaultCredentials, KinesisInputDStream}

val stream = KinesisInputDStream.builder
  .streamingContext(ssc)
  .streamName("my-stream")
  .checkpointAppName("my-app")
  .kinesisCredentials(DefaultCredentials)
  .build()

Use Case: Recommended for production environments with proper IAM roles and policies.

Basic Credentials

Use explicit AWS access key ID and secret access key.

import org.apache.spark.streaming.kinesis.SparkAWSCredentials

val credentials = SparkAWSCredentials.builder
  .basicCredentials("AKIAIOSFODNN7EXAMPLE", "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
  .build()

val stream = KinesisInputDStream.builder
  .streamingContext(ssc)
  .streamName("my-stream")
  .checkpointAppName("my-app")
  .kinesisCredentials(credentials)
  .build()

Security Warning: Basic credentials will be saved in DStream checkpoints if checkpointing is enabled. Ensure your checkpoint directory is secure.

STS Assume Role Credentials

Use AWS Security Token Service (STS) to assume an IAM role for temporary credentials.

Basic STS Usage

import org.apache.spark.streaming.kinesis.SparkAWSCredentials

val credentials = SparkAWSCredentials.builder
  .stsCredentials(
    roleArn = "arn:aws:iam::123456789012:role/KinesisAccessRole",
    sessionName = "spark-kinesis-session"
  )
  .build()

val stream = KinesisInputDStream.builder
  .streamingContext(ssc)
  .streamName("my-stream")
  .checkpointAppName("my-app")
  .kinesisCredentials(credentials)
  .build()

STS with External ID

Use external ID for additional security when crossing account boundaries.

val credentials = SparkAWSCredentials.builder
  .stsCredentials(
    roleArn = "arn:aws:iam::123456789012:role/CrossAccountKinesisRole",
    sessionName = "spark-kinesis-session",
    externalId = "unique-external-identifier"
  )
  .build()

STS with Basic Credentials

Combine STS with basic credentials for the initial authentication.

val credentials = SparkAWSCredentials.builder
  .basicCredentials("AKIAIOSFODNN7EXAMPLE", "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
  .stsCredentials(
    roleArn = "arn:aws:iam::123456789012:role/KinesisAccessRole", 
    sessionName = "spark-kinesis-session"
  )
  .build()

Service-Specific Credentials

Configure different credentials for different AWS services.

val kinesisCredentials = SparkAWSCredentials.builder
  .basicCredentials("kinesis-access-key", "kinesis-secret-key") 
  .build()

val dynamoCredentials = SparkAWSCredentials.builder
  .stsCredentials("arn:aws:iam::123456789012:role/DynamoDBRole", "dynamo-session")
  .build()

val cloudWatchCredentials = SparkAWSCredentials.builder
  .basicCredentials("cloudwatch-access-key", "cloudwatch-secret-key")
  .build()

val stream = KinesisInputDStream.builder
  .streamingContext(ssc)
  .streamName("my-stream")
  .checkpointAppName("my-app")
  .kinesisCredentials(kinesisCredentials)
  .dynamoDBCredentials(dynamoCredentials)
  .cloudWatchCredentials(cloudWatchCredentials)
  .build()

Default Behavior: If not specified, DynamoDB and CloudWatch credentials default to the same credentials used for Kinesis.

Java API Usage

import org.apache.spark.streaming.kinesis.SparkAWSCredentials;

// Default credentials
SparkAWSCredentials defaultCreds = DefaultCredentials.MODULE$;

// Basic credentials  
SparkAWSCredentials basicCreds = SparkAWSCredentials.builder()
    .basicCredentials("access-key", "secret-key")
    .build();

// STS credentials
SparkAWSCredentials stsCreds = SparkAWSCredentials.builder()
    .stsCredentials("arn:aws:iam::123456789012:role/MyRole", "my-session")
    .build();

// Use with stream builder
KinesisInputDStream<byte[]> stream = KinesisInputDStream.builder()
    .streamingContext(jssc)
    .streamName("my-stream")
    .checkpointAppName("my-app")
    .kinesisCredentials(basicCreds)
    .build();

Error Handling

Invalid Credentials

import org.apache.spark.streaming.kinesis.BasicCredentials

// This will fall back to DefaultCredentials if invalid
val credentials = BasicCredentials(null, "secret-key") // Invalid: null access key

When BasicCredentials cannot construct valid credentials, it logs a warning and falls back to DefaultCredentials.

STS Validation

// All STS parameters must be provided together or all must be null
val credentials = SparkAWSCredentials.builder
  .stsCredentials("arn:aws:iam::123456789012:role/MyRole", null) // Invalid: missing session name
  .build() // Throws IllegalArgumentException

Security Best Practices

Use IAM Roles

Prefer IAM roles over hardcoded credentials:

// Good: Uses IAM instance profile or task role
val stream = KinesisInputDStream.builder
  .streamingContext(ssc)
  .streamName("my-stream")
  .checkpointAppName("my-app")
  // No explicit credentials - uses default provider chain
  .build()

Least Privilege Principle

Create IAM policies with minimal required permissions:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "kinesis:DescribeStream",
        "kinesis:GetShardIterator", 
        "kinesis:GetRecords",
        "kinesis:ListShards"
      ],
      "Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream"
    },
    {
      "Effect": "Allow",
      "Action": [
        "dynamodb:CreateTable",
        "dynamodb:DescribeTable",
        "dynamodb:GetItem",
        "dynamodb:PutItem",
        "dynamodb:UpdateItem",
        "dynamodb:DeleteItem"
      ],
      "Resource": "arn:aws:dynamodb:us-east-1:123456789012:table/my-app"
    }
  ]
}

Secure Checkpoints

When using basic credentials, ensure checkpoint directories are secure:

// Ensure checkpoint directory has restricted access
ssc.checkpoint("s3://secure-bucket/checkpoints/")

Credential Rotation

Use STS credentials for automatic credential rotation:

val credentials = SparkAWSCredentials.builder
  .stsCredentials(
    roleArn = "arn:aws:iam::123456789012:role/KinesisRole",
    sessionName = "spark-session-" + System.currentTimeMillis()
  )
  .build()