Comprehensive testing utilities for Apache Flink stream and batch processing applications
—
Test connectors for specific use cases including upsert operations testing with configurable serialization schemas and output file management. These connectors provide specialized functionality for testing complex data processing scenarios.
Sink specifically designed for testing upsert (insert/update) operations with configurable key-value serialization and file-based output verification.
@PublicEvolving
public class UpsertTestSink<IN> implements Sink<IN> {
public static <IN> UpsertTestSinkBuilder<IN> builder();
}Builder pattern implementation for configuring UpsertTestSink with customizable serialization schemas and output options.
public class UpsertTestSinkBuilder<IN> {
public UpsertTestSinkBuilder<IN> setOutputFile(File outputFile);
public UpsertTestSinkBuilder<IN> setKeySerializationSchema(SerializationSchema<IN> keySerializationSchema);
public UpsertTestSinkBuilder<IN> setValueSerializationSchema(SerializationSchema<IN> valueSerializationSchema);
public UpsertTestSink<IN> build();
}import org.apache.flink.connector.upserttest.sink.UpsertTestSink;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import java.io.File;
// Create upsert test sink with file output
File outputFile = new File("/tmp/upsert-test-output.txt");
UpsertTestSink<String> sink = UpsertTestSink.<String>builder()
.setOutputFile(outputFile)
.setKeySerializationSchema(new SimpleStringSchema())
.setValueSerializationSchema(new SimpleStringSchema())
.build();
// Use in streaming job
env.fromElements("key1:value1", "key2:value2", "key1:updated_value1")
.map(new KeyValueParser())
.sinkTo(sink);import org.apache.flink.api.common.serialization.SerializationSchema;
// Custom serialization schema for keys
SerializationSchema<MyRecord> keySerializer = new SerializationSchema<MyRecord>() {
@Override
public byte[] serialize(MyRecord record) {
return record.getKey().getBytes();
}
};
// Custom serialization schema for values
SerializationSchema<MyRecord> valueSerializer = new SerializationSchema<MyRecord>() {
@Override
public byte[] serialize(MyRecord record) {
return record.toJson().getBytes();
}
};
// Build sink with custom serializers
UpsertTestSink<MyRecord> customSink = UpsertTestSink.<MyRecord>builder()
.setOutputFile(new File("/tmp/custom-upsert-output.json"))
.setKeySerializationSchema(keySerializer)
.setValueSerializationSchema(valueSerializer)
.build();Internal writer implementation for the UpsertTestSink that handles the actual upsert logic and file operations.
public class UpsertTestSinkWriter<IN> implements SinkWriter<IN> {
// Internal implementation for upsert operations
}Utility functions for managing upsert test output files and performing file-based verifications.
public class UpsertTestFileUtil {
// File management utilities for upsert testing
}Utility class for handling immutable byte array operations in upsert testing scenarios.
public class ImmutableByteArrayWrapper {
// Immutable byte array handling
}Table API integration for upsert testing with dynamic table sink functionality.
public class UpsertTestDynamicTableSink implements DynamicTableSink {
// Dynamic table sink for upsert operations
}Factory for creating upsert test dynamic table sinks with proper configuration handling.
public class UpsertTestDynamicTableSinkFactory implements DynamicTableSinkFactory {
// Factory for creating upsert test table sinks
}Configuration options for the upsert test connector when used with Table API.
public class UpsertTestConnectorOptions {
// Configuration options for upsert test connector
}-- Create upsert test table in SQL
CREATE TABLE upsert_test_sink (
id STRING,
name STRING,
value BIGINT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'upsert-test',
'output-file' = '/tmp/table-upsert-output.txt'
);
-- Insert data with upserts
INSERT INTO upsert_test_sink VALUES
('1', 'Alice', 100),
('2', 'Bob', 200),
('1', 'Alice Updated', 150);Testing simple key-value upsert operations with string data.
@Test
void testBasicUpsertOperations() throws Exception {
File outputFile = tempDir.resolve("upsert-output.txt").toFile();
UpsertTestSink<Tuple2<String, String>> sink =
UpsertTestSink.<Tuple2<String, String>>builder()
.setOutputFile(outputFile)
.setKeySerializationSchema(new SimpleStringSchema())
.setValueSerializationSchema(new SimpleStringSchema())
.build();
// Create test data with upserts
env.fromElements(
Tuple2.of("key1", "value1"),
Tuple2.of("key2", "value2"),
Tuple2.of("key1", "updated_value1"), // Upsert
Tuple2.of("key3", "value3")
).sinkTo(sink);
env.execute("Upsert Test");
// Verify output file contains final state
List<String> lines = Files.readAllLines(outputFile.toPath());
assertTrue(lines.contains("key1:updated_value1"));
assertTrue(lines.contains("key2:value2"));
assertTrue(lines.contains("key3:value3"));
assertEquals(3, lines.size()); // Only final states
}Testing upsert operations with complex objects and custom serialization.
@Test
void testComplexObjectUpserts() throws Exception {
File outputFile = tempDir.resolve("complex-upsert-output.json").toFile();
// Custom serialization for complex objects
SerializationSchema<UserRecord> keySchema = record -> record.getUserId().getBytes();
SerializationSchema<UserRecord> valueSchema = record -> {
ObjectMapper mapper = new ObjectMapper();
try {
return mapper.writeValueAsBytes(record);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
};
UpsertTestSink<UserRecord> sink = UpsertTestSink.<UserRecord>builder()
.setOutputFile(outputFile)
.setKeySerializationSchema(keySchema)
.setValueSerializationSchema(valueSchema)
.build();
// Test data with user record updates
env.fromElements(
new UserRecord("user1", "Alice", "alice@example.com"),
new UserRecord("user2", "Bob", "bob@example.com"),
new UserRecord("user1", "Alice Smith", "alice.smith@example.com") // Update
).sinkTo(sink);
env.execute("Complex Upsert Test");
// Verify JSON output
List<String> jsonLines = Files.readAllLines(outputFile.toPath());
assertEquals(2, jsonLines.size());
// Parse and verify updated record
ObjectMapper mapper = new ObjectMapper();
UserRecord updatedUser = mapper.readValue(
jsonLines.stream()
.filter(line -> line.contains("Alice Smith"))
.findFirst()
.orElseThrow(),
UserRecord.class
);
assertEquals("Alice Smith", updatedUser.getName());
assertEquals("alice.smith@example.com", updatedUser.getEmail());
}Using the upsert test connector with Flink's Table API for SQL-based testing.
@Test
void testTableApiUpsertIntegration() throws Exception {
StreamExecutionEnvironment env = getTestEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
File outputFile = tempDir.resolve("table-upsert-output.txt").toFile();
// Create upsert test table
String createTableDDL = String.format(
"CREATE TABLE upsert_sink (" +
" id STRING," +
" name STRING," +
" score INT," +
" PRIMARY KEY (id) NOT ENFORCED" +
") WITH (" +
" 'connector' = 'upsert-test'," +
" 'output-file' = '%s'" +
")",
outputFile.getAbsolutePath()
);
tableEnv.executeSql(createTableDDL);
// Insert data with upserts
tableEnv.executeSql(
"INSERT INTO upsert_sink VALUES " +
"('1', 'Alice', 95), " +
"('2', 'Bob', 87), " +
"('1', 'Alice', 98)" // Upsert - score update
);
// Verify results
assertTrue(outputFile.exists());
List<String> lines = Files.readAllLines(outputFile.toPath());
// Should contain final state only
assertEquals(2, lines.size());
assertTrue(lines.stream().anyMatch(line -> line.contains("Alice") && line.contains("98")));
assertTrue(lines.stream().anyMatch(line -> line.contains("Bob") && line.contains("87")));
}Common patterns for verifying upsert test results.
@Test
void testUpsertResultVerification() throws Exception {
File outputFile = tempDir.resolve("verification-output.txt").toFile();
// Run upsert test job
runUpsertTestJob(outputFile);
// Verification approaches
// 1. Line count verification
List<String> lines = Files.readAllLines(outputFile.toPath());
assertEquals(expectedFinalRecordCount, lines.size());
// 2. Content verification
assertTrue(lines.contains("expected_final_record"));
assertFalse(lines.contains("overwritten_record"));
// 3. Structured verification
Map<String, String> finalState = lines.stream()
.map(line -> line.split(":"))
.collect(Collectors.toMap(parts -> parts[0], parts -> parts[1]));
assertEquals("expected_value", finalState.get("test_key"));
// 4. Using TestBaseUtils for comparison
String expectedOutput = "key1:final_value1\nkey2:final_value2";
TestBaseUtils.compareResultsByLinesInMemory(
expectedOutput, outputFile.getAbsolutePath());
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-test-utils