or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

aws-credentials.mdconfiguration.mdindex.mdinitial-position.mdpython-integration.mdstream-creation.md

python-integration.mddocs/

0

# Python Integration

1

2

Internal helper utilities for integrating Kinesis streams with PySpark. These utilities provide Python-friendly interfaces to the Scala/Java APIs and are primarily used by the PySpark Kinesis integration.

3

4

## Core API

5

6

### KinesisUtilsPythonHelper

7

8

Internal helper class that wraps Kinesis stream creation for Python compatibility.

9

10

```scala { .api }

11

private class KinesisUtilsPythonHelper {

12

def createStream(

13

jssc: JavaStreamingContext,

14

kinesisAppName: String,

15

streamName: String,

16

endpointUrl: String,

17

regionName: String,

18

initialPositionInStream: Int,

19

checkpointInterval: Duration,

20

metricsLevel: Int,

21

storageLevel: StorageLevel,

22

awsAccessKeyId: String,

23

awsSecretKey: String,

24

stsAssumeRoleArn: String,

25

stsSessionName: String,

26

stsExternalId: String

27

): JavaReceiverInputDStream[Array[Byte]]

28

}

29

```

30

31

## Parameter Mappings

32

33

The Python helper maps integer values to appropriate enums and configurations:

34

35

### Initial Position Mapping

36

37

```scala

38

initialPositionInStream match {

39

case 0 => InitialPositionInStream.LATEST

40

case 1 => InitialPositionInStream.TRIM_HORIZON

41

case _ => throws IllegalArgumentException

42

}

43

```

44

45

### Metrics Level Mapping

46

47

```scala

48

metricsLevel match {

49

case 0 => MetricsLevel.DETAILED

50

case 1 => MetricsLevel.SUMMARY

51

case 2 => MetricsLevel.NONE

52

case _ => MetricsLevel.DETAILED // Default fallback

53

}

54

```

55

56

## Authentication Validation

57

58

The helper validates authentication parameter combinations:

59

60

1. **Basic credentials**: Both `awsAccessKeyId` and `awsSecretKey` must be provided together or both null

61

2. **STS credentials**: All STS parameters (`stsAssumeRoleArn`, `stsSessionName`, `stsExternalId`) must be provided together or all null

62

3. **Mixed authentication**: STS credentials can be combined with basic credentials for long-lived authentication

63

64

## Usage Pattern

65

66

The helper follows this internal pattern:

67

68

```scala

69

val builder = KinesisInputDStream.builder

70

.streamingContext(jssc)

71

.checkpointAppName(kinesisAppName)

72

.streamName(streamName)

73

.endpointUrl(endpointUrl)

74

.regionName(regionName)

75

.initialPosition(mappedInitialPosition)

76

.checkpointInterval(checkpointInterval)

77

.metricsLevel(mappedMetricsLevel)

78

.storageLevel(storageLevel)

79

80

// Configure authentication based on provided parameters

81

if (stsCredentialsProvided) {

82

val kinesisCredsProvider = STSCredentials(

83

stsRoleArn, stsSessionName, Option(stsExternalId),

84

BasicCredentials(awsAccessKeyId, awsSecretKey)

85

)

86

builder.kinesisCredentials(kinesisCredsProvider)

87

.buildWithMessageHandler(KinesisInputDStream.defaultMessageHandler)

88

} else if (basicCredentialsProvided) {

89

builder.kinesisCredentials(BasicCredentials(awsAccessKeyId, awsSecretKey))

90

.build()

91

} else {

92

builder.build() // Uses DefaultCredentials

93

}

94

```

95

96

## Error Handling

97

98

The helper validates parameter combinations and throws descriptive errors:

99

100

- **Partial basic credentials**: "awsAccessKeyId is set but awsSecretKey is null" or vice versa

101

- **Partial STS credentials**: "stsAssumeRoleArn, stsSessionName, and stsExternalId must all be defined or all be null"

102

- **Invalid initial position**: "Illegal InitialPositionInStream. Please use InitialPositionInStream.LATEST or InitialPositionInStream.TRIM_HORIZON"

103

104

## Internal Use Only

105

106

**Note**: This class is marked `private` and is intended only for internal PySpark integration. Direct usage is not recommended or supported in application code. Use the standard `KinesisInputDStream.builder` API instead.