or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

aws-credentials.mdconfiguration.mdindex.mdstream-creation.mdstream-positioning.mdtesting.md

testing.mddocs/

0

# Testing Utilities

1

2

Comprehensive utilities for testing Kinesis integration including stream management, data generation, and cleanup operations for development and testing environments.

3

4

## Capabilities

5

6

### KinesisTestUtils Class

7

8

Primary testing utility class for managing Kinesis streams and test data during development and testing.

9

10

```scala { .api }

11

/**

12

* Utilities for testing Kinesis integration with configurable shard count

13

* @param streamShardCount Number of shards to create in test streams (default: 2)

14

*/

15

class KinesisTestUtils(streamShardCount: Int = 2) {

16

/** Gets the generated test stream name */

17

def streamName: String

18

19

/** Creates a Kinesis stream for testing */

20

def createStream(): Unit

21

22

/**

23

* Gets information about stream shards

24

* @return Sequence of AWS Shard objects containing shard metadata

25

*/

26

def getShards(): Seq[Shard]

27

28

/**

29

* Splits a shard for testing shard rebalancing scenarios

30

* @param shardId ID of shard to split

31

*/

32

def splitShard(shardId: String): Unit

33

34

/**

35

* Merges two adjacent shards for testing shard consolidation

36

* @param shardToMerge ID of first shard to merge

37

* @param adjacentShardToMerge ID of adjacent shard to merge with

38

*/

39

def mergeShard(shardToMerge: String, adjacentShardToMerge: String): Unit

40

41

/**

42

* Pushes test data to the stream

43

* @param testData Sequence of integers to send as test records

44

* @param aggregate Whether to use KPL aggregation

45

* @return Map of shard ID to sequence of (data, sequenceNumber) pairs

46

*/

47

def pushData(testData: Seq[Int], aggregate: Boolean): Map[String, Seq[(Int, String)]]

48

49

/**

50

* Java-friendly version of pushData without aggregation

51

* @param testData Java List of integers to send

52

*/

53

def pushData(testData: java.util.List[Int]): Unit

54

55

/** Deletes the test stream and waits for deletion to complete */

56

def deleteStream(): Unit

57

58

/**

59

* Deletes DynamoDB checkpoint table for cleanup

60

* @param tableName Name of DynamoDB table to delete

61

*/

62

def deleteDynamoDBTable(tableName: String): Unit

63

}

64

```

65

66

**Usage Examples:**

67

68

```scala

69

import org.apache.spark.streaming.kinesis.KinesisTestUtils

70

import org.apache.spark.streaming.kinesis.KinesisInputDStream

71

import org.apache.spark.streaming.kinesis.KinesisInitialPositions._

72

73

// Create test utilities with default 2 shards

74

val testUtils = new KinesisTestUtils()

75

76

// Create test stream

77

testUtils.createStream()

78

val streamName = testUtils.streamName

79

80

// Send test data

81

val testData = Seq(1, 2, 3, 4, 5)

82

val dataMap = testUtils.pushData(testData, aggregate = false)

83

84

// Create Kinesis input stream for testing

85

val stream = KinesisInputDStream.builder

86

.streamingContext(ssc)

87

.streamName(streamName)

88

.checkpointAppName("test-app")

89

.initialPosition(new TrimHorizon())

90

.build()

91

92

// Process test stream

93

stream.foreachRDD { rdd =>

94

if (!rdd.isEmpty()) {

95

val records = rdd.collect()

96

println(s"Received ${records.length} records")

97

}

98

}

99

100

// Cleanup after testing

101

testUtils.deleteStream()

102

testUtils.deleteDynamoDBTable("test-app")

103

```

104

105

### KinesisTestUtils Object

106

107

Companion object providing configuration and utility methods for test environment setup.

108

109

```scala { .api }

110

object KinesisTestUtils {

111

/** Environment variable name for enabling Kinesis tests */

112

val envVarNameForEnablingTests: String = "ENABLE_KINESIS_TESTS"

113

114

/** Environment variable name for custom Kinesis endpoint URL */

115

val endVarNameForEndpoint: String = "KINESIS_TEST_ENDPOINT_URL"

116

117

/** Default endpoint URL for testing */

118

val defaultEndpointUrl: String = "https://kinesis.us-west-2.amazonaws.com"

119

120

/** Whether Kinesis tests should run based on environment configuration */

121

val shouldRunTests: Boolean

122

123

/** Endpoint URL for tests (from environment or default) */

124

val endpointUrl: String

125

126

/**

127

* Extracts AWS region name from endpoint URL

128

* @param endpoint Kinesis endpoint URL

129

* @return AWS region name extracted from URL

130

*/

131

def getRegionNameByEndpoint(endpoint: String): String

132

133

/** Checks if AWS credentials are available for testing */

134

def isAWSCredentialsPresent: Boolean

135

136

/** Gets AWS credentials for testing (throws exception if not available) */

137

def getAWSCredentials(): AWSCredentials

138

}

139

```

140

141

**Usage Examples:**

142

143

```scala

144

import org.apache.spark.streaming.kinesis.KinesisTestUtils

145

146

// Check if tests should run

147

if (KinesisTestUtils.shouldRunTests) {

148

println("Kinesis tests are enabled")

149

println(s"Using endpoint: ${KinesisTestUtils.endpointUrl}")

150

151

// Extract region for configuration

152

val region = KinesisTestUtils.getRegionNameByEndpoint(KinesisTestUtils.endpointUrl)

153

println(s"Using region: $region")

154

155

// Verify credentials are available

156

if (KinesisTestUtils.isAWSCredentialsPresent) {

157

val credentials = KinesisTestUtils.getAWSCredentials()

158

println("AWS credentials found")

159

}

160

}

161

```

162

163

### Test Data Generation

164

165

Interface and implementation for generating test data with different patterns.

166

167

```scala { .api }

168

/**

169

* Interface for generating test data to send to Kinesis streams

170

*/

171

trait KinesisDataGenerator {

172

/**

173

* Sends data to specified Kinesis stream

174

* @param streamName Name of target Kinesis stream

175

* @param data Sequence of integers to send

176

* @return Map of shard ID to sequence of (data, sequenceNumber) pairs

177

*/

178

def sendData(streamName: String, data: Seq[Int]): Map[String, Seq[(Int, String)]]

179

}

180

181

/**

182

* Simple implementation of KinesisDataGenerator

183

* @param client Configured AmazonKinesisClient for data sending

184

*/

185

class SimpleDataGenerator(client: AmazonKinesisClient) extends KinesisDataGenerator {

186

def sendData(streamName: String, data: Seq[Int]): Map[String, Seq[(Int, String)]]

187

}

188

```

189

190

**Usage Examples:**

191

192

```scala

193

import org.apache.spark.streaming.kinesis.{KinesisTestUtils, SimpleDataGenerator}

194

import com.amazonaws.services.kinesis.AmazonKinesisClient

195

196

// Create custom data generator

197

val kinesisClient = new AmazonKinesisClient(KinesisTestUtils.getAWSCredentials())

198

kinesisClient.setEndpoint(KinesisTestUtils.endpointUrl)

199

200

val dataGenerator = new SimpleDataGenerator(kinesisClient)

201

202

// Generate test data patterns

203

val sequentialData = (1 to 100).toSeq

204

val randomData = util.Random.shuffle(1 to 50)

205

val repeatingData = Seq.fill(20)(42)

206

207

// Send different data patterns

208

val sequentialResults = dataGenerator.sendData("test-stream", sequentialData)

209

val randomResults = dataGenerator.sendData("test-stream", randomData)

210

val repeatingResults = dataGenerator.sendData("test-stream", repeatingData)

211

```

212

213

### Test Environment Setup

214

215

**Environment Variable Configuration:**

216

217

```bash

218

# Enable Kinesis tests

219

export ENABLE_KINESIS_TESTS=true

220

221

# Use custom endpoint (optional)

222

export KINESIS_TEST_ENDPOINT_URL=https://kinesis.us-east-1.amazonaws.com

223

224

# AWS credentials (required)

225

export AWS_ACCESS_KEY_ID=your-access-key

226

export AWS_SECRET_ACCESS_KEY=your-secret-key

227

export AWS_DEFAULT_REGION=us-west-2

228

```

229

230

**Test Configuration Example:**

231

232

```scala

233

import org.apache.spark.streaming.kinesis.KinesisTestUtils

234

235

class KinesisIntegrationTest extends FunSuite {

236

237

test("Kinesis stream processing") {

238

assume(KinesisTestUtils.shouldRunTests, "Kinesis tests not enabled")

239

assume(KinesisTestUtils.isAWSCredentialsPresent, "AWS credentials not found")

240

241

val testUtils = new KinesisTestUtils(streamShardCount = 4)

242

243

try {

244

// Setup

245

testUtils.createStream()

246

247

// Test data

248

val testData = (1 to 100).toSeq

249

val dataMap = testUtils.pushData(testData, aggregate = true)

250

251

// Verify data distribution across shards

252

assert(dataMap.size <= 4) // At most 4 shards

253

assert(dataMap.values.flatten.size == 100) // All data sent

254

255

// Additional test logic...

256

257

} finally {

258

// Cleanup

259

testUtils.deleteStream()

260

testUtils.deleteDynamoDBTable("test-app")

261

}

262

}

263

}

264

```

265

266

### Advanced Testing Scenarios

267

268

**Shard Management Testing:**

269

270

```scala

271

import org.apache.spark.streaming.kinesis.KinesisTestUtils

272

273

val testUtils = new KinesisTestUtils(streamShardCount = 2)

274

testUtils.createStream()

275

276

// Get initial shard configuration

277

val initialShards = testUtils.getShards()

278

println(s"Initial shards: ${initialShards.map(_.getShardId)}")

279

280

// Test shard splitting

281

val shardToSplit = initialShards.head.getShardId

282

testUtils.splitShard(shardToSplit)

283

284

// Verify shard split

285

val afterSplit = testUtils.getShards()

286

assert(afterSplit.size > initialShards.size)

287

288

// Test shard merging (requires adjacent shards)

289

val adjacentShards = afterSplit.filter(_.getParentShardId == shardToSplit)

290

if (adjacentShards.size >= 2) {

291

testUtils.mergeShard(adjacentShards(0).getShardId, adjacentShards(1).getShardId)

292

}

293

```

294

295

**Data Aggregation Testing:**

296

297

```scala

298

import org.apache.spark.streaming.kinesis.KinesisTestUtils

299

300

val testUtils = new KinesisTestUtils()

301

testUtils.createStream()

302

303

// Test with aggregation (uses KPL)

304

val aggregatedData = (1 to 1000).toSeq

305

val aggregatedResults = testUtils.pushData(aggregatedData, aggregate = true)

306

307

// Test without aggregation (individual records)

308

val individualData = (1 to 100).toSeq

309

val individualResults = testUtils.pushData(individualData, aggregate = false)

310

311

// Compare record counts and distribution

312

println(s"Aggregated: ${aggregatedResults.values.flatten.size} records")

313

println(s"Individual: ${individualResults.values.flatten.size} records")

314

```

315

316

### Test Performance Considerations

317

318

**Stream Creation Time:**

319

- Kinesis stream creation takes 60-90 seconds

320

- Consider using pre-created streams for faster test execution

321

- Use stream pooling for test suites

322

323

**Data Propagation Delay:**

324

- Allow 1-2 seconds for data to propagate after `pushData()`

325

- Use polling or delays in tests to wait for data availability

326

327

**Cleanup Importance:**

328

- Always cleanup streams and DynamoDB tables to avoid AWS charges

329

- Use try-finally blocks or test fixtures for guaranteed cleanup

330

- Monitor AWS console for orphaned resources

331

332

**Resource Limits:**

333

- AWS accounts have limits on concurrent streams and DynamoDB tables

334

- Consider sequential test execution for resource-intensive scenarios

335

- Use descriptive names to identify test resources

336

337

### Integration with Testing Frameworks

338

339

**ScalaTest Integration:**

340

341

```scala

342

import org.scalatest.{BeforeAndAfterEach, FunSuite}

343

import org.apache.spark.streaming.kinesis.KinesisTestUtils

344

345

class KinesisStreamingTest extends FunSuite with BeforeAndAfterEach {

346

var testUtils: KinesisTestUtils = _

347

348

override def beforeEach(): Unit = {

349

assume(KinesisTestUtils.shouldRunTests)

350

testUtils = new KinesisTestUtils()

351

testUtils.createStream()

352

}

353

354

override def afterEach(): Unit = {

355

if (testUtils != null) {

356

testUtils.deleteStream()

357

testUtils.deleteDynamoDBTable("test-app")

358

}

359

}

360

361

test("stream processing works correctly") {

362

val testData = Seq(1, 2, 3, 4, 5)

363

testUtils.pushData(testData, aggregate = false)

364

365

// Test stream processing logic

366

// ...

367

}

368

}

369

```