or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdmetrics-testing.mdminicluster-management.mdresult-verification.mdspecialized-connectors.mdtest-data-sources.mdtest-environments.mdvalidation-utilities.md

validation-utilities.mddocs/

0

# Validation Utilities

1

2

Utilities for POJO serialization verification, JAR packaging validation, resource discovery, and parameter property handling to ensure comprehensive testing coverage. These utilities provide validation capabilities for various aspects of Flink application development and deployment.

3

4

## Capabilities

5

6

### POJO Serialization Testing

7

8

Utilities for validating that classes are properly serialized as POJOs by Flink's type system, ensuring optimal serialization performance.

9

10

```java { .api }

11

@PublicEvolving

12

public class PojoTestUtils {

13

public static <T> void assertSerializedAsPojo(Class<T> clazz);

14

public static <T> void assertSerializedAsPojoWithoutKryo(Class<T> clazz);

15

}

16

```

17

18

#### Usage Example

19

20

```java

21

import org.apache.flink.types.PojoTestUtils;

22

23

// Test that a class is serialized as POJO

24

@Test

25

void testUserRecordIsPojo() {

26

PojoTestUtils.assertSerializedAsPojo(UserRecord.class);

27

}

28

29

// Test POJO serialization without Kryo fallback

30

@Test

31

void testStrictPojoSerialization() {

32

PojoTestUtils.assertSerializedAsPojoWithoutKryo(CustomerData.class);

33

}

34

35

// Example POJO class

36

public static class UserRecord {

37

public String name;

38

public int age;

39

public String email;

40

41

public UserRecord() {} // Required default constructor

42

43

public UserRecord(String name, int age, String email) {

44

this.name = name;

45

this.age = age;

46

this.email = email;

47

}

48

49

// Getters and setters for POJO compliance

50

public String getName() { return name; }

51

public void setName(String name) { this.name = name; }

52

public int getAge() { return age; }

53

public void setAge(int age) { this.age = age; }

54

public String getEmail() { return email; }

55

public void setEmail(String email) { this.email = email; }

56

}

57

```

58

59

### JAR Packaging Validation

60

61

Utilities for verifying JAR file contents and structure, ensuring proper packaging for Flink applications and dependencies.

62

63

```java { .api }

64

public class PackagingTestUtils {

65

public static void assertJarContainsOnlyFilesMatching(

66

Path jarPath, Collection<String> allowedPaths) throws IOException;

67

public static void assertJarContainsServiceEntry(

68

Path jarPath, Class<?> service) throws IOException;

69

}

70

```

71

72

#### Usage Example

73

74

```java

75

import org.apache.flink.packaging.PackagingTestUtils;

76

import java.nio.file.Path;

77

import java.nio.file.Paths;

78

import java.util.Arrays;

79

80

@Test

81

void testJarPackaging() throws IOException {

82

Path jarPath = Paths.get("target/my-flink-app.jar");

83

84

// Verify JAR contains only allowed files

85

Collection<String> allowedPaths = Arrays.asList(

86

"com/mycompany/flink/.*",

87

"META-INF/.*",

88

"org/apache/flink/.*"

89

);

90

91

PackagingTestUtils.assertJarContainsOnlyFilesMatching(jarPath, allowedPaths);

92

93

// Verify JAR contains required service entries

94

PackagingTestUtils.assertJarContainsServiceEntry(

95

jarPath, org.apache.flink.table.factories.TableFactory.class);

96

}

97

98

@Test

99

void testConnectorJarStructure() throws IOException {

100

Path connectorJar = Paths.get("target/my-connector.jar");

101

102

// Validate connector JAR structure

103

Collection<String> connectorPaths = Arrays.asList(

104

"com/mycompany/connector/.*",

105

"META-INF/services/.*",

106

"META-INF/MANIFEST.MF"

107

);

108

109

PackagingTestUtils.assertJarContainsOnlyFilesMatching(connectorJar, connectorPaths);

110

111

// Verify service provider configuration

112

PackagingTestUtils.assertJarContainsServiceEntry(

113

connectorJar, org.apache.flink.table.factories.DynamicTableFactory.class);

114

}

115

```

116

117

### Resource Discovery

118

119

Utilities for finding and managing test resources using regex patterns, enabling flexible resource location in test environments.

120

121

```java { .api }

122

public class ResourceTestUtils {

123

public static Path getResource(String resourceNameRegex) throws IOException;

124

}

125

```

126

127

#### Usage Example

128

129

```java

130

import org.apache.flink.test.resources.ResourceTestUtils;

131

import java.nio.file.Path;

132

133

@Test

134

void testResourceDiscovery() throws IOException {

135

// Find configuration file using regex

136

Path configFile = ResourceTestUtils.getResource(".*application\\.properties");

137

assertTrue(Files.exists(configFile));

138

139

// Find test data files

140

Path testDataFile = ResourceTestUtils.getResource(".*test-data\\.csv");

141

assertTrue(Files.exists(testDataFile));

142

143

// Use discovered resources in test

144

Properties config = new Properties();

145

try (InputStream is = Files.newInputStream(configFile)) {

146

config.load(is);

147

}

148

149

// Verify configuration

150

assertNotNull(config.getProperty("flink.parallelism"));

151

}

152

153

@Test

154

void testSchemaResourceLoading() throws IOException {

155

// Find Avro schema files

156

Path schemaFile = ResourceTestUtils.getResource(".*\\.avsc");

157

158

// Load and validate schema

159

Schema schema = new Schema.Parser().parse(Files.newInputStream(schemaFile));

160

assertNotNull(schema);

161

assertEquals(Schema.Type.RECORD, schema.getType());

162

}

163

```

164

165

### Parameter Property Management

166

167

System property-based parameter management with type conversion and default value support for flexible test configuration.

168

169

```java { .api }

170

public class ParameterProperty<V> {

171

public ParameterProperty(String propertyName, Function<String, V> converter);

172

173

public String getPropertyName();

174

public Optional<V> get();

175

public V get(V defaultValue);

176

}

177

```

178

179

#### Usage Example

180

181

```java

182

import org.apache.flink.test.parameters.ParameterProperty;

183

import java.util.function.Function;

184

185

// Define parameter properties with type conversion

186

ParameterProperty<Integer> parallelismProperty = new ParameterProperty<>(

187

"test.parallelism", Integer::parseInt);

188

189

ParameterProperty<String> jobNameProperty = new ParameterProperty<>(

190

"test.job.name", Function.identity());

191

192

ParameterProperty<Boolean> enableCheckpointingProperty = new ParameterProperty<>(

193

"test.checkpointing.enabled", Boolean::parseBoolean);

194

195

@Test

196

void testParameterConfiguration() {

197

// Use properties with defaults

198

int parallelism = parallelismProperty.get(4);

199

String jobName = jobNameProperty.get("DefaultTestJob");

200

boolean checkpointingEnabled = enableCheckpointingProperty.get(false);

201

202

// Configure test environment

203

StreamExecutionEnvironment env = getTestEnvironment();

204

env.setParallelism(parallelism);

205

206

if (checkpointingEnabled) {

207

env.enableCheckpointing(1000);

208

}

209

210

// Create job with configured parameters

211

env.fromElements(1, 2, 3, 4, 5)

212

.map(x -> x * parallelism)

213

.print(jobName);

214

}

215

216

@Test

217

void testOptionalParameters() {

218

// Check if parameters are provided

219

Optional<Integer> maxParallelism = parallelismProperty.get();

220

221

if (maxParallelism.isPresent()) {

222

// Use provided value

223

configureWithMaxParallelism(maxParallelism.get());

224

} else {

225

// Use automatic configuration

226

configureAutomatically();

227

}

228

}

229

```

230

231

#### Advanced Parameter Usage

232

233

```java

234

// Custom type conversion

235

ParameterProperty<Duration> timeoutProperty = new ParameterProperty<>(

236

"test.timeout", durationString -> Duration.parse(durationString));

237

238

ParameterProperty<List<String>> topicsProperty = new ParameterProperty<>(

239

"test.kafka.topics", topicsString -> Arrays.asList(topicsString.split(",")));

240

241

// Configuration class pattern

242

public class TestConfiguration {

243

private static final ParameterProperty<String> KAFKA_BOOTSTRAP_SERVERS =

244

new ParameterProperty<>("test.kafka.bootstrap.servers", Function.identity());

245

246

private static final ParameterProperty<Integer> KAFKA_PARTITIONS =

247

new ParameterProperty<>("test.kafka.partitions", Integer::parseInt);

248

249

public String getKafkaBootstrapServers() {

250

return KAFKA_BOOTSTRAP_SERVERS.get("localhost:9092");

251

}

252

253

public int getKafkaPartitions() {

254

return KAFKA_PARTITIONS.get(1);

255

}

256

}

257

```

258

259

## Usage Patterns

260

261

### Comprehensive Application Validation

262

263

Complete validation suite for a Flink application including serialization, packaging, and configuration.

264

265

```java

266

@Test

267

void testApplicationValidation() throws IOException {

268

// 1. Validate POJO serialization

269

PojoTestUtils.assertSerializedAsPojo(OrderRecord.class);

270

PojoTestUtils.assertSerializedAsPojo(CustomerRecord.class);

271

PojoTestUtils.assertSerializedAsPojo(ProductRecord.class);

272

273

// 2. Validate JAR packaging

274

Path applicationJar = Paths.get("target/order-processing-app.jar");

275

Collection<String> allowedPaths = Arrays.asList(

276

"com/mycompany/orders/.*",

277

"META-INF/.*",

278

"org/apache/flink/.*"

279

);

280

PackagingTestUtils.assertJarContainsOnlyFilesMatching(applicationJar, allowedPaths);

281

282

// 3. Validate resource availability

283

Path configFile = ResourceTestUtils.getResource(".*application\\.conf");

284

assertTrue(Files.exists(configFile));

285

286

// 4. Validate parameter configuration

287

TestConfiguration config = new TestConfiguration();

288

assertNotNull(config.getKafkaBootstrapServers());

289

assertTrue(config.getKafkaPartitions() > 0);

290

}

291

```

292

293

### CI/CD Pipeline Validation

294

295

Validation utilities designed for continuous integration and deployment pipelines.

296

297

```java

298

@Test

299

void testCiCdValidation() throws IOException {

300

// Environment-specific parameter validation

301

ParameterProperty<String> environmentProperty = new ParameterProperty<>(

302

"deployment.environment", Function.identity());

303

304

String environment = environmentProperty.get("test");

305

306

// Validate based on environment

307

switch (environment) {

308

case "production":

309

validateProductionConfiguration();

310

break;

311

case "staging":

312

validateStagingConfiguration();

313

break;

314

default:

315

validateTestConfiguration();

316

}

317

318

// JAR validation for deployment

319

Path deploymentJar = ResourceTestUtils.getResource(".*-deployment\\.jar");

320

validateDeploymentJarStructure(deploymentJar);

321

}

322

323

private void validateProductionConfiguration() throws IOException {

324

// Production-specific validations

325

ParameterProperty<Boolean> debugEnabled = new ParameterProperty<>(

326

"flink.debug.enabled", Boolean::parseBoolean);

327

328

assertFalse(debugEnabled.get(false), "Debug should be disabled in production");

329

}

330

```

331

332

### Type System Validation

333

334

Ensuring optimal serialization performance through POJO validation.

335

336

```java

337

@Test

338

void testSerializationPerformance() {

339

// Test that critical data types are POJOs for optimal performance

340

PojoTestUtils.assertSerializedAsPojo(Event.class);

341

PojoTestUtils.assertSerializedAsPojo(Measurement.class);

342

PojoTestUtils.assertSerializedAsPojo(Alert.class);

343

344

// Ensure no Kryo fallback for performance-critical types

345

PojoTestUtils.assertSerializedAsPojoWithoutKryo(HighFrequencyEvent.class);

346

}

347

348

// Performance-critical event class

349

public static class HighFrequencyEvent {

350

public long timestamp;

351

public String sensorId;

352

public double value;

353

public String status;

354

355

// POJO requirements: default constructor and getters/setters

356

public HighFrequencyEvent() {}

357

358

// Constructor, getters, and setters...

359

}

360

```