CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-streaming-kinesis-asl-assembly

Apache Spark Streaming integration library for consuming data from Amazon Kinesis streams with fault-tolerant checkpointing and automatic shard management

Pending
Overview
Eval results
Files

credential-management.mddocs/

Credential Management

AWS credential handling for secure access to Kinesis streams, DynamoDB checkpointing, and CloudWatch metrics. Supports both automatic credential discovery and explicit credential specification.

Credential Options

Default Credential Provider Chain

The recommended approach uses AWS DefaultAWSCredentialsProviderChain which automatically discovers credentials from:

  1. Environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
  2. Java system properties (aws.accessKeyId, aws.secretKey)
  3. Credential profiles file (~/.aws/credentials)
  4. Instance profile credentials (for EC2 instances)
  5. Container credentials (for ECS containers)

All stream creation methods without explicit credentials use this automatic discovery.

Explicit Credential Specification

For applications requiring specific credential control, all createStream methods have overloaded versions accepting explicit AWS credentials.

Security Note: Explicit credentials are stored in DStream checkpoints. Ensure checkpoint directories are properly secured.

Scala API with Credentials

Generic Stream with Credentials

def createStream[T: ClassTag](
  ssc: StreamingContext,
  kinesisAppName: String,
  streamName: String,
  endpointUrl: String,
  regionName: String,
  initialPositionInStream: InitialPositionInStream,
  checkpointInterval: Duration,
  storageLevel: StorageLevel,
  messageHandler: Record => T,
  awsAccessKeyId: String,
  awsSecretKey: String
): ReceiverInputDStream[T]

Usage Example:

val credentialedStream = KinesisUtils.createStream[String](
  ssc,
  "secure-app",
  "private-stream",
  "https://kinesis.us-east-1.amazonaws.com",
  "us-east-1",
  InitialPositionInStream.LATEST,
  Seconds(30),
  StorageLevel.MEMORY_AND_DISK_2,
  record => new String(record.getData.array()),
  "AKIAIOSFODNN7EXAMPLE",
  "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
)

Default Byte Array Stream with Credentials

def createStream(
  ssc: StreamingContext,
  kinesisAppName: String,
  streamName: String,
  endpointUrl: String,
  regionName: String,
  initialPositionInStream: InitialPositionInStream,
  checkpointInterval: Duration,
  storageLevel: StorageLevel,
  awsAccessKeyId: String,
  awsSecretKey: String
): ReceiverInputDStream[Array[Byte]]

Usage Example:

val secureByteStream = KinesisUtils.createStream(
  ssc,
  "secure-byte-processor",
  "encrypted-data-stream",
  "https://kinesis.us-west-2.amazonaws.com",
  "us-west-2",
  InitialPositionInStream.TRIM_HORIZON,
  Seconds(60),
  StorageLevel.MEMORY_AND_DISK_2,
  sys.env("AWS_ACCESS_KEY_ID"),
  sys.env("AWS_SECRET_ACCESS_KEY")
)

Java API with Credentials

Generic Stream with Credentials

public static <T> JavaReceiverInputDStream<T> createStream(
  JavaStreamingContext jssc,
  String kinesisAppName,
  String streamName,
  String endpointUrl,
  String regionName,
  InitialPositionInStream initialPositionInStream,
  Duration checkpointInterval,
  StorageLevel storageLevel,
  Function<Record, T> messageHandler,
  Class<T> recordClass,
  String awsAccessKeyId,
  String awsSecretKey
);

Usage Example:

import org.apache.spark.api.java.function.Function;

Function<Record, String> handler = record -> 
    new String(record.getData().array());

JavaReceiverInputDStream<String> secureStream = KinesisUtils.createStream(
    jssc,
    "java-secure-app",
    "confidential-stream",
    "https://kinesis.eu-west-1.amazonaws.com",
    "eu-west-1",
    InitialPositionInStream.LATEST,
    Durations.seconds(30),
    StorageLevel.MEMORY_AND_DISK_2(),
    handler,
    String.class,
    System.getenv("AWS_ACCESS_KEY_ID"),
    System.getenv("AWS_SECRET_ACCESS_KEY")
);

Default Byte Array Stream with Credentials

public static JavaReceiverInputDStream<byte[]> createStream(
  JavaStreamingContext jssc,
  String kinesisAppName,
  String streamName,
  String endpointUrl,
  String regionName,
  InitialPositionInStream initialPositionInStream,
  Duration checkpointInterval,
  StorageLevel storageLevel,
  String awsAccessKeyId,
  String awsSecretKey
);

Usage Example:

JavaReceiverInputDStream<byte[]> secureByteStream = KinesisUtils.createStream(
    jssc,
    "java-secure-bytes",
    "secure-binary-stream", 
    "https://kinesis.ap-northeast-1.amazonaws.com",
    "ap-northeast-1",
    InitialPositionInStream.TRIM_HORIZON,
    Durations.seconds(45),
    StorageLevel.MEMORY_AND_DISK_2(),
    System.getProperty("aws.accessKeyId"),
    System.getProperty("aws.secretKey")
);

SerializableAWSCredentials

Internal credential wrapper for secure serialization in distributed environments.

case class SerializableAWSCredentials(
  accessKeyId: String,
  secretKey: String
) extends AWSCredentials {
  override def getAWSAccessKeyId: String = accessKeyId
  override def getAWSSecretKey: String = secretKey
}

This class wraps AWS credentials for safe serialization when distributing stream processing tasks across Spark workers.

Best Practices

Credential Security

  1. Use Environment Variables: Store credentials in environment variables rather than hardcoding
val accessKey = sys.env.getOrElse("AWS_ACCESS_KEY_ID", 
  throw new IllegalArgumentException("AWS_ACCESS_KEY_ID not set"))
val secretKey = sys.env.getOrElse("AWS_SECRET_ACCESS_KEY",
  throw new IllegalArgumentException("AWS_SECRET_ACCESS_KEY not set"))
  1. Secure Checkpoint Directories: When using explicit credentials, ensure checkpoint directories have proper access controls
// Set secure checkpoint directory
ssc.checkpoint("hdfs://secure-cluster/checkpoints/kinesis-app")
  1. Use IAM Roles: For production deployments, prefer IAM roles over explicit credentials
// Prefer this approach - uses automatic credential discovery
val stream = KinesisUtils.createStream(ssc, ...) // No explicit credentials

Credential Rotation

For applications using explicit credentials:

  1. Monitor Expiration: Implement credential monitoring and rotation
  2. Graceful Updates: Plan for application restarts when credentials change
  3. Fallback Mechanisms: Consider implementing credential fallback chains

Cross-Region Access

When accessing Kinesis streams in different regions than your Spark cluster:

// Ensure credentials have cross-region permissions
val crossRegionStream = KinesisUtils.createStream(
  ssc,
  "cross-region-app",
  "remote-stream",
  "https://kinesis.eu-central-1.amazonaws.com", // Different region
  "eu-central-1",
  InitialPositionInStream.LATEST,
  Seconds(30),
  StorageLevel.MEMORY_AND_DISK_2,
  explicitAccessKey,
  explicitSecretKey
)

Required AWS Permissions

Ensure credentials have the following minimum permissions:

Kinesis Permissions

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "kinesis:DescribeStream",
        "kinesis:GetShardIterator", 
        "kinesis:GetRecords",
        "kinesis:ListStreams"
      ],
      "Resource": "arn:aws:kinesis:*:*:stream/your-stream-name"
    }
  ]
}

DynamoDB Permissions (for checkpointing)

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "dynamodb:CreateTable",
        "dynamodb:DescribeTable",
        "dynamodb:GetItem",
        "dynamodb:PutItem",
        "dynamodb:UpdateItem",
        "dynamodb:Scan"
      ],
      "Resource": "arn:aws:dynamodb:*:*:table/your-kinesis-app-name"
    }
  ]
}

CloudWatch Permissions (for metrics)

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "cloudwatch:PutMetricData"
      ],
      "Resource": "*"
    }
  ]
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-streaming-kinesis-asl-assembly

docs

credential-management.md

fault-tolerance.md

index.md

java-api.md

stream-creation.md

tile.json