0
# Scala Testing Support
1
2
Scala-specific testing components including API completeness validation, migration test types, and Scala-specific utility functions for comprehensive testing of Flink's Scala API integration and compatibility.
3
4
## Capabilities
5
6
### ScalaAPICompletenessTestBase
7
8
Base class for testing Scala API completeness and ensuring all Java API features are properly exposed through Scala wrappers.
9
10
```scala { .api }
11
/**
12
* Base class for testing Scala API completeness
13
* Validates that Scala API provides equivalent functionality to Java API
14
*/
15
abstract class ScalaAPICompletenessTestBase {
16
17
/**
18
* Test that all Java API methods have Scala equivalents
19
* Validates API completeness through reflection-based comparison
20
*/
21
def testAPICompleteness(): Unit
22
23
/**
24
* Test Scala-specific functionality and syntax
25
* Validates Scala-idiomatic API patterns and implicit conversions
26
*/
27
def testScalaSpecificFeatures(): Unit
28
29
/**
30
* Validate type safety in Scala API
31
* Ensures proper type inference and compile-time type checking
32
*/
33
def testTypeSafety(): Unit
34
}
35
```
36
37
**Usage Example:**
38
39
```scala
40
class DataSetAPICompletenessTest extends ScalaAPICompletenessTestBase {
41
42
@Test
43
def testDataSetOperations(): Unit = {
44
val env = ExecutionEnvironment.getExecutionEnvironment
45
46
// Test that all common operations are available
47
val dataset = env.fromElements(1, 2, 3, 4, 5)
48
49
// Scala-specific syntax should work
50
val mapped = dataset.map(_ * 2)
51
val filtered = dataset.filter(_ > 2)
52
val reduced = dataset.reduce(_ + _)
53
54
// Verify operations compile and execute
55
assertNotNull("Mapped dataset should not be null", mapped)
56
assertNotNull("Filtered dataset should not be null", filtered)
57
assertNotNull("Reduced dataset should not be null", reduced)
58
}
59
60
@Test
61
def testImplicitConversions(): Unit = {
62
val env = ExecutionEnvironment.getExecutionEnvironment
63
64
// Test implicit conversions work properly
65
val tuples = env.fromElements((1, "a"), (2, "b"), (3, "c"))
66
val keyed = tuples.groupBy(0) // Should work with numeric index
67
val keyedByFunction = tuples.groupBy(_._1) // Should work with function
68
69
assertNotNull("Keyed by index should work", keyed)
70
assertNotNull("Keyed by function should work", keyedByFunction)
71
}
72
}
73
```
74
75
### MigrationTestTypes
76
77
Scala case classes and enums for testing serialization and migration of Scala-specific types across Flink versions.
78
79
```scala { .api }
80
/**
81
* Scala types for migration testing
82
* Provides case classes and enums to test Scala serialization compatibility
83
*/
84
object MigrationTestTypes {
85
86
/**
87
* Simple case class for basic migration testing
88
* @param a String field for testing string serialization
89
* @param b Long field for testing numeric serialization
90
*/
91
case class CustomCaseClass(a: String, b: Long)
92
93
/**
94
* Nested case class for complex migration testing
95
* @param a Long field for testing numeric serialization
96
* @param nested Nested case class for testing complex object serialization
97
*/
98
case class CustomCaseClassWithNesting(a: Long, nested: CustomCaseClass)
99
100
/**
101
* Enumeration for testing enum serialization migration
102
* Tests that Scala enums maintain compatibility across versions
103
*/
104
object CustomEnum extends Enumeration {
105
type CustomEnum = Value
106
val ONE, TWO, THREE = Value
107
}
108
109
/**
110
* Case class with Option types for testing Option serialization
111
* @param required Required string field
112
* @param optional Optional string field
113
* @param optionalNumber Optional numeric field
114
*/
115
case class CustomCaseClassWithOption(
116
required: String,
117
optional: Option[String],
118
optionalNumber: Option[Long]
119
)
120
121
/**
122
* Case class with collection types for testing collection serialization
123
* @param id Identifier field
124
* @param items List of string items
125
* @param metadata Map of metadata key-value pairs
126
*/
127
case class CustomCaseClassWithCollections(
128
id: Long,
129
items: List[String],
130
metadata: Map[String, String]
131
)
132
}
133
```
134
135
**Usage Example:**
136
137
```scala
138
import MigrationTestTypes._
139
140
class ScalaMigrationTest extends SavepointMigrationTestBase {
141
142
@Test
143
def testCaseClassMigration(): Unit = {
144
val env = StreamExecutionEnvironment.getExecutionEnvironment
145
env.setParallelism(DEFAULT_PARALLELISM)
146
147
// Create test data with case classes
148
val testData = Seq(
149
CustomCaseClass("hello", 1L),
150
CustomCaseClass("world", 2L),
151
CustomCaseClass("flink", 3L)
152
)
153
154
env.fromCollection(testData)
155
.keyBy(_.b)
156
.map(item => CustomCaseClassWithNesting(item.b * 10, item))
157
.addSink(new AccumulatorCountingSink[CustomCaseClassWithNesting])
158
159
executeAndSavepoint(env, "scala-case-class-savepoint",
160
Tuple2.of(AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, testData.size))
161
}
162
163
@Test
164
def testEnumMigration(): Unit = {
165
val env = StreamExecutionEnvironment.getExecutionEnvironment
166
env.setParallelism(DEFAULT_PARALLELISM)
167
168
env.fromElements(CustomEnum.ONE, CustomEnum.TWO, CustomEnum.THREE)
169
.map(_.toString)
170
.addSink(new AccumulatorCountingSink[String])
171
172
executeAndSavepoint(env, "scala-enum-savepoint",
173
Tuple2.of(AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, 3))
174
}
175
}
176
```
177
178
### Scala Utility Functions
179
180
Utility functions and implicit conversions specific to Scala testing scenarios.
181
182
```scala { .api }
183
/**
184
* Scala-specific testing utilities
185
* Provides helpers for common Scala testing patterns
186
*/
187
object ScalaTestUtils {
188
189
/**
190
* Create test DataSet from Scala collections
191
* @param env ExecutionEnvironment
192
* @param data Scala collection to convert
193
* @return DataSet containing the collection elements
194
*/
195
def createTestDataSet[T](env: ExecutionEnvironment, data: Seq[T]): DataSet[T] = {
196
env.fromCollection(data)
197
}
198
199
/**
200
* Create test DataStream from Scala collections
201
* @param env StreamExecutionEnvironment
202
* @param data Scala collection to convert
203
* @return DataStream containing the collection elements
204
*/
205
def createTestDataStream[T](env: StreamExecutionEnvironment, data: Seq[T]): DataStream[T] = {
206
env.fromCollection(data)
207
}
208
209
/**
210
* Assert that two sequences contain the same elements (order independent)
211
* @param expected Expected sequence
212
* @param actual Actual sequence
213
*/
214
def assertSameElements[T](expected: Seq[T], actual: Seq[T]): Unit = {
215
assertEquals("Sequences should have same size", expected.size, actual.size)
216
assertTrue("Sequences should contain same elements",
217
expected.toSet == actual.toSet)
218
}
219
220
/**
221
* Execute Scala DataSet and collect results
222
* @param dataset DataSet to execute
223
* @return Collected results as Scala List
224
*/
225
def executeAndCollect[T](dataset: DataSet[T]): List[T] = {
226
import scala.collection.JavaConverters._
227
dataset.collect().asScala.toList
228
}
229
230
/**
231
* Execute Scala DataStream and collect results (for testing)
232
* @param stream DataStream to execute
233
* @param maxElements Maximum number of elements to collect
234
* @return Collected results as Scala List
235
*/
236
def executeAndCollectStream[T](stream: DataStream[T], maxElements: Int): List[T] = {
237
val resultSink = new TestListResultSink[T]()
238
stream.addSink(resultSink)
239
240
val env = stream.getExecutionEnvironment
241
TestUtils.tryExecute(env, "Scala Stream Test")
242
243
import scala.collection.JavaConverters._
244
resultSink.getResult.asScala.take(maxElements).toList
245
}
246
}
247
```
248
249
**Usage Example:**
250
251
```scala
252
import ScalaTestUtils._
253
254
class ScalaUtilityTest extends StreamFaultToleranceTestBase {
255
256
override def testProgram(env: StreamExecutionEnvironment): Unit = {
257
val testData = Seq(1, 2, 3, 4, 5)
258
val stream = createTestDataStream(env, testData)
259
260
stream.map(_ * 2)
261
.filter(_ > 4)
262
.addSink(new TestListResultSink[Int])
263
}
264
265
override def postSubmit(): Unit = {
266
val results = TestListResultSink.getResults
267
import scala.collection.JavaConverters._
268
val scalaResults = results.asScala.toList
269
270
assertSameElements(List(6, 8, 10), scalaResults)
271
}
272
}
273
```
274
275
## Scala-Specific Testing Patterns
276
277
### Case Class Serialization Testing
278
279
Testing that Scala case classes serialize correctly:
280
281
```scala
282
case class Person(name: String, age: Int, city: String)
283
284
class CaseClassSerializationTest extends StreamFaultToleranceTestBase {
285
286
override def testProgram(env: StreamExecutionEnvironment): Unit = {
287
val people = Seq(
288
Person("Alice", 25, "Berlin"),
289
Person("Bob", 30, "Munich"),
290
Person("Charlie", 35, "Hamburg")
291
)
292
293
env.fromCollection(people)
294
.keyBy(_.city)
295
.map(person => person.copy(age = person.age + 1))
296
.addSink(new TestListResultSink[Person])
297
}
298
299
override def postSubmit(): Unit = {
300
val results = TestListResultSink.getResults
301
assertEquals("Should have 3 people", 3, results.size())
302
303
import scala.collection.JavaConverters._
304
val ages = results.asScala.map(_.age).toSet
305
assertEquals("All ages should be incremented", Set(26, 31, 36), ages)
306
}
307
}
308
```
309
310
### Option Type Testing
311
312
Testing Scala Option types in Flink operations:
313
314
```scala
315
case class OptionalData(id: Int, value: Option[String], count: Option[Long])
316
317
class OptionTypeTest extends StreamFaultToleranceTestBase {
318
319
override def testProgram(env: StreamExecutionEnvironment): Unit = {
320
val data = Seq(
321
OptionalData(1, Some("hello"), Some(10L)),
322
OptionalData(2, None, Some(20L)),
323
OptionalData(3, Some("world"), None)
324
)
325
326
env.fromCollection(data)
327
.filter(_.value.isDefined) // Filter out None values
328
.map(item => item.copy(count = item.count.map(_ * 2))) // Transform Option
329
.addSink(new TestListResultSink[OptionalData])
330
}
331
332
override def postSubmit(): Unit = {
333
val results = TestListResultSink.getResults
334
assertEquals("Should have 2 items with defined values", 2, results.size())
335
336
import scala.collection.JavaConverters._
337
results.asScala.foreach { item =>
338
assertTrue("Value should be defined", item.value.isDefined)
339
}
340
}
341
}
342
```
343
344
### Collection Type Testing
345
346
Testing Scala collection types with Flink:
347
348
```scala
349
case class CollectionData(id: Int, items: List[String], frequencies: Map[String, Int])
350
351
class CollectionTypeTest extends StreamFaultToleranceTestBase {
352
353
override def testProgram(env: StreamExecutionEnvironment): Unit = {
354
val data = Seq(
355
CollectionData(1, List("a", "b", "c"), Map("a" -> 1, "b" -> 2)),
356
CollectionData(2, List("d", "e"), Map("d" -> 3, "e" -> 1)),
357
CollectionData(3, List("f"), Map("f" -> 5))
358
)
359
360
env.fromCollection(data)
361
.flatMap(item => item.items.map(str => (str, item.frequencies.getOrElse(str, 0))))
362
.keyBy(_._1)
363
.sum(1)
364
.addSink(new TestListResultSink[(String, Int)])
365
}
366
367
override def postSubmit(): Unit = {
368
val results = TestListResultSink.getResults
369
assertTrue("Should have processed items", results.size() > 0)
370
371
import scala.collection.JavaConverters._
372
val resultMap = results.asScala.map(t => t._1 -> t._2).toMap
373
374
// Verify expected frequencies
375
assertEquals("Frequency for 'a' should be 1", 1, resultMap("a"))
376
assertEquals("Frequency for 'b' should be 2", 2, resultMap("b"))
377
}
378
}
379
```
380
381
### Implicit Conversion Testing
382
383
Testing that Scala implicit conversions work properly:
384
385
```scala
386
class ImplicitConversionTest extends StreamFaultToleranceTestBase {
387
388
override def testProgram(env: StreamExecutionEnvironment): Unit = {
389
val tuples = Seq((1, "a"), (2, "b"), (3, "c"), (1, "d"), (2, "e"))
390
391
env.fromCollection(tuples)
392
.keyBy(0) // Should use implicit conversion to KeySelector
393
.reduce((t1, t2) => (t1._1, t1._2 + "," + t2._2)) // Concatenate strings
394
.addSink(new TestListResultSink[(Int, String)])
395
}
396
397
override def postSubmit(): Unit = {
398
val results = TestListResultSink.getResults
399
assertEquals("Should have 3 reduced results", 3, results.size())
400
401
import scala.collection.JavaConverters._
402
val resultMap = results.asScala.map(t => t._1 -> t._2).toMap
403
404
assertTrue("Key 1 should have concatenated values",
405
resultMap(1).contains("a") && resultMap(1).contains("d"))
406
assertTrue("Key 2 should have concatenated values",
407
resultMap(2).contains("b") && resultMap(2).contains("e"))
408
}
409
}
410
```
411
412
## Advanced Scala Features
413
414
### Higher-Order Functions
415
416
Testing Scala higher-order functions with Flink:
417
418
```scala
419
class HigherOrderFunctionTest extends StreamFaultToleranceTestBase {
420
421
def createTransformFunction(multiplier: Int): Int => Int = _ * multiplier
422
423
override def testProgram(env: StreamExecutionEnvironment): Unit = {
424
val transform = createTransformFunction(3)
425
426
env.fromElements(1, 2, 3, 4, 5)
427
.map(transform) // Use higher-order function
428
.addSink(new TestListResultSink[Int])
429
}
430
431
override def postSubmit(): Unit = {
432
val results = TestListResultSink.getResults
433
import scala.collection.JavaConverters._
434
val expected = List(3, 6, 9, 12, 15)
435
436
assertSameElements(expected, results.asScala.toList)
437
}
438
}
439
```
440
441
### Pattern Matching
442
443
Testing Scala pattern matching in Flink operations:
444
445
```scala
446
sealed trait Event
447
case class UserEvent(userId: Int, action: String) extends Event
448
case class SystemEvent(level: String, message: String) extends Event
449
450
class PatternMatchingTest extends StreamFaultToleranceTestBase {
451
452
override def testProgram(env: StreamExecutionEnvironment): Unit = {
453
val events = Seq[Event](
454
UserEvent(1, "login"),
455
SystemEvent("INFO", "system started"),
456
UserEvent(2, "logout"),
457
SystemEvent("ERROR", "connection failed")
458
)
459
460
env.fromCollection(events)
461
.map {
462
case UserEvent(id, action) => s"User $id did $action"
463
case SystemEvent(level, msg) => s"[$level] $msg"
464
}
465
.addSink(new TestListResultSink[String])
466
}
467
468
override def postSubmit(): Unit = {
469
val results = TestListResultSink.getResults
470
assertEquals("Should have 4 processed events", 4, results.size())
471
472
import scala.collection.JavaConverters._
473
val resultStrings = results.asScala.toSet
474
475
assertTrue("Should contain user event",
476
resultStrings.exists(_.contains("User 1 did login")))
477
assertTrue("Should contain system event",
478
resultStrings.exists(_.contains("[INFO] system started")))
479
}
480
}
481
```