Internal helper utilities for integrating Kinesis streams with PySpark. These utilities provide Python-friendly interfaces to the Scala/Java APIs and are primarily used by the PySpark Kinesis integration.
Internal helper class that wraps Kinesis stream creation for Python compatibility.
private class KinesisUtilsPythonHelper {
def createStream(
jssc: JavaStreamingContext,
kinesisAppName: String,
streamName: String,
endpointUrl: String,
regionName: String,
initialPositionInStream: Int,
checkpointInterval: Duration,
metricsLevel: Int,
storageLevel: StorageLevel,
awsAccessKeyId: String,
awsSecretKey: String,
stsAssumeRoleArn: String,
stsSessionName: String,
stsExternalId: String
): JavaReceiverInputDStream[Array[Byte]]
}The Python helper maps integer values to appropriate enums and configurations:
initialPositionInStream match {
case 0 => InitialPositionInStream.LATEST
case 1 => InitialPositionInStream.TRIM_HORIZON
case _ => throws IllegalArgumentException
}metricsLevel match {
case 0 => MetricsLevel.DETAILED
case 1 => MetricsLevel.SUMMARY
case 2 => MetricsLevel.NONE
case _ => MetricsLevel.DETAILED // Default fallback
}The helper validates authentication parameter combinations:
awsAccessKeyId and awsSecretKey must be provided together or both nullstsAssumeRoleArn, stsSessionName, stsExternalId) must be provided together or all nullThe helper follows this internal pattern:
val builder = KinesisInputDStream.builder
.streamingContext(jssc)
.checkpointAppName(kinesisAppName)
.streamName(streamName)
.endpointUrl(endpointUrl)
.regionName(regionName)
.initialPosition(mappedInitialPosition)
.checkpointInterval(checkpointInterval)
.metricsLevel(mappedMetricsLevel)
.storageLevel(storageLevel)
// Configure authentication based on provided parameters
if (stsCredentialsProvided) {
val kinesisCredsProvider = STSCredentials(
stsRoleArn, stsSessionName, Option(stsExternalId),
BasicCredentials(awsAccessKeyId, awsSecretKey)
)
builder.kinesisCredentials(kinesisCredsProvider)
.buildWithMessageHandler(KinesisInputDStream.defaultMessageHandler)
} else if (basicCredentialsProvided) {
builder.kinesisCredentials(BasicCredentials(awsAccessKeyId, awsSecretKey))
.build()
} else {
builder.build() // Uses DefaultCredentials
}The helper validates parameter combinations and throws descriptive errors:
Note: This class is marked private and is intended only for internal PySpark integration. Direct usage is not recommended or supported in application code. Use the standard KinesisInputDStream.builder API instead.