0
# Validation Utilities
1
2
Utilities for POJO serialization verification, JAR packaging validation, resource discovery, and parameter property handling to ensure comprehensive testing coverage. These utilities provide validation capabilities for various aspects of Flink application development and deployment.
3
4
## Capabilities
5
6
### POJO Serialization Testing
7
8
Utilities for validating that classes are properly serialized as POJOs by Flink's type system, ensuring optimal serialization performance.
9
10
```java { .api }
11
@PublicEvolving
12
public class PojoTestUtils {
13
public static <T> void assertSerializedAsPojo(Class<T> clazz);
14
public static <T> void assertSerializedAsPojoWithoutKryo(Class<T> clazz);
15
}
16
```
17
18
#### Usage Example
19
20
```java
21
import org.apache.flink.types.PojoTestUtils;
22
23
// Test that a class is serialized as POJO
24
@Test
25
void testUserRecordIsPojo() {
26
PojoTestUtils.assertSerializedAsPojo(UserRecord.class);
27
}
28
29
// Test POJO serialization without Kryo fallback
30
@Test
31
void testStrictPojoSerialization() {
32
PojoTestUtils.assertSerializedAsPojoWithoutKryo(CustomerData.class);
33
}
34
35
// Example POJO class
36
public static class UserRecord {
37
public String name;
38
public int age;
39
public String email;
40
41
public UserRecord() {} // Required default constructor
42
43
public UserRecord(String name, int age, String email) {
44
this.name = name;
45
this.age = age;
46
this.email = email;
47
}
48
49
// Getters and setters for POJO compliance
50
public String getName() { return name; }
51
public void setName(String name) { this.name = name; }
52
public int getAge() { return age; }
53
public void setAge(int age) { this.age = age; }
54
public String getEmail() { return email; }
55
public void setEmail(String email) { this.email = email; }
56
}
57
```
58
59
### JAR Packaging Validation
60
61
Utilities for verifying JAR file contents and structure, ensuring proper packaging for Flink applications and dependencies.
62
63
```java { .api }
64
public class PackagingTestUtils {
65
public static void assertJarContainsOnlyFilesMatching(
66
Path jarPath, Collection<String> allowedPaths) throws IOException;
67
public static void assertJarContainsServiceEntry(
68
Path jarPath, Class<?> service) throws IOException;
69
}
70
```
71
72
#### Usage Example
73
74
```java
75
import org.apache.flink.packaging.PackagingTestUtils;
76
import java.nio.file.Path;
77
import java.nio.file.Paths;
78
import java.util.Arrays;
79
80
@Test
81
void testJarPackaging() throws IOException {
82
Path jarPath = Paths.get("target/my-flink-app.jar");
83
84
// Verify JAR contains only allowed files
85
Collection<String> allowedPaths = Arrays.asList(
86
"com/mycompany/flink/.*",
87
"META-INF/.*",
88
"org/apache/flink/.*"
89
);
90
91
PackagingTestUtils.assertJarContainsOnlyFilesMatching(jarPath, allowedPaths);
92
93
// Verify JAR contains required service entries
94
PackagingTestUtils.assertJarContainsServiceEntry(
95
jarPath, org.apache.flink.table.factories.TableFactory.class);
96
}
97
98
@Test
99
void testConnectorJarStructure() throws IOException {
100
Path connectorJar = Paths.get("target/my-connector.jar");
101
102
// Validate connector JAR structure
103
Collection<String> connectorPaths = Arrays.asList(
104
"com/mycompany/connector/.*",
105
"META-INF/services/.*",
106
"META-INF/MANIFEST.MF"
107
);
108
109
PackagingTestUtils.assertJarContainsOnlyFilesMatching(connectorJar, connectorPaths);
110
111
// Verify service provider configuration
112
PackagingTestUtils.assertJarContainsServiceEntry(
113
connectorJar, org.apache.flink.table.factories.DynamicTableFactory.class);
114
}
115
```
116
117
### Resource Discovery
118
119
Utilities for finding and managing test resources using regex patterns, enabling flexible resource location in test environments.
120
121
```java { .api }
122
public class ResourceTestUtils {
123
public static Path getResource(String resourceNameRegex) throws IOException;
124
}
125
```
126
127
#### Usage Example
128
129
```java
130
import org.apache.flink.test.resources.ResourceTestUtils;
131
import java.nio.file.Path;
132
133
@Test
134
void testResourceDiscovery() throws IOException {
135
// Find configuration file using regex
136
Path configFile = ResourceTestUtils.getResource(".*application\\.properties");
137
assertTrue(Files.exists(configFile));
138
139
// Find test data files
140
Path testDataFile = ResourceTestUtils.getResource(".*test-data\\.csv");
141
assertTrue(Files.exists(testDataFile));
142
143
// Use discovered resources in test
144
Properties config = new Properties();
145
try (InputStream is = Files.newInputStream(configFile)) {
146
config.load(is);
147
}
148
149
// Verify configuration
150
assertNotNull(config.getProperty("flink.parallelism"));
151
}
152
153
@Test
154
void testSchemaResourceLoading() throws IOException {
155
// Find Avro schema files
156
Path schemaFile = ResourceTestUtils.getResource(".*\\.avsc");
157
158
// Load and validate schema
159
Schema schema = new Schema.Parser().parse(Files.newInputStream(schemaFile));
160
assertNotNull(schema);
161
assertEquals(Schema.Type.RECORD, schema.getType());
162
}
163
```
164
165
### Parameter Property Management
166
167
System property-based parameter management with type conversion and default value support for flexible test configuration.
168
169
```java { .api }
170
public class ParameterProperty<V> {
171
public ParameterProperty(String propertyName, Function<String, V> converter);
172
173
public String getPropertyName();
174
public Optional<V> get();
175
public V get(V defaultValue);
176
}
177
```
178
179
#### Usage Example
180
181
```java
182
import org.apache.flink.test.parameters.ParameterProperty;
183
import java.util.function.Function;
184
185
// Define parameter properties with type conversion
186
ParameterProperty<Integer> parallelismProperty = new ParameterProperty<>(
187
"test.parallelism", Integer::parseInt);
188
189
ParameterProperty<String> jobNameProperty = new ParameterProperty<>(
190
"test.job.name", Function.identity());
191
192
ParameterProperty<Boolean> enableCheckpointingProperty = new ParameterProperty<>(
193
"test.checkpointing.enabled", Boolean::parseBoolean);
194
195
@Test
196
void testParameterConfiguration() {
197
// Use properties with defaults
198
int parallelism = parallelismProperty.get(4);
199
String jobName = jobNameProperty.get("DefaultTestJob");
200
boolean checkpointingEnabled = enableCheckpointingProperty.get(false);
201
202
// Configure test environment
203
StreamExecutionEnvironment env = getTestEnvironment();
204
env.setParallelism(parallelism);
205
206
if (checkpointingEnabled) {
207
env.enableCheckpointing(1000);
208
}
209
210
// Create job with configured parameters
211
env.fromElements(1, 2, 3, 4, 5)
212
.map(x -> x * parallelism)
213
.print(jobName);
214
}
215
216
@Test
217
void testOptionalParameters() {
218
// Check if parameters are provided
219
Optional<Integer> maxParallelism = parallelismProperty.get();
220
221
if (maxParallelism.isPresent()) {
222
// Use provided value
223
configureWithMaxParallelism(maxParallelism.get());
224
} else {
225
// Use automatic configuration
226
configureAutomatically();
227
}
228
}
229
```
230
231
#### Advanced Parameter Usage
232
233
```java
234
// Custom type conversion
235
ParameterProperty<Duration> timeoutProperty = new ParameterProperty<>(
236
"test.timeout", durationString -> Duration.parse(durationString));
237
238
ParameterProperty<List<String>> topicsProperty = new ParameterProperty<>(
239
"test.kafka.topics", topicsString -> Arrays.asList(topicsString.split(",")));
240
241
// Configuration class pattern
242
public class TestConfiguration {
243
private static final ParameterProperty<String> KAFKA_BOOTSTRAP_SERVERS =
244
new ParameterProperty<>("test.kafka.bootstrap.servers", Function.identity());
245
246
private static final ParameterProperty<Integer> KAFKA_PARTITIONS =
247
new ParameterProperty<>("test.kafka.partitions", Integer::parseInt);
248
249
public String getKafkaBootstrapServers() {
250
return KAFKA_BOOTSTRAP_SERVERS.get("localhost:9092");
251
}
252
253
public int getKafkaPartitions() {
254
return KAFKA_PARTITIONS.get(1);
255
}
256
}
257
```
258
259
## Usage Patterns
260
261
### Comprehensive Application Validation
262
263
Complete validation suite for a Flink application including serialization, packaging, and configuration.
264
265
```java
266
@Test
267
void testApplicationValidation() throws IOException {
268
// 1. Validate POJO serialization
269
PojoTestUtils.assertSerializedAsPojo(OrderRecord.class);
270
PojoTestUtils.assertSerializedAsPojo(CustomerRecord.class);
271
PojoTestUtils.assertSerializedAsPojo(ProductRecord.class);
272
273
// 2. Validate JAR packaging
274
Path applicationJar = Paths.get("target/order-processing-app.jar");
275
Collection<String> allowedPaths = Arrays.asList(
276
"com/mycompany/orders/.*",
277
"META-INF/.*",
278
"org/apache/flink/.*"
279
);
280
PackagingTestUtils.assertJarContainsOnlyFilesMatching(applicationJar, allowedPaths);
281
282
// 3. Validate resource availability
283
Path configFile = ResourceTestUtils.getResource(".*application\\.conf");
284
assertTrue(Files.exists(configFile));
285
286
// 4. Validate parameter configuration
287
TestConfiguration config = new TestConfiguration();
288
assertNotNull(config.getKafkaBootstrapServers());
289
assertTrue(config.getKafkaPartitions() > 0);
290
}
291
```
292
293
### CI/CD Pipeline Validation
294
295
Validation utilities designed for continuous integration and deployment pipelines.
296
297
```java
298
@Test
299
void testCiCdValidation() throws IOException {
300
// Environment-specific parameter validation
301
ParameterProperty<String> environmentProperty = new ParameterProperty<>(
302
"deployment.environment", Function.identity());
303
304
String environment = environmentProperty.get("test");
305
306
// Validate based on environment
307
switch (environment) {
308
case "production":
309
validateProductionConfiguration();
310
break;
311
case "staging":
312
validateStagingConfiguration();
313
break;
314
default:
315
validateTestConfiguration();
316
}
317
318
// JAR validation for deployment
319
Path deploymentJar = ResourceTestUtils.getResource(".*-deployment\\.jar");
320
validateDeploymentJarStructure(deploymentJar);
321
}
322
323
private void validateProductionConfiguration() throws IOException {
324
// Production-specific validations
325
ParameterProperty<Boolean> debugEnabled = new ParameterProperty<>(
326
"flink.debug.enabled", Boolean::parseBoolean);
327
328
assertFalse(debugEnabled.get(false), "Debug should be disabled in production");
329
}
330
```
331
332
### Type System Validation
333
334
Ensuring optimal serialization performance through POJO validation.
335
336
```java
337
@Test
338
void testSerializationPerformance() {
339
// Test that critical data types are POJOs for optimal performance
340
PojoTestUtils.assertSerializedAsPojo(Event.class);
341
PojoTestUtils.assertSerializedAsPojo(Measurement.class);
342
PojoTestUtils.assertSerializedAsPojo(Alert.class);
343
344
// Ensure no Kryo fallback for performance-critical types
345
PojoTestUtils.assertSerializedAsPojoWithoutKryo(HighFrequencyEvent.class);
346
}
347
348
// Performance-critical event class
349
public static class HighFrequencyEvent {
350
public long timestamp;
351
public String sensorId;
352
public double value;
353
public String status;
354
355
// POJO requirements: default constructor and getters/setters
356
public HighFrequencyEvent() {}
357
358
// Constructor, getters, and setters...
359
}
360
```