CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-test-utils

Comprehensive testing utilities for Apache Flink stream and batch processing applications

Pending
Overview
Eval results
Files

specialized-connectors.mddocs/

Specialized Test Connectors

Test connectors for specific use cases including upsert operations testing with configurable serialization schemas and output file management. These connectors provide specialized functionality for testing complex data processing scenarios.

Capabilities

Upsert Test Sink

Sink specifically designed for testing upsert (insert/update) operations with configurable key-value serialization and file-based output verification.

@PublicEvolving
public class UpsertTestSink<IN> implements Sink<IN> {
    public static <IN> UpsertTestSinkBuilder<IN> builder();
}

Upsert Test Sink Builder

Builder pattern implementation for configuring UpsertTestSink with customizable serialization schemas and output options.

public class UpsertTestSinkBuilder<IN> {
    public UpsertTestSinkBuilder<IN> setOutputFile(File outputFile);
    public UpsertTestSinkBuilder<IN> setKeySerializationSchema(SerializationSchema<IN> keySerializationSchema);
    public UpsertTestSinkBuilder<IN> setValueSerializationSchema(SerializationSchema<IN> valueSerializationSchema);
    public UpsertTestSink<IN> build();
}

Basic Usage

import org.apache.flink.connector.upserttest.sink.UpsertTestSink;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import java.io.File;

// Create upsert test sink with file output
File outputFile = new File("/tmp/upsert-test-output.txt");
UpsertTestSink<String> sink = UpsertTestSink.<String>builder()
    .setOutputFile(outputFile)
    .setKeySerializationSchema(new SimpleStringSchema())
    .setValueSerializationSchema(new SimpleStringSchema())
    .build();

// Use in streaming job
env.fromElements("key1:value1", "key2:value2", "key1:updated_value1")
   .map(new KeyValueParser())
   .sinkTo(sink);

Advanced Usage with Custom Serialization

import org.apache.flink.api.common.serialization.SerializationSchema;

// Custom serialization schema for keys
SerializationSchema<MyRecord> keySerializer = new SerializationSchema<MyRecord>() {
    @Override
    public byte[] serialize(MyRecord record) {
        return record.getKey().getBytes();
    }
};

// Custom serialization schema for values
SerializationSchema<MyRecord> valueSerializer = new SerializationSchema<MyRecord>() {
    @Override
    public byte[] serialize(MyRecord record) {
        return record.toJson().getBytes();
    }
};

// Build sink with custom serializers
UpsertTestSink<MyRecord> customSink = UpsertTestSink.<MyRecord>builder()
    .setOutputFile(new File("/tmp/custom-upsert-output.json"))
    .setKeySerializationSchema(keySerializer)
    .setValueSerializationSchema(valueSerializer)
    .build();

Upsert Test Sink Writer

Internal writer implementation for the UpsertTestSink that handles the actual upsert logic and file operations.

public class UpsertTestSinkWriter<IN> implements SinkWriter<IN> {
    // Internal implementation for upsert operations
}

Upsert File Utilities

Utility functions for managing upsert test output files and performing file-based verifications.

public class UpsertTestFileUtil {
    // File management utilities for upsert testing
}

Immutable Byte Array Wrapper

Utility class for handling immutable byte array operations in upsert testing scenarios.

public class ImmutableByteArrayWrapper {
    // Immutable byte array handling
}

Table API Integration

Dynamic Table Sink for Upsert Testing

Table API integration for upsert testing with dynamic table sink functionality.

public class UpsertTestDynamicTableSink implements DynamicTableSink {
    // Dynamic table sink for upsert operations
}

Dynamic Table Sink Factory

Factory for creating upsert test dynamic table sinks with proper configuration handling.

public class UpsertTestDynamicTableSinkFactory implements DynamicTableSinkFactory {
    // Factory for creating upsert test table sinks
}

Connector Options

Configuration options for the upsert test connector when used with Table API.

public class UpsertTestConnectorOptions {
    // Configuration options for upsert test connector
}

Table API Usage Example

-- Create upsert test table in SQL
CREATE TABLE upsert_test_sink (
    id STRING,
    name STRING,
    value BIGINT,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'upsert-test',
    'output-file' = '/tmp/table-upsert-output.txt'
);

-- Insert data with upserts
INSERT INTO upsert_test_sink VALUES 
    ('1', 'Alice', 100),
    ('2', 'Bob', 200),
    ('1', 'Alice Updated', 150);

Usage Patterns

Basic Upsert Testing

Testing simple key-value upsert operations with string data.

@Test
void testBasicUpsertOperations() throws Exception {
    File outputFile = tempDir.resolve("upsert-output.txt").toFile();
    
    UpsertTestSink<Tuple2<String, String>> sink = 
        UpsertTestSink.<Tuple2<String, String>>builder()
            .setOutputFile(outputFile)
            .setKeySerializationSchema(new SimpleStringSchema())
            .setValueSerializationSchema(new SimpleStringSchema())
            .build();
    
    // Create test data with upserts
    env.fromElements(
        Tuple2.of("key1", "value1"),
        Tuple2.of("key2", "value2"),
        Tuple2.of("key1", "updated_value1"),  // Upsert
        Tuple2.of("key3", "value3")
    ).sinkTo(sink);
    
    env.execute("Upsert Test");
    
    // Verify output file contains final state
    List<String> lines = Files.readAllLines(outputFile.toPath());
    assertTrue(lines.contains("key1:updated_value1"));
    assertTrue(lines.contains("key2:value2"));
    assertTrue(lines.contains("key3:value3"));
    assertEquals(3, lines.size()); // Only final states
}

Complex Object Upsert Testing

Testing upsert operations with complex objects and custom serialization.

@Test
void testComplexObjectUpserts() throws Exception {
    File outputFile = tempDir.resolve("complex-upsert-output.json").toFile();
    
    // Custom serialization for complex objects
    SerializationSchema<UserRecord> keySchema = record -> record.getUserId().getBytes();
    SerializationSchema<UserRecord> valueSchema = record -> {
        ObjectMapper mapper = new ObjectMapper();
        try {
            return mapper.writeValueAsBytes(record);
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    };
    
    UpsertTestSink<UserRecord> sink = UpsertTestSink.<UserRecord>builder()
        .setOutputFile(outputFile)
        .setKeySerializationSchema(keySchema)
        .setValueSerializationSchema(valueSchema)
        .build();
    
    // Test data with user record updates
    env.fromElements(
        new UserRecord("user1", "Alice", "alice@example.com"),
        new UserRecord("user2", "Bob", "bob@example.com"),
        new UserRecord("user1", "Alice Smith", "alice.smith@example.com") // Update
    ).sinkTo(sink);
    
    env.execute("Complex Upsert Test");
    
    // Verify JSON output
    List<String> jsonLines = Files.readAllLines(outputFile.toPath());
    assertEquals(2, jsonLines.size());
    
    // Parse and verify updated record
    ObjectMapper mapper = new ObjectMapper();
    UserRecord updatedUser = mapper.readValue(
        jsonLines.stream()
            .filter(line -> line.contains("Alice Smith"))
            .findFirst()
            .orElseThrow(),
        UserRecord.class
    );
    assertEquals("Alice Smith", updatedUser.getName());
    assertEquals("alice.smith@example.com", updatedUser.getEmail());
}

Table API Upsert Integration

Using the upsert test connector with Flink's Table API for SQL-based testing.

@Test
void testTableApiUpsertIntegration() throws Exception {
    StreamExecutionEnvironment env = getTestEnvironment();
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    
    File outputFile = tempDir.resolve("table-upsert-output.txt").toFile();
    
    // Create upsert test table
    String createTableDDL = String.format(
        "CREATE TABLE upsert_sink (" +
        "  id STRING," +
        "  name STRING," +
        "  score INT," +
        "  PRIMARY KEY (id) NOT ENFORCED" +
        ") WITH (" +
        "  'connector' = 'upsert-test'," +
        "  'output-file' = '%s'" +
        ")",
        outputFile.getAbsolutePath()
    );
    
    tableEnv.executeSql(createTableDDL);
    
    // Insert data with upserts
    tableEnv.executeSql(
        "INSERT INTO upsert_sink VALUES " +
        "('1', 'Alice', 95), " +
        "('2', 'Bob', 87), " +
        "('1', 'Alice', 98)"  // Upsert - score update
    );
    
    // Verify results
    assertTrue(outputFile.exists());
    List<String> lines = Files.readAllLines(outputFile.toPath());
    
    // Should contain final state only
    assertEquals(2, lines.size());
    assertTrue(lines.stream().anyMatch(line -> line.contains("Alice") && line.contains("98")));
    assertTrue(lines.stream().anyMatch(line -> line.contains("Bob") && line.contains("87")));
}

Verification Patterns

Common patterns for verifying upsert test results.

@Test
void testUpsertResultVerification() throws Exception {
    File outputFile = tempDir.resolve("verification-output.txt").toFile();
    
    // Run upsert test job
    runUpsertTestJob(outputFile);
    
    // Verification approaches
    
    // 1. Line count verification
    List<String> lines = Files.readAllLines(outputFile.toPath());
    assertEquals(expectedFinalRecordCount, lines.size());
    
    // 2. Content verification
    assertTrue(lines.contains("expected_final_record"));
    assertFalse(lines.contains("overwritten_record"));
    
    // 3. Structured verification
    Map<String, String> finalState = lines.stream()
        .map(line -> line.split(":"))
        .collect(Collectors.toMap(parts -> parts[0], parts -> parts[1]));
    
    assertEquals("expected_value", finalState.get("test_key"));
    
    // 4. Using TestBaseUtils for comparison
    String expectedOutput = "key1:final_value1\nkey2:final_value2";
    TestBaseUtils.compareResultsByLinesInMemory(
        expectedOutput, outputFile.getAbsolutePath());
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-test-utils

docs

index.md

metrics-testing.md

minicluster-management.md

result-verification.md

specialized-connectors.md

test-data-sources.md

test-environments.md

validation-utilities.md

tile.json