or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

aws-credentials.mdconfiguration.mdindex.mdinitial-position.mdpython-integration.mdstream-creation.md
tile.json

aws-credentials.mddocs/

AWS Credentials Management

The AWS credentials system provides flexible authentication for accessing Kinesis, DynamoDB, and CloudWatch services. It supports multiple authentication methods including default provider chains, basic access keys, and STS assume role for cross-account access.

Core API

SparkAWSCredentials Interface

The base interface for all credential providers.

sealed trait SparkAWSCredentials extends Serializable {
  def provider: AWSCredentialsProvider
}

Credential Implementations

// Uses AWS default credential provider chain
case object DefaultCredentials extends SparkAWSCredentials {
  def provider: AWSCredentialsProvider
}

// Uses basic AWS access key and secret key
case class BasicCredentials(
  awsAccessKeyId: String, 
  awsSecretKey: String
) extends SparkAWSCredentials {
  def provider: AWSCredentialsProvider
}

// Uses STS assume role for temporary credentials
case class STSCredentials(
  stsRoleArn: String,
  stsSessionName: String, 
  stsExternalId: Option[String] = None,
  longLivedCreds: SparkAWSCredentials = DefaultCredentials
) extends SparkAWSCredentials {
  def provider: AWSCredentialsProvider
}

Builder API

object SparkAWSCredentials {
  def builder: SparkAWSCredentials.Builder
}

class Builder {
  def basicCredentials(accessKeyId: String, secretKey: String): Builder
  def stsCredentials(roleArn: String, sessionName: String): Builder
  def stsCredentials(roleArn: String, sessionName: String, externalId: String): Builder
  def build(): SparkAWSCredentials
}

Default Credentials

Uses the AWS default credential provider chain, which checks credentials in this order:

  1. Environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
  2. Java system properties (aws.accessKeyId, aws.secretKey)
  3. Credential profiles file (~/.aws/credentials)
  4. Amazon ECS container credentials
  5. Instance profile credentials (EC2/ECS)
import org.apache.spark.streaming.kinesis.{DefaultCredentials, KinesisInputDStream}

val stream = KinesisInputDStream.builder
  .streamingContext(ssc)
  .streamName("my-stream")
  .checkpointAppName("my-app")
  .kinesisCredentials(DefaultCredentials)  // Explicit, but this is the default
  .build()

Or using the builder:

val credentials = SparkAWSCredentials.builder.build()  // Creates DefaultCredentials

val stream = KinesisInputDStream.builder
  .kinesisCredentials(credentials)
  // ... other configuration
  .build()

Basic Credentials

Uses explicit AWS access key ID and secret access key. Warning: These credentials will be saved in DStream checkpoints, so ensure your checkpoint directory is secure.

import org.apache.spark.streaming.kinesis.BasicCredentials

val credentials = BasicCredentials("AKIAIOSFODNN7EXAMPLE", "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")

val stream = KinesisInputDStream.builder
  .streamingContext(ssc)
  .streamName("my-stream") 
  .checkpointAppName("my-app")
  .kinesisCredentials(credentials)
  .build()

Using the builder pattern (recommended):

val credentials = SparkAWSCredentials.builder
  .basicCredentials("AKIAIOSFODNN7EXAMPLE", "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
  .build()

val stream = KinesisInputDStream.builder
  .kinesisCredentials(credentials)
  // ... other configuration
  .build()

Error Handling

BasicCredentials will fall back to the default provider chain if the provided credentials are invalid:

// If credentials are null or invalid, falls back to DefaultCredentials
val credentials = BasicCredentials(null, "invalid")
// This will log a warning and use DefaultCredentials instead

STS Assume Role Credentials

Uses AWS Security Token Service (STS) to assume an IAM role for temporary credentials. This is useful for cross-account access or enhanced security.

Basic STS Usage

import org.apache.spark.streaming.kinesis.STSCredentials

val credentials = STSCredentials(
  stsRoleArn = "arn:aws:iam::123456789012:role/KinesisAccessRole",
  stsSessionName = "spark-kinesis-session"
)

val stream = KinesisInputDStream.builder
  .kinesisCredentials(credentials)
  // ... other configuration  
  .build()

STS with External ID

For roles that require an external ID for additional security:

val credentials = STSCredentials(
  stsRoleArn = "arn:aws:iam::123456789012:role/CrossAccountRole",
  stsSessionName = "spark-session", 
  stsExternalId = Some("unique-external-id")
)

STS with Custom Long-Lived Credentials

Specify different long-lived credentials for assuming the role:

val longLivedCreds = BasicCredentials("access-key", "secret-key")

val stsCredentials = STSCredentials(
  stsRoleArn = "arn:aws:iam::123456789012:role/KinesisRole",
  stsSessionName = "my-session",
  longLivedCreds = longLivedCreds
)

Using the Builder Pattern

// Basic STS
val credentials = SparkAWSCredentials.builder
  .stsCredentials("arn:aws:iam::123456789012:role/KinesisRole", "my-session")
  .build()

// STS with external ID  
val credentialsWithExternalId = SparkAWSCredentials.builder
  .basicCredentials("access-key", "secret-key")  // Long-lived credentials
  .stsCredentials("arn:aws:iam::123456789012:role/CrossAccountRole", "session", "external-id")
  .build()

Service-Specific Credentials

You can configure different credentials for different AWS services:

val kinesisCredentials = SparkAWSCredentials.builder
  .basicCredentials("kinesis-access-key", "kinesis-secret")
  .build()

val dynamoCredentials = SparkAWSCredentials.builder
  .stsCredentials("arn:aws:iam::123456789012:role/DynamoRole", "dynamo-session")
  .build()

val stream = KinesisInputDStream.builder
  .streamingContext(ssc)
  .streamName("my-stream")
  .checkpointAppName("my-app")
  .kinesisCredentials(kinesisCredentials)        // For Kinesis API calls
  .dynamoDBCredentials(dynamoCredentials)       // For DynamoDB checkpointing
  .cloudWatchCredentials(DefaultCredentials)    // For CloudWatch metrics
  .build()

If you don't specify dynamoDBCredentials or cloudWatchCredentials, they will default to the same credentials as kinesisCredentials.

Best Practices

Security Recommendations

  1. Use IAM roles when possible: Prefer STS assume role or instance profiles over hardcoded keys
  2. Secure checkpoint directories: Basic credentials are stored in checkpoints
  3. Rotate credentials regularly: Use temporary credentials when possible
  4. Use least privilege: Grant only the minimum required permissions

Required IAM Permissions

For Kinesis:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "kinesis:DescribeStream",
        "kinesis:GetShardIterator", 
        "kinesis:GetRecords",
        "kinesis:ListShards"
      ],
      "Resource": "arn:aws:kinesis:*:*:stream/your-stream-name"
    }
  ]
}

For DynamoDB (checkpointing):

{
  "Version": "2012-10-17", 
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "dynamodb:CreateTable",
        "dynamodb:DescribeTable",
        "dynamodb:GetItem",
        "dynamodb:PutItem", 
        "dynamodb:UpdateItem",
        "dynamodb:DeleteItem"
      ],
      "Resource": "arn:aws:dynamodb:*:*:table/your-checkpoint-app-name"
    }
  ]
}

For CloudWatch (metrics):

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow", 
      "Action": [
        "cloudwatch:PutMetricData"
      ],
      "Resource": "*"
    }
  ]
}

Example: Production Configuration

// Production setup with assume role
val productionCredentials = SparkAWSCredentials.builder
  .stsCredentials(
    roleArn = "arn:aws:iam::123456789012:role/SparkKinesisRole",
    sessionName = s"spark-kinesis-${java.util.UUID.randomUUID()}"
  )
  .build()

val stream = KinesisInputDStream.builder
  .streamingContext(ssc)
  .streamName("production-data-stream")
  .checkpointAppName("production-spark-consumer")
  .regionName("us-west-2")
  .kinesisCredentials(productionCredentials)
  .dynamoDBCredentials(productionCredentials)
  .cloudWatchCredentials(productionCredentials)
  .build()