0
# Testing Utilities
1
2
Comprehensive utilities for testing Kinesis integration including stream management, data generation, and cleanup operations for development and testing environments.
3
4
## Capabilities
5
6
### KinesisTestUtils Class
7
8
Primary testing utility class for managing Kinesis streams and test data during development and testing.
9
10
```scala { .api }
11
/**
12
* Utilities for testing Kinesis integration with configurable shard count
13
* @param streamShardCount Number of shards to create in test streams (default: 2)
14
*/
15
class KinesisTestUtils(streamShardCount: Int = 2) {
16
/** Gets the generated test stream name */
17
def streamName: String
18
19
/** Creates a Kinesis stream for testing */
20
def createStream(): Unit
21
22
/**
23
* Gets information about stream shards
24
* @return Sequence of AWS Shard objects containing shard metadata
25
*/
26
def getShards(): Seq[Shard]
27
28
/**
29
* Splits a shard for testing shard rebalancing scenarios
30
* @param shardId ID of shard to split
31
*/
32
def splitShard(shardId: String): Unit
33
34
/**
35
* Merges two adjacent shards for testing shard consolidation
36
* @param shardToMerge ID of first shard to merge
37
* @param adjacentShardToMerge ID of adjacent shard to merge with
38
*/
39
def mergeShard(shardToMerge: String, adjacentShardToMerge: String): Unit
40
41
/**
42
* Pushes test data to the stream
43
* @param testData Sequence of integers to send as test records
44
* @param aggregate Whether to use KPL aggregation
45
* @return Map of shard ID to sequence of (data, sequenceNumber) pairs
46
*/
47
def pushData(testData: Seq[Int], aggregate: Boolean): Map[String, Seq[(Int, String)]]
48
49
/**
50
* Java-friendly version of pushData without aggregation
51
* @param testData Java List of integers to send
52
*/
53
def pushData(testData: java.util.List[Int]): Unit
54
55
/** Deletes the test stream and waits for deletion to complete */
56
def deleteStream(): Unit
57
58
/**
59
* Deletes DynamoDB checkpoint table for cleanup
60
* @param tableName Name of DynamoDB table to delete
61
*/
62
def deleteDynamoDBTable(tableName: String): Unit
63
}
64
```
65
66
**Usage Examples:**
67
68
```scala
69
import org.apache.spark.streaming.kinesis.KinesisTestUtils
70
import org.apache.spark.streaming.kinesis.KinesisInputDStream
71
import org.apache.spark.streaming.kinesis.KinesisInitialPositions._
72
73
// Create test utilities with default 2 shards
74
val testUtils = new KinesisTestUtils()
75
76
// Create test stream
77
testUtils.createStream()
78
val streamName = testUtils.streamName
79
80
// Send test data
81
val testData = Seq(1, 2, 3, 4, 5)
82
val dataMap = testUtils.pushData(testData, aggregate = false)
83
84
// Create Kinesis input stream for testing
85
val stream = KinesisInputDStream.builder
86
.streamingContext(ssc)
87
.streamName(streamName)
88
.checkpointAppName("test-app")
89
.initialPosition(new TrimHorizon())
90
.build()
91
92
// Process test stream
93
stream.foreachRDD { rdd =>
94
if (!rdd.isEmpty()) {
95
val records = rdd.collect()
96
println(s"Received ${records.length} records")
97
}
98
}
99
100
// Cleanup after testing
101
testUtils.deleteStream()
102
testUtils.deleteDynamoDBTable("test-app")
103
```
104
105
### KinesisTestUtils Object
106
107
Companion object providing configuration and utility methods for test environment setup.
108
109
```scala { .api }
110
object KinesisTestUtils {
111
/** Environment variable name for enabling Kinesis tests */
112
val envVarNameForEnablingTests: String = "ENABLE_KINESIS_TESTS"
113
114
/** Environment variable name for custom Kinesis endpoint URL */
115
val endVarNameForEndpoint: String = "KINESIS_TEST_ENDPOINT_URL"
116
117
/** Default endpoint URL for testing */
118
val defaultEndpointUrl: String = "https://kinesis.us-west-2.amazonaws.com"
119
120
/** Whether Kinesis tests should run based on environment configuration */
121
val shouldRunTests: Boolean
122
123
/** Endpoint URL for tests (from environment or default) */
124
val endpointUrl: String
125
126
/**
127
* Extracts AWS region name from endpoint URL
128
* @param endpoint Kinesis endpoint URL
129
* @return AWS region name extracted from URL
130
*/
131
def getRegionNameByEndpoint(endpoint: String): String
132
133
/** Checks if AWS credentials are available for testing */
134
def isAWSCredentialsPresent: Boolean
135
136
/** Gets AWS credentials for testing (throws exception if not available) */
137
def getAWSCredentials(): AWSCredentials
138
}
139
```
140
141
**Usage Examples:**
142
143
```scala
144
import org.apache.spark.streaming.kinesis.KinesisTestUtils
145
146
// Check if tests should run
147
if (KinesisTestUtils.shouldRunTests) {
148
println("Kinesis tests are enabled")
149
println(s"Using endpoint: ${KinesisTestUtils.endpointUrl}")
150
151
// Extract region for configuration
152
val region = KinesisTestUtils.getRegionNameByEndpoint(KinesisTestUtils.endpointUrl)
153
println(s"Using region: $region")
154
155
// Verify credentials are available
156
if (KinesisTestUtils.isAWSCredentialsPresent) {
157
val credentials = KinesisTestUtils.getAWSCredentials()
158
println("AWS credentials found")
159
}
160
}
161
```
162
163
### Test Data Generation
164
165
Interface and implementation for generating test data with different patterns.
166
167
```scala { .api }
168
/**
169
* Interface for generating test data to send to Kinesis streams
170
*/
171
trait KinesisDataGenerator {
172
/**
173
* Sends data to specified Kinesis stream
174
* @param streamName Name of target Kinesis stream
175
* @param data Sequence of integers to send
176
* @return Map of shard ID to sequence of (data, sequenceNumber) pairs
177
*/
178
def sendData(streamName: String, data: Seq[Int]): Map[String, Seq[(Int, String)]]
179
}
180
181
/**
182
* Simple implementation of KinesisDataGenerator
183
* @param client Configured AmazonKinesisClient for data sending
184
*/
185
class SimpleDataGenerator(client: AmazonKinesisClient) extends KinesisDataGenerator {
186
def sendData(streamName: String, data: Seq[Int]): Map[String, Seq[(Int, String)]]
187
}
188
```
189
190
**Usage Examples:**
191
192
```scala
193
import org.apache.spark.streaming.kinesis.{KinesisTestUtils, SimpleDataGenerator}
194
import com.amazonaws.services.kinesis.AmazonKinesisClient
195
196
// Create custom data generator
197
val kinesisClient = new AmazonKinesisClient(KinesisTestUtils.getAWSCredentials())
198
kinesisClient.setEndpoint(KinesisTestUtils.endpointUrl)
199
200
val dataGenerator = new SimpleDataGenerator(kinesisClient)
201
202
// Generate test data patterns
203
val sequentialData = (1 to 100).toSeq
204
val randomData = util.Random.shuffle(1 to 50)
205
val repeatingData = Seq.fill(20)(42)
206
207
// Send different data patterns
208
val sequentialResults = dataGenerator.sendData("test-stream", sequentialData)
209
val randomResults = dataGenerator.sendData("test-stream", randomData)
210
val repeatingResults = dataGenerator.sendData("test-stream", repeatingData)
211
```
212
213
### Test Environment Setup
214
215
**Environment Variable Configuration:**
216
217
```bash
218
# Enable Kinesis tests
219
export ENABLE_KINESIS_TESTS=true
220
221
# Use custom endpoint (optional)
222
export KINESIS_TEST_ENDPOINT_URL=https://kinesis.us-east-1.amazonaws.com
223
224
# AWS credentials (required)
225
export AWS_ACCESS_KEY_ID=your-access-key
226
export AWS_SECRET_ACCESS_KEY=your-secret-key
227
export AWS_DEFAULT_REGION=us-west-2
228
```
229
230
**Test Configuration Example:**
231
232
```scala
233
import org.apache.spark.streaming.kinesis.KinesisTestUtils
234
235
class KinesisIntegrationTest extends FunSuite {
236
237
test("Kinesis stream processing") {
238
assume(KinesisTestUtils.shouldRunTests, "Kinesis tests not enabled")
239
assume(KinesisTestUtils.isAWSCredentialsPresent, "AWS credentials not found")
240
241
val testUtils = new KinesisTestUtils(streamShardCount = 4)
242
243
try {
244
// Setup
245
testUtils.createStream()
246
247
// Test data
248
val testData = (1 to 100).toSeq
249
val dataMap = testUtils.pushData(testData, aggregate = true)
250
251
// Verify data distribution across shards
252
assert(dataMap.size <= 4) // At most 4 shards
253
assert(dataMap.values.flatten.size == 100) // All data sent
254
255
// Additional test logic...
256
257
} finally {
258
// Cleanup
259
testUtils.deleteStream()
260
testUtils.deleteDynamoDBTable("test-app")
261
}
262
}
263
}
264
```
265
266
### Advanced Testing Scenarios
267
268
**Shard Management Testing:**
269
270
```scala
271
import org.apache.spark.streaming.kinesis.KinesisTestUtils
272
273
val testUtils = new KinesisTestUtils(streamShardCount = 2)
274
testUtils.createStream()
275
276
// Get initial shard configuration
277
val initialShards = testUtils.getShards()
278
println(s"Initial shards: ${initialShards.map(_.getShardId)}")
279
280
// Test shard splitting
281
val shardToSplit = initialShards.head.getShardId
282
testUtils.splitShard(shardToSplit)
283
284
// Verify shard split
285
val afterSplit = testUtils.getShards()
286
assert(afterSplit.size > initialShards.size)
287
288
// Test shard merging (requires adjacent shards)
289
val adjacentShards = afterSplit.filter(_.getParentShardId == shardToSplit)
290
if (adjacentShards.size >= 2) {
291
testUtils.mergeShard(adjacentShards(0).getShardId, adjacentShards(1).getShardId)
292
}
293
```
294
295
**Data Aggregation Testing:**
296
297
```scala
298
import org.apache.spark.streaming.kinesis.KinesisTestUtils
299
300
val testUtils = new KinesisTestUtils()
301
testUtils.createStream()
302
303
// Test with aggregation (uses KPL)
304
val aggregatedData = (1 to 1000).toSeq
305
val aggregatedResults = testUtils.pushData(aggregatedData, aggregate = true)
306
307
// Test without aggregation (individual records)
308
val individualData = (1 to 100).toSeq
309
val individualResults = testUtils.pushData(individualData, aggregate = false)
310
311
// Compare record counts and distribution
312
println(s"Aggregated: ${aggregatedResults.values.flatten.size} records")
313
println(s"Individual: ${individualResults.values.flatten.size} records")
314
```
315
316
### Test Performance Considerations
317
318
**Stream Creation Time:**
319
- Kinesis stream creation takes 60-90 seconds
320
- Consider using pre-created streams for faster test execution
321
- Use stream pooling for test suites
322
323
**Data Propagation Delay:**
324
- Allow 1-2 seconds for data to propagate after `pushData()`
325
- Use polling or delays in tests to wait for data availability
326
327
**Cleanup Importance:**
328
- Always cleanup streams and DynamoDB tables to avoid AWS charges
329
- Use try-finally blocks or test fixtures for guaranteed cleanup
330
- Monitor AWS console for orphaned resources
331
332
**Resource Limits:**
333
- AWS accounts have limits on concurrent streams and DynamoDB tables
334
- Consider sequential test execution for resource-intensive scenarios
335
- Use descriptive names to identify test resources
336
337
### Integration with Testing Frameworks
338
339
**ScalaTest Integration:**
340
341
```scala
342
import org.scalatest.{BeforeAndAfterEach, FunSuite}
343
import org.apache.spark.streaming.kinesis.KinesisTestUtils
344
345
class KinesisStreamingTest extends FunSuite with BeforeAndAfterEach {
346
var testUtils: KinesisTestUtils = _
347
348
override def beforeEach(): Unit = {
349
assume(KinesisTestUtils.shouldRunTests)
350
testUtils = new KinesisTestUtils()
351
testUtils.createStream()
352
}
353
354
override def afterEach(): Unit = {
355
if (testUtils != null) {
356
testUtils.deleteStream()
357
testUtils.deleteDynamoDBTable("test-app")
358
}
359
}
360
361
test("stream processing works correctly") {
362
val testData = Seq(1, 2, 3, 4, 5)
363
testUtils.pushData(testData, aggregate = false)
364
365
// Test stream processing logic
366
// ...
367
}
368
}
369
```