CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-streaming-kinesis-asl-2-13

Apache Spark Kinesis Integration providing real-time stream processing of data from Amazon Kinesis Data Streams

Overview
Eval results
Files

python-integration.mddocs/

Python Integration

Internal helper utilities for integrating Kinesis streams with PySpark. These utilities provide Python-friendly interfaces to the Scala/Java APIs and are primarily used by the PySpark Kinesis integration.

Core API

KinesisUtilsPythonHelper

Internal helper class that wraps Kinesis stream creation for Python compatibility.

private class KinesisUtilsPythonHelper {
  def createStream(
    jssc: JavaStreamingContext,
    kinesisAppName: String,
    streamName: String,
    endpointUrl: String,
    regionName: String,
    initialPositionInStream: Int,
    checkpointInterval: Duration,
    metricsLevel: Int,
    storageLevel: StorageLevel,
    awsAccessKeyId: String,
    awsSecretKey: String,
    stsAssumeRoleArn: String,
    stsSessionName: String,
    stsExternalId: String
  ): JavaReceiverInputDStream[Array[Byte]]
}

Parameter Mappings

The Python helper maps integer values to appropriate enums and configurations:

Initial Position Mapping

initialPositionInStream match {
  case 0 => InitialPositionInStream.LATEST
  case 1 => InitialPositionInStream.TRIM_HORIZON
  case _ => throws IllegalArgumentException
}

Metrics Level Mapping

metricsLevel match {
  case 0 => MetricsLevel.DETAILED
  case 1 => MetricsLevel.SUMMARY  
  case 2 => MetricsLevel.NONE
  case _ => MetricsLevel.DETAILED  // Default fallback
}

Authentication Validation

The helper validates authentication parameter combinations:

  1. Basic credentials: Both awsAccessKeyId and awsSecretKey must be provided together or both null
  2. STS credentials: All STS parameters (stsAssumeRoleArn, stsSessionName, stsExternalId) must be provided together or all null
  3. Mixed authentication: STS credentials can be combined with basic credentials for long-lived authentication

Usage Pattern

The helper follows this internal pattern:

val builder = KinesisInputDStream.builder
  .streamingContext(jssc)
  .checkpointAppName(kinesisAppName)
  .streamName(streamName)
  .endpointUrl(endpointUrl)
  .regionName(regionName)
  .initialPosition(mappedInitialPosition)
  .checkpointInterval(checkpointInterval)
  .metricsLevel(mappedMetricsLevel)
  .storageLevel(storageLevel)

// Configure authentication based on provided parameters
if (stsCredentialsProvided) {
  val kinesisCredsProvider = STSCredentials(
    stsRoleArn, stsSessionName, Option(stsExternalId),
    BasicCredentials(awsAccessKeyId, awsSecretKey)
  )
  builder.kinesisCredentials(kinesisCredsProvider)
    .buildWithMessageHandler(KinesisInputDStream.defaultMessageHandler)
} else if (basicCredentialsProvided) {
  builder.kinesisCredentials(BasicCredentials(awsAccessKeyId, awsSecretKey))
    .build()
} else {
  builder.build()  // Uses DefaultCredentials
}

Error Handling

The helper validates parameter combinations and throws descriptive errors:

  • Partial basic credentials: "awsAccessKeyId is set but awsSecretKey is null" or vice versa
  • Partial STS credentials: "stsAssumeRoleArn, stsSessionName, and stsExternalId must all be defined or all be null"
  • Invalid initial position: "Illegal InitialPositionInStream. Please use InitialPositionInStream.LATEST or InitialPositionInStream.TRIM_HORIZON"

Internal Use Only

Note: This class is marked private and is intended only for internal PySpark integration. Direct usage is not recommended or supported in application code. Use the standard KinesisInputDStream.builder API instead.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-streaming-kinesis-asl-2-13

docs

aws-credentials.md

configuration.md

index.md

initial-position.md

python-integration.md

stream-creation.md

tile.json