or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-org-apache-flink--flink-test-utils

Comprehensive testing utilities for Apache Flink stream and batch processing applications

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-test-utils@2.1.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-test-utils@2.1.0

0

# Flink Test Utils

1

2

Comprehensive testing utilities for Apache Flink stream and batch processing applications. This library provides essential components like MiniCluster management for JUnit integration, result verification utilities, metrics testing, specialized test connectors, and sample data sets designed for maximum reusability across Flink application testing, integration testing frameworks, performance benchmarking, and continuous integration pipelines.

3

4

## Package Information

5

6

- **Package Name**: flink-test-utils

7

- **Package Type**: maven

8

- **Language**: Java

9

- **Group ID**: org.apache.flink

10

- **Artifact ID**: flink-test-utils

11

- **Version**: 2.1.0

12

- **Installation**: Maven dependency in pom.xml:

13

14

```xml

15

<dependency>

16

<groupId>org.apache.flink</groupId>

17

<artifactId>flink-test-utils</artifactId>

18

<version>2.1.0</version>

19

<scope>test</scope>

20

</dependency>

21

```

22

23

## Core Imports

24

25

Basic test base classes:

26

27

```java

28

import org.apache.flink.test.util.AbstractTestBase;

29

import org.apache.flink.test.util.TestBaseUtils;

30

```

31

32

JUnit 5 extensions:

33

34

```java

35

import org.apache.flink.test.junit5.MiniClusterExtension;

36

import org.apache.flink.test.junit5.InjectClusterClient;

37

```

38

39

Streaming test utilities:

40

41

```java

42

import org.apache.flink.streaming.util.TestStreamEnvironment;

43

import org.apache.flink.streaming.util.FiniteTestSource;

44

```

45

46

## Basic Usage

47

48

Setting up a basic Flink test with MiniCluster:

49

50

```java

51

import org.apache.flink.test.junit5.MiniClusterExtension;

52

import org.apache.flink.streaming.util.TestStreamEnvironment;

53

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

54

import org.junit.jupiter.api.Test;

55

import org.junit.jupiter.api.extension.RegisterExtension;

56

57

public class MyFlinkTest {

58

59

@RegisterExtension

60

static final MiniClusterExtension MINI_CLUSTER_EXTENSION =

61

new MiniClusterExtension();

62

63

@Test

64

void testStreamingJob() throws Exception {

65

StreamExecutionEnvironment env =

66

MINI_CLUSTER_EXTENSION.getTestStreamEnvironment();

67

68

// Create your streaming job

69

env.fromElements(1, 2, 3, 4, 5)

70

.map(x -> x * 2)

71

.print();

72

73

// Execute and verify

74

env.execute("Test Job");

75

}

76

}

77

```

78

79

Verifying test results:

80

81

```java

82

import org.apache.flink.test.util.TestBaseUtils;

83

import java.util.Arrays;

84

import java.util.List;

85

86

// Compare collections

87

List<String> expected = Arrays.asList("a", "b", "c");

88

List<String> actual = getJobResults();

89

TestBaseUtils.compareResultCollections(expected, actual, String::compareTo);

90

91

// Compare with file output

92

TestBaseUtils.compareResultsByLinesInMemory("a\nb\nc", outputFilePath);

93

```

94

95

## Architecture

96

97

The flink-test-utils library is organized around several key architectural components:

98

99

- **Test Base Classes**: Provide MiniCluster integration and temporary file management for both JUnit 4 and JUnit 5

100

- **Result Verification**: Comprehensive utilities for comparing results across various formats (text, tuples, collections)

101

- **Metrics Testing**: Fluent assertions and listeners for validating Flink metrics

102

- **Test Environments**: Specialized environments for streaming and batch testing with configurable execution modes

103

- **Sample Data**: Predefined datasets for common algorithm testing scenarios

104

105

This design enables thorough testing of complex distributed streaming applications while ensuring compatibility with Flink's runtime components and execution models.

106

107

## Capabilities

108

109

### MiniCluster Management

110

111

JUnit integration for managing Flink MiniCluster lifecycle with support for both JUnit 4 and JUnit 5, including parameter injection for cluster clients and configuration randomization.

112

113

```java { .api }

114

@RegisterExtension

115

static final MiniClusterExtension MINI_CLUSTER_EXTENSION = new MiniClusterExtension();

116

117

@Test

118

void testWithCluster(@InjectClusterClient ClusterClient<?> client) { }

119

```

120

121

[MiniCluster Management](./minicluster-management.md)

122

123

### Result Verification

124

125

Comprehensive utilities for comparing and validating test results across various formats including text files, collections, tuples, and numeric data with delta comparisons.

126

127

```java { .api }

128

public static <T> void compareResultCollections(

129

List<T> expected, List<T> actual, Comparator<T> comparator);

130

131

public static void compareResultsByLinesInMemory(

132

String expected, String resultPath);

133

134

public static void compareKeyValuePairsWithDelta(

135

String expected, String resultPath, String delimiter, double maxDelta);

136

```

137

138

[Result Verification](./result-verification.md)

139

140

### Test Environments

141

142

Specialized execution environments for streaming and batch testing with multiple execution modes, object reuse configuration, and temporary file management.

143

144

```java { .api }

145

public class TestStreamEnvironment extends StreamExecutionEnvironment {

146

public TestStreamEnvironment(MiniCluster miniCluster, int parallelism);

147

public static void setAsContext(MiniCluster miniCluster, int parallelism);

148

public JobExecutionResult getLastJobExecutionResult();

149

}

150

```

151

152

[Test Environments](./test-environments.md)

153

154

### Metrics Testing

155

156

Fluent assertions and metric listeners for validating Flink metrics including counters, gauges, histograms, and meters with easy retrieval and verification capabilities.

157

158

```java { .api }

159

public static CounterAssert assertThatCounter(Metric actual);

160

public static <T> GaugeAssert<T> assertThatGauge(Metric actual);

161

162

public class MetricListener implements MetricReporter {

163

public <T extends Metric> Optional<T> getMetric(Class<T> metricType, String... identifier);

164

public Optional<Counter> getCounter(String... identifier);

165

}

166

```

167

168

[Metrics Testing](./metrics-testing.md)

169

170

### Test Data Sources

171

172

Finite test sources for controlled data emission with checkpoint coordination and sample datasets for common algorithms like PageRank, K-means, and Connected Components.

173

174

```java { .api }

175

public class FiniteTestSource<T> implements SourceFunction<T> {

176

public FiniteTestSource(T... elements);

177

public FiniteTestSource(Iterable<T> elements);

178

public FiniteTestSource(BooleanSupplier couldExit, Iterable<T> elements);

179

}

180

```

181

182

[Test Data Sources](./test-data-sources.md)

183

184

### Specialized Test Connectors

185

186

Test connectors for specific use cases including upsert operations testing with configurable serialization schemas and output file management.

187

188

```java { .api }

189

public static <IN> UpsertTestSinkBuilder<IN> builder();

190

191

public class UpsertTestSinkBuilder<IN> {

192

public UpsertTestSinkBuilder<IN> setOutputFile(File outputFile);

193

public UpsertTestSinkBuilder<IN> setKeySerializationSchema(SerializationSchema<IN> schema);

194

public UpsertTestSink<IN> build();

195

}

196

```

197

198

[Specialized Test Connectors](./specialized-connectors.md)

199

200

### Validation Utilities

201

202

Utilities for POJO serialization verification, JAR packaging validation, resource discovery, and parameter property handling to ensure comprehensive testing coverage.

203

204

```java { .api }

205

public static <T> void assertSerializedAsPojo(Class<T> clazz);

206

public static void assertJarContainsOnlyFilesMatching(Path jarPath, Collection<String> allowedPaths);

207

public static Path getResource(String resourceNameRegex);

208

```

209

210

[Validation Utilities](./validation-utilities.md)