Test utilities for Apache Flink's Table API and SQL ecosystem enabling robust testing of table operations and data transformations.
—
Validation utilities for Flink's lookup cache functionality with key-value relationship testing. These assertions enable comprehensive testing of cache behavior, key presence, and data consistency in lookup join scenarios.
Comprehensive assertions for validating the state and contents of Flink's LookupCache implementations.
public class LookupCacheAssert extends AbstractAssert<LookupCacheAssert, LookupCache> {
public LookupCacheAssert(LookupCache actual);
// Size validation
public LookupCacheAssert hasSize(int size);
// Key presence validation
public LookupCacheAssert containsKey(RowData keyRow);
public LookupCacheAssert containsKey(Object... keyFields);
public LookupCacheAssert doesNotContainKey(RowData keyRow);
public LookupCacheAssert doesNotContainKey(Object... keyFields);
// Content validation
public LookupCacheAssert containsExactlyEntriesOf(Map<RowData, Collection<RowData>> entries);
public LookupCacheAssert contains(RowData keyRow, Collection<RowData> valueRows);
public LookupCacheAssert contains(RowData keyRow, RowData... valueRows);
}// From LookupCacheAssert class
public static LookupCacheAssert assertThat(LookupCache actual);import static org.apache.flink.table.test.lookup.cache.LookupCacheAssert.assertThat;
import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.GenericRowData;
LookupCache cache = /* ... initialized cache */;
// Basic size validation
assertThat(cache)
.hasSize(0); // Empty cache
// After populating cache
cache.put(keyRow, valueRows);
assertThat(cache)
.hasSize(1);import org.apache.flink.table.data.StringData;
RowData keyRow = GenericRowData.of(1, StringData.fromString("user1"));
RowData missingKeyRow = GenericRowData.of(999, StringData.fromString("missing"));
// Key presence validation
assertThat(cache)
.containsKey(keyRow)
.doesNotContainKey(missingKeyRow);
// Using individual key fields
assertThat(cache)
.containsKey(1, "user1")
.doesNotContainKey(999, "missing");import java.util.*;
RowData key1 = GenericRowData.of(1, StringData.fromString("dept1"));
RowData key2 = GenericRowData.of(2, StringData.fromString("dept2"));
Collection<RowData> values1 = Arrays.asList(
GenericRowData.of(101, StringData.fromString("Alice")),
GenericRowData.of(102, StringData.fromString("Bob"))
);
Collection<RowData> values2 = Arrays.asList(
GenericRowData.of(201, StringData.fromString("Charlie"))
);
// Single key-value validation
assertThat(cache)
.contains(key1, values1)
.contains(key2, values2);
// Using varargs for values
assertThat(cache)
.contains(key1,
GenericRowData.of(101, StringData.fromString("Alice")),
GenericRowData.of(102, StringData.fromString("Bob"))
);Map<RowData, Collection<RowData>> expectedEntries = new HashMap<>();
expectedEntries.put(key1, values1);
expectedEntries.put(key2, values2);
// Validate exact cache contents
assertThat(cache)
.containsExactlyEntriesOf(expectedEntries);// Test lookup join cache behavior
@Test
void testLookupJoinCaching() {
LookupCache cache = createLookupCache();
// Initial state - empty cache
assertThat(cache)
.hasSize(0);
// After first lookup - cache populated
performLookup(cache, lookupKey1);
assertThat(cache)
.hasSize(1)
.containsKey(lookupKey1);
// After duplicate lookup - cache hit, no size change
performLookup(cache, lookupKey1);
assertThat(cache)
.hasSize(1)
.containsKey(lookupKey1);
// After different lookup - cache expanded
performLookup(cache, lookupKey2);
assertThat(cache)
.hasSize(2)
.containsKey(lookupKey1)
.containsKey(lookupKey2);
}@Test
void testCacheEviction() {
LookupCache cache = createLimitedSizeCache(maxSize = 2);
// Fill cache to capacity
performLookup(cache, key1);
performLookup(cache, key2);
assertThat(cache)
.hasSize(2)
.containsKey(key1)
.containsKey(key2);
// Trigger eviction with new key
performLookup(cache, key3);
assertThat(cache)
.hasSize(2) // Size limit maintained
.containsKey(key3); // New key present
// Verify eviction occurred (LRU behavior)
assertThat(cache)
.doesNotContainKey(key1); // Oldest key evicted
}@Test
void testMultiValueLookup() {
// For one-to-many relationships
RowData departmentKey = GenericRowData.of(StringData.fromString("Engineering"));
Collection<RowData> employees = Arrays.asList(
GenericRowData.of(1, StringData.fromString("Alice")),
GenericRowData.of(2, StringData.fromString("Bob")),
GenericRowData.of(3, StringData.fromString("Charlie"))
);
cache.put(departmentKey, employees);
assertThat(cache)
.hasSize(1)
.contains(departmentKey, employees);
// Verify all employees are cached for the department
assertThat(cache)
.contains(departmentKey,
GenericRowData.of(1, StringData.fromString("Alice")),
GenericRowData.of(2, StringData.fromString("Bob")),
GenericRowData.of(3, StringData.fromString("Charlie"))
);
}@Test
void testCacheStateTransitions() {
LookupCache cache = createLookupCache();
// Initial empty state
assertThat(cache).hasSize(0);
// After population
populateCache(cache);
int populatedSize = cache.size();
assertThat(cache).hasSize(populatedSize);
// After partial clear/invalidation
invalidateKeys(cache, keysToInvalidate);
assertThat(cache)
.hasSize(populatedSize - keysToInvalidate.size())
.satisfies(updatedCache -> {
for (RowData invalidatedKey : keysToInvalidate) {
assertThat(updatedCache).doesNotContainKey(invalidatedKey);
}
});
// After complete clear
cache.clear();
assertThat(cache).hasSize(0);
}@Test
void testComplexKeyStructures() {
// Multi-field composite keys
RowData compositeKey = GenericRowData.of(
1, // id
StringData.fromString("department"), // type
TimestampData.fromEpochMillis(System.currentTimeMillis()) // timestamp
);
Collection<RowData> values = Arrays.asList(
GenericRowData.of(StringData.fromString("result1")),
GenericRowData.of(StringData.fromString("result2"))
);
cache.put(compositeKey, values);
// Test with exact composite key
assertThat(cache)
.containsKey(compositeKey)
.contains(compositeKey, values);
// Test with field-by-field key construction
assertThat(cache)
.containsKey(1, "department", compositeKey.getTimestamp(2, 3));
}@Test
void testCacheConsistency() {
LookupCache cache = createConcurrentCache();
// Populate with known data
Map<RowData, Collection<RowData>> testData = generateTestData();
testData.forEach(cache::put);
// Validate complete consistency
assertThat(cache)
.hasSize(testData.size())
.containsExactlyEntriesOf(testData);
// Test individual entries
testData.forEach((key, values) -> {
assertThat(cache)
.containsKey(key)
.contains(key, values);
});
}Lookup cache assertions integrate seamlessly with other table testing utilities:
@Test
void testLookupJoinWithTableAssertions() {
// Setup lookup cache
LookupCache cache = setupLookupCache();
// Execute lookup join
Table result = executeJoinWithLookup(sourceTable, cache);
// Validate cache state
assertThat(cache)
.hasSize(expectedCacheEntries)
.containsKey(expectedLookupKey);
// Validate join results
List<Row> resultRows = collectResults(result);
assertThatRows(resultRows)
.hasSize(expectedResultCount)
.allSatisfy(row ->
assertThat(row).hasArity(expectedJoinedArity)
);
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-table-test-utils