0
# AWS Credentials Management
1
2
The AWS credentials system provides flexible authentication for accessing Kinesis, DynamoDB, and CloudWatch services. It supports multiple authentication methods including default provider chains, basic access keys, and STS assume role for cross-account access.
3
4
## Core API
5
6
### SparkAWSCredentials Interface
7
8
The base interface for all credential providers.
9
10
```scala { .api }
11
sealed trait SparkAWSCredentials extends Serializable {
12
def provider: AWSCredentialsProvider
13
}
14
```
15
16
### Credential Implementations
17
18
```scala { .api }
19
// Uses AWS default credential provider chain
20
case object DefaultCredentials extends SparkAWSCredentials {
21
def provider: AWSCredentialsProvider
22
}
23
24
// Uses basic AWS access key and secret key
25
case class BasicCredentials(
26
awsAccessKeyId: String,
27
awsSecretKey: String
28
) extends SparkAWSCredentials {
29
def provider: AWSCredentialsProvider
30
}
31
32
// Uses STS assume role for temporary credentials
33
case class STSCredentials(
34
stsRoleArn: String,
35
stsSessionName: String,
36
stsExternalId: Option[String] = None,
37
longLivedCreds: SparkAWSCredentials = DefaultCredentials
38
) extends SparkAWSCredentials {
39
def provider: AWSCredentialsProvider
40
}
41
```
42
43
### Builder API
44
45
```scala { .api }
46
object SparkAWSCredentials {
47
def builder: SparkAWSCredentials.Builder
48
}
49
50
class Builder {
51
def basicCredentials(accessKeyId: String, secretKey: String): Builder
52
def stsCredentials(roleArn: String, sessionName: String): Builder
53
def stsCredentials(roleArn: String, sessionName: String, externalId: String): Builder
54
def build(): SparkAWSCredentials
55
}
56
```
57
58
## Default Credentials
59
60
Uses the AWS default credential provider chain, which checks credentials in this order:
61
62
1. Environment variables (`AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`)
63
2. Java system properties (`aws.accessKeyId`, `aws.secretKey`)
64
3. Credential profiles file (`~/.aws/credentials`)
65
4. Amazon ECS container credentials
66
5. Instance profile credentials (EC2/ECS)
67
68
```scala
69
import org.apache.spark.streaming.kinesis.{DefaultCredentials, KinesisInputDStream}
70
71
val stream = KinesisInputDStream.builder
72
.streamingContext(ssc)
73
.streamName("my-stream")
74
.checkpointAppName("my-app")
75
.kinesisCredentials(DefaultCredentials) // Explicit, but this is the default
76
.build()
77
```
78
79
Or using the builder:
80
81
```scala
82
val credentials = SparkAWSCredentials.builder.build() // Creates DefaultCredentials
83
84
val stream = KinesisInputDStream.builder
85
.kinesisCredentials(credentials)
86
// ... other configuration
87
.build()
88
```
89
90
## Basic Credentials
91
92
Uses explicit AWS access key ID and secret access key. **Warning**: These credentials will be saved in DStream checkpoints, so ensure your checkpoint directory is secure.
93
94
```scala
95
import org.apache.spark.streaming.kinesis.BasicCredentials
96
97
val credentials = BasicCredentials("AKIAIOSFODNN7EXAMPLE", "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
98
99
val stream = KinesisInputDStream.builder
100
.streamingContext(ssc)
101
.streamName("my-stream")
102
.checkpointAppName("my-app")
103
.kinesisCredentials(credentials)
104
.build()
105
```
106
107
Using the builder pattern (recommended):
108
109
```scala
110
val credentials = SparkAWSCredentials.builder
111
.basicCredentials("AKIAIOSFODNN7EXAMPLE", "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
112
.build()
113
114
val stream = KinesisInputDStream.builder
115
.kinesisCredentials(credentials)
116
// ... other configuration
117
.build()
118
```
119
120
### Error Handling
121
122
BasicCredentials will fall back to the default provider chain if the provided credentials are invalid:
123
124
```scala
125
// If credentials are null or invalid, falls back to DefaultCredentials
126
val credentials = BasicCredentials(null, "invalid")
127
// This will log a warning and use DefaultCredentials instead
128
```
129
130
## STS Assume Role Credentials
131
132
Uses AWS Security Token Service (STS) to assume an IAM role for temporary credentials. This is useful for cross-account access or enhanced security.
133
134
### Basic STS Usage
135
136
```scala
137
import org.apache.spark.streaming.kinesis.STSCredentials
138
139
val credentials = STSCredentials(
140
stsRoleArn = "arn:aws:iam::123456789012:role/KinesisAccessRole",
141
stsSessionName = "spark-kinesis-session"
142
)
143
144
val stream = KinesisInputDStream.builder
145
.kinesisCredentials(credentials)
146
// ... other configuration
147
.build()
148
```
149
150
### STS with External ID
151
152
For roles that require an external ID for additional security:
153
154
```scala
155
val credentials = STSCredentials(
156
stsRoleArn = "arn:aws:iam::123456789012:role/CrossAccountRole",
157
stsSessionName = "spark-session",
158
stsExternalId = Some("unique-external-id")
159
)
160
```
161
162
### STS with Custom Long-Lived Credentials
163
164
Specify different long-lived credentials for assuming the role:
165
166
```scala
167
val longLivedCreds = BasicCredentials("access-key", "secret-key")
168
169
val stsCredentials = STSCredentials(
170
stsRoleArn = "arn:aws:iam::123456789012:role/KinesisRole",
171
stsSessionName = "my-session",
172
longLivedCreds = longLivedCreds
173
)
174
```
175
176
### Using the Builder Pattern
177
178
```scala
179
// Basic STS
180
val credentials = SparkAWSCredentials.builder
181
.stsCredentials("arn:aws:iam::123456789012:role/KinesisRole", "my-session")
182
.build()
183
184
// STS with external ID
185
val credentialsWithExternalId = SparkAWSCredentials.builder
186
.basicCredentials("access-key", "secret-key") // Long-lived credentials
187
.stsCredentials("arn:aws:iam::123456789012:role/CrossAccountRole", "session", "external-id")
188
.build()
189
```
190
191
## Service-Specific Credentials
192
193
You can configure different credentials for different AWS services:
194
195
```scala
196
val kinesisCredentials = SparkAWSCredentials.builder
197
.basicCredentials("kinesis-access-key", "kinesis-secret")
198
.build()
199
200
val dynamoCredentials = SparkAWSCredentials.builder
201
.stsCredentials("arn:aws:iam::123456789012:role/DynamoRole", "dynamo-session")
202
.build()
203
204
val stream = KinesisInputDStream.builder
205
.streamingContext(ssc)
206
.streamName("my-stream")
207
.checkpointAppName("my-app")
208
.kinesisCredentials(kinesisCredentials) // For Kinesis API calls
209
.dynamoDBCredentials(dynamoCredentials) // For DynamoDB checkpointing
210
.cloudWatchCredentials(DefaultCredentials) // For CloudWatch metrics
211
.build()
212
```
213
214
If you don't specify `dynamoDBCredentials` or `cloudWatchCredentials`, they will default to the same credentials as `kinesisCredentials`.
215
216
## Best Practices
217
218
### Security Recommendations
219
220
1. **Use IAM roles when possible**: Prefer STS assume role or instance profiles over hardcoded keys
221
2. **Secure checkpoint directories**: Basic credentials are stored in checkpoints
222
3. **Rotate credentials regularly**: Use temporary credentials when possible
223
4. **Use least privilege**: Grant only the minimum required permissions
224
225
### Required IAM Permissions
226
227
#### For Kinesis:
228
```json
229
{
230
"Version": "2012-10-17",
231
"Statement": [
232
{
233
"Effect": "Allow",
234
"Action": [
235
"kinesis:DescribeStream",
236
"kinesis:GetShardIterator",
237
"kinesis:GetRecords",
238
"kinesis:ListShards"
239
],
240
"Resource": "arn:aws:kinesis:*:*:stream/your-stream-name"
241
}
242
]
243
}
244
```
245
246
#### For DynamoDB (checkpointing):
247
```json
248
{
249
"Version": "2012-10-17",
250
"Statement": [
251
{
252
"Effect": "Allow",
253
"Action": [
254
"dynamodb:CreateTable",
255
"dynamodb:DescribeTable",
256
"dynamodb:GetItem",
257
"dynamodb:PutItem",
258
"dynamodb:UpdateItem",
259
"dynamodb:DeleteItem"
260
],
261
"Resource": "arn:aws:dynamodb:*:*:table/your-checkpoint-app-name"
262
}
263
]
264
}
265
```
266
267
#### For CloudWatch (metrics):
268
```json
269
{
270
"Version": "2012-10-17",
271
"Statement": [
272
{
273
"Effect": "Allow",
274
"Action": [
275
"cloudwatch:PutMetricData"
276
],
277
"Resource": "*"
278
}
279
]
280
}
281
```
282
283
### Example: Production Configuration
284
285
```scala
286
// Production setup with assume role
287
val productionCredentials = SparkAWSCredentials.builder
288
.stsCredentials(
289
roleArn = "arn:aws:iam::123456789012:role/SparkKinesisRole",
290
sessionName = s"spark-kinesis-${java.util.UUID.randomUUID()}"
291
)
292
.build()
293
294
val stream = KinesisInputDStream.builder
295
.streamingContext(ssc)
296
.streamName("production-data-stream")
297
.checkpointAppName("production-spark-consumer")
298
.regionName("us-west-2")
299
.kinesisCredentials(productionCredentials)
300
.dynamoDBCredentials(productionCredentials)
301
.cloudWatchCredentials(productionCredentials)
302
.build()
303
```