0
# AWS Credentials Configuration
1
2
Flexible credential management supporting default provider chains, basic credentials, and STS assume role patterns for accessing Kinesis, DynamoDB, and CloudWatch services.
3
4
## Credential Types
5
6
```scala { .api }
7
package org.apache.spark.streaming.kinesis
8
9
sealed trait SparkAWSCredentials extends Serializable {
10
def provider: AWSCredentialsProvider
11
}
12
13
case object DefaultCredentials extends SparkAWSCredentials
14
15
case class BasicCredentials(
16
awsAccessKeyId: String,
17
awsSecretKey: String
18
) extends SparkAWSCredentials
19
20
case class STSCredentials(
21
stsRoleArn: String,
22
stsSessionName: String,
23
stsExternalId: Option[String] = None,
24
longLivedCreds: SparkAWSCredentials = DefaultCredentials
25
) extends SparkAWSCredentials
26
```
27
28
## Builder API
29
30
```scala { .api }
31
object SparkAWSCredentials {
32
def builder: Builder
33
}
34
35
class Builder {
36
def basicCredentials(accessKeyId: String, secretKey: String): Builder
37
def stsCredentials(roleArn: String, sessionName: String): Builder
38
def stsCredentials(roleArn: String, sessionName: String, externalId: String): Builder
39
def build(): SparkAWSCredentials
40
}
41
```
42
43
## Default Credentials
44
45
Uses the default AWS credentials provider chain, which checks credentials in this order:
46
1. Environment variables (`AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`)
47
2. Java system properties (`aws.accessKeyId`, `aws.secretKey`)
48
3. Credential profiles file (`~/.aws/credentials`)
49
4. EC2 instance profile credentials
50
51
```scala
52
import org.apache.spark.streaming.kinesis.{DefaultCredentials, KinesisInputDStream}
53
54
val stream = KinesisInputDStream.builder
55
.streamingContext(ssc)
56
.streamName("my-stream")
57
.checkpointAppName("my-app")
58
.kinesisCredentials(DefaultCredentials)
59
.build()
60
```
61
62
**Use Case:** Recommended for production environments with proper IAM roles and policies.
63
64
## Basic Credentials
65
66
Use explicit AWS access key ID and secret access key.
67
68
```scala
69
import org.apache.spark.streaming.kinesis.SparkAWSCredentials
70
71
val credentials = SparkAWSCredentials.builder
72
.basicCredentials("AKIAIOSFODNN7EXAMPLE", "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
73
.build()
74
75
val stream = KinesisInputDStream.builder
76
.streamingContext(ssc)
77
.streamName("my-stream")
78
.checkpointAppName("my-app")
79
.kinesisCredentials(credentials)
80
.build()
81
```
82
83
**Security Warning:** Basic credentials will be saved in DStream checkpoints if checkpointing is enabled. Ensure your checkpoint directory is secure.
84
85
## STS Assume Role Credentials
86
87
Use AWS Security Token Service (STS) to assume an IAM role for temporary credentials.
88
89
### Basic STS Usage
90
91
```scala
92
import org.apache.spark.streaming.kinesis.SparkAWSCredentials
93
94
val credentials = SparkAWSCredentials.builder
95
.stsCredentials(
96
roleArn = "arn:aws:iam::123456789012:role/KinesisAccessRole",
97
sessionName = "spark-kinesis-session"
98
)
99
.build()
100
101
val stream = KinesisInputDStream.builder
102
.streamingContext(ssc)
103
.streamName("my-stream")
104
.checkpointAppName("my-app")
105
.kinesisCredentials(credentials)
106
.build()
107
```
108
109
### STS with External ID
110
111
Use external ID for additional security when crossing account boundaries.
112
113
```scala
114
val credentials = SparkAWSCredentials.builder
115
.stsCredentials(
116
roleArn = "arn:aws:iam::123456789012:role/CrossAccountKinesisRole",
117
sessionName = "spark-kinesis-session",
118
externalId = "unique-external-identifier"
119
)
120
.build()
121
```
122
123
### STS with Basic Credentials
124
125
Combine STS with basic credentials for the initial authentication.
126
127
```scala
128
val credentials = SparkAWSCredentials.builder
129
.basicCredentials("AKIAIOSFODNN7EXAMPLE", "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
130
.stsCredentials(
131
roleArn = "arn:aws:iam::123456789012:role/KinesisAccessRole",
132
sessionName = "spark-kinesis-session"
133
)
134
.build()
135
```
136
137
## Service-Specific Credentials
138
139
Configure different credentials for different AWS services.
140
141
```scala
142
val kinesisCredentials = SparkAWSCredentials.builder
143
.basicCredentials("kinesis-access-key", "kinesis-secret-key")
144
.build()
145
146
val dynamoCredentials = SparkAWSCredentials.builder
147
.stsCredentials("arn:aws:iam::123456789012:role/DynamoDBRole", "dynamo-session")
148
.build()
149
150
val cloudWatchCredentials = SparkAWSCredentials.builder
151
.basicCredentials("cloudwatch-access-key", "cloudwatch-secret-key")
152
.build()
153
154
val stream = KinesisInputDStream.builder
155
.streamingContext(ssc)
156
.streamName("my-stream")
157
.checkpointAppName("my-app")
158
.kinesisCredentials(kinesisCredentials)
159
.dynamoDBCredentials(dynamoCredentials)
160
.cloudWatchCredentials(cloudWatchCredentials)
161
.build()
162
```
163
164
**Default Behavior:** If not specified, DynamoDB and CloudWatch credentials default to the same credentials used for Kinesis.
165
166
## Java API Usage
167
168
```java
169
import org.apache.spark.streaming.kinesis.SparkAWSCredentials;
170
171
// Default credentials
172
SparkAWSCredentials defaultCreds = DefaultCredentials.MODULE$;
173
174
// Basic credentials
175
SparkAWSCredentials basicCreds = SparkAWSCredentials.builder()
176
.basicCredentials("access-key", "secret-key")
177
.build();
178
179
// STS credentials
180
SparkAWSCredentials stsCreds = SparkAWSCredentials.builder()
181
.stsCredentials("arn:aws:iam::123456789012:role/MyRole", "my-session")
182
.build();
183
184
// Use with stream builder
185
KinesisInputDStream<byte[]> stream = KinesisInputDStream.builder()
186
.streamingContext(jssc)
187
.streamName("my-stream")
188
.checkpointAppName("my-app")
189
.kinesisCredentials(basicCreds)
190
.build();
191
```
192
193
## Error Handling
194
195
### Invalid Credentials
196
```scala
197
import org.apache.spark.streaming.kinesis.BasicCredentials
198
199
// This will fall back to DefaultCredentials if invalid
200
val credentials = BasicCredentials(null, "secret-key") // Invalid: null access key
201
```
202
203
When `BasicCredentials` cannot construct valid credentials, it logs a warning and falls back to `DefaultCredentials`.
204
205
### STS Validation
206
```scala
207
// All STS parameters must be provided together or all must be null
208
val credentials = SparkAWSCredentials.builder
209
.stsCredentials("arn:aws:iam::123456789012:role/MyRole", null) // Invalid: missing session name
210
.build() // Throws IllegalArgumentException
211
```
212
213
## Security Best Practices
214
215
### Use IAM Roles
216
Prefer IAM roles over hardcoded credentials:
217
```scala
218
// Good: Uses IAM instance profile or task role
219
val stream = KinesisInputDStream.builder
220
.streamingContext(ssc)
221
.streamName("my-stream")
222
.checkpointAppName("my-app")
223
// No explicit credentials - uses default provider chain
224
.build()
225
```
226
227
### Least Privilege Principle
228
Create IAM policies with minimal required permissions:
229
230
```json
231
{
232
"Version": "2012-10-17",
233
"Statement": [
234
{
235
"Effect": "Allow",
236
"Action": [
237
"kinesis:DescribeStream",
238
"kinesis:GetShardIterator",
239
"kinesis:GetRecords",
240
"kinesis:ListShards"
241
],
242
"Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream"
243
},
244
{
245
"Effect": "Allow",
246
"Action": [
247
"dynamodb:CreateTable",
248
"dynamodb:DescribeTable",
249
"dynamodb:GetItem",
250
"dynamodb:PutItem",
251
"dynamodb:UpdateItem",
252
"dynamodb:DeleteItem"
253
],
254
"Resource": "arn:aws:dynamodb:us-east-1:123456789012:table/my-app"
255
}
256
]
257
}
258
```
259
260
### Secure Checkpoints
261
When using basic credentials, ensure checkpoint directories are secure:
262
```scala
263
// Ensure checkpoint directory has restricted access
264
ssc.checkpoint("s3://secure-bucket/checkpoints/")
265
```
266
267
### Credential Rotation
268
Use STS credentials for automatic credential rotation:
269
```scala
270
val credentials = SparkAWSCredentials.builder
271
.stsCredentials(
272
roleArn = "arn:aws:iam::123456789012:role/KinesisRole",
273
sessionName = "spark-session-" + System.currentTimeMillis()
274
)
275
.build()
276
```