CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-streaming-kinesis-asl-2-10

Spark Streaming integration with Amazon Kinesis for real-time data processing using the Kinesis Client Library

Pending
Overview
Eval results
Files

aws-configuration.mddocs/

AWS Configuration

Flexible AWS authentication options including default credential provider chains, explicit credential specification, and IAM role integration for secure access to Kinesis streams.

Capabilities

Default Credential Provider Chain

Uses AWS DefaultAWSCredentialsProviderChain for automatic credential discovery.

// Credentials discovered automatically in this order:
// 1. Environment Variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
// 2. Java System Properties (aws.accessKeyId, aws.secretKey)  
// 3. Credential profiles file (~/.aws/credentials)
// 4. Amazon EC2 Instance profile credentials

Usage (Scala):

val stream = KinesisUtils.createStream(
  ssc, appName, streamName, endpointUrl, regionName,
  initialPosition, checkpointInterval, storageLevel
)
// No explicit credentials - uses DefaultAWSCredentialsProviderChain

Usage (Java):

JavaReceiverInputDStream<byte[]> stream = KinesisUtils.createStream(
  jssc, appName, streamName, endpointUrl, regionName,
  initialPosition, checkpointInterval, storageLevel
);
// No explicit credentials - uses DefaultAWSCredentialsProviderChain

Explicit Credential Configuration

Provides AWS credentials directly to the stream creation methods.

/**
 * SerializableAWSCredentials wrapper for explicit credential specification.
 * Implements AWSCredentials interface with serialization support.
 */
case class SerializableAWSCredentials(
  accessKeyId: String, 
  secretKey: String
) extends AWSCredentials {
  def getAWSAccessKeyId: String = accessKeyId
  def getAWSSecretKey: String = secretKey
}

Important Security Note: Explicit credentials are saved in DStream checkpoints if checkpointing is enabled. Ensure checkpoint directories are properly secured.

Explicit Credentials (Scala)

val stream = KinesisUtils.createStream(
  ssc, appName, streamName, endpointUrl, regionName,
  initialPosition, checkpointInterval, storageLevel,
  awsAccessKeyId = "AKIAIOSFODNN7EXAMPLE",
  awsSecretKey = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
)

Explicit Credentials (Java)

JavaReceiverInputDStream<byte[]> stream = KinesisUtils.createStream(
  jssc, appName, streamName, endpointUrl, regionName,
  initialPosition, checkpointInterval, storageLevel,
  "AKIAIOSFODNN7EXAMPLE",                    // awsAccessKeyId
  "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY" // awsSecretKey
);

AWS Authentication Methods

Environment Variables

Set AWS credentials as environment variables (recommended for development):

export AWS_ACCESS_KEY_ID=AKIAIOSFODNN7EXAMPLE
export AWS_SECRET_ACCESS_KEY=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
export AWS_DEFAULT_REGION=us-east-1

AWS Credentials File

Create ~/.aws/credentials file:

[default]
aws_access_key_id = AKIAIOSFODNN7EXAMPLE
aws_secret_access_key = wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY

[production]
aws_access_key_id = AKIAI44QH8DHBEXAMPLE
aws_secret_access_key = je7MtGbClwBF/2Zp9Utk/h3yCo8nvbEXAMPLEKEY

Use specific profile:

export AWS_PROFILE=production

IAM Roles for EC2 Instances

When running on EC2, attach an IAM role with appropriate Kinesis permissions:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "kinesis:DescribeStream",
        "kinesis:GetShardIterator", 
        "kinesis:GetRecords",
        "kinesis:ListStreams"
      ],
      "Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream"
    },
    {
      "Effect": "Allow",
      "Action": [
        "dynamodb:CreateTable",
        "dynamodb:DescribeTable",
        "dynamodb:GetItem",
        "dynamodb:PutItem",
        "dynamodb:UpdateItem",
        "dynamodb:DeleteItem",
        "dynamodb:Scan"
      ],
      "Resource": "arn:aws:dynamodb:us-east-1:123456789012:table/MyKinesisApp"
    },
    {
      "Effect": "Allow",
      "Action": [
        "cloudwatch:PutMetricData"
      ],
      "Resource": "*"
    }
  ]
}

Java System Properties

Set credentials as JVM system properties:

java -Daws.accessKeyId=AKIAIOSFODNN7EXAMPLE \
     -Daws.secretKey=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY \
     -jar my-spark-app.jar

Required AWS Permissions

Kinesis Permissions

{
  "Effect": "Allow",
  "Action": [
    "kinesis:DescribeStream",
    "kinesis:GetShardIterator",
    "kinesis:GetRecords", 
    "kinesis:ListStreams"
  ],
  "Resource": "arn:aws:kinesis:REGION:ACCOUNT:stream/STREAM_NAME"
}

DynamoDB Permissions (for KCL Checkpointing)

{
  "Effect": "Allow", 
  "Action": [
    "dynamodb:CreateTable",
    "dynamodb:DescribeTable",
    "dynamodb:GetItem",
    "dynamodb:PutItem", 
    "dynamodb:UpdateItem",
    "dynamodb:DeleteItem",
    "dynamodb:Scan",
    "dynamodb:Query"
  ],
  "Resource": "arn:aws:dynamodb:REGION:ACCOUNT:table/KCL_APPLICATION_NAME"
}

CloudWatch Permissions (Optional, for Metrics)

{
  "Effect": "Allow",
  "Action": [
    "cloudwatch:PutMetricData"
  ],
  "Resource": "*"
}

Regional Configuration

Endpoint URLs by Region

Common Kinesis endpoint URLs:

// US East (N. Virginia)
val endpointUrl = "https://kinesis.us-east-1.amazonaws.com"

// US West (Oregon)  
val endpointUrl = "https://kinesis.us-west-2.amazonaws.com"

// Europe (Ireland)
val endpointUrl = "https://kinesis.eu-west-1.amazonaws.com"

// Asia Pacific (Tokyo)
val endpointUrl = "https://kinesis.ap-northeast-1.amazonaws.com"

Region Name Validation

The library validates region names against AWS regions:

/**
 * Validates region name against known AWS regions.
 * Throws IllegalArgumentException for invalid regions.
 */
private def validateRegion(regionName: String): String

Valid region names include:

  • us-east-1, us-west-1, us-west-2
  • eu-west-1, eu-central-1
  • ap-southeast-1, ap-southeast-2, ap-northeast-1
  • And other valid AWS regions

Security Best Practices

Credential Management

  • Never hardcode credentials in source code
  • Use IAM roles when running on EC2 instances
  • Rotate access keys regularly
  • Use least-privilege IAM policies
  • Enable AWS CloudTrail for audit logging

Network Security

  • Use VPC endpoints for Kinesis when possible
  • Configure security groups to restrict access
  • Use SSL/TLS for all communications (enabled by default)
  • Consider using PrivateLink for additional security

Checkpoint Security

  • Secure checkpoint directories with appropriate file permissions
  • Consider encrypting checkpoint data for sensitive applications
  • Use separate AWS accounts for different environments
  • Monitor DynamoDB access patterns for anomalies

Configuration Examples

Development Environment

// Use environment variables for development
val stream = KinesisUtils.createStream(
  ssc, "dev-kinesis-app", "dev-stream",
  "https://kinesis.us-east-1.amazonaws.com", "us-east-1",
  InitialPositionInStream.LATEST, 
  Duration.milliseconds(2000),
  StorageLevel.MEMORY_AND_DISK_2
)

Production Environment with IAM Roles

// Production setup using IAM roles on EC2
val stream = KinesisUtils.createStream(
  ssc, "prod-kinesis-app", "production-stream", 
  "https://kinesis.us-east-1.amazonaws.com", "us-east-1",
  InitialPositionInStream.TRIM_HORIZON,
  Duration.milliseconds(5000), 
  StorageLevel.MEMORY_AND_DISK_2
)

Cross-Account Access

// Access Kinesis stream in different AWS account
val crossAccountCredentials = SerializableAWSCredentials(
  "AKIAI44QH8DHBEXAMPLE",
  "je7MtGbClwBF/2Zp9Utk/h3yCo8nvbEXAMPLEKEY"
)

val stream = KinesisUtils.createStream(
  ssc, "cross-account-app", "shared-stream",
  "https://kinesis.us-east-1.amazonaws.com", "us-east-1", 
  InitialPositionInStream.LATEST,
  Duration.milliseconds(2000),
  StorageLevel.MEMORY_AND_DISK_2,
  defaultMessageHandler,
  crossAccountCredentials.getAWSAccessKeyId,
  crossAccountCredentials.getAWSSecretKey
)

Troubleshooting Authentication Issues

Common Error Messages

"Unable to load AWS credentials"

  • Check DefaultAWSCredentialsProviderChain order
  • Verify environment variables or credentials file
  • Ensure IAM role is attached (for EC2)

"Access Denied" on Kinesis operations

  • Verify IAM permissions for Kinesis actions
  • Check resource ARNs in policy statements
  • Ensure region matches between policy and configuration

"Access Denied" on DynamoDB

  • Verify DynamoDB permissions for KCL application name
  • Check that table name matches KCL application name
  • Ensure region consistency between Kinesis and DynamoDB

Debugging Tips

  1. Enable AWS SDK logging:
System.setProperty("com.amazonaws.sdk.enableDefaultMetrics", "true")
  1. Check credential provider chain:
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
val provider = new DefaultAWSCredentialsProviderChain()
println(provider.getCredentials()) // Will throw exception if no creds found
  1. Test permissions separately:
  • Use AWS CLI to test Kinesis access: aws kinesis describe-stream --stream-name my-stream
  • Test DynamoDB access: aws dynamodb list-tables
  1. Monitor CloudWatch logs for detailed error messages from the KCL worker threads

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-streaming-kinesis-asl-2-10

docs

aws-configuration.md

data-processing.md

index.md

java-api.md

stream-creation.md

tile.json