or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

aws-credentials.mdconfiguration.mdindex.mdstream-creation.mdstream-positioning.mdtesting.md

aws-credentials.mddocs/

0

# AWS Credentials

1

2

Flexible credential providers supporting default AWS credentials, basic key/secret authentication, and STS role assumption for secure Kinesis access.

3

4

## Capabilities

5

6

### SparkAWSCredentials Trait

7

8

Base trait for all AWS credential providers used throughout the Kinesis integration.

9

10

```scala { .api }

11

/**

12

* Serializable interface providing AWS credentials for Kinesis Client Library

13

*/

14

sealed trait SparkAWSCredentials extends Serializable {

15

/** Returns AWSCredentialsProvider for KCL authentication */

16

def provider: AWSCredentialsProvider

17

}

18

```

19

20

### Default Credentials

21

22

Uses AWS DefaultAWSCredentialsProviderChain for automatic credential discovery.

23

24

```scala { .api }

25

/**

26

* Uses DefaultAWSCredentialsProviderChain for authentication

27

* Checks environment variables, system properties, credential files, and IAM roles

28

*/

29

case object DefaultCredentials extends SparkAWSCredentials {

30

def provider: AWSCredentialsProvider

31

}

32

```

33

34

**Usage Example:**

35

36

```scala

37

import org.apache.spark.streaming.kinesis.DefaultCredentials

38

39

// Use default credential chain (recommended for production)

40

val stream = KinesisInputDStream.builder

41

.streamingContext(ssc)

42

.streamName("my-stream")

43

.checkpointAppName("my-app")

44

.kinesisCredentials(DefaultCredentials)

45

.build()

46

```

47

48

### Basic Credentials

49

50

Static AWS access key and secret key authentication.

51

52

```scala { .api }

53

/**

54

* Static AWS keypair credentials with fallback to default chain

55

* @param awsAccessKeyId AWS access key ID

56

* @param awsSecretKey AWS secret access key

57

*/

58

case class BasicCredentials(

59

awsAccessKeyId: String,

60

awsSecretKey: String

61

) extends SparkAWSCredentials {

62

def provider: AWSCredentialsProvider

63

}

64

```

65

66

**Usage Example:**

67

68

```scala

69

import org.apache.spark.streaming.kinesis.BasicCredentials

70

71

// Use static credentials (not recommended for production)

72

val credentials = BasicCredentials("AKIAIOSFODNN7EXAMPLE", "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")

73

74

val stream = KinesisInputDStream.builder

75

.streamingContext(ssc)

76

.streamName("my-stream")

77

.checkpointAppName("my-app")

78

.kinesisCredentials(credentials)

79

.build()

80

```

81

82

### STS Role Credentials

83

84

AWS Security Token Service (STS) role assumption for cross-account or temporary access.

85

86

```scala { .api }

87

/**

88

* STS role assumption credentials for cross-account access

89

* @param stsRoleArn ARN of IAM role to assume

90

* @param stsSessionName Session name for STS session

91

* @param stsExternalId Optional external ID for role trust policy validation

92

* @param longLivedCreds Base credentials for STS authentication

93

*/

94

case class STSCredentials(

95

stsRoleArn: String,

96

stsSessionName: String,

97

stsExternalId: Option[String] = None,

98

longLivedCreds: SparkAWSCredentials = DefaultCredentials

99

) extends SparkAWSCredentials {

100

def provider: AWSCredentialsProvider

101

}

102

```

103

104

**Usage Examples:**

105

106

```scala

107

import org.apache.spark.streaming.kinesis.{STSCredentials, BasicCredentials, DefaultCredentials}

108

109

// STS with default credentials for role assumption

110

val stsCredentials = STSCredentials(

111

stsRoleArn = "arn:aws:iam::123456789012:role/CrossAccountKinesisRole",

112

stsSessionName = "spark-kinesis-session"

113

)

114

115

// STS with basic credentials and external ID

116

val stsWithExternalId = STSCredentials(

117

stsRoleArn = "arn:aws:iam::123456789012:role/CrossAccountKinesisRole",

118

stsSessionName = "spark-kinesis-session",

119

stsExternalId = Some("unique-external-id"),

120

longLivedCreds = BasicCredentials("ACCESS_KEY", "SECRET_KEY")

121

)

122

123

val stream = KinesisInputDStream.builder

124

.streamingContext(ssc)

125

.streamName("cross-account-stream")

126

.checkpointAppName("cross-account-app")

127

.kinesisCredentials(stsCredentials)

128

.build()

129

```

130

131

### Credentials Builder

132

133

Fluent builder for constructing SparkAWSCredentials instances with validation.

134

135

```scala { .api }

136

object SparkAWSCredentials {

137

/** Creates new credentials builder */

138

def builder: Builder

139

140

class Builder {

141

/** Configure basic AWS keypair credentials */

142

def basicCredentials(accessKeyId: String, secretKey: String): Builder

143

144

/** Configure STS role assumption credentials */

145

def stsCredentials(roleArn: String, sessionName: String): Builder

146

147

/** Configure STS credentials with external ID validation */

148

def stsCredentials(roleArn: String, sessionName: String, externalId: String): Builder

149

150

/** Build the configured credentials instance */

151

def build(): SparkAWSCredentials

152

}

153

}

154

```

155

156

**Builder Usage Examples:**

157

158

```scala

159

import org.apache.spark.streaming.kinesis.SparkAWSCredentials

160

161

// Basic credentials via builder

162

val basicCreds = SparkAWSCredentials.builder

163

.basicCredentials("ACCESS_KEY", "SECRET_KEY")

164

.build()

165

166

// STS credentials via builder

167

val stsCreds = SparkAWSCredentials.builder

168

.basicCredentials("ACCESS_KEY", "SECRET_KEY") // Long-lived creds

169

.stsCredentials("arn:aws:iam::123456789012:role/KinesisRole", "spark-session")

170

.build()

171

172

// STS with external ID via builder

173

val stsWithExternalId = SparkAWSCredentials.builder

174

.stsCredentials(

175

"arn:aws:iam::123456789012:role/KinesisRole",

176

"spark-session",

177

"external-id-123"

178

)

179

.build()

180

```

181

182

### Multi-Service Credentials

183

184

Different credential providers can be used for different AWS services (Kinesis, DynamoDB, CloudWatch).

185

186

**Usage Example:**

187

188

```scala

189

import org.apache.spark.streaming.kinesis.{SparkAWSCredentials, BasicCredentials, STSCredentials}

190

191

// Use different credentials for different services

192

val kinesisCredentials = BasicCredentials("KINESIS_ACCESS_KEY", "KINESIS_SECRET_KEY")

193

val dynamoDbCredentials = STSCredentials(

194

"arn:aws:iam::123456789012:role/DynamoDBRole",

195

"dynamodb-session"

196

)

197

198

val stream = KinesisInputDStream.builder

199

.streamingContext(ssc)

200

.streamName("my-stream")

201

.checkpointAppName("my-app")

202

.kinesisCredentials(kinesisCredentials) // For Kinesis access

203

.dynamoDBCredentials(dynamoDbCredentials) // For checkpoint storage

204

.cloudWatchCredentials(kinesisCredentials) // For metrics publishing

205

.build()

206

```

207

208

### Security Best Practices

209

210

1. **Avoid hardcoded credentials** - Use `DefaultCredentials` with IAM roles when possible

211

2. **Use STS for cross-account access** - Implement role assumption for security isolation

212

3. **Secure checkpoint storage** - Ensure DynamoDB credentials have minimal required permissions

213

4. **External ID validation** - Use external IDs in STS trust policies for additional security

214

5. **Credential rotation** - STS credentials automatically refresh; basic credentials require manual rotation

215

216

### Error Handling

217

218

Credential providers can fail with:

219

220

- `IllegalArgumentException` - For null or invalid credential parameters

221

- `AmazonClientException` - For AWS authentication failures

222

- `AmazonServiceException` - For STS service errors during role assumption

223

- `SecurityException` - For insufficient permissions on assumed roles

224

225

**Error Handling Example:**

226

227

```scala

228

import com.amazonaws.AmazonServiceException

229

import org.apache.spark.streaming.kinesis.STSCredentials

230

231

try {

232

val credentials = STSCredentials(

233

"arn:aws:iam::123456789012:role/NonExistentRole",

234

"test-session"

235

)

236

237

val stream = KinesisInputDStream.builder

238

.streamingContext(ssc)

239

.streamName("my-stream")

240

.checkpointAppName("my-app")

241

.kinesisCredentials(credentials)

242

.build()

243

244

} catch {

245

case e: AmazonServiceException =>

246

println(s"AWS service error: ${e.getMessage}")

247

// Handle credential/permission errors

248

case e: IllegalArgumentException =>

249

println(s"Invalid credential configuration: ${e.getMessage}")

250

// Handle configuration errors

251

}

252

```