0
# Lookup Cache Assertions
1
2
Validation utilities for Flink's lookup cache functionality with key-value relationship testing. These assertions enable comprehensive testing of cache behavior, key presence, and data consistency in lookup join scenarios.
3
4
## LookupCache Assertions
5
6
Comprehensive assertions for validating the state and contents of Flink's LookupCache implementations.
7
8
```java { .api }
9
public class LookupCacheAssert extends AbstractAssert<LookupCacheAssert, LookupCache> {
10
public LookupCacheAssert(LookupCache actual);
11
12
// Size validation
13
public LookupCacheAssert hasSize(int size);
14
15
// Key presence validation
16
public LookupCacheAssert containsKey(RowData keyRow);
17
public LookupCacheAssert containsKey(Object... keyFields);
18
public LookupCacheAssert doesNotContainKey(RowData keyRow);
19
public LookupCacheAssert doesNotContainKey(Object... keyFields);
20
21
// Content validation
22
public LookupCacheAssert containsExactlyEntriesOf(Map<RowData, Collection<RowData>> entries);
23
public LookupCacheAssert contains(RowData keyRow, Collection<RowData> valueRows);
24
public LookupCacheAssert contains(RowData keyRow, RowData... valueRows);
25
}
26
```
27
28
### Factory Method
29
30
```java { .api }
31
// From LookupCacheAssert class
32
public static LookupCacheAssert assertThat(LookupCache actual);
33
```
34
35
### Usage Examples
36
37
#### Basic Cache Validation
38
39
```java
40
import static org.apache.flink.table.test.lookup.cache.LookupCacheAssert.assertThat;
41
import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
42
import org.apache.flink.table.data.RowData;
43
import org.apache.flink.table.data.GenericRowData;
44
45
LookupCache cache = /* ... initialized cache */;
46
47
// Basic size validation
48
assertThat(cache)
49
.hasSize(0); // Empty cache
50
51
// After populating cache
52
cache.put(keyRow, valueRows);
53
assertThat(cache)
54
.hasSize(1);
55
```
56
57
#### Key Presence Testing
58
59
```java
60
import org.apache.flink.table.data.StringData;
61
62
RowData keyRow = GenericRowData.of(1, StringData.fromString("user1"));
63
RowData missingKeyRow = GenericRowData.of(999, StringData.fromString("missing"));
64
65
// Key presence validation
66
assertThat(cache)
67
.containsKey(keyRow)
68
.doesNotContainKey(missingKeyRow);
69
70
// Using individual key fields
71
assertThat(cache)
72
.containsKey(1, "user1")
73
.doesNotContainKey(999, "missing");
74
```
75
76
#### Content Validation
77
78
```java
79
import java.util.*;
80
81
RowData key1 = GenericRowData.of(1, StringData.fromString("dept1"));
82
RowData key2 = GenericRowData.of(2, StringData.fromString("dept2"));
83
84
Collection<RowData> values1 = Arrays.asList(
85
GenericRowData.of(101, StringData.fromString("Alice")),
86
GenericRowData.of(102, StringData.fromString("Bob"))
87
);
88
89
Collection<RowData> values2 = Arrays.asList(
90
GenericRowData.of(201, StringData.fromString("Charlie"))
91
);
92
93
// Single key-value validation
94
assertThat(cache)
95
.contains(key1, values1)
96
.contains(key2, values2);
97
98
// Using varargs for values
99
assertThat(cache)
100
.contains(key1,
101
GenericRowData.of(101, StringData.fromString("Alice")),
102
GenericRowData.of(102, StringData.fromString("Bob"))
103
);
104
```
105
106
#### Exact Content Matching
107
108
```java
109
Map<RowData, Collection<RowData>> expectedEntries = new HashMap<>();
110
expectedEntries.put(key1, values1);
111
expectedEntries.put(key2, values2);
112
113
// Validate exact cache contents
114
assertThat(cache)
115
.containsExactlyEntriesOf(expectedEntries);
116
```
117
118
#### Lookup Join Testing Scenarios
119
120
```java
121
// Test lookup join cache behavior
122
@Test
123
void testLookupJoinCaching() {
124
LookupCache cache = createLookupCache();
125
126
// Initial state - empty cache
127
assertThat(cache)
128
.hasSize(0);
129
130
// After first lookup - cache populated
131
performLookup(cache, lookupKey1);
132
assertThat(cache)
133
.hasSize(1)
134
.containsKey(lookupKey1);
135
136
// After duplicate lookup - cache hit, no size change
137
performLookup(cache, lookupKey1);
138
assertThat(cache)
139
.hasSize(1)
140
.containsKey(lookupKey1);
141
142
// After different lookup - cache expanded
143
performLookup(cache, lookupKey2);
144
assertThat(cache)
145
.hasSize(2)
146
.containsKey(lookupKey1)
147
.containsKey(lookupKey2);
148
}
149
```
150
151
#### Cache Eviction Testing
152
153
```java
154
@Test
155
void testCacheEviction() {
156
LookupCache cache = createLimitedSizeCache(maxSize = 2);
157
158
// Fill cache to capacity
159
performLookup(cache, key1);
160
performLookup(cache, key2);
161
assertThat(cache)
162
.hasSize(2)
163
.containsKey(key1)
164
.containsKey(key2);
165
166
// Trigger eviction with new key
167
performLookup(cache, key3);
168
assertThat(cache)
169
.hasSize(2) // Size limit maintained
170
.containsKey(key3); // New key present
171
172
// Verify eviction occurred (LRU behavior)
173
assertThat(cache)
174
.doesNotContainKey(key1); // Oldest key evicted
175
}
176
```
177
178
#### Multi-Value Lookup Testing
179
180
```java
181
@Test
182
void testMultiValueLookup() {
183
// For one-to-many relationships
184
RowData departmentKey = GenericRowData.of(StringData.fromString("Engineering"));
185
186
Collection<RowData> employees = Arrays.asList(
187
GenericRowData.of(1, StringData.fromString("Alice")),
188
GenericRowData.of(2, StringData.fromString("Bob")),
189
GenericRowData.of(3, StringData.fromString("Charlie"))
190
);
191
192
cache.put(departmentKey, employees);
193
194
assertThat(cache)
195
.hasSize(1)
196
.contains(departmentKey, employees);
197
198
// Verify all employees are cached for the department
199
assertThat(cache)
200
.contains(departmentKey,
201
GenericRowData.of(1, StringData.fromString("Alice")),
202
GenericRowData.of(2, StringData.fromString("Bob")),
203
GenericRowData.of(3, StringData.fromString("Charlie"))
204
);
205
}
206
```
207
208
#### Cache State Transitions
209
210
```java
211
@Test
212
void testCacheStateTransitions() {
213
LookupCache cache = createLookupCache();
214
215
// Initial empty state
216
assertThat(cache).hasSize(0);
217
218
// After population
219
populateCache(cache);
220
int populatedSize = cache.size();
221
assertThat(cache).hasSize(populatedSize);
222
223
// After partial clear/invalidation
224
invalidateKeys(cache, keysToInvalidate);
225
assertThat(cache)
226
.hasSize(populatedSize - keysToInvalidate.size())
227
.satisfies(updatedCache -> {
228
for (RowData invalidatedKey : keysToInvalidate) {
229
assertThat(updatedCache).doesNotContainKey(invalidatedKey);
230
}
231
});
232
233
// After complete clear
234
cache.clear();
235
assertThat(cache).hasSize(0);
236
}
237
```
238
239
#### Complex Key Structures
240
241
```java
242
@Test
243
void testComplexKeyStructures() {
244
// Multi-field composite keys
245
RowData compositeKey = GenericRowData.of(
246
1, // id
247
StringData.fromString("department"), // type
248
TimestampData.fromEpochMillis(System.currentTimeMillis()) // timestamp
249
);
250
251
Collection<RowData> values = Arrays.asList(
252
GenericRowData.of(StringData.fromString("result1")),
253
GenericRowData.of(StringData.fromString("result2"))
254
);
255
256
cache.put(compositeKey, values);
257
258
// Test with exact composite key
259
assertThat(cache)
260
.containsKey(compositeKey)
261
.contains(compositeKey, values);
262
263
// Test with field-by-field key construction
264
assertThat(cache)
265
.containsKey(1, "department", compositeKey.getTimestamp(2, 3));
266
}
267
```
268
269
#### Performance and Consistency Testing
270
271
```java
272
@Test
273
void testCacheConsistency() {
274
LookupCache cache = createConcurrentCache();
275
276
// Populate with known data
277
Map<RowData, Collection<RowData>> testData = generateTestData();
278
testData.forEach(cache::put);
279
280
// Validate complete consistency
281
assertThat(cache)
282
.hasSize(testData.size())
283
.containsExactlyEntriesOf(testData);
284
285
// Test individual entries
286
testData.forEach((key, values) -> {
287
assertThat(cache)
288
.containsKey(key)
289
.contains(key, values);
290
});
291
}
292
```
293
294
## Integration with Table Testing
295
296
Lookup cache assertions integrate seamlessly with other table testing utilities:
297
298
```java
299
@Test
300
void testLookupJoinWithTableAssertions() {
301
// Setup lookup cache
302
LookupCache cache = setupLookupCache();
303
304
// Execute lookup join
305
Table result = executeJoinWithLookup(sourceTable, cache);
306
307
// Validate cache state
308
assertThat(cache)
309
.hasSize(expectedCacheEntries)
310
.containsKey(expectedLookupKey);
311
312
// Validate join results
313
List<Row> resultRows = collectResults(result);
314
assertThatRows(resultRows)
315
.hasSize(expectedResultCount)
316
.allSatisfy(row ->
317
assertThat(row).hasArity(expectedJoinedArity)
318
);
319
}