or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

aws-credentials.mdconfiguration.mdindex.mdinitial-position.mdpython-integration.mdstream-creation.md
tile.json

python-integration.mddocs/

Python Integration

Internal helper utilities for integrating Kinesis streams with PySpark. These utilities provide Python-friendly interfaces to the Scala/Java APIs and are primarily used by the PySpark Kinesis integration.

Core API

KinesisUtilsPythonHelper

Internal helper class that wraps Kinesis stream creation for Python compatibility.

private class KinesisUtilsPythonHelper {
  def createStream(
    jssc: JavaStreamingContext,
    kinesisAppName: String,
    streamName: String,
    endpointUrl: String,
    regionName: String,
    initialPositionInStream: Int,
    checkpointInterval: Duration,
    metricsLevel: Int,
    storageLevel: StorageLevel,
    awsAccessKeyId: String,
    awsSecretKey: String,
    stsAssumeRoleArn: String,
    stsSessionName: String,
    stsExternalId: String
  ): JavaReceiverInputDStream[Array[Byte]]
}

Parameter Mappings

The Python helper maps integer values to appropriate enums and configurations:

Initial Position Mapping

initialPositionInStream match {
  case 0 => InitialPositionInStream.LATEST
  case 1 => InitialPositionInStream.TRIM_HORIZON
  case _ => throws IllegalArgumentException
}

Metrics Level Mapping

metricsLevel match {
  case 0 => MetricsLevel.DETAILED
  case 1 => MetricsLevel.SUMMARY  
  case 2 => MetricsLevel.NONE
  case _ => MetricsLevel.DETAILED  // Default fallback
}

Authentication Validation

The helper validates authentication parameter combinations:

  1. Basic credentials: Both awsAccessKeyId and awsSecretKey must be provided together or both null
  2. STS credentials: All STS parameters (stsAssumeRoleArn, stsSessionName, stsExternalId) must be provided together or all null
  3. Mixed authentication: STS credentials can be combined with basic credentials for long-lived authentication

Usage Pattern

The helper follows this internal pattern:

val builder = KinesisInputDStream.builder
  .streamingContext(jssc)
  .checkpointAppName(kinesisAppName)
  .streamName(streamName)
  .endpointUrl(endpointUrl)
  .regionName(regionName)
  .initialPosition(mappedInitialPosition)
  .checkpointInterval(checkpointInterval)
  .metricsLevel(mappedMetricsLevel)
  .storageLevel(storageLevel)

// Configure authentication based on provided parameters
if (stsCredentialsProvided) {
  val kinesisCredsProvider = STSCredentials(
    stsRoleArn, stsSessionName, Option(stsExternalId),
    BasicCredentials(awsAccessKeyId, awsSecretKey)
  )
  builder.kinesisCredentials(kinesisCredsProvider)
    .buildWithMessageHandler(KinesisInputDStream.defaultMessageHandler)
} else if (basicCredentialsProvided) {
  builder.kinesisCredentials(BasicCredentials(awsAccessKeyId, awsSecretKey))
    .build()
} else {
  builder.build()  // Uses DefaultCredentials
}

Error Handling

The helper validates parameter combinations and throws descriptive errors:

  • Partial basic credentials: "awsAccessKeyId is set but awsSecretKey is null" or vice versa
  • Partial STS credentials: "stsAssumeRoleArn, stsSessionName, and stsExternalId must all be defined or all be null"
  • Invalid initial position: "Illegal InitialPositionInStream. Please use InitialPositionInStream.LATEST or InitialPositionInStream.TRIM_HORIZON"

Internal Use Only

Note: This class is marked private and is intended only for internal PySpark integration. Direct usage is not recommended or supported in application code. Use the standard KinesisInputDStream.builder API instead.