0
# AWS Credentials
1
2
Flexible credential providers supporting default AWS credentials, basic key/secret authentication, and STS role assumption for secure Kinesis access.
3
4
## Capabilities
5
6
### SparkAWSCredentials Trait
7
8
Base trait for all AWS credential providers used throughout the Kinesis integration.
9
10
```scala { .api }
11
/**
12
* Serializable interface providing AWS credentials for Kinesis Client Library
13
*/
14
sealed trait SparkAWSCredentials extends Serializable {
15
/** Returns AWSCredentialsProvider for KCL authentication */
16
def provider: AWSCredentialsProvider
17
}
18
```
19
20
### Default Credentials
21
22
Uses AWS DefaultAWSCredentialsProviderChain for automatic credential discovery.
23
24
```scala { .api }
25
/**
26
* Uses DefaultAWSCredentialsProviderChain for authentication
27
* Checks environment variables, system properties, credential files, and IAM roles
28
*/
29
case object DefaultCredentials extends SparkAWSCredentials {
30
def provider: AWSCredentialsProvider
31
}
32
```
33
34
**Usage Example:**
35
36
```scala
37
import org.apache.spark.streaming.kinesis.DefaultCredentials
38
39
// Use default credential chain (recommended for production)
40
val stream = KinesisInputDStream.builder
41
.streamingContext(ssc)
42
.streamName("my-stream")
43
.checkpointAppName("my-app")
44
.kinesisCredentials(DefaultCredentials)
45
.build()
46
```
47
48
### Basic Credentials
49
50
Static AWS access key and secret key authentication.
51
52
```scala { .api }
53
/**
54
* Static AWS keypair credentials with fallback to default chain
55
* @param awsAccessKeyId AWS access key ID
56
* @param awsSecretKey AWS secret access key
57
*/
58
case class BasicCredentials(
59
awsAccessKeyId: String,
60
awsSecretKey: String
61
) extends SparkAWSCredentials {
62
def provider: AWSCredentialsProvider
63
}
64
```
65
66
**Usage Example:**
67
68
```scala
69
import org.apache.spark.streaming.kinesis.BasicCredentials
70
71
// Use static credentials (not recommended for production)
72
val credentials = BasicCredentials("AKIAIOSFODNN7EXAMPLE", "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
73
74
val stream = KinesisInputDStream.builder
75
.streamingContext(ssc)
76
.streamName("my-stream")
77
.checkpointAppName("my-app")
78
.kinesisCredentials(credentials)
79
.build()
80
```
81
82
### STS Role Credentials
83
84
AWS Security Token Service (STS) role assumption for cross-account or temporary access.
85
86
```scala { .api }
87
/**
88
* STS role assumption credentials for cross-account access
89
* @param stsRoleArn ARN of IAM role to assume
90
* @param stsSessionName Session name for STS session
91
* @param stsExternalId Optional external ID for role trust policy validation
92
* @param longLivedCreds Base credentials for STS authentication
93
*/
94
case class STSCredentials(
95
stsRoleArn: String,
96
stsSessionName: String,
97
stsExternalId: Option[String] = None,
98
longLivedCreds: SparkAWSCredentials = DefaultCredentials
99
) extends SparkAWSCredentials {
100
def provider: AWSCredentialsProvider
101
}
102
```
103
104
**Usage Examples:**
105
106
```scala
107
import org.apache.spark.streaming.kinesis.{STSCredentials, BasicCredentials, DefaultCredentials}
108
109
// STS with default credentials for role assumption
110
val stsCredentials = STSCredentials(
111
stsRoleArn = "arn:aws:iam::123456789012:role/CrossAccountKinesisRole",
112
stsSessionName = "spark-kinesis-session"
113
)
114
115
// STS with basic credentials and external ID
116
val stsWithExternalId = STSCredentials(
117
stsRoleArn = "arn:aws:iam::123456789012:role/CrossAccountKinesisRole",
118
stsSessionName = "spark-kinesis-session",
119
stsExternalId = Some("unique-external-id"),
120
longLivedCreds = BasicCredentials("ACCESS_KEY", "SECRET_KEY")
121
)
122
123
val stream = KinesisInputDStream.builder
124
.streamingContext(ssc)
125
.streamName("cross-account-stream")
126
.checkpointAppName("cross-account-app")
127
.kinesisCredentials(stsCredentials)
128
.build()
129
```
130
131
### Credentials Builder
132
133
Fluent builder for constructing SparkAWSCredentials instances with validation.
134
135
```scala { .api }
136
object SparkAWSCredentials {
137
/** Creates new credentials builder */
138
def builder: Builder
139
140
class Builder {
141
/** Configure basic AWS keypair credentials */
142
def basicCredentials(accessKeyId: String, secretKey: String): Builder
143
144
/** Configure STS role assumption credentials */
145
def stsCredentials(roleArn: String, sessionName: String): Builder
146
147
/** Configure STS credentials with external ID validation */
148
def stsCredentials(roleArn: String, sessionName: String, externalId: String): Builder
149
150
/** Build the configured credentials instance */
151
def build(): SparkAWSCredentials
152
}
153
}
154
```
155
156
**Builder Usage Examples:**
157
158
```scala
159
import org.apache.spark.streaming.kinesis.SparkAWSCredentials
160
161
// Basic credentials via builder
162
val basicCreds = SparkAWSCredentials.builder
163
.basicCredentials("ACCESS_KEY", "SECRET_KEY")
164
.build()
165
166
// STS credentials via builder
167
val stsCreds = SparkAWSCredentials.builder
168
.basicCredentials("ACCESS_KEY", "SECRET_KEY") // Long-lived creds
169
.stsCredentials("arn:aws:iam::123456789012:role/KinesisRole", "spark-session")
170
.build()
171
172
// STS with external ID via builder
173
val stsWithExternalId = SparkAWSCredentials.builder
174
.stsCredentials(
175
"arn:aws:iam::123456789012:role/KinesisRole",
176
"spark-session",
177
"external-id-123"
178
)
179
.build()
180
```
181
182
### Multi-Service Credentials
183
184
Different credential providers can be used for different AWS services (Kinesis, DynamoDB, CloudWatch).
185
186
**Usage Example:**
187
188
```scala
189
import org.apache.spark.streaming.kinesis.{SparkAWSCredentials, BasicCredentials, STSCredentials}
190
191
// Use different credentials for different services
192
val kinesisCredentials = BasicCredentials("KINESIS_ACCESS_KEY", "KINESIS_SECRET_KEY")
193
val dynamoDbCredentials = STSCredentials(
194
"arn:aws:iam::123456789012:role/DynamoDBRole",
195
"dynamodb-session"
196
)
197
198
val stream = KinesisInputDStream.builder
199
.streamingContext(ssc)
200
.streamName("my-stream")
201
.checkpointAppName("my-app")
202
.kinesisCredentials(kinesisCredentials) // For Kinesis access
203
.dynamoDBCredentials(dynamoDbCredentials) // For checkpoint storage
204
.cloudWatchCredentials(kinesisCredentials) // For metrics publishing
205
.build()
206
```
207
208
### Security Best Practices
209
210
1. **Avoid hardcoded credentials** - Use `DefaultCredentials` with IAM roles when possible
211
2. **Use STS for cross-account access** - Implement role assumption for security isolation
212
3. **Secure checkpoint storage** - Ensure DynamoDB credentials have minimal required permissions
213
4. **External ID validation** - Use external IDs in STS trust policies for additional security
214
5. **Credential rotation** - STS credentials automatically refresh; basic credentials require manual rotation
215
216
### Error Handling
217
218
Credential providers can fail with:
219
220
- `IllegalArgumentException` - For null or invalid credential parameters
221
- `AmazonClientException` - For AWS authentication failures
222
- `AmazonServiceException` - For STS service errors during role assumption
223
- `SecurityException` - For insufficient permissions on assumed roles
224
225
**Error Handling Example:**
226
227
```scala
228
import com.amazonaws.AmazonServiceException
229
import org.apache.spark.streaming.kinesis.STSCredentials
230
231
try {
232
val credentials = STSCredentials(
233
"arn:aws:iam::123456789012:role/NonExistentRole",
234
"test-session"
235
)
236
237
val stream = KinesisInputDStream.builder
238
.streamingContext(ssc)
239
.streamName("my-stream")
240
.checkpointAppName("my-app")
241
.kinesisCredentials(credentials)
242
.build()
243
244
} catch {
245
case e: AmazonServiceException =>
246
println(s"AWS service error: ${e.getMessage}")
247
// Handle credential/permission errors
248
case e: IllegalArgumentException =>
249
println(s"Invalid credential configuration: ${e.getMessage}")
250
// Handle configuration errors
251
}
252
```