or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-org-apache-flink--flink-tests

Comprehensive test library for Apache Flink stream processing framework with integration tests, test utilities, and end-to-end validation tests.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-tests@2.1.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-tests@2.1.0

0

# Apache Flink Tests

1

2

Apache Flink Tests is a comprehensive test library providing reusable testing infrastructure, utilities, and frameworks for testing Apache Flink stream processing functionality. This module packages as a test-jar, making its test utilities available to other Flink modules for comprehensive validation of streaming, batch processing, state management, and fault tolerance features.

3

4

## Package Information

5

6

- **Package Name**: flink-tests

7

- **Package Type**: maven

8

- **Language**: Java

9

- **Installation**: Add as test-jar dependency in Maven projects

10

- **Coordinates**: `org.apache.flink:flink-tests:2.1.0`

11

12

## Core Imports

13

14

```java

15

// Test utilities and base classes

16

import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase;

17

import org.apache.flink.test.checkpointing.utils.MigrationTestUtils;

18

import org.apache.flink.test.recovery.SimpleRecoveryITCaseBase;

19

import org.apache.flink.test.util.JobGraphRunningUtil;

20

21

// Test data and operators

22

import org.apache.flink.test.operators.util.CollectionDataStreams;

23

import org.apache.flink.runtime.operators.lifecycle.graph.TestJobBuilders;

24

```

25

26

Maven dependency (test-jar):

27

28

```xml

29

<dependency>

30

<groupId>org.apache.flink</groupId>

31

<artifactId>flink-tests</artifactId>

32

<version>2.1.0</version>

33

<type>test-jar</type>

34

<scope>test</scope>

35

</dependency>

36

```

37

38

## Basic Usage

39

40

```java

41

import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase;

42

import org.apache.flink.test.operators.util.CollectionDataStreams;

43

import org.apache.flink.streaming.api.datastream.DataStreamSource;

44

import org.apache.flink.api.java.tuple.Tuple3;

45

46

// Example: Using standard test data

47

public class MyFlinkTest {

48

49

@Test

50

public void testWithStandardData() throws Exception {

51

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

52

53

DataStreamSource<Tuple3<Integer, Long, String>> testData =

54

CollectionDataStreams.get3TupleDataSet(env);

55

56

// Use test data in your Flink job

57

DataStream<Integer> result = testData

58

.map(t -> t.f0)

59

.filter(x -> x > 5);

60

61

// Add sink and execute

62

result.print();

63

env.execute("Test Job");

64

}

65

}

66

67

// Example: Testing migration scenarios

68

public class MigrationTest extends SnapshotMigrationTestBase {

69

70

@Test

71

public void testStateMigration() throws Exception {

72

// Create and execute job with state

73

JobGraph job = createJobWithOperatorState();

74

SnapshotSpec snapshot = executeAndSnapshot(job);

75

76

// Restore and validate in new version

77

JobGraph restoredJob = createUpdatedJob();

78

restoreAndExecute(restoredJob, snapshot);

79

}

80

}

81

```

82

83

## Architecture

84

85

Apache Flink Tests is organized around several key testing frameworks:

86

87

- **Test Infrastructure**: Base classes and utilities for setting up test environments

88

- **Migration Framework**: Comprehensive support for testing snapshot migration across Flink versions

89

- **Fault Tolerance Framework**: Failure injection mechanisms and recovery testing utilities

90

- **Operator Lifecycle Framework**: Complete framework for testing streaming operator behavior and lifecycle events

91

- **State Management Framework**: Extensive support for testing state backends, checkpointing, and state migration

92

- **Test Data Framework**: Standardized datasets, POJOs, and data generators for consistent testing across modules

93

94

## Capabilities

95

96

### Checkpointing and Migration Testing

97

98

Comprehensive framework for testing snapshot migration across Flink versions with utilities for state validation and checkpoint management.

99

100

```java { .api }

101

public abstract class SnapshotMigrationTestBase {

102

protected SnapshotSpec executeAndSnapshot(JobGraph job) throws Exception;

103

protected void restoreAndExecute(JobGraph job, SnapshotSpec snapshot) throws Exception;

104

105

public static class SnapshotSpec {

106

public String getSnapshotPath();

107

public String getSnapshotVersion();

108

}

109

}

110

```

111

112

[Checkpointing and Migration](./checkpointing-migration.md)

113

114

### Fault Tolerance and Recovery Testing

115

116

Multiple failure injection mechanisms and recovery testing utilities for validating Flink's fault tolerance capabilities.

117

118

```java { .api }

119

public abstract class SimpleRecoveryITCaseBase {

120

protected void runAndCancelJob(JobGraph jobGraph) throws Exception;

121

122

public static class FailingMapper1 implements MapFunction<Integer, Integer>;

123

public static class FailingMapper2 implements MapFunction<Integer, Integer>;

124

}

125

```

126

127

[Fault Tolerance and Recovery](./fault-tolerance-recovery.md)

128

129

### Operator Lifecycle Testing

130

131

Complete framework for testing streaming operator behavior including startup, checkpointing, finishing, and shutdown phases.

132

133

```java { .api }

134

public class TestJobBuilders {

135

public static final TestJobBuilder SIMPLE_GRAPH_BUILDER;

136

public static final TestJobBuilder COMPLEX_GRAPH_BUILDER;

137

}

138

139

public class OneInputTestStreamOperator extends AbstractStreamOperator<TestDataElement>

140

implements OneInputStreamOperator<TestDataElement, TestDataElement>;

141

142

public class TestEventQueue {

143

public void add(TestEvent event);

144

public List<TestEvent> getEvents();

145

}

146

```

147

148

[Operator Lifecycle Testing](./operator-lifecycle.md)

149

150

### State Backend and Operator Restore Testing

151

152

Framework for testing state backend switching, operator restore scenarios, and state migration validation.

153

154

```java { .api }

155

public abstract class AbstractOperatorRestoreTestBase {

156

protected void testRestore() throws Exception;

157

}

158

159

public abstract class SavepointStateBackendSwitchTestBase {

160

protected void testSwitchingStateBackend() throws Exception;

161

}

162

```

163

164

[State Backend and Restore Testing](./state-backend-restore.md)

165

166

### Standardized Test Data and Utilities

167

168

Reusable datasets, POJOs, and runtime utilities for consistent testing across Flink modules.

169

170

```java { .api }

171

public class CollectionDataStreams {

172

public static DataSet<Tuple3<Integer, Long, String>> get3TupleDataSet(ExecutionEnvironment env);

173

public static DataSet<Tuple3<Integer, Long, String>> getSmall3TupleDataSet(ExecutionEnvironment env);

174

}

175

176

public class JobGraphRunningUtil {

177

public static void execute(JobGraph jobGraph, MiniCluster miniCluster) throws Exception;

178

}

179

```

180

181

[Test Data and Utilities](./test-data-utilities.md)

182

183

### Cancellation Testing Framework

184

185

Framework for testing job cancellation scenarios and cleanup behavior.

186

187

```java { .api }

188

public abstract class CancelingTestBase {

189

protected void runAndCancelJob(JobGraph jobGraph, long cancelAfterMs) throws Exception;

190

}

191

```

192

193

[Cancellation Testing](./cancellation-testing.md)

194

195

### Session Window Testing Framework

196

197

Specialized testing framework for session window functionality with event generation and validation.

198

199

```java { .api }

200

public class EventGeneratorFactory {

201

public static SessionEventGenerator create(SessionConfiguration config);

202

}

203

204

public class SessionEvent {

205

public String getSessionId();

206

public long getTimestamp();

207

public TestEventPayload getPayload();

208

}

209

```

210

211

[Session Window Testing](./session-window-testing.md)

212

213

### Plugin Testing Framework

214

215

Framework for testing Flink's plugin system and service provider interface (SPI) implementations.

216

217

```java { .api }

218

public abstract class PluginTestBase {

219

protected void testPluginLoading() throws Exception;

220

}

221

```

222

223

[Plugin Testing](./plugin-testing.md)

224

225

### Runtime Utilities

226

227

Job execution utilities, process management, and common testing operations for controlled test environments.

228

229

```java { .api }

230

public class JobGraphRunningUtil {

231

public static void execute(JobGraph jobGraph, MiniCluster miniCluster) throws Exception;

232

public static JobExecutionResult executeWithTimeout(JobGraph jobGraph, MiniCluster miniCluster, long timeoutMs) throws Exception;

233

}

234

235

public class TestEnvironmentUtil {

236

public static Configuration createTestClusterConfig(int parallelism, int numTaskManagers);

237

public static StreamExecutionEnvironment createTestStreamEnv(int parallelism, boolean checkpointingEnabled);

238

}

239

```

240

241

[Runtime Utilities](./runtime-utilities.md)

242

243

## Types

244

245

```java { .api }

246

// Core test data types

247

public class CustomType {

248

public String myString;

249

public int myInt;

250

public CustomType(String myString, int myInt);

251

}

252

253

public class POJO {

254

public int number;

255

public String str;

256

public POJO();

257

public POJO(int number, String str);

258

}

259

260

public class NestedPojo {

261

public POJO nested;

262

public long longField;

263

public NestedPojo(POJO nested, long longField);

264

}

265

266

public class CrazyNested {

267

public NestedPojo nestedPojo;

268

public POJO simplePojo;

269

public String stringField;

270

public CrazyNested(NestedPojo nestedPojo, POJO simplePojo, String stringField);

271

}

272

273

public class PojoWithDateAndEnum {

274

public Date dateField;

275

public TestEnum enumField;

276

public String stringField;

277

public PojoWithDateAndEnum(Date dateField, TestEnum enumField, String stringField);

278

279

public enum TestEnum { VALUE1, VALUE2, VALUE3 }

280

}

281

282

public class PojoWithCollectionGeneric {

283

public List<String> stringList;

284

public Map<String, Integer> stringIntMap;

285

public Set<Long> longSet;

286

public PojoWithCollectionGeneric(List<String> stringList, Map<String, Integer> stringIntMap, Set<Long> longSet);

287

}

288

289

// Fault tolerance types

290

public class PrefixCount {

291

public String prefix;

292

public Integer value;

293

public Long count;

294

public PrefixCount(String prefix, Integer value, Long count);

295

}

296

297

// Migration testing types

298

public class SnapshotSpec {

299

public String getSnapshotPath();

300

public String getSnapshotVersion();

301

}

302

303

// Event system types

304

public interface TestEvent {

305

String getOperatorId();

306

long getTimestamp();

307

}

308

309

public class OperatorStartedEvent implements TestEvent;

310

public class OperatorFinishedEvent implements TestEvent;

311

public class CheckpointStartedEvent implements TestEvent;

312

public class CheckpointCompletedEvent implements TestEvent;

313

public class InputEndedEvent implements TestEvent;

314

public class WatermarkReceivedEvent implements TestEvent;

315

public class TestCommandAckEvent implements TestEvent;

316

317

// Command system types

318

public interface TestCommand {

319

void execute(StreamOperator<?> operator);

320

String getCommandType();

321

}

322

323

public enum TestCommandScope {

324

ALL_SUBTASKS, SINGLE_SUBTASK

325

}

326

327

// Test data element

328

public class TestDataElement {

329

public String value;

330

public long timestamp;

331

public TestDataElement(String value, long timestamp);

332

}

333

334

// Session window testing types

335

public class SessionEvent {

336

public String getSessionId();

337

public long getTimestamp();

338

public TestEventPayload getPayload();

339

}

340

341

public class TestEventPayload {

342

public String data;

343

public Map<String, Object> properties;

344

}

345

346

// Enumeration types

347

public enum FailoverStrategy {

348

RestartAllFailoverStrategy,

349

RestartPipelinedRegionFailoverStrategy

350

}

351

352

public enum ExecutionMode {

353

CREATE_SNAPSHOT,

354

VERIFY_SNAPSHOT

355

}

356

357

public enum SnapshotType {

358

SAVEPOINT_CANONICAL,

359

SAVEPOINT_NATIVE,

360

CHECKPOINT

361

}

362

363

// Configuration types

364

public class SessionConfiguration {

365

public long sessionTimeout;

366

public long sessionGap;

367

public int maxConcurrentSessions;

368

}

369

370

public class TestConfiguration {

371

public int parallelism;

372

public long checkpointInterval;

373

public boolean enableCheckpointing;

374

}

375

```