or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdmetrics-testing.mdminicluster-management.mdresult-verification.mdspecialized-connectors.mdtest-data-sources.mdtest-environments.mdvalidation-utilities.md

specialized-connectors.mddocs/

0

# Specialized Test Connectors

1

2

Test connectors for specific use cases including upsert operations testing with configurable serialization schemas and output file management. These connectors provide specialized functionality for testing complex data processing scenarios.

3

4

## Capabilities

5

6

### Upsert Test Sink

7

8

Sink specifically designed for testing upsert (insert/update) operations with configurable key-value serialization and file-based output verification.

9

10

```java { .api }

11

@PublicEvolving

12

public class UpsertTestSink<IN> implements Sink<IN> {

13

public static <IN> UpsertTestSinkBuilder<IN> builder();

14

}

15

```

16

17

### Upsert Test Sink Builder

18

19

Builder pattern implementation for configuring UpsertTestSink with customizable serialization schemas and output options.

20

21

```java { .api }

22

public class UpsertTestSinkBuilder<IN> {

23

public UpsertTestSinkBuilder<IN> setOutputFile(File outputFile);

24

public UpsertTestSinkBuilder<IN> setKeySerializationSchema(SerializationSchema<IN> keySerializationSchema);

25

public UpsertTestSinkBuilder<IN> setValueSerializationSchema(SerializationSchema<IN> valueSerializationSchema);

26

public UpsertTestSink<IN> build();

27

}

28

```

29

30

#### Basic Usage

31

32

```java

33

import org.apache.flink.connector.upserttest.sink.UpsertTestSink;

34

import org.apache.flink.api.common.serialization.SimpleStringSchema;

35

import java.io.File;

36

37

// Create upsert test sink with file output

38

File outputFile = new File("/tmp/upsert-test-output.txt");

39

UpsertTestSink<String> sink = UpsertTestSink.<String>builder()

40

.setOutputFile(outputFile)

41

.setKeySerializationSchema(new SimpleStringSchema())

42

.setValueSerializationSchema(new SimpleStringSchema())

43

.build();

44

45

// Use in streaming job

46

env.fromElements("key1:value1", "key2:value2", "key1:updated_value1")

47

.map(new KeyValueParser())

48

.sinkTo(sink);

49

```

50

51

#### Advanced Usage with Custom Serialization

52

53

```java

54

import org.apache.flink.api.common.serialization.SerializationSchema;

55

56

// Custom serialization schema for keys

57

SerializationSchema<MyRecord> keySerializer = new SerializationSchema<MyRecord>() {

58

@Override

59

public byte[] serialize(MyRecord record) {

60

return record.getKey().getBytes();

61

}

62

};

63

64

// Custom serialization schema for values

65

SerializationSchema<MyRecord> valueSerializer = new SerializationSchema<MyRecord>() {

66

@Override

67

public byte[] serialize(MyRecord record) {

68

return record.toJson().getBytes();

69

}

70

};

71

72

// Build sink with custom serializers

73

UpsertTestSink<MyRecord> customSink = UpsertTestSink.<MyRecord>builder()

74

.setOutputFile(new File("/tmp/custom-upsert-output.json"))

75

.setKeySerializationSchema(keySerializer)

76

.setValueSerializationSchema(valueSerializer)

77

.build();

78

```

79

80

### Upsert Test Sink Writer

81

82

Internal writer implementation for the UpsertTestSink that handles the actual upsert logic and file operations.

83

84

```java { .api }

85

public class UpsertTestSinkWriter<IN> implements SinkWriter<IN> {

86

// Internal implementation for upsert operations

87

}

88

```

89

90

### Upsert File Utilities

91

92

Utility functions for managing upsert test output files and performing file-based verifications.

93

94

```java { .api }

95

public class UpsertTestFileUtil {

96

// File management utilities for upsert testing

97

}

98

```

99

100

### Immutable Byte Array Wrapper

101

102

Utility class for handling immutable byte array operations in upsert testing scenarios.

103

104

```java { .api }

105

public class ImmutableByteArrayWrapper {

106

// Immutable byte array handling

107

}

108

```

109

110

## Table API Integration

111

112

### Dynamic Table Sink for Upsert Testing

113

114

Table API integration for upsert testing with dynamic table sink functionality.

115

116

```java { .api }

117

public class UpsertTestDynamicTableSink implements DynamicTableSink {

118

// Dynamic table sink for upsert operations

119

}

120

```

121

122

### Dynamic Table Sink Factory

123

124

Factory for creating upsert test dynamic table sinks with proper configuration handling.

125

126

```java { .api }

127

public class UpsertTestDynamicTableSinkFactory implements DynamicTableSinkFactory {

128

// Factory for creating upsert test table sinks

129

}

130

```

131

132

### Connector Options

133

134

Configuration options for the upsert test connector when used with Table API.

135

136

```java { .api }

137

public class UpsertTestConnectorOptions {

138

// Configuration options for upsert test connector

139

}

140

```

141

142

#### Table API Usage Example

143

144

```sql

145

-- Create upsert test table in SQL

146

CREATE TABLE upsert_test_sink (

147

id STRING,

148

name STRING,

149

value BIGINT,

150

PRIMARY KEY (id) NOT ENFORCED

151

) WITH (

152

'connector' = 'upsert-test',

153

'output-file' = '/tmp/table-upsert-output.txt'

154

);

155

156

-- Insert data with upserts

157

INSERT INTO upsert_test_sink VALUES

158

('1', 'Alice', 100),

159

('2', 'Bob', 200),

160

('1', 'Alice Updated', 150);

161

```

162

163

## Usage Patterns

164

165

### Basic Upsert Testing

166

167

Testing simple key-value upsert operations with string data.

168

169

```java

170

@Test

171

void testBasicUpsertOperations() throws Exception {

172

File outputFile = tempDir.resolve("upsert-output.txt").toFile();

173

174

UpsertTestSink<Tuple2<String, String>> sink =

175

UpsertTestSink.<Tuple2<String, String>>builder()

176

.setOutputFile(outputFile)

177

.setKeySerializationSchema(new SimpleStringSchema())

178

.setValueSerializationSchema(new SimpleStringSchema())

179

.build();

180

181

// Create test data with upserts

182

env.fromElements(

183

Tuple2.of("key1", "value1"),

184

Tuple2.of("key2", "value2"),

185

Tuple2.of("key1", "updated_value1"), // Upsert

186

Tuple2.of("key3", "value3")

187

).sinkTo(sink);

188

189

env.execute("Upsert Test");

190

191

// Verify output file contains final state

192

List<String> lines = Files.readAllLines(outputFile.toPath());

193

assertTrue(lines.contains("key1:updated_value1"));

194

assertTrue(lines.contains("key2:value2"));

195

assertTrue(lines.contains("key3:value3"));

196

assertEquals(3, lines.size()); // Only final states

197

}

198

```

199

200

### Complex Object Upsert Testing

201

202

Testing upsert operations with complex objects and custom serialization.

203

204

```java

205

@Test

206

void testComplexObjectUpserts() throws Exception {

207

File outputFile = tempDir.resolve("complex-upsert-output.json").toFile();

208

209

// Custom serialization for complex objects

210

SerializationSchema<UserRecord> keySchema = record -> record.getUserId().getBytes();

211

SerializationSchema<UserRecord> valueSchema = record -> {

212

ObjectMapper mapper = new ObjectMapper();

213

try {

214

return mapper.writeValueAsBytes(record);

215

} catch (JsonProcessingException e) {

216

throw new RuntimeException(e);

217

}

218

};

219

220

UpsertTestSink<UserRecord> sink = UpsertTestSink.<UserRecord>builder()

221

.setOutputFile(outputFile)

222

.setKeySerializationSchema(keySchema)

223

.setValueSerializationSchema(valueSchema)

224

.build();

225

226

// Test data with user record updates

227

env.fromElements(

228

new UserRecord("user1", "Alice", "alice@example.com"),

229

new UserRecord("user2", "Bob", "bob@example.com"),

230

new UserRecord("user1", "Alice Smith", "alice.smith@example.com") // Update

231

).sinkTo(sink);

232

233

env.execute("Complex Upsert Test");

234

235

// Verify JSON output

236

List<String> jsonLines = Files.readAllLines(outputFile.toPath());

237

assertEquals(2, jsonLines.size());

238

239

// Parse and verify updated record

240

ObjectMapper mapper = new ObjectMapper();

241

UserRecord updatedUser = mapper.readValue(

242

jsonLines.stream()

243

.filter(line -> line.contains("Alice Smith"))

244

.findFirst()

245

.orElseThrow(),

246

UserRecord.class

247

);

248

assertEquals("Alice Smith", updatedUser.getName());

249

assertEquals("alice.smith@example.com", updatedUser.getEmail());

250

}

251

```

252

253

### Table API Upsert Integration

254

255

Using the upsert test connector with Flink's Table API for SQL-based testing.

256

257

```java

258

@Test

259

void testTableApiUpsertIntegration() throws Exception {

260

StreamExecutionEnvironment env = getTestEnvironment();

261

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

262

263

File outputFile = tempDir.resolve("table-upsert-output.txt").toFile();

264

265

// Create upsert test table

266

String createTableDDL = String.format(

267

"CREATE TABLE upsert_sink (" +

268

" id STRING," +

269

" name STRING," +

270

" score INT," +

271

" PRIMARY KEY (id) NOT ENFORCED" +

272

") WITH (" +

273

" 'connector' = 'upsert-test'," +

274

" 'output-file' = '%s'" +

275

")",

276

outputFile.getAbsolutePath()

277

);

278

279

tableEnv.executeSql(createTableDDL);

280

281

// Insert data with upserts

282

tableEnv.executeSql(

283

"INSERT INTO upsert_sink VALUES " +

284

"('1', 'Alice', 95), " +

285

"('2', 'Bob', 87), " +

286

"('1', 'Alice', 98)" // Upsert - score update

287

);

288

289

// Verify results

290

assertTrue(outputFile.exists());

291

List<String> lines = Files.readAllLines(outputFile.toPath());

292

293

// Should contain final state only

294

assertEquals(2, lines.size());

295

assertTrue(lines.stream().anyMatch(line -> line.contains("Alice") && line.contains("98")));

296

assertTrue(lines.stream().anyMatch(line -> line.contains("Bob") && line.contains("87")));

297

}

298

```

299

300

### Verification Patterns

301

302

Common patterns for verifying upsert test results.

303

304

```java

305

@Test

306

void testUpsertResultVerification() throws Exception {

307

File outputFile = tempDir.resolve("verification-output.txt").toFile();

308

309

// Run upsert test job

310

runUpsertTestJob(outputFile);

311

312

// Verification approaches

313

314

// 1. Line count verification

315

List<String> lines = Files.readAllLines(outputFile.toPath());

316

assertEquals(expectedFinalRecordCount, lines.size());

317

318

// 2. Content verification

319

assertTrue(lines.contains("expected_final_record"));

320

assertFalse(lines.contains("overwritten_record"));

321

322

// 3. Structured verification

323

Map<String, String> finalState = lines.stream()

324

.map(line -> line.split(":"))

325

.collect(Collectors.toMap(parts -> parts[0], parts -> parts[1]));

326

327

assertEquals("expected_value", finalState.get("test_key"));

328

329

// 4. Using TestBaseUtils for comparison

330

String expectedOutput = "key1:final_value1\nkey2:final_value2";

331

TestBaseUtils.compareResultsByLinesInMemory(

332

expectedOutput, outputFile.getAbsolutePath());

333

}

334

```