0
# Utility Classes
1
2
Additional utility classes for configuration redaction and error handling.
3
4
## Capabilities
5
6
### KafkaRedactionUtil
7
8
Object providing utilities for redacting sensitive configuration parameters in logging and debugging output.
9
10
```scala { .api }
11
object KafkaRedactionUtil extends Logging {
12
13
/**
14
* Redacts sensitive parameters in configuration sequences
15
* Applies redaction patterns to sensitive keys like passwords, tokens, and credentials
16
* Special handling for SASL JAAS configuration parameters
17
*
18
* @param params Sequence of key-value pairs to redact
19
* @return Sequence of redacted key-value pairs with sensitive values replaced
20
*/
21
def redactParams(params: Seq[(String, Object)]): Seq[(String, String)]
22
23
/**
24
* Redacts JAAS configuration passwords specifically
25
* Replaces password values in JAAS parameter strings with redaction placeholder
26
*
27
* @param param JAAS parameter string potentially containing passwords
28
* @return JAAS parameter string with passwords redacted
29
*/
30
def redactJaasParam(param: String): String
31
}
32
```
33
34
**Usage Examples:**
35
36
```scala
37
import org.apache.spark.kafka010.KafkaRedactionUtil
38
39
// Redact sensitive configuration parameters
40
val sensitiveParams = Seq(
41
("bootstrap.servers", "kafka1:9092,kafka2:9092"),
42
("sasl.jaas.config", "ScramLoginModule required username=\"user\" password=\"secret123\";"),
43
("ssl.keystore.password", "keystore-password"),
44
("security.protocol", "SASL_SSL")
45
)
46
47
val redactedParams = KafkaRedactionUtil.redactParams(sensitiveParams)
48
redactedParams.foreach { case (key, value) =>
49
println(s"$key = $value")
50
}
51
// Output:
52
// bootstrap.servers = kafka1:9092,kafka2:9092
53
// sasl.jaas.config = ScramLoginModule required username="user" password="[REDACTED]";
54
// ssl.keystore.password = [REDACTED]
55
// security.protocol = SASL_SSL
56
57
// Redact JAAS parameters specifically
58
val jaasConfig = """ScramLoginModule required
59
|username="myuser"
60
|password="mypassword123";""".stripMargin.replace("\n", " ")
61
62
val redactedJaas = KafkaRedactionUtil.redactJaasParam(jaasConfig)
63
println(redactedJaas)
64
// Output: ScramLoginModule required username="myuser" password="[REDACTED]";
65
```
66
67
### Parameter Redaction
68
69
Redacts sensitive parameters in configuration sequences using Spark's built-in redaction patterns.
70
71
```scala { .api }
72
/**
73
* Redacts sensitive parameters in configuration sequences
74
* Uses Spark's SECRET_REDACTION_PATTERN configuration to identify sensitive keys
75
* Applies special handling for SASL JAAS configuration parameters
76
*
77
* @param params Sequence of key-value pairs where values may contain sensitive information
78
* @return Sequence of key-value pairs with sensitive values replaced by redaction placeholder
79
*/
80
def redactParams(params: Seq[(String, Object)]): Seq[(String, String)]
81
```
82
83
### JAAS Parameter Redaction
84
85
Specifically redacts password fields in JAAS configuration strings.
86
87
```scala { .api }
88
/**
89
* Redacts JAAS configuration passwords specifically
90
* Uses regex pattern matching to find and replace password values in JAAS parameter strings
91
* Preserves the structure of JAAS configuration while hiding sensitive password values
92
*
93
* @param param JAAS parameter string potentially containing password="value" patterns
94
* @return JAAS parameter string with password values replaced by redaction placeholder
95
*/
96
def redactJaasParam(param: String): String
97
```
98
99
## Error Handling
100
101
### KafkaTokenProviderExceptions
102
103
Object providing factory methods for creating standardized exceptions related to Kafka token provider operations.
104
105
```scala { .api }
106
object KafkaTokenProviderExceptions {
107
108
/**
109
* Creates exception for missing Kafka configuration options
110
* Generates standardized SparkException with appropriate error class and message
111
*
112
* @param option Name of the missing Kafka configuration option
113
* @return SparkException with error class MISSING_KAFKA_OPTION
114
*/
115
def missingKafkaOption(option: String): SparkException
116
}
117
```
118
119
**Usage Examples:**
120
121
```scala
122
import org.apache.spark.kafka010.KafkaTokenProviderExceptions
123
import org.apache.kafka.clients.CommonClientConfigs
124
125
// Validate required configuration options
126
val bootstrapServers = kafkaParams.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
127
128
if (bootstrapServers.isEmpty) {
129
throw KafkaTokenProviderExceptions.missingKafkaOption(
130
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG
131
)
132
}
133
134
// This creates a SparkException with:
135
// - Error class: "MISSING_KAFKA_OPTION"
136
// - Message: contextual error message about the missing option
137
// - Parameters: Map containing the missing option name
138
```
139
140
### Missing Option Exception
141
142
Creates standardized exceptions for missing Kafka configuration options.
143
144
```scala { .api }
145
/**
146
* Creates exception for missing Kafka configuration options
147
* Generates a SparkException with:
148
* - Error class: "MISSING_KAFKA_OPTION"
149
* - Descriptive error message loaded from error conditions JSON
150
* - Message parameters including the missing option name
151
*
152
* @param option Name of the missing Kafka configuration option (e.g., "bootstrap.servers")
153
* @return SparkException configured with appropriate error class and parameters
154
*/
155
def missingKafkaOption(option: String): SparkException
156
```
157
158
## Security and Logging
159
160
These utility classes are designed to support secure logging practices:
161
162
- **Redaction**: Prevents sensitive information from appearing in logs or debug output
163
- **Standardized Errors**: Provides consistent error handling and messaging
164
- **Configuration Safety**: Ensures safe handling of authentication parameters
165
166
**Security Considerations:**
167
168
```scala
169
// Always use redaction utilities when logging configuration
170
val configToLog = KafkaRedactionUtil.redactParams(kafkaConfig.toSeq)
171
logger.info(s"Kafka configuration: $configToLog")
172
173
// Never log raw JAAS configurations - always redact passwords
174
val safeJaasConfig = KafkaRedactionUtil.redactJaasParam(jaasConfig)
175
logger.debug(s"JAAS configuration: $safeJaasConfig")
176
```