CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-table-test-utils

Test utilities for Apache Flink's Table API and SQL ecosystem enabling robust testing of table operations and data transformations.

Pending
Overview
Eval results
Files

lookup-cache-assertions.mddocs/

Lookup Cache Assertions

Validation utilities for Flink's lookup cache functionality with key-value relationship testing. These assertions enable comprehensive testing of cache behavior, key presence, and data consistency in lookup join scenarios.

LookupCache Assertions

Comprehensive assertions for validating the state and contents of Flink's LookupCache implementations.

public class LookupCacheAssert extends AbstractAssert<LookupCacheAssert, LookupCache> {
    public LookupCacheAssert(LookupCache actual);
    
    // Size validation
    public LookupCacheAssert hasSize(int size);
    
    // Key presence validation
    public LookupCacheAssert containsKey(RowData keyRow);
    public LookupCacheAssert containsKey(Object... keyFields);
    public LookupCacheAssert doesNotContainKey(RowData keyRow);
    public LookupCacheAssert doesNotContainKey(Object... keyFields);
    
    // Content validation
    public LookupCacheAssert containsExactlyEntriesOf(Map<RowData, Collection<RowData>> entries);
    public LookupCacheAssert contains(RowData keyRow, Collection<RowData> valueRows);
    public LookupCacheAssert contains(RowData keyRow, RowData... valueRows);
}

Factory Method

// From LookupCacheAssert class
public static LookupCacheAssert assertThat(LookupCache actual);

Usage Examples

Basic Cache Validation

import static org.apache.flink.table.test.lookup.cache.LookupCacheAssert.assertThat;
import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.GenericRowData;

LookupCache cache = /* ... initialized cache */;

// Basic size validation
assertThat(cache)
    .hasSize(0);  // Empty cache

// After populating cache
cache.put(keyRow, valueRows);
assertThat(cache)
    .hasSize(1);

Key Presence Testing

import org.apache.flink.table.data.StringData;

RowData keyRow = GenericRowData.of(1, StringData.fromString("user1"));
RowData missingKeyRow = GenericRowData.of(999, StringData.fromString("missing"));

// Key presence validation
assertThat(cache)
    .containsKey(keyRow)
    .doesNotContainKey(missingKeyRow);

// Using individual key fields
assertThat(cache)
    .containsKey(1, "user1")
    .doesNotContainKey(999, "missing");

Content Validation

import java.util.*;

RowData key1 = GenericRowData.of(1, StringData.fromString("dept1"));
RowData key2 = GenericRowData.of(2, StringData.fromString("dept2"));

Collection<RowData> values1 = Arrays.asList(
    GenericRowData.of(101, StringData.fromString("Alice")),
    GenericRowData.of(102, StringData.fromString("Bob"))
);

Collection<RowData> values2 = Arrays.asList(
    GenericRowData.of(201, StringData.fromString("Charlie"))
);

// Single key-value validation
assertThat(cache)
    .contains(key1, values1)
    .contains(key2, values2);

// Using varargs for values
assertThat(cache)
    .contains(key1, 
        GenericRowData.of(101, StringData.fromString("Alice")),
        GenericRowData.of(102, StringData.fromString("Bob"))
    );

Exact Content Matching

Map<RowData, Collection<RowData>> expectedEntries = new HashMap<>();
expectedEntries.put(key1, values1);
expectedEntries.put(key2, values2);

// Validate exact cache contents
assertThat(cache)
    .containsExactlyEntriesOf(expectedEntries);

Lookup Join Testing Scenarios

// Test lookup join cache behavior
@Test
void testLookupJoinCaching() {
    LookupCache cache = createLookupCache();
    
    // Initial state - empty cache
    assertThat(cache)
        .hasSize(0);
    
    // After first lookup - cache populated
    performLookup(cache, lookupKey1);
    assertThat(cache)
        .hasSize(1)
        .containsKey(lookupKey1);
    
    // After duplicate lookup - cache hit, no size change
    performLookup(cache, lookupKey1);
    assertThat(cache)
        .hasSize(1)
        .containsKey(lookupKey1);
    
    // After different lookup - cache expanded
    performLookup(cache, lookupKey2);
    assertThat(cache)
        .hasSize(2)
        .containsKey(lookupKey1)
        .containsKey(lookupKey2);
}

Cache Eviction Testing

@Test
void testCacheEviction() {
    LookupCache cache = createLimitedSizeCache(maxSize = 2);
    
    // Fill cache to capacity
    performLookup(cache, key1);
    performLookup(cache, key2);
    assertThat(cache)
        .hasSize(2)
        .containsKey(key1)
        .containsKey(key2);
    
    // Trigger eviction with new key
    performLookup(cache, key3);
    assertThat(cache)
        .hasSize(2)  // Size limit maintained
        .containsKey(key3);  // New key present
    
    // Verify eviction occurred (LRU behavior)
    assertThat(cache)
        .doesNotContainKey(key1);  // Oldest key evicted
}

Multi-Value Lookup Testing

@Test
void testMultiValueLookup() {
    // For one-to-many relationships
    RowData departmentKey = GenericRowData.of(StringData.fromString("Engineering"));
    
    Collection<RowData> employees = Arrays.asList(
        GenericRowData.of(1, StringData.fromString("Alice")),
        GenericRowData.of(2, StringData.fromString("Bob")),
        GenericRowData.of(3, StringData.fromString("Charlie"))
    );
    
    cache.put(departmentKey, employees);
    
    assertThat(cache)
        .hasSize(1)
        .contains(departmentKey, employees);
    
    // Verify all employees are cached for the department
    assertThat(cache)
        .contains(departmentKey,
            GenericRowData.of(1, StringData.fromString("Alice")),
            GenericRowData.of(2, StringData.fromString("Bob")),
            GenericRowData.of(3, StringData.fromString("Charlie"))
        );
}

Cache State Transitions

@Test
void testCacheStateTransitions() {
    LookupCache cache = createLookupCache();
    
    // Initial empty state
    assertThat(cache).hasSize(0);
    
    // After population
    populateCache(cache);
    int populatedSize = cache.size();
    assertThat(cache).hasSize(populatedSize);
    
    // After partial clear/invalidation
    invalidateKeys(cache, keysToInvalidate);
    assertThat(cache)
        .hasSize(populatedSize - keysToInvalidate.size())
        .satisfies(updatedCache -> {
            for (RowData invalidatedKey : keysToInvalidate) {
                assertThat(updatedCache).doesNotContainKey(invalidatedKey);
            }
        });
    
    // After complete clear
    cache.clear();
    assertThat(cache).hasSize(0);
}

Complex Key Structures

@Test 
void testComplexKeyStructures() {
    // Multi-field composite keys
    RowData compositeKey = GenericRowData.of(
        1,                                      // id
        StringData.fromString("department"),    // type
        TimestampData.fromEpochMillis(System.currentTimeMillis())  // timestamp
    );
    
    Collection<RowData> values = Arrays.asList(
        GenericRowData.of(StringData.fromString("result1")),
        GenericRowData.of(StringData.fromString("result2"))
    );
    
    cache.put(compositeKey, values);
    
    // Test with exact composite key
    assertThat(cache)
        .containsKey(compositeKey)
        .contains(compositeKey, values);
    
    // Test with field-by-field key construction
    assertThat(cache)
        .containsKey(1, "department", compositeKey.getTimestamp(2, 3));
}

Performance and Consistency Testing

@Test
void testCacheConsistency() {
    LookupCache cache = createConcurrentCache();
    
    // Populate with known data
    Map<RowData, Collection<RowData>> testData = generateTestData();
    testData.forEach(cache::put);
    
    // Validate complete consistency
    assertThat(cache)
        .hasSize(testData.size())
        .containsExactlyEntriesOf(testData);
    
    // Test individual entries
    testData.forEach((key, values) -> {
        assertThat(cache)
            .containsKey(key)
            .contains(key, values);
    });
}

Integration with Table Testing

Lookup cache assertions integrate seamlessly with other table testing utilities:

@Test
void testLookupJoinWithTableAssertions() {
    // Setup lookup cache
    LookupCache cache = setupLookupCache();
    
    // Execute lookup join
    Table result = executeJoinWithLookup(sourceTable, cache);
    
    // Validate cache state
    assertThat(cache)
        .hasSize(expectedCacheEntries)
        .containsKey(expectedLookupKey);
    
    // Validate join results
    List<Row> resultRows = collectResults(result);
    assertThatRows(resultRows)
        .hasSize(expectedResultCount)
        .allSatisfy(row -> 
            assertThat(row).hasArity(expectedJoinedArity)
        );
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-table-test-utils

docs

collection-assertions.md

data-assertions.md

index.md

lookup-cache-assertions.md

type-assertions.md

tile.json