Apache Spark Kinesis Integration providing real-time stream processing of data from Amazon Kinesis Data Streams
Internal helper utilities for integrating Kinesis streams with PySpark. These utilities provide Python-friendly interfaces to the Scala/Java APIs and are primarily used by the PySpark Kinesis integration.
Internal helper class that wraps Kinesis stream creation for Python compatibility.
private class KinesisUtilsPythonHelper {
def createStream(
jssc: JavaStreamingContext,
kinesisAppName: String,
streamName: String,
endpointUrl: String,
regionName: String,
initialPositionInStream: Int,
checkpointInterval: Duration,
metricsLevel: Int,
storageLevel: StorageLevel,
awsAccessKeyId: String,
awsSecretKey: String,
stsAssumeRoleArn: String,
stsSessionName: String,
stsExternalId: String
): JavaReceiverInputDStream[Array[Byte]]
}The Python helper maps integer values to appropriate enums and configurations:
initialPositionInStream match {
case 0 => InitialPositionInStream.LATEST
case 1 => InitialPositionInStream.TRIM_HORIZON
case _ => throws IllegalArgumentException
}metricsLevel match {
case 0 => MetricsLevel.DETAILED
case 1 => MetricsLevel.SUMMARY
case 2 => MetricsLevel.NONE
case _ => MetricsLevel.DETAILED // Default fallback
}The helper validates authentication parameter combinations:
awsAccessKeyId and awsSecretKey must be provided together or both nullstsAssumeRoleArn, stsSessionName, stsExternalId) must be provided together or all nullThe helper follows this internal pattern:
val builder = KinesisInputDStream.builder
.streamingContext(jssc)
.checkpointAppName(kinesisAppName)
.streamName(streamName)
.endpointUrl(endpointUrl)
.regionName(regionName)
.initialPosition(mappedInitialPosition)
.checkpointInterval(checkpointInterval)
.metricsLevel(mappedMetricsLevel)
.storageLevel(storageLevel)
// Configure authentication based on provided parameters
if (stsCredentialsProvided) {
val kinesisCredsProvider = STSCredentials(
stsRoleArn, stsSessionName, Option(stsExternalId),
BasicCredentials(awsAccessKeyId, awsSecretKey)
)
builder.kinesisCredentials(kinesisCredsProvider)
.buildWithMessageHandler(KinesisInputDStream.defaultMessageHandler)
} else if (basicCredentialsProvided) {
builder.kinesisCredentials(BasicCredentials(awsAccessKeyId, awsSecretKey))
.build()
} else {
builder.build() // Uses DefaultCredentials
}The helper validates parameter combinations and throws descriptive errors:
Note: This class is marked private and is intended only for internal PySpark integration. Direct usage is not recommended or supported in application code. Use the standard KinesisInputDStream.builder API instead.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-streaming-kinesis-asl-2-13