0
# Python Integration
1
2
Internal helper utilities for integrating Kinesis streams with PySpark. These utilities provide Python-friendly interfaces to the Scala/Java APIs and are primarily used by the PySpark Kinesis integration.
3
4
## Core API
5
6
### KinesisUtilsPythonHelper
7
8
Internal helper class that wraps Kinesis stream creation for Python compatibility.
9
10
```scala { .api }
11
private class KinesisUtilsPythonHelper {
12
def createStream(
13
jssc: JavaStreamingContext,
14
kinesisAppName: String,
15
streamName: String,
16
endpointUrl: String,
17
regionName: String,
18
initialPositionInStream: Int,
19
checkpointInterval: Duration,
20
metricsLevel: Int,
21
storageLevel: StorageLevel,
22
awsAccessKeyId: String,
23
awsSecretKey: String,
24
stsAssumeRoleArn: String,
25
stsSessionName: String,
26
stsExternalId: String
27
): JavaReceiverInputDStream[Array[Byte]]
28
}
29
```
30
31
## Parameter Mappings
32
33
The Python helper maps integer values to appropriate enums and configurations:
34
35
### Initial Position Mapping
36
37
```scala
38
initialPositionInStream match {
39
case 0 => InitialPositionInStream.LATEST
40
case 1 => InitialPositionInStream.TRIM_HORIZON
41
case _ => throws IllegalArgumentException
42
}
43
```
44
45
### Metrics Level Mapping
46
47
```scala
48
metricsLevel match {
49
case 0 => MetricsLevel.DETAILED
50
case 1 => MetricsLevel.SUMMARY
51
case 2 => MetricsLevel.NONE
52
case _ => MetricsLevel.DETAILED // Default fallback
53
}
54
```
55
56
## Authentication Validation
57
58
The helper validates authentication parameter combinations:
59
60
1. **Basic credentials**: Both `awsAccessKeyId` and `awsSecretKey` must be provided together or both null
61
2. **STS credentials**: All STS parameters (`stsAssumeRoleArn`, `stsSessionName`, `stsExternalId`) must be provided together or all null
62
3. **Mixed authentication**: STS credentials can be combined with basic credentials for long-lived authentication
63
64
## Usage Pattern
65
66
The helper follows this internal pattern:
67
68
```scala
69
val builder = KinesisInputDStream.builder
70
.streamingContext(jssc)
71
.checkpointAppName(kinesisAppName)
72
.streamName(streamName)
73
.endpointUrl(endpointUrl)
74
.regionName(regionName)
75
.initialPosition(mappedInitialPosition)
76
.checkpointInterval(checkpointInterval)
77
.metricsLevel(mappedMetricsLevel)
78
.storageLevel(storageLevel)
79
80
// Configure authentication based on provided parameters
81
if (stsCredentialsProvided) {
82
val kinesisCredsProvider = STSCredentials(
83
stsRoleArn, stsSessionName, Option(stsExternalId),
84
BasicCredentials(awsAccessKeyId, awsSecretKey)
85
)
86
builder.kinesisCredentials(kinesisCredsProvider)
87
.buildWithMessageHandler(KinesisInputDStream.defaultMessageHandler)
88
} else if (basicCredentialsProvided) {
89
builder.kinesisCredentials(BasicCredentials(awsAccessKeyId, awsSecretKey))
90
.build()
91
} else {
92
builder.build() // Uses DefaultCredentials
93
}
94
```
95
96
## Error Handling
97
98
The helper validates parameter combinations and throws descriptive errors:
99
100
- **Partial basic credentials**: "awsAccessKeyId is set but awsSecretKey is null" or vice versa
101
- **Partial STS credentials**: "stsAssumeRoleArn, stsSessionName, and stsExternalId must all be defined or all be null"
102
- **Invalid initial position**: "Illegal InitialPositionInStream. Please use InitialPositionInStream.LATEST or InitialPositionInStream.TRIM_HORIZON"
103
104
## Internal Use Only
105
106
**Note**: This class is marked `private` and is intended only for internal PySpark integration. Direct usage is not recommended or supported in application code. Use the standard `KinesisInputDStream.builder` API instead.