or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdsecurity-testing.mdtest-base-classes.mdtest-data.mdtest-environments.mdtest-utilities.md

index.mddocs/

0

# Flink Test Utils

1

2

Flink Test Utils is a comprehensive testing library for Apache Flink applications providing test environments, base classes, utilities, and sample data for unit and integration testing of both streaming and batch processing applications.

3

4

## Package Information

5

6

- **Package Name**: flink-test-utils_2.10

7

- **Package Type**: Maven

8

- **Language**: Java (with Scala 2.10 support)

9

- **Installation**:

10

```xml

11

<dependency>

12

<groupId>org.apache.flink</groupId>

13

<artifactId>flink-test-utils_2.10</artifactId>

14

<version>1.3.3</version>

15

<scope>test</scope>

16

</dependency>

17

```

18

19

## Core Imports

20

21

```java

22

import org.apache.flink.test.util.*;

23

import org.apache.flink.streaming.util.*;

24

import org.apache.flink.test.testdata.*;

25

```

26

27

## Basic Usage

28

29

```java

30

// Basic test setup

31

public class MyFlinkTest extends JavaProgramTestBase {

32

@Override

33

protected void testProgram() throws Exception {

34

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

35

36

// Your test logic here

37

DataSet<String> input = env.fromElements("hello", "world");

38

DataSet<String> result = input.map(s -> s.toUpperCase());

39

40

// Collect and verify results

41

List<String> resultList = result.collect();

42

TestBaseUtils.compareResultAsText(resultList, "HELLO\\nWORLD");

43

}

44

}

45

```

46

47

## Architecture

48

49

The library is organized into three main areas:

50

51

- **Test Environments**: Execution environments for running Flink jobs in test contexts

52

- **Test Base Classes**: Abstract base classes providing standardized test patterns and cluster management

53

- **Test Utilities**: Helper classes for result validation, cluster management, and security testing

54

- **Sample Test Data**: Pre-built datasets for common algorithms and testing scenarios

55

56

## Capabilities

57

58

### Test Environment Management

59

60

Provides specialized execution environments for testing Flink applications in controlled environments.

61

62

```java { .api }

63

// Batch job testing environment

64

public class TestEnvironment extends ExecutionEnvironment {

65

public TestEnvironment(LocalFlinkMiniCluster miniCluster, int parallelism, boolean isObjectReuseEnabled);

66

public JobExecutionResult execute(String jobName) throws Exception;

67

public static void setAsContext(LocalFlinkMiniCluster miniCluster, int parallelism);

68

}

69

70

// Streaming job testing environment

71

public class TestStreamEnvironment extends StreamExecutionEnvironment {

72

public TestStreamEnvironment(LocalFlinkMiniCluster miniCluster, int parallelism);

73

public JobExecutionResult execute(String jobName) throws Exception;

74

public static void setAsContext(LocalFlinkMiniCluster cluster, int parallelism);

75

}

76

```

77

78

[Test Environments](./test-environments.md)

79

80

### Test Base Classes

81

82

Abstract base classes that provide standardized testing patterns, cluster lifecycle management, and parameterized testing across multiple execution modes.

83

84

```java { .api }

85

// Base class for batch program tests

86

public abstract class JavaProgramTestBase extends AbstractTestBase {

87

public JavaProgramTestBase();

88

protected abstract void testProgram() throws Exception;

89

public void setParallelism(int parallelism);

90

}

91

92

// Base class for streaming program tests

93

public abstract class StreamingProgramTestBase extends AbstractTestBase {

94

protected abstract void testProgram() throws Exception;

95

public void setParallelism(int parallelism);

96

}

97

98

// Base class for multiple program tests

99

public class MultipleProgramsTestBase extends TestBaseUtils {

100

public MultipleProgramsTestBase(TestExecutionMode mode);

101

}

102

```

103

104

[Test Base Classes](./test-base-classes.md)

105

106

### Test Utilities and Result Validation

107

108

Comprehensive utilities for cluster management, result comparison, test data handling, and specialized testing utilities.

109

110

```java { .api }

111

public class TestBaseUtils extends TestLogger {

112

// Cluster management

113

public static LocalFlinkMiniCluster startCluster(int numTaskManagers, int taskManagerNumSlots,

114

boolean startWebserver, boolean startZooKeeper,

115

boolean singleActorSystem) throws Exception;

116

117

// Result validation and comparison

118

public static void compareResultsByLinesInMemory(String expectedResultStr, String resultPath) throws Exception;

119

public static void compareResultsByLinesInMemoryWithStrictOrder(String expectedResultStr, String resultPath) throws Exception;

120

public static <T> void compareResultAsTuples(List<T> result, String expected);

121

public static <T> void compareResultAsText(List<T> result, String expected);

122

public static void compareKeyValuePairsWithDelta(String expectedLines, String resultPath, String delimiter, double maxDelta) throws Exception;

123

public static void checkLinesAgainstRegexp(String resultPath, String regexp);

124

125

// File I/O utilities

126

public static BufferedReader[] getResultReader(String resultPath) throws IOException;

127

public static void readAllResultLines(List<String> target, String resultPath) throws IOException;

128

129

// Utility methods

130

public static String constructTestPath(Class<?> forClass, String folder);

131

public static String getFromHTTP(String url) throws Exception;

132

}

133

134

// Specialized testing utilities

135

public abstract class CheckedThread extends Thread {

136

public abstract void go() throws Exception;

137

public void sync() throws Exception;

138

}

139

140

public class RetryRule implements TestRule {

141

// JUnit rule for automatic test retries

142

}

143

144

public class CommonTestUtils {

145

public static <T extends Serializable> T createCopySerializable(T original);

146

public static void setEnv(Map<String, String> newenv);

147

}

148

```

149

150

[Test Utilities](./test-utilities.md)

151

152

### Security Testing

153

154

Support for testing Flink applications with security features enabled, including Kerberos authentication via MiniKDC.

155

156

```java { .api }

157

public class SecureTestEnvironment {

158

public static void prepare(TemporaryFolder tempFolder);

159

public static void cleanup();

160

public static Configuration populateFlinkSecureConfigurations(Configuration flinkConf);

161

}

162

163

public class TestingSecurityContext {

164

public static void install(SecurityConfiguration config,

165

Map<String, ClientSecurityConfiguration> clientSecurityConfigurationMap) throws Exception;

166

}

167

```

168

169

[Security Testing](./security-testing.md)

170

171

### Sample Test Data

172

173

Pre-built datasets and validation utilities for common Flink algorithms and testing scenarios.

174

175

```java { .api }

176

// PageRank test data

177

public class PageRankData {

178

public static final String VERTICES;

179

public static final String EDGES;

180

public static final String RANKS_AFTER_3_ITERATIONS;

181

}

182

183

// K-Means clustering test data

184

public class KMeansData {

185

public static final String DATAPOINTS;

186

public static final String INITIAL_CENTERS;

187

public static void checkResultsWithDelta(String expectedResults, List<String> resultLines, double maxDelta);

188

}

189

```

190

191

[Sample Test Data](./test-data.md)

192

193

## Common Types

194

195

```java { .api }

196

// Test execution modes

197

public enum TestExecutionMode {

198

CLUSTER,

199

CLUSTER_OBJECT_REUSE,

200

COLLECTION

201

}

202

203

// Security configuration for testing

204

public static class ClientSecurityConfiguration {

205

public String getPrincipal();

206

public String getKeytab();

207

public ClientSecurityConfiguration(String principal, String keytab);

208

}

209

210

// Test annotations for retry mechanisms

211

public @interface RetryOnFailure {

212

int times();

213

}

214

215

public @interface RetryOnException {

216

int times();

217

Class<? extends Throwable> exception();

218

}

219

220

// Local Flink cluster for testing

221

public class LocalFlinkMiniCluster {

222

// Mini cluster used by all test environments

223

}

224

225

// Duration utilities

226

public class FiniteDuration {

227

// Timeout specifications for cluster operations

228

}

229

```