0
# AWS Configuration
1
2
Flexible AWS authentication options including default credential provider chains, explicit credential specification, and IAM role integration for secure access to Kinesis streams.
3
4
## Capabilities
5
6
### Default Credential Provider Chain
7
8
Uses AWS DefaultAWSCredentialsProviderChain for automatic credential discovery.
9
10
```scala { .api }
11
// Credentials discovered automatically in this order:
12
// 1. Environment Variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
13
// 2. Java System Properties (aws.accessKeyId, aws.secretKey)
14
// 3. Credential profiles file (~/.aws/credentials)
15
// 4. Amazon EC2 Instance profile credentials
16
```
17
18
**Usage (Scala):**
19
20
```scala
21
val stream = KinesisUtils.createStream(
22
ssc, appName, streamName, endpointUrl, regionName,
23
initialPosition, checkpointInterval, storageLevel
24
)
25
// No explicit credentials - uses DefaultAWSCredentialsProviderChain
26
```
27
28
**Usage (Java):**
29
30
```java
31
JavaReceiverInputDStream<byte[]> stream = KinesisUtils.createStream(
32
jssc, appName, streamName, endpointUrl, regionName,
33
initialPosition, checkpointInterval, storageLevel
34
);
35
// No explicit credentials - uses DefaultAWSCredentialsProviderChain
36
```
37
38
### Explicit Credential Configuration
39
40
Provides AWS credentials directly to the stream creation methods.
41
42
```scala { .api }
43
/**
44
* SerializableAWSCredentials wrapper for explicit credential specification.
45
* Implements AWSCredentials interface with serialization support.
46
*/
47
case class SerializableAWSCredentials(
48
accessKeyId: String,
49
secretKey: String
50
) extends AWSCredentials {
51
def getAWSAccessKeyId: String = accessKeyId
52
def getAWSSecretKey: String = secretKey
53
}
54
```
55
56
**Important Security Note:** Explicit credentials are saved in DStream checkpoints if checkpointing is enabled. Ensure checkpoint directories are properly secured.
57
58
### Explicit Credentials (Scala)
59
60
```scala
61
val stream = KinesisUtils.createStream(
62
ssc, appName, streamName, endpointUrl, regionName,
63
initialPosition, checkpointInterval, storageLevel,
64
awsAccessKeyId = "AKIAIOSFODNN7EXAMPLE",
65
awsSecretKey = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
66
)
67
```
68
69
### Explicit Credentials (Java)
70
71
```java
72
JavaReceiverInputDStream<byte[]> stream = KinesisUtils.createStream(
73
jssc, appName, streamName, endpointUrl, regionName,
74
initialPosition, checkpointInterval, storageLevel,
75
"AKIAIOSFODNN7EXAMPLE", // awsAccessKeyId
76
"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY" // awsSecretKey
77
);
78
```
79
80
## AWS Authentication Methods
81
82
### Environment Variables
83
84
Set AWS credentials as environment variables (recommended for development):
85
86
```bash
87
export AWS_ACCESS_KEY_ID=AKIAIOSFODNN7EXAMPLE
88
export AWS_SECRET_ACCESS_KEY=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
89
export AWS_DEFAULT_REGION=us-east-1
90
```
91
92
### AWS Credentials File
93
94
Create `~/.aws/credentials` file:
95
96
```ini
97
[default]
98
aws_access_key_id = AKIAIOSFODNN7EXAMPLE
99
aws_secret_access_key = wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
100
101
[production]
102
aws_access_key_id = AKIAI44QH8DHBEXAMPLE
103
aws_secret_access_key = je7MtGbClwBF/2Zp9Utk/h3yCo8nvbEXAMPLEKEY
104
```
105
106
Use specific profile:
107
108
```bash
109
export AWS_PROFILE=production
110
```
111
112
### IAM Roles for EC2 Instances
113
114
When running on EC2, attach an IAM role with appropriate Kinesis permissions:
115
116
```json
117
{
118
"Version": "2012-10-17",
119
"Statement": [
120
{
121
"Effect": "Allow",
122
"Action": [
123
"kinesis:DescribeStream",
124
"kinesis:GetShardIterator",
125
"kinesis:GetRecords",
126
"kinesis:ListStreams"
127
],
128
"Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream"
129
},
130
{
131
"Effect": "Allow",
132
"Action": [
133
"dynamodb:CreateTable",
134
"dynamodb:DescribeTable",
135
"dynamodb:GetItem",
136
"dynamodb:PutItem",
137
"dynamodb:UpdateItem",
138
"dynamodb:DeleteItem",
139
"dynamodb:Scan"
140
],
141
"Resource": "arn:aws:dynamodb:us-east-1:123456789012:table/MyKinesisApp"
142
},
143
{
144
"Effect": "Allow",
145
"Action": [
146
"cloudwatch:PutMetricData"
147
],
148
"Resource": "*"
149
}
150
]
151
}
152
```
153
154
### Java System Properties
155
156
Set credentials as JVM system properties:
157
158
```bash
159
java -Daws.accessKeyId=AKIAIOSFODNN7EXAMPLE \
160
-Daws.secretKey=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY \
161
-jar my-spark-app.jar
162
```
163
164
## Required AWS Permissions
165
166
### Kinesis Permissions
167
168
```json
169
{
170
"Effect": "Allow",
171
"Action": [
172
"kinesis:DescribeStream",
173
"kinesis:GetShardIterator",
174
"kinesis:GetRecords",
175
"kinesis:ListStreams"
176
],
177
"Resource": "arn:aws:kinesis:REGION:ACCOUNT:stream/STREAM_NAME"
178
}
179
```
180
181
### DynamoDB Permissions (for KCL Checkpointing)
182
183
```json
184
{
185
"Effect": "Allow",
186
"Action": [
187
"dynamodb:CreateTable",
188
"dynamodb:DescribeTable",
189
"dynamodb:GetItem",
190
"dynamodb:PutItem",
191
"dynamodb:UpdateItem",
192
"dynamodb:DeleteItem",
193
"dynamodb:Scan",
194
"dynamodb:Query"
195
],
196
"Resource": "arn:aws:dynamodb:REGION:ACCOUNT:table/KCL_APPLICATION_NAME"
197
}
198
```
199
200
### CloudWatch Permissions (Optional, for Metrics)
201
202
```json
203
{
204
"Effect": "Allow",
205
"Action": [
206
"cloudwatch:PutMetricData"
207
],
208
"Resource": "*"
209
}
210
```
211
212
## Regional Configuration
213
214
### Endpoint URLs by Region
215
216
Common Kinesis endpoint URLs:
217
218
```scala
219
// US East (N. Virginia)
220
val endpointUrl = "https://kinesis.us-east-1.amazonaws.com"
221
222
// US West (Oregon)
223
val endpointUrl = "https://kinesis.us-west-2.amazonaws.com"
224
225
// Europe (Ireland)
226
val endpointUrl = "https://kinesis.eu-west-1.amazonaws.com"
227
228
// Asia Pacific (Tokyo)
229
val endpointUrl = "https://kinesis.ap-northeast-1.amazonaws.com"
230
```
231
232
### Region Name Validation
233
234
The library validates region names against AWS regions:
235
236
```scala { .api }
237
/**
238
* Validates region name against known AWS regions.
239
* Throws IllegalArgumentException for invalid regions.
240
*/
241
private def validateRegion(regionName: String): String
242
```
243
244
**Valid region names include:**
245
- `us-east-1`, `us-west-1`, `us-west-2`
246
- `eu-west-1`, `eu-central-1`
247
- `ap-southeast-1`, `ap-southeast-2`, `ap-northeast-1`
248
- And other valid AWS regions
249
250
## Security Best Practices
251
252
### Credential Management
253
- **Never hardcode credentials** in source code
254
- Use IAM roles when running on EC2 instances
255
- Rotate access keys regularly
256
- Use least-privilege IAM policies
257
- Enable AWS CloudTrail for audit logging
258
259
### Network Security
260
- Use VPC endpoints for Kinesis when possible
261
- Configure security groups to restrict access
262
- Use SSL/TLS for all communications (enabled by default)
263
- Consider using PrivateLink for additional security
264
265
### Checkpoint Security
266
- Secure checkpoint directories with appropriate file permissions
267
- Consider encrypting checkpoint data for sensitive applications
268
- Use separate AWS accounts for different environments
269
- Monitor DynamoDB access patterns for anomalies
270
271
## Configuration Examples
272
273
### Development Environment
274
275
```scala
276
// Use environment variables for development
277
val stream = KinesisUtils.createStream(
278
ssc, "dev-kinesis-app", "dev-stream",
279
"https://kinesis.us-east-1.amazonaws.com", "us-east-1",
280
InitialPositionInStream.LATEST,
281
Duration.milliseconds(2000),
282
StorageLevel.MEMORY_AND_DISK_2
283
)
284
```
285
286
### Production Environment with IAM Roles
287
288
```scala
289
// Production setup using IAM roles on EC2
290
val stream = KinesisUtils.createStream(
291
ssc, "prod-kinesis-app", "production-stream",
292
"https://kinesis.us-east-1.amazonaws.com", "us-east-1",
293
InitialPositionInStream.TRIM_HORIZON,
294
Duration.milliseconds(5000),
295
StorageLevel.MEMORY_AND_DISK_2
296
)
297
```
298
299
### Cross-Account Access
300
301
```scala
302
// Access Kinesis stream in different AWS account
303
val crossAccountCredentials = SerializableAWSCredentials(
304
"AKIAI44QH8DHBEXAMPLE",
305
"je7MtGbClwBF/2Zp9Utk/h3yCo8nvbEXAMPLEKEY"
306
)
307
308
val stream = KinesisUtils.createStream(
309
ssc, "cross-account-app", "shared-stream",
310
"https://kinesis.us-east-1.amazonaws.com", "us-east-1",
311
InitialPositionInStream.LATEST,
312
Duration.milliseconds(2000),
313
StorageLevel.MEMORY_AND_DISK_2,
314
defaultMessageHandler,
315
crossAccountCredentials.getAWSAccessKeyId,
316
crossAccountCredentials.getAWSSecretKey
317
)
318
```
319
320
## Troubleshooting Authentication Issues
321
322
### Common Error Messages
323
324
**"Unable to load AWS credentials"**
325
- Check DefaultAWSCredentialsProviderChain order
326
- Verify environment variables or credentials file
327
- Ensure IAM role is attached (for EC2)
328
329
**"Access Denied" on Kinesis operations**
330
- Verify IAM permissions for Kinesis actions
331
- Check resource ARNs in policy statements
332
- Ensure region matches between policy and configuration
333
334
**"Access Denied" on DynamoDB**
335
- Verify DynamoDB permissions for KCL application name
336
- Check that table name matches KCL application name
337
- Ensure region consistency between Kinesis and DynamoDB
338
339
### Debugging Tips
340
341
1. **Enable AWS SDK logging:**
342
```scala
343
System.setProperty("com.amazonaws.sdk.enableDefaultMetrics", "true")
344
```
345
346
2. **Check credential provider chain:**
347
```scala
348
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
349
val provider = new DefaultAWSCredentialsProviderChain()
350
println(provider.getCredentials()) // Will throw exception if no creds found
351
```
352
353
3. **Test permissions separately:**
354
- Use AWS CLI to test Kinesis access: `aws kinesis describe-stream --stream-name my-stream`
355
- Test DynamoDB access: `aws dynamodb list-tables`
356
357
4. **Monitor CloudWatch logs** for detailed error messages from the KCL worker threads