0
# Credential Management
1
2
AWS credential handling for secure access to Kinesis streams, DynamoDB checkpointing, and CloudWatch metrics. Supports both automatic credential discovery and explicit credential specification.
3
4
## Credential Options
5
6
### Default Credential Provider Chain
7
8
The recommended approach uses AWS DefaultAWSCredentialsProviderChain which automatically discovers credentials from:
9
10
1. Environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
11
2. Java system properties (aws.accessKeyId, aws.secretKey)
12
3. Credential profiles file (~/.aws/credentials)
13
4. Instance profile credentials (for EC2 instances)
14
5. Container credentials (for ECS containers)
15
16
All stream creation methods without explicit credentials use this automatic discovery.
17
18
### Explicit Credential Specification
19
20
For applications requiring specific credential control, all createStream methods have overloaded versions accepting explicit AWS credentials.
21
22
**Security Note**: Explicit credentials are stored in DStream checkpoints. Ensure checkpoint directories are properly secured.
23
24
## Scala API with Credentials
25
26
### Generic Stream with Credentials
27
28
```scala { .api }
29
def createStream[T: ClassTag](
30
ssc: StreamingContext,
31
kinesisAppName: String,
32
streamName: String,
33
endpointUrl: String,
34
regionName: String,
35
initialPositionInStream: InitialPositionInStream,
36
checkpointInterval: Duration,
37
storageLevel: StorageLevel,
38
messageHandler: Record => T,
39
awsAccessKeyId: String,
40
awsSecretKey: String
41
): ReceiverInputDStream[T]
42
```
43
44
**Usage Example:**
45
46
```scala
47
val credentialedStream = KinesisUtils.createStream[String](
48
ssc,
49
"secure-app",
50
"private-stream",
51
"https://kinesis.us-east-1.amazonaws.com",
52
"us-east-1",
53
InitialPositionInStream.LATEST,
54
Seconds(30),
55
StorageLevel.MEMORY_AND_DISK_2,
56
record => new String(record.getData.array()),
57
"AKIAIOSFODNN7EXAMPLE",
58
"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
59
)
60
```
61
62
### Default Byte Array Stream with Credentials
63
64
```scala { .api }
65
def createStream(
66
ssc: StreamingContext,
67
kinesisAppName: String,
68
streamName: String,
69
endpointUrl: String,
70
regionName: String,
71
initialPositionInStream: InitialPositionInStream,
72
checkpointInterval: Duration,
73
storageLevel: StorageLevel,
74
awsAccessKeyId: String,
75
awsSecretKey: String
76
): ReceiverInputDStream[Array[Byte]]
77
```
78
79
**Usage Example:**
80
81
```scala
82
val secureByteStream = KinesisUtils.createStream(
83
ssc,
84
"secure-byte-processor",
85
"encrypted-data-stream",
86
"https://kinesis.us-west-2.amazonaws.com",
87
"us-west-2",
88
InitialPositionInStream.TRIM_HORIZON,
89
Seconds(60),
90
StorageLevel.MEMORY_AND_DISK_2,
91
sys.env("AWS_ACCESS_KEY_ID"),
92
sys.env("AWS_SECRET_ACCESS_KEY")
93
)
94
```
95
96
## Java API with Credentials
97
98
### Generic Stream with Credentials
99
100
```java { .api }
101
public static <T> JavaReceiverInputDStream<T> createStream(
102
JavaStreamingContext jssc,
103
String kinesisAppName,
104
String streamName,
105
String endpointUrl,
106
String regionName,
107
InitialPositionInStream initialPositionInStream,
108
Duration checkpointInterval,
109
StorageLevel storageLevel,
110
Function<Record, T> messageHandler,
111
Class<T> recordClass,
112
String awsAccessKeyId,
113
String awsSecretKey
114
);
115
```
116
117
**Usage Example:**
118
119
```java
120
import org.apache.spark.api.java.function.Function;
121
122
Function<Record, String> handler = record ->
123
new String(record.getData().array());
124
125
JavaReceiverInputDStream<String> secureStream = KinesisUtils.createStream(
126
jssc,
127
"java-secure-app",
128
"confidential-stream",
129
"https://kinesis.eu-west-1.amazonaws.com",
130
"eu-west-1",
131
InitialPositionInStream.LATEST,
132
Durations.seconds(30),
133
StorageLevel.MEMORY_AND_DISK_2(),
134
handler,
135
String.class,
136
System.getenv("AWS_ACCESS_KEY_ID"),
137
System.getenv("AWS_SECRET_ACCESS_KEY")
138
);
139
```
140
141
### Default Byte Array Stream with Credentials
142
143
```java { .api }
144
public static JavaReceiverInputDStream<byte[]> createStream(
145
JavaStreamingContext jssc,
146
String kinesisAppName,
147
String streamName,
148
String endpointUrl,
149
String regionName,
150
InitialPositionInStream initialPositionInStream,
151
Duration checkpointInterval,
152
StorageLevel storageLevel,
153
String awsAccessKeyId,
154
String awsSecretKey
155
);
156
```
157
158
**Usage Example:**
159
160
```java
161
JavaReceiverInputDStream<byte[]> secureByteStream = KinesisUtils.createStream(
162
jssc,
163
"java-secure-bytes",
164
"secure-binary-stream",
165
"https://kinesis.ap-northeast-1.amazonaws.com",
166
"ap-northeast-1",
167
InitialPositionInStream.TRIM_HORIZON,
168
Durations.seconds(45),
169
StorageLevel.MEMORY_AND_DISK_2(),
170
System.getProperty("aws.accessKeyId"),
171
System.getProperty("aws.secretKey")
172
);
173
```
174
175
## SerializableAWSCredentials
176
177
Internal credential wrapper for secure serialization in distributed environments.
178
179
```scala { .api }
180
case class SerializableAWSCredentials(
181
accessKeyId: String,
182
secretKey: String
183
) extends AWSCredentials {
184
override def getAWSAccessKeyId: String = accessKeyId
185
override def getAWSSecretKey: String = secretKey
186
}
187
```
188
189
This class wraps AWS credentials for safe serialization when distributing stream processing tasks across Spark workers.
190
191
## Best Practices
192
193
### Credential Security
194
195
1. **Use Environment Variables**: Store credentials in environment variables rather than hardcoding
196
```scala
197
val accessKey = sys.env.getOrElse("AWS_ACCESS_KEY_ID",
198
throw new IllegalArgumentException("AWS_ACCESS_KEY_ID not set"))
199
val secretKey = sys.env.getOrElse("AWS_SECRET_ACCESS_KEY",
200
throw new IllegalArgumentException("AWS_SECRET_ACCESS_KEY not set"))
201
```
202
203
2. **Secure Checkpoint Directories**: When using explicit credentials, ensure checkpoint directories have proper access controls
204
```scala
205
// Set secure checkpoint directory
206
ssc.checkpoint("hdfs://secure-cluster/checkpoints/kinesis-app")
207
```
208
209
3. **Use IAM Roles**: For production deployments, prefer IAM roles over explicit credentials
210
```scala
211
// Prefer this approach - uses automatic credential discovery
212
val stream = KinesisUtils.createStream(ssc, ...) // No explicit credentials
213
```
214
215
### Credential Rotation
216
217
For applications using explicit credentials:
218
219
1. **Monitor Expiration**: Implement credential monitoring and rotation
220
2. **Graceful Updates**: Plan for application restarts when credentials change
221
3. **Fallback Mechanisms**: Consider implementing credential fallback chains
222
223
### Cross-Region Access
224
225
When accessing Kinesis streams in different regions than your Spark cluster:
226
227
```scala
228
// Ensure credentials have cross-region permissions
229
val crossRegionStream = KinesisUtils.createStream(
230
ssc,
231
"cross-region-app",
232
"remote-stream",
233
"https://kinesis.eu-central-1.amazonaws.com", // Different region
234
"eu-central-1",
235
InitialPositionInStream.LATEST,
236
Seconds(30),
237
StorageLevel.MEMORY_AND_DISK_2,
238
explicitAccessKey,
239
explicitSecretKey
240
)
241
```
242
243
## Required AWS Permissions
244
245
Ensure credentials have the following minimum permissions:
246
247
### Kinesis Permissions
248
```json
249
{
250
"Version": "2012-10-17",
251
"Statement": [
252
{
253
"Effect": "Allow",
254
"Action": [
255
"kinesis:DescribeStream",
256
"kinesis:GetShardIterator",
257
"kinesis:GetRecords",
258
"kinesis:ListStreams"
259
],
260
"Resource": "arn:aws:kinesis:*:*:stream/your-stream-name"
261
}
262
]
263
}
264
```
265
266
### DynamoDB Permissions (for checkpointing)
267
```json
268
{
269
"Version": "2012-10-17",
270
"Statement": [
271
{
272
"Effect": "Allow",
273
"Action": [
274
"dynamodb:CreateTable",
275
"dynamodb:DescribeTable",
276
"dynamodb:GetItem",
277
"dynamodb:PutItem",
278
"dynamodb:UpdateItem",
279
"dynamodb:Scan"
280
],
281
"Resource": "arn:aws:dynamodb:*:*:table/your-kinesis-app-name"
282
}
283
]
284
}
285
```
286
287
### CloudWatch Permissions (for metrics)
288
```json
289
{
290
"Version": "2012-10-17",
291
"Statement": [
292
{
293
"Effect": "Allow",
294
"Action": [
295
"cloudwatch:PutMetricData"
296
],
297
"Resource": "*"
298
}
299
]
300
}
301
```