or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdindex.mdtoken-provider.mdtoken-utilities.mdutilities.md

utilities.mddocs/

0

# Utility Classes

1

2

Additional utility classes for configuration redaction and error handling.

3

4

## Capabilities

5

6

### KafkaRedactionUtil

7

8

Object providing utilities for redacting sensitive configuration parameters in logging and debugging output.

9

10

```scala { .api }

11

object KafkaRedactionUtil extends Logging {

12

13

/**

14

* Redacts sensitive parameters in configuration sequences

15

* Applies redaction patterns to sensitive keys like passwords, tokens, and credentials

16

* Special handling for SASL JAAS configuration parameters

17

*

18

* @param params Sequence of key-value pairs to redact

19

* @return Sequence of redacted key-value pairs with sensitive values replaced

20

*/

21

def redactParams(params: Seq[(String, Object)]): Seq[(String, String)]

22

23

/**

24

* Redacts JAAS configuration passwords specifically

25

* Replaces password values in JAAS parameter strings with redaction placeholder

26

*

27

* @param param JAAS parameter string potentially containing passwords

28

* @return JAAS parameter string with passwords redacted

29

*/

30

def redactJaasParam(param: String): String

31

}

32

```

33

34

**Usage Examples:**

35

36

```scala

37

import org.apache.spark.kafka010.KafkaRedactionUtil

38

39

// Redact sensitive configuration parameters

40

val sensitiveParams = Seq(

41

("bootstrap.servers", "kafka1:9092,kafka2:9092"),

42

("sasl.jaas.config", "ScramLoginModule required username=\"user\" password=\"secret123\";"),

43

("ssl.keystore.password", "keystore-password"),

44

("security.protocol", "SASL_SSL")

45

)

46

47

val redactedParams = KafkaRedactionUtil.redactParams(sensitiveParams)

48

redactedParams.foreach { case (key, value) =>

49

println(s"$key = $value")

50

}

51

// Output:

52

// bootstrap.servers = kafka1:9092,kafka2:9092

53

// sasl.jaas.config = ScramLoginModule required username="user" password="[REDACTED]";

54

// ssl.keystore.password = [REDACTED]

55

// security.protocol = SASL_SSL

56

57

// Redact JAAS parameters specifically

58

val jaasConfig = """ScramLoginModule required

59

|username="myuser"

60

|password="mypassword123";""".stripMargin.replace("\n", " ")

61

62

val redactedJaas = KafkaRedactionUtil.redactJaasParam(jaasConfig)

63

println(redactedJaas)

64

// Output: ScramLoginModule required username="myuser" password="[REDACTED]";

65

```

66

67

### Parameter Redaction

68

69

Redacts sensitive parameters in configuration sequences using Spark's built-in redaction patterns.

70

71

```scala { .api }

72

/**

73

* Redacts sensitive parameters in configuration sequences

74

* Uses Spark's SECRET_REDACTION_PATTERN configuration to identify sensitive keys

75

* Applies special handling for SASL JAAS configuration parameters

76

*

77

* @param params Sequence of key-value pairs where values may contain sensitive information

78

* @return Sequence of key-value pairs with sensitive values replaced by redaction placeholder

79

*/

80

def redactParams(params: Seq[(String, Object)]): Seq[(String, String)]

81

```

82

83

### JAAS Parameter Redaction

84

85

Specifically redacts password fields in JAAS configuration strings.

86

87

```scala { .api }

88

/**

89

* Redacts JAAS configuration passwords specifically

90

* Uses regex pattern matching to find and replace password values in JAAS parameter strings

91

* Preserves the structure of JAAS configuration while hiding sensitive password values

92

*

93

* @param param JAAS parameter string potentially containing password="value" patterns

94

* @return JAAS parameter string with password values replaced by redaction placeholder

95

*/

96

def redactJaasParam(param: String): String

97

```

98

99

## Error Handling

100

101

### KafkaTokenProviderExceptions

102

103

Object providing factory methods for creating standardized exceptions related to Kafka token provider operations.

104

105

```scala { .api }

106

object KafkaTokenProviderExceptions {

107

108

/**

109

* Creates exception for missing Kafka configuration options

110

* Generates standardized SparkException with appropriate error class and message

111

*

112

* @param option Name of the missing Kafka configuration option

113

* @return SparkException with error class MISSING_KAFKA_OPTION

114

*/

115

def missingKafkaOption(option: String): SparkException

116

}

117

```

118

119

**Usage Examples:**

120

121

```scala

122

import org.apache.spark.kafka010.KafkaTokenProviderExceptions

123

import org.apache.kafka.clients.CommonClientConfigs

124

125

// Validate required configuration options

126

val bootstrapServers = kafkaParams.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)

127

128

if (bootstrapServers.isEmpty) {

129

throw KafkaTokenProviderExceptions.missingKafkaOption(

130

CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG

131

)

132

}

133

134

// This creates a SparkException with:

135

// - Error class: "MISSING_KAFKA_OPTION"

136

// - Message: contextual error message about the missing option

137

// - Parameters: Map containing the missing option name

138

```

139

140

### Missing Option Exception

141

142

Creates standardized exceptions for missing Kafka configuration options.

143

144

```scala { .api }

145

/**

146

* Creates exception for missing Kafka configuration options

147

* Generates a SparkException with:

148

* - Error class: "MISSING_KAFKA_OPTION"

149

* - Descriptive error message loaded from error conditions JSON

150

* - Message parameters including the missing option name

151

*

152

* @param option Name of the missing Kafka configuration option (e.g., "bootstrap.servers")

153

* @return SparkException configured with appropriate error class and parameters

154

*/

155

def missingKafkaOption(option: String): SparkException

156

```

157

158

## Security and Logging

159

160

These utility classes are designed to support secure logging practices:

161

162

- **Redaction**: Prevents sensitive information from appearing in logs or debug output

163

- **Standardized Errors**: Provides consistent error handling and messaging

164

- **Configuration Safety**: Ensures safe handling of authentication parameters

165

166

**Security Considerations:**

167

168

```scala

169

// Always use redaction utilities when logging configuration

170

val configToLog = KafkaRedactionUtil.redactParams(kafkaConfig.toSeq)

171

logger.info(s"Kafka configuration: $configToLog")

172

173

// Never log raw JAAS configurations - always redact passwords

174

val safeJaasConfig = KafkaRedactionUtil.redactJaasParam(jaasConfig)

175

logger.debug(s"JAAS configuration: $safeJaasConfig")

176

```