or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

credential-management.mdfault-tolerance.mdindex.mdjava-api.mdstream-creation.md

credential-management.mddocs/

0

# Credential Management

1

2

AWS credential handling for secure access to Kinesis streams, DynamoDB checkpointing, and CloudWatch metrics. Supports both automatic credential discovery and explicit credential specification.

3

4

## Credential Options

5

6

### Default Credential Provider Chain

7

8

The recommended approach uses AWS DefaultAWSCredentialsProviderChain which automatically discovers credentials from:

9

10

1. Environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)

11

2. Java system properties (aws.accessKeyId, aws.secretKey)

12

3. Credential profiles file (~/.aws/credentials)

13

4. Instance profile credentials (for EC2 instances)

14

5. Container credentials (for ECS containers)

15

16

All stream creation methods without explicit credentials use this automatic discovery.

17

18

### Explicit Credential Specification

19

20

For applications requiring specific credential control, all createStream methods have overloaded versions accepting explicit AWS credentials.

21

22

**Security Note**: Explicit credentials are stored in DStream checkpoints. Ensure checkpoint directories are properly secured.

23

24

## Scala API with Credentials

25

26

### Generic Stream with Credentials

27

28

```scala { .api }

29

def createStream[T: ClassTag](

30

ssc: StreamingContext,

31

kinesisAppName: String,

32

streamName: String,

33

endpointUrl: String,

34

regionName: String,

35

initialPositionInStream: InitialPositionInStream,

36

checkpointInterval: Duration,

37

storageLevel: StorageLevel,

38

messageHandler: Record => T,

39

awsAccessKeyId: String,

40

awsSecretKey: String

41

): ReceiverInputDStream[T]

42

```

43

44

**Usage Example:**

45

46

```scala

47

val credentialedStream = KinesisUtils.createStream[String](

48

ssc,

49

"secure-app",

50

"private-stream",

51

"https://kinesis.us-east-1.amazonaws.com",

52

"us-east-1",

53

InitialPositionInStream.LATEST,

54

Seconds(30),

55

StorageLevel.MEMORY_AND_DISK_2,

56

record => new String(record.getData.array()),

57

"AKIAIOSFODNN7EXAMPLE",

58

"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"

59

)

60

```

61

62

### Default Byte Array Stream with Credentials

63

64

```scala { .api }

65

def createStream(

66

ssc: StreamingContext,

67

kinesisAppName: String,

68

streamName: String,

69

endpointUrl: String,

70

regionName: String,

71

initialPositionInStream: InitialPositionInStream,

72

checkpointInterval: Duration,

73

storageLevel: StorageLevel,

74

awsAccessKeyId: String,

75

awsSecretKey: String

76

): ReceiverInputDStream[Array[Byte]]

77

```

78

79

**Usage Example:**

80

81

```scala

82

val secureByteStream = KinesisUtils.createStream(

83

ssc,

84

"secure-byte-processor",

85

"encrypted-data-stream",

86

"https://kinesis.us-west-2.amazonaws.com",

87

"us-west-2",

88

InitialPositionInStream.TRIM_HORIZON,

89

Seconds(60),

90

StorageLevel.MEMORY_AND_DISK_2,

91

sys.env("AWS_ACCESS_KEY_ID"),

92

sys.env("AWS_SECRET_ACCESS_KEY")

93

)

94

```

95

96

## Java API with Credentials

97

98

### Generic Stream with Credentials

99

100

```java { .api }

101

public static <T> JavaReceiverInputDStream<T> createStream(

102

JavaStreamingContext jssc,

103

String kinesisAppName,

104

String streamName,

105

String endpointUrl,

106

String regionName,

107

InitialPositionInStream initialPositionInStream,

108

Duration checkpointInterval,

109

StorageLevel storageLevel,

110

Function<Record, T> messageHandler,

111

Class<T> recordClass,

112

String awsAccessKeyId,

113

String awsSecretKey

114

);

115

```

116

117

**Usage Example:**

118

119

```java

120

import org.apache.spark.api.java.function.Function;

121

122

Function<Record, String> handler = record ->

123

new String(record.getData().array());

124

125

JavaReceiverInputDStream<String> secureStream = KinesisUtils.createStream(

126

jssc,

127

"java-secure-app",

128

"confidential-stream",

129

"https://kinesis.eu-west-1.amazonaws.com",

130

"eu-west-1",

131

InitialPositionInStream.LATEST,

132

Durations.seconds(30),

133

StorageLevel.MEMORY_AND_DISK_2(),

134

handler,

135

String.class,

136

System.getenv("AWS_ACCESS_KEY_ID"),

137

System.getenv("AWS_SECRET_ACCESS_KEY")

138

);

139

```

140

141

### Default Byte Array Stream with Credentials

142

143

```java { .api }

144

public static JavaReceiverInputDStream<byte[]> createStream(

145

JavaStreamingContext jssc,

146

String kinesisAppName,

147

String streamName,

148

String endpointUrl,

149

String regionName,

150

InitialPositionInStream initialPositionInStream,

151

Duration checkpointInterval,

152

StorageLevel storageLevel,

153

String awsAccessKeyId,

154

String awsSecretKey

155

);

156

```

157

158

**Usage Example:**

159

160

```java

161

JavaReceiverInputDStream<byte[]> secureByteStream = KinesisUtils.createStream(

162

jssc,

163

"java-secure-bytes",

164

"secure-binary-stream",

165

"https://kinesis.ap-northeast-1.amazonaws.com",

166

"ap-northeast-1",

167

InitialPositionInStream.TRIM_HORIZON,

168

Durations.seconds(45),

169

StorageLevel.MEMORY_AND_DISK_2(),

170

System.getProperty("aws.accessKeyId"),

171

System.getProperty("aws.secretKey")

172

);

173

```

174

175

## SerializableAWSCredentials

176

177

Internal credential wrapper for secure serialization in distributed environments.

178

179

```scala { .api }

180

case class SerializableAWSCredentials(

181

accessKeyId: String,

182

secretKey: String

183

) extends AWSCredentials {

184

override def getAWSAccessKeyId: String = accessKeyId

185

override def getAWSSecretKey: String = secretKey

186

}

187

```

188

189

This class wraps AWS credentials for safe serialization when distributing stream processing tasks across Spark workers.

190

191

## Best Practices

192

193

### Credential Security

194

195

1. **Use Environment Variables**: Store credentials in environment variables rather than hardcoding

196

```scala

197

val accessKey = sys.env.getOrElse("AWS_ACCESS_KEY_ID",

198

throw new IllegalArgumentException("AWS_ACCESS_KEY_ID not set"))

199

val secretKey = sys.env.getOrElse("AWS_SECRET_ACCESS_KEY",

200

throw new IllegalArgumentException("AWS_SECRET_ACCESS_KEY not set"))

201

```

202

203

2. **Secure Checkpoint Directories**: When using explicit credentials, ensure checkpoint directories have proper access controls

204

```scala

205

// Set secure checkpoint directory

206

ssc.checkpoint("hdfs://secure-cluster/checkpoints/kinesis-app")

207

```

208

209

3. **Use IAM Roles**: For production deployments, prefer IAM roles over explicit credentials

210

```scala

211

// Prefer this approach - uses automatic credential discovery

212

val stream = KinesisUtils.createStream(ssc, ...) // No explicit credentials

213

```

214

215

### Credential Rotation

216

217

For applications using explicit credentials:

218

219

1. **Monitor Expiration**: Implement credential monitoring and rotation

220

2. **Graceful Updates**: Plan for application restarts when credentials change

221

3. **Fallback Mechanisms**: Consider implementing credential fallback chains

222

223

### Cross-Region Access

224

225

When accessing Kinesis streams in different regions than your Spark cluster:

226

227

```scala

228

// Ensure credentials have cross-region permissions

229

val crossRegionStream = KinesisUtils.createStream(

230

ssc,

231

"cross-region-app",

232

"remote-stream",

233

"https://kinesis.eu-central-1.amazonaws.com", // Different region

234

"eu-central-1",

235

InitialPositionInStream.LATEST,

236

Seconds(30),

237

StorageLevel.MEMORY_AND_DISK_2,

238

explicitAccessKey,

239

explicitSecretKey

240

)

241

```

242

243

## Required AWS Permissions

244

245

Ensure credentials have the following minimum permissions:

246

247

### Kinesis Permissions

248

```json

249

{

250

"Version": "2012-10-17",

251

"Statement": [

252

{

253

"Effect": "Allow",

254

"Action": [

255

"kinesis:DescribeStream",

256

"kinesis:GetShardIterator",

257

"kinesis:GetRecords",

258

"kinesis:ListStreams"

259

],

260

"Resource": "arn:aws:kinesis:*:*:stream/your-stream-name"

261

}

262

]

263

}

264

```

265

266

### DynamoDB Permissions (for checkpointing)

267

```json

268

{

269

"Version": "2012-10-17",

270

"Statement": [

271

{

272

"Effect": "Allow",

273

"Action": [

274

"dynamodb:CreateTable",

275

"dynamodb:DescribeTable",

276

"dynamodb:GetItem",

277

"dynamodb:PutItem",

278

"dynamodb:UpdateItem",

279

"dynamodb:Scan"

280

],

281

"Resource": "arn:aws:dynamodb:*:*:table/your-kinesis-app-name"

282

}

283

]

284

}

285

```

286

287

### CloudWatch Permissions (for metrics)

288

```json

289

{

290

"Version": "2012-10-17",

291

"Statement": [

292

{

293

"Effect": "Allow",

294

"Action": [

295

"cloudwatch:PutMetricData"

296

],

297

"Resource": "*"

298

}

299

]

300

}

301

```