or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

aws-credentials.mdconfiguration.mdindex.mdinitial-position.mdpython-integration.mdstream-creation.md

aws-credentials.mddocs/

0

# AWS Credentials Management

1

2

The AWS credentials system provides flexible authentication for accessing Kinesis, DynamoDB, and CloudWatch services. It supports multiple authentication methods including default provider chains, basic access keys, and STS assume role for cross-account access.

3

4

## Core API

5

6

### SparkAWSCredentials Interface

7

8

The base interface for all credential providers.

9

10

```scala { .api }

11

sealed trait SparkAWSCredentials extends Serializable {

12

def provider: AWSCredentialsProvider

13

}

14

```

15

16

### Credential Implementations

17

18

```scala { .api }

19

// Uses AWS default credential provider chain

20

case object DefaultCredentials extends SparkAWSCredentials {

21

def provider: AWSCredentialsProvider

22

}

23

24

// Uses basic AWS access key and secret key

25

case class BasicCredentials(

26

awsAccessKeyId: String,

27

awsSecretKey: String

28

) extends SparkAWSCredentials {

29

def provider: AWSCredentialsProvider

30

}

31

32

// Uses STS assume role for temporary credentials

33

case class STSCredentials(

34

stsRoleArn: String,

35

stsSessionName: String,

36

stsExternalId: Option[String] = None,

37

longLivedCreds: SparkAWSCredentials = DefaultCredentials

38

) extends SparkAWSCredentials {

39

def provider: AWSCredentialsProvider

40

}

41

```

42

43

### Builder API

44

45

```scala { .api }

46

object SparkAWSCredentials {

47

def builder: SparkAWSCredentials.Builder

48

}

49

50

class Builder {

51

def basicCredentials(accessKeyId: String, secretKey: String): Builder

52

def stsCredentials(roleArn: String, sessionName: String): Builder

53

def stsCredentials(roleArn: String, sessionName: String, externalId: String): Builder

54

def build(): SparkAWSCredentials

55

}

56

```

57

58

## Default Credentials

59

60

Uses the AWS default credential provider chain, which checks credentials in this order:

61

62

1. Environment variables (`AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`)

63

2. Java system properties (`aws.accessKeyId`, `aws.secretKey`)

64

3. Credential profiles file (`~/.aws/credentials`)

65

4. Amazon ECS container credentials

66

5. Instance profile credentials (EC2/ECS)

67

68

```scala

69

import org.apache.spark.streaming.kinesis.{DefaultCredentials, KinesisInputDStream}

70

71

val stream = KinesisInputDStream.builder

72

.streamingContext(ssc)

73

.streamName("my-stream")

74

.checkpointAppName("my-app")

75

.kinesisCredentials(DefaultCredentials) // Explicit, but this is the default

76

.build()

77

```

78

79

Or using the builder:

80

81

```scala

82

val credentials = SparkAWSCredentials.builder.build() // Creates DefaultCredentials

83

84

val stream = KinesisInputDStream.builder

85

.kinesisCredentials(credentials)

86

// ... other configuration

87

.build()

88

```

89

90

## Basic Credentials

91

92

Uses explicit AWS access key ID and secret access key. **Warning**: These credentials will be saved in DStream checkpoints, so ensure your checkpoint directory is secure.

93

94

```scala

95

import org.apache.spark.streaming.kinesis.BasicCredentials

96

97

val credentials = BasicCredentials("AKIAIOSFODNN7EXAMPLE", "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")

98

99

val stream = KinesisInputDStream.builder

100

.streamingContext(ssc)

101

.streamName("my-stream")

102

.checkpointAppName("my-app")

103

.kinesisCredentials(credentials)

104

.build()

105

```

106

107

Using the builder pattern (recommended):

108

109

```scala

110

val credentials = SparkAWSCredentials.builder

111

.basicCredentials("AKIAIOSFODNN7EXAMPLE", "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")

112

.build()

113

114

val stream = KinesisInputDStream.builder

115

.kinesisCredentials(credentials)

116

// ... other configuration

117

.build()

118

```

119

120

### Error Handling

121

122

BasicCredentials will fall back to the default provider chain if the provided credentials are invalid:

123

124

```scala

125

// If credentials are null or invalid, falls back to DefaultCredentials

126

val credentials = BasicCredentials(null, "invalid")

127

// This will log a warning and use DefaultCredentials instead

128

```

129

130

## STS Assume Role Credentials

131

132

Uses AWS Security Token Service (STS) to assume an IAM role for temporary credentials. This is useful for cross-account access or enhanced security.

133

134

### Basic STS Usage

135

136

```scala

137

import org.apache.spark.streaming.kinesis.STSCredentials

138

139

val credentials = STSCredentials(

140

stsRoleArn = "arn:aws:iam::123456789012:role/KinesisAccessRole",

141

stsSessionName = "spark-kinesis-session"

142

)

143

144

val stream = KinesisInputDStream.builder

145

.kinesisCredentials(credentials)

146

// ... other configuration

147

.build()

148

```

149

150

### STS with External ID

151

152

For roles that require an external ID for additional security:

153

154

```scala

155

val credentials = STSCredentials(

156

stsRoleArn = "arn:aws:iam::123456789012:role/CrossAccountRole",

157

stsSessionName = "spark-session",

158

stsExternalId = Some("unique-external-id")

159

)

160

```

161

162

### STS with Custom Long-Lived Credentials

163

164

Specify different long-lived credentials for assuming the role:

165

166

```scala

167

val longLivedCreds = BasicCredentials("access-key", "secret-key")

168

169

val stsCredentials = STSCredentials(

170

stsRoleArn = "arn:aws:iam::123456789012:role/KinesisRole",

171

stsSessionName = "my-session",

172

longLivedCreds = longLivedCreds

173

)

174

```

175

176

### Using the Builder Pattern

177

178

```scala

179

// Basic STS

180

val credentials = SparkAWSCredentials.builder

181

.stsCredentials("arn:aws:iam::123456789012:role/KinesisRole", "my-session")

182

.build()

183

184

// STS with external ID

185

val credentialsWithExternalId = SparkAWSCredentials.builder

186

.basicCredentials("access-key", "secret-key") // Long-lived credentials

187

.stsCredentials("arn:aws:iam::123456789012:role/CrossAccountRole", "session", "external-id")

188

.build()

189

```

190

191

## Service-Specific Credentials

192

193

You can configure different credentials for different AWS services:

194

195

```scala

196

val kinesisCredentials = SparkAWSCredentials.builder

197

.basicCredentials("kinesis-access-key", "kinesis-secret")

198

.build()

199

200

val dynamoCredentials = SparkAWSCredentials.builder

201

.stsCredentials("arn:aws:iam::123456789012:role/DynamoRole", "dynamo-session")

202

.build()

203

204

val stream = KinesisInputDStream.builder

205

.streamingContext(ssc)

206

.streamName("my-stream")

207

.checkpointAppName("my-app")

208

.kinesisCredentials(kinesisCredentials) // For Kinesis API calls

209

.dynamoDBCredentials(dynamoCredentials) // For DynamoDB checkpointing

210

.cloudWatchCredentials(DefaultCredentials) // For CloudWatch metrics

211

.build()

212

```

213

214

If you don't specify `dynamoDBCredentials` or `cloudWatchCredentials`, they will default to the same credentials as `kinesisCredentials`.

215

216

## Best Practices

217

218

### Security Recommendations

219

220

1. **Use IAM roles when possible**: Prefer STS assume role or instance profiles over hardcoded keys

221

2. **Secure checkpoint directories**: Basic credentials are stored in checkpoints

222

3. **Rotate credentials regularly**: Use temporary credentials when possible

223

4. **Use least privilege**: Grant only the minimum required permissions

224

225

### Required IAM Permissions

226

227

#### For Kinesis:

228

```json

229

{

230

"Version": "2012-10-17",

231

"Statement": [

232

{

233

"Effect": "Allow",

234

"Action": [

235

"kinesis:DescribeStream",

236

"kinesis:GetShardIterator",

237

"kinesis:GetRecords",

238

"kinesis:ListShards"

239

],

240

"Resource": "arn:aws:kinesis:*:*:stream/your-stream-name"

241

}

242

]

243

}

244

```

245

246

#### For DynamoDB (checkpointing):

247

```json

248

{

249

"Version": "2012-10-17",

250

"Statement": [

251

{

252

"Effect": "Allow",

253

"Action": [

254

"dynamodb:CreateTable",

255

"dynamodb:DescribeTable",

256

"dynamodb:GetItem",

257

"dynamodb:PutItem",

258

"dynamodb:UpdateItem",

259

"dynamodb:DeleteItem"

260

],

261

"Resource": "arn:aws:dynamodb:*:*:table/your-checkpoint-app-name"

262

}

263

]

264

}

265

```

266

267

#### For CloudWatch (metrics):

268

```json

269

{

270

"Version": "2012-10-17",

271

"Statement": [

272

{

273

"Effect": "Allow",

274

"Action": [

275

"cloudwatch:PutMetricData"

276

],

277

"Resource": "*"

278

}

279

]

280

}

281

```

282

283

### Example: Production Configuration

284

285

```scala

286

// Production setup with assume role

287

val productionCredentials = SparkAWSCredentials.builder

288

.stsCredentials(

289

roleArn = "arn:aws:iam::123456789012:role/SparkKinesisRole",

290

sessionName = s"spark-kinesis-${java.util.UUID.randomUUID()}"

291

)

292

.build()

293

294

val stream = KinesisInputDStream.builder

295

.streamingContext(ssc)

296

.streamName("production-data-stream")

297

.checkpointAppName("production-spark-consumer")

298

.regionName("us-west-2")

299

.kinesisCredentials(productionCredentials)

300

.dynamoDBCredentials(productionCredentials)

301

.cloudWatchCredentials(productionCredentials)

302

.build()

303

```