or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdrecovery-fault-tolerance.mdscala-testing.mdstate-migration.mdtest-base-classes.mdtest-utilities.md

scala-testing.mddocs/

0

# Scala Testing Support

1

2

Scala-specific testing components including API completeness validation, migration test types, and Scala-specific utility functions for comprehensive testing of Flink's Scala API integration and compatibility.

3

4

## Capabilities

5

6

### ScalaAPICompletenessTestBase

7

8

Base class for testing Scala API completeness and ensuring all Java API features are properly exposed through Scala wrappers.

9

10

```scala { .api }

11

/**

12

* Base class for testing Scala API completeness

13

* Validates that Scala API provides equivalent functionality to Java API

14

*/

15

abstract class ScalaAPICompletenessTestBase {

16

17

/**

18

* Test that all Java API methods have Scala equivalents

19

* Validates API completeness through reflection-based comparison

20

*/

21

def testAPICompleteness(): Unit

22

23

/**

24

* Test Scala-specific functionality and syntax

25

* Validates Scala-idiomatic API patterns and implicit conversions

26

*/

27

def testScalaSpecificFeatures(): Unit

28

29

/**

30

* Validate type safety in Scala API

31

* Ensures proper type inference and compile-time type checking

32

*/

33

def testTypeSafety(): Unit

34

}

35

```

36

37

**Usage Example:**

38

39

```scala

40

class DataSetAPICompletenessTest extends ScalaAPICompletenessTestBase {

41

42

@Test

43

def testDataSetOperations(): Unit = {

44

val env = ExecutionEnvironment.getExecutionEnvironment

45

46

// Test that all common operations are available

47

val dataset = env.fromElements(1, 2, 3, 4, 5)

48

49

// Scala-specific syntax should work

50

val mapped = dataset.map(_ * 2)

51

val filtered = dataset.filter(_ > 2)

52

val reduced = dataset.reduce(_ + _)

53

54

// Verify operations compile and execute

55

assertNotNull("Mapped dataset should not be null", mapped)

56

assertNotNull("Filtered dataset should not be null", filtered)

57

assertNotNull("Reduced dataset should not be null", reduced)

58

}

59

60

@Test

61

def testImplicitConversions(): Unit = {

62

val env = ExecutionEnvironment.getExecutionEnvironment

63

64

// Test implicit conversions work properly

65

val tuples = env.fromElements((1, "a"), (2, "b"), (3, "c"))

66

val keyed = tuples.groupBy(0) // Should work with numeric index

67

val keyedByFunction = tuples.groupBy(_._1) // Should work with function

68

69

assertNotNull("Keyed by index should work", keyed)

70

assertNotNull("Keyed by function should work", keyedByFunction)

71

}

72

}

73

```

74

75

### MigrationTestTypes

76

77

Scala case classes and enums for testing serialization and migration of Scala-specific types across Flink versions.

78

79

```scala { .api }

80

/**

81

* Scala types for migration testing

82

* Provides case classes and enums to test Scala serialization compatibility

83

*/

84

object MigrationTestTypes {

85

86

/**

87

* Simple case class for basic migration testing

88

* @param a String field for testing string serialization

89

* @param b Long field for testing numeric serialization

90

*/

91

case class CustomCaseClass(a: String, b: Long)

92

93

/**

94

* Nested case class for complex migration testing

95

* @param a Long field for testing numeric serialization

96

* @param nested Nested case class for testing complex object serialization

97

*/

98

case class CustomCaseClassWithNesting(a: Long, nested: CustomCaseClass)

99

100

/**

101

* Enumeration for testing enum serialization migration

102

* Tests that Scala enums maintain compatibility across versions

103

*/

104

object CustomEnum extends Enumeration {

105

type CustomEnum = Value

106

val ONE, TWO, THREE = Value

107

}

108

109

/**

110

* Case class with Option types for testing Option serialization

111

* @param required Required string field

112

* @param optional Optional string field

113

* @param optionalNumber Optional numeric field

114

*/

115

case class CustomCaseClassWithOption(

116

required: String,

117

optional: Option[String],

118

optionalNumber: Option[Long]

119

)

120

121

/**

122

* Case class with collection types for testing collection serialization

123

* @param id Identifier field

124

* @param items List of string items

125

* @param metadata Map of metadata key-value pairs

126

*/

127

case class CustomCaseClassWithCollections(

128

id: Long,

129

items: List[String],

130

metadata: Map[String, String]

131

)

132

}

133

```

134

135

**Usage Example:**

136

137

```scala

138

import MigrationTestTypes._

139

140

class ScalaMigrationTest extends SavepointMigrationTestBase {

141

142

@Test

143

def testCaseClassMigration(): Unit = {

144

val env = StreamExecutionEnvironment.getExecutionEnvironment

145

env.setParallelism(DEFAULT_PARALLELISM)

146

147

// Create test data with case classes

148

val testData = Seq(

149

CustomCaseClass("hello", 1L),

150

CustomCaseClass("world", 2L),

151

CustomCaseClass("flink", 3L)

152

)

153

154

env.fromCollection(testData)

155

.keyBy(_.b)

156

.map(item => CustomCaseClassWithNesting(item.b * 10, item))

157

.addSink(new AccumulatorCountingSink[CustomCaseClassWithNesting])

158

159

executeAndSavepoint(env, "scala-case-class-savepoint",

160

Tuple2.of(AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, testData.size))

161

}

162

163

@Test

164

def testEnumMigration(): Unit = {

165

val env = StreamExecutionEnvironment.getExecutionEnvironment

166

env.setParallelism(DEFAULT_PARALLELISM)

167

168

env.fromElements(CustomEnum.ONE, CustomEnum.TWO, CustomEnum.THREE)

169

.map(_.toString)

170

.addSink(new AccumulatorCountingSink[String])

171

172

executeAndSavepoint(env, "scala-enum-savepoint",

173

Tuple2.of(AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, 3))

174

}

175

}

176

```

177

178

### Scala Utility Functions

179

180

Utility functions and implicit conversions specific to Scala testing scenarios.

181

182

```scala { .api }

183

/**

184

* Scala-specific testing utilities

185

* Provides helpers for common Scala testing patterns

186

*/

187

object ScalaTestUtils {

188

189

/**

190

* Create test DataSet from Scala collections

191

* @param env ExecutionEnvironment

192

* @param data Scala collection to convert

193

* @return DataSet containing the collection elements

194

*/

195

def createTestDataSet[T](env: ExecutionEnvironment, data: Seq[T]): DataSet[T] = {

196

env.fromCollection(data)

197

}

198

199

/**

200

* Create test DataStream from Scala collections

201

* @param env StreamExecutionEnvironment

202

* @param data Scala collection to convert

203

* @return DataStream containing the collection elements

204

*/

205

def createTestDataStream[T](env: StreamExecutionEnvironment, data: Seq[T]): DataStream[T] = {

206

env.fromCollection(data)

207

}

208

209

/**

210

* Assert that two sequences contain the same elements (order independent)

211

* @param expected Expected sequence

212

* @param actual Actual sequence

213

*/

214

def assertSameElements[T](expected: Seq[T], actual: Seq[T]): Unit = {

215

assertEquals("Sequences should have same size", expected.size, actual.size)

216

assertTrue("Sequences should contain same elements",

217

expected.toSet == actual.toSet)

218

}

219

220

/**

221

* Execute Scala DataSet and collect results

222

* @param dataset DataSet to execute

223

* @return Collected results as Scala List

224

*/

225

def executeAndCollect[T](dataset: DataSet[T]): List[T] = {

226

import scala.collection.JavaConverters._

227

dataset.collect().asScala.toList

228

}

229

230

/**

231

* Execute Scala DataStream and collect results (for testing)

232

* @param stream DataStream to execute

233

* @param maxElements Maximum number of elements to collect

234

* @return Collected results as Scala List

235

*/

236

def executeAndCollectStream[T](stream: DataStream[T], maxElements: Int): List[T] = {

237

val resultSink = new TestListResultSink[T]()

238

stream.addSink(resultSink)

239

240

val env = stream.getExecutionEnvironment

241

TestUtils.tryExecute(env, "Scala Stream Test")

242

243

import scala.collection.JavaConverters._

244

resultSink.getResult.asScala.take(maxElements).toList

245

}

246

}

247

```

248

249

**Usage Example:**

250

251

```scala

252

import ScalaTestUtils._

253

254

class ScalaUtilityTest extends StreamFaultToleranceTestBase {

255

256

override def testProgram(env: StreamExecutionEnvironment): Unit = {

257

val testData = Seq(1, 2, 3, 4, 5)

258

val stream = createTestDataStream(env, testData)

259

260

stream.map(_ * 2)

261

.filter(_ > 4)

262

.addSink(new TestListResultSink[Int])

263

}

264

265

override def postSubmit(): Unit = {

266

val results = TestListResultSink.getResults

267

import scala.collection.JavaConverters._

268

val scalaResults = results.asScala.toList

269

270

assertSameElements(List(6, 8, 10), scalaResults)

271

}

272

}

273

```

274

275

## Scala-Specific Testing Patterns

276

277

### Case Class Serialization Testing

278

279

Testing that Scala case classes serialize correctly:

280

281

```scala

282

case class Person(name: String, age: Int, city: String)

283

284

class CaseClassSerializationTest extends StreamFaultToleranceTestBase {

285

286

override def testProgram(env: StreamExecutionEnvironment): Unit = {

287

val people = Seq(

288

Person("Alice", 25, "Berlin"),

289

Person("Bob", 30, "Munich"),

290

Person("Charlie", 35, "Hamburg")

291

)

292

293

env.fromCollection(people)

294

.keyBy(_.city)

295

.map(person => person.copy(age = person.age + 1))

296

.addSink(new TestListResultSink[Person])

297

}

298

299

override def postSubmit(): Unit = {

300

val results = TestListResultSink.getResults

301

assertEquals("Should have 3 people", 3, results.size())

302

303

import scala.collection.JavaConverters._

304

val ages = results.asScala.map(_.age).toSet

305

assertEquals("All ages should be incremented", Set(26, 31, 36), ages)

306

}

307

}

308

```

309

310

### Option Type Testing

311

312

Testing Scala Option types in Flink operations:

313

314

```scala

315

case class OptionalData(id: Int, value: Option[String], count: Option[Long])

316

317

class OptionTypeTest extends StreamFaultToleranceTestBase {

318

319

override def testProgram(env: StreamExecutionEnvironment): Unit = {

320

val data = Seq(

321

OptionalData(1, Some("hello"), Some(10L)),

322

OptionalData(2, None, Some(20L)),

323

OptionalData(3, Some("world"), None)

324

)

325

326

env.fromCollection(data)

327

.filter(_.value.isDefined) // Filter out None values

328

.map(item => item.copy(count = item.count.map(_ * 2))) // Transform Option

329

.addSink(new TestListResultSink[OptionalData])

330

}

331

332

override def postSubmit(): Unit = {

333

val results = TestListResultSink.getResults

334

assertEquals("Should have 2 items with defined values", 2, results.size())

335

336

import scala.collection.JavaConverters._

337

results.asScala.foreach { item =>

338

assertTrue("Value should be defined", item.value.isDefined)

339

}

340

}

341

}

342

```

343

344

### Collection Type Testing

345

346

Testing Scala collection types with Flink:

347

348

```scala

349

case class CollectionData(id: Int, items: List[String], frequencies: Map[String, Int])

350

351

class CollectionTypeTest extends StreamFaultToleranceTestBase {

352

353

override def testProgram(env: StreamExecutionEnvironment): Unit = {

354

val data = Seq(

355

CollectionData(1, List("a", "b", "c"), Map("a" -> 1, "b" -> 2)),

356

CollectionData(2, List("d", "e"), Map("d" -> 3, "e" -> 1)),

357

CollectionData(3, List("f"), Map("f" -> 5))

358

)

359

360

env.fromCollection(data)

361

.flatMap(item => item.items.map(str => (str, item.frequencies.getOrElse(str, 0))))

362

.keyBy(_._1)

363

.sum(1)

364

.addSink(new TestListResultSink[(String, Int)])

365

}

366

367

override def postSubmit(): Unit = {

368

val results = TestListResultSink.getResults

369

assertTrue("Should have processed items", results.size() > 0)

370

371

import scala.collection.JavaConverters._

372

val resultMap = results.asScala.map(t => t._1 -> t._2).toMap

373

374

// Verify expected frequencies

375

assertEquals("Frequency for 'a' should be 1", 1, resultMap("a"))

376

assertEquals("Frequency for 'b' should be 2", 2, resultMap("b"))

377

}

378

}

379

```

380

381

### Implicit Conversion Testing

382

383

Testing that Scala implicit conversions work properly:

384

385

```scala

386

class ImplicitConversionTest extends StreamFaultToleranceTestBase {

387

388

override def testProgram(env: StreamExecutionEnvironment): Unit = {

389

val tuples = Seq((1, "a"), (2, "b"), (3, "c"), (1, "d"), (2, "e"))

390

391

env.fromCollection(tuples)

392

.keyBy(0) // Should use implicit conversion to KeySelector

393

.reduce((t1, t2) => (t1._1, t1._2 + "," + t2._2)) // Concatenate strings

394

.addSink(new TestListResultSink[(Int, String)])

395

}

396

397

override def postSubmit(): Unit = {

398

val results = TestListResultSink.getResults

399

assertEquals("Should have 3 reduced results", 3, results.size())

400

401

import scala.collection.JavaConverters._

402

val resultMap = results.asScala.map(t => t._1 -> t._2).toMap

403

404

assertTrue("Key 1 should have concatenated values",

405

resultMap(1).contains("a") && resultMap(1).contains("d"))

406

assertTrue("Key 2 should have concatenated values",

407

resultMap(2).contains("b") && resultMap(2).contains("e"))

408

}

409

}

410

```

411

412

## Advanced Scala Features

413

414

### Higher-Order Functions

415

416

Testing Scala higher-order functions with Flink:

417

418

```scala

419

class HigherOrderFunctionTest extends StreamFaultToleranceTestBase {

420

421

def createTransformFunction(multiplier: Int): Int => Int = _ * multiplier

422

423

override def testProgram(env: StreamExecutionEnvironment): Unit = {

424

val transform = createTransformFunction(3)

425

426

env.fromElements(1, 2, 3, 4, 5)

427

.map(transform) // Use higher-order function

428

.addSink(new TestListResultSink[Int])

429

}

430

431

override def postSubmit(): Unit = {

432

val results = TestListResultSink.getResults

433

import scala.collection.JavaConverters._

434

val expected = List(3, 6, 9, 12, 15)

435

436

assertSameElements(expected, results.asScala.toList)

437

}

438

}

439

```

440

441

### Pattern Matching

442

443

Testing Scala pattern matching in Flink operations:

444

445

```scala

446

sealed trait Event

447

case class UserEvent(userId: Int, action: String) extends Event

448

case class SystemEvent(level: String, message: String) extends Event

449

450

class PatternMatchingTest extends StreamFaultToleranceTestBase {

451

452

override def testProgram(env: StreamExecutionEnvironment): Unit = {

453

val events = Seq[Event](

454

UserEvent(1, "login"),

455

SystemEvent("INFO", "system started"),

456

UserEvent(2, "logout"),

457

SystemEvent("ERROR", "connection failed")

458

)

459

460

env.fromCollection(events)

461

.map {

462

case UserEvent(id, action) => s"User $id did $action"

463

case SystemEvent(level, msg) => s"[$level] $msg"

464

}

465

.addSink(new TestListResultSink[String])

466

}

467

468

override def postSubmit(): Unit = {

469

val results = TestListResultSink.getResults

470

assertEquals("Should have 4 processed events", 4, results.size())

471

472

import scala.collection.JavaConverters._

473

val resultStrings = results.asScala.toSet

474

475

assertTrue("Should contain user event",

476

resultStrings.exists(_.contains("User 1 did login")))

477

assertTrue("Should contain system event",

478

resultStrings.exists(_.contains("[INFO] system started")))

479

}

480

}

481

```