0
# Specialized Test Connectors
1
2
Test connectors for specific use cases including upsert operations testing with configurable serialization schemas and output file management. These connectors provide specialized functionality for testing complex data processing scenarios.
3
4
## Capabilities
5
6
### Upsert Test Sink
7
8
Sink specifically designed for testing upsert (insert/update) operations with configurable key-value serialization and file-based output verification.
9
10
```java { .api }
11
@PublicEvolving
12
public class UpsertTestSink<IN> implements Sink<IN> {
13
public static <IN> UpsertTestSinkBuilder<IN> builder();
14
}
15
```
16
17
### Upsert Test Sink Builder
18
19
Builder pattern implementation for configuring UpsertTestSink with customizable serialization schemas and output options.
20
21
```java { .api }
22
public class UpsertTestSinkBuilder<IN> {
23
public UpsertTestSinkBuilder<IN> setOutputFile(File outputFile);
24
public UpsertTestSinkBuilder<IN> setKeySerializationSchema(SerializationSchema<IN> keySerializationSchema);
25
public UpsertTestSinkBuilder<IN> setValueSerializationSchema(SerializationSchema<IN> valueSerializationSchema);
26
public UpsertTestSink<IN> build();
27
}
28
```
29
30
#### Basic Usage
31
32
```java
33
import org.apache.flink.connector.upserttest.sink.UpsertTestSink;
34
import org.apache.flink.api.common.serialization.SimpleStringSchema;
35
import java.io.File;
36
37
// Create upsert test sink with file output
38
File outputFile = new File("/tmp/upsert-test-output.txt");
39
UpsertTestSink<String> sink = UpsertTestSink.<String>builder()
40
.setOutputFile(outputFile)
41
.setKeySerializationSchema(new SimpleStringSchema())
42
.setValueSerializationSchema(new SimpleStringSchema())
43
.build();
44
45
// Use in streaming job
46
env.fromElements("key1:value1", "key2:value2", "key1:updated_value1")
47
.map(new KeyValueParser())
48
.sinkTo(sink);
49
```
50
51
#### Advanced Usage with Custom Serialization
52
53
```java
54
import org.apache.flink.api.common.serialization.SerializationSchema;
55
56
// Custom serialization schema for keys
57
SerializationSchema<MyRecord> keySerializer = new SerializationSchema<MyRecord>() {
58
@Override
59
public byte[] serialize(MyRecord record) {
60
return record.getKey().getBytes();
61
}
62
};
63
64
// Custom serialization schema for values
65
SerializationSchema<MyRecord> valueSerializer = new SerializationSchema<MyRecord>() {
66
@Override
67
public byte[] serialize(MyRecord record) {
68
return record.toJson().getBytes();
69
}
70
};
71
72
// Build sink with custom serializers
73
UpsertTestSink<MyRecord> customSink = UpsertTestSink.<MyRecord>builder()
74
.setOutputFile(new File("/tmp/custom-upsert-output.json"))
75
.setKeySerializationSchema(keySerializer)
76
.setValueSerializationSchema(valueSerializer)
77
.build();
78
```
79
80
### Upsert Test Sink Writer
81
82
Internal writer implementation for the UpsertTestSink that handles the actual upsert logic and file operations.
83
84
```java { .api }
85
public class UpsertTestSinkWriter<IN> implements SinkWriter<IN> {
86
// Internal implementation for upsert operations
87
}
88
```
89
90
### Upsert File Utilities
91
92
Utility functions for managing upsert test output files and performing file-based verifications.
93
94
```java { .api }
95
public class UpsertTestFileUtil {
96
// File management utilities for upsert testing
97
}
98
```
99
100
### Immutable Byte Array Wrapper
101
102
Utility class for handling immutable byte array operations in upsert testing scenarios.
103
104
```java { .api }
105
public class ImmutableByteArrayWrapper {
106
// Immutable byte array handling
107
}
108
```
109
110
## Table API Integration
111
112
### Dynamic Table Sink for Upsert Testing
113
114
Table API integration for upsert testing with dynamic table sink functionality.
115
116
```java { .api }
117
public class UpsertTestDynamicTableSink implements DynamicTableSink {
118
// Dynamic table sink for upsert operations
119
}
120
```
121
122
### Dynamic Table Sink Factory
123
124
Factory for creating upsert test dynamic table sinks with proper configuration handling.
125
126
```java { .api }
127
public class UpsertTestDynamicTableSinkFactory implements DynamicTableSinkFactory {
128
// Factory for creating upsert test table sinks
129
}
130
```
131
132
### Connector Options
133
134
Configuration options for the upsert test connector when used with Table API.
135
136
```java { .api }
137
public class UpsertTestConnectorOptions {
138
// Configuration options for upsert test connector
139
}
140
```
141
142
#### Table API Usage Example
143
144
```sql
145
-- Create upsert test table in SQL
146
CREATE TABLE upsert_test_sink (
147
id STRING,
148
name STRING,
149
value BIGINT,
150
PRIMARY KEY (id) NOT ENFORCED
151
) WITH (
152
'connector' = 'upsert-test',
153
'output-file' = '/tmp/table-upsert-output.txt'
154
);
155
156
-- Insert data with upserts
157
INSERT INTO upsert_test_sink VALUES
158
('1', 'Alice', 100),
159
('2', 'Bob', 200),
160
('1', 'Alice Updated', 150);
161
```
162
163
## Usage Patterns
164
165
### Basic Upsert Testing
166
167
Testing simple key-value upsert operations with string data.
168
169
```java
170
@Test
171
void testBasicUpsertOperations() throws Exception {
172
File outputFile = tempDir.resolve("upsert-output.txt").toFile();
173
174
UpsertTestSink<Tuple2<String, String>> sink =
175
UpsertTestSink.<Tuple2<String, String>>builder()
176
.setOutputFile(outputFile)
177
.setKeySerializationSchema(new SimpleStringSchema())
178
.setValueSerializationSchema(new SimpleStringSchema())
179
.build();
180
181
// Create test data with upserts
182
env.fromElements(
183
Tuple2.of("key1", "value1"),
184
Tuple2.of("key2", "value2"),
185
Tuple2.of("key1", "updated_value1"), // Upsert
186
Tuple2.of("key3", "value3")
187
).sinkTo(sink);
188
189
env.execute("Upsert Test");
190
191
// Verify output file contains final state
192
List<String> lines = Files.readAllLines(outputFile.toPath());
193
assertTrue(lines.contains("key1:updated_value1"));
194
assertTrue(lines.contains("key2:value2"));
195
assertTrue(lines.contains("key3:value3"));
196
assertEquals(3, lines.size()); // Only final states
197
}
198
```
199
200
### Complex Object Upsert Testing
201
202
Testing upsert operations with complex objects and custom serialization.
203
204
```java
205
@Test
206
void testComplexObjectUpserts() throws Exception {
207
File outputFile = tempDir.resolve("complex-upsert-output.json").toFile();
208
209
// Custom serialization for complex objects
210
SerializationSchema<UserRecord> keySchema = record -> record.getUserId().getBytes();
211
SerializationSchema<UserRecord> valueSchema = record -> {
212
ObjectMapper mapper = new ObjectMapper();
213
try {
214
return mapper.writeValueAsBytes(record);
215
} catch (JsonProcessingException e) {
216
throw new RuntimeException(e);
217
}
218
};
219
220
UpsertTestSink<UserRecord> sink = UpsertTestSink.<UserRecord>builder()
221
.setOutputFile(outputFile)
222
.setKeySerializationSchema(keySchema)
223
.setValueSerializationSchema(valueSchema)
224
.build();
225
226
// Test data with user record updates
227
env.fromElements(
228
new UserRecord("user1", "Alice", "alice@example.com"),
229
new UserRecord("user2", "Bob", "bob@example.com"),
230
new UserRecord("user1", "Alice Smith", "alice.smith@example.com") // Update
231
).sinkTo(sink);
232
233
env.execute("Complex Upsert Test");
234
235
// Verify JSON output
236
List<String> jsonLines = Files.readAllLines(outputFile.toPath());
237
assertEquals(2, jsonLines.size());
238
239
// Parse and verify updated record
240
ObjectMapper mapper = new ObjectMapper();
241
UserRecord updatedUser = mapper.readValue(
242
jsonLines.stream()
243
.filter(line -> line.contains("Alice Smith"))
244
.findFirst()
245
.orElseThrow(),
246
UserRecord.class
247
);
248
assertEquals("Alice Smith", updatedUser.getName());
249
assertEquals("alice.smith@example.com", updatedUser.getEmail());
250
}
251
```
252
253
### Table API Upsert Integration
254
255
Using the upsert test connector with Flink's Table API for SQL-based testing.
256
257
```java
258
@Test
259
void testTableApiUpsertIntegration() throws Exception {
260
StreamExecutionEnvironment env = getTestEnvironment();
261
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
262
263
File outputFile = tempDir.resolve("table-upsert-output.txt").toFile();
264
265
// Create upsert test table
266
String createTableDDL = String.format(
267
"CREATE TABLE upsert_sink (" +
268
" id STRING," +
269
" name STRING," +
270
" score INT," +
271
" PRIMARY KEY (id) NOT ENFORCED" +
272
") WITH (" +
273
" 'connector' = 'upsert-test'," +
274
" 'output-file' = '%s'" +
275
")",
276
outputFile.getAbsolutePath()
277
);
278
279
tableEnv.executeSql(createTableDDL);
280
281
// Insert data with upserts
282
tableEnv.executeSql(
283
"INSERT INTO upsert_sink VALUES " +
284
"('1', 'Alice', 95), " +
285
"('2', 'Bob', 87), " +
286
"('1', 'Alice', 98)" // Upsert - score update
287
);
288
289
// Verify results
290
assertTrue(outputFile.exists());
291
List<String> lines = Files.readAllLines(outputFile.toPath());
292
293
// Should contain final state only
294
assertEquals(2, lines.size());
295
assertTrue(lines.stream().anyMatch(line -> line.contains("Alice") && line.contains("98")));
296
assertTrue(lines.stream().anyMatch(line -> line.contains("Bob") && line.contains("87")));
297
}
298
```
299
300
### Verification Patterns
301
302
Common patterns for verifying upsert test results.
303
304
```java
305
@Test
306
void testUpsertResultVerification() throws Exception {
307
File outputFile = tempDir.resolve("verification-output.txt").toFile();
308
309
// Run upsert test job
310
runUpsertTestJob(outputFile);
311
312
// Verification approaches
313
314
// 1. Line count verification
315
List<String> lines = Files.readAllLines(outputFile.toPath());
316
assertEquals(expectedFinalRecordCount, lines.size());
317
318
// 2. Content verification
319
assertTrue(lines.contains("expected_final_record"));
320
assertFalse(lines.contains("overwritten_record"));
321
322
// 3. Structured verification
323
Map<String, String> finalState = lines.stream()
324
.map(line -> line.split(":"))
325
.collect(Collectors.toMap(parts -> parts[0], parts -> parts[1]));
326
327
assertEquals("expected_value", finalState.get("test_key"));
328
329
// 4. Using TestBaseUtils for comparison
330
String expectedOutput = "key1:final_value1\nkey2:final_value2";
331
TestBaseUtils.compareResultsByLinesInMemory(
332
expectedOutput, outputFile.getAbsolutePath());
333
}
334
```