or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdmetrics-testing.mdminicluster-management.mdresult-verification.mdspecialized-connectors.mdtest-data-sources.mdtest-environments.mdvalidation-utilities.md

test-environments.mddocs/

0

# Test Environments

1

2

Specialized execution environments for streaming and batch testing with multiple execution modes, object reuse configuration, and temporary file management. These environments provide isolated testing contexts for Flink applications.

3

4

## Capabilities

5

6

### Test Stream Environment

7

8

StreamExecutionEnvironment specifically designed for testing on MiniCluster with job execution tracking and context management.

9

10

```java { .api }

11

public class TestStreamEnvironment extends StreamExecutionEnvironment {

12

public TestStreamEnvironment(MiniCluster miniCluster, int parallelism);

13

public TestStreamEnvironment(

14

MiniCluster miniCluster,

15

Configuration config,

16

int parallelism,

17

Collection<Path> jarFiles,

18

Collection<URL> classPaths);

19

20

public static void setAsContext(MiniCluster miniCluster, int parallelism);

21

public static void setAsContext(

22

MiniCluster miniCluster,

23

int parallelism,

24

Collection<Path> jarFiles,

25

Collection<URL> classPaths);

26

public static void unsetAsContext();

27

28

public void setAsContext();

29

public JobExecutionResult getLastJobExecutionResult();

30

}

31

```

32

33

#### Usage Example

34

35

```java

36

import org.apache.flink.streaming.util.TestStreamEnvironment;

37

import org.apache.flink.runtime.minicluster.MiniCluster;

38

39

// Create test environment with specific parallelism

40

MiniCluster miniCluster = getMiniCluster();

41

TestStreamEnvironment env = new TestStreamEnvironment(miniCluster, 4);

42

43

// Set as global context for factory methods

44

TestStreamEnvironment.setAsContext(miniCluster, 4);

45

StreamExecutionEnvironment env2 = StreamExecutionEnvironment.getExecutionEnvironment();

46

47

// Execute job and get result

48

env.fromElements(1, 2, 3, 4, 5)

49

.map(x -> x * 2)

50

.print();

51

52

JobExecutionResult result = env.execute("Test Job");

53

JobExecutionResult lastResult = env.getLastJobExecutionResult();

54

```

55

56

### SQL Job Submission

57

58

Interface and implementations for different SQL job submission methods including embedded SQL clients, gateway clients, and REST clients.

59

60

```java { .api }

61

public interface SQLJobClientMode {

62

EmbeddedSqlClient getEmbeddedSqlClient();

63

GatewaySqlClient getGatewaySqlClient(String host, int port);

64

HiveJDBC getHiveJDBC(String host, int port);

65

RestClient getRestClient(String host, int port, String version);

66

}

67

```

68

69

#### SQL Client Types

70

71

```java { .api }

72

public static class EmbeddedSqlClient {

73

// Embedded SQL client implementation

74

}

75

76

public static class GatewaySqlClient {

77

// Gateway SQL client implementation

78

}

79

80

public static class HiveJDBC {

81

// Hive JDBC client implementation

82

}

83

84

public static class RestClient {

85

// REST client implementation

86

}

87

```

88

89

### Job Submission Utilities

90

91

Utilities for submitting and managing Flink jobs in test environments with support for both regular jobs and SQL jobs.

92

93

```java { .api }

94

public class JobSubmission {

95

// Job submission functionality

96

}

97

98

public class SQLJobSubmission {

99

// SQL job submission functionality

100

}

101

```

102

103

### Secure Test Environment

104

105

Test environment with Kerberos security support for testing secure Flink deployments.

106

107

```java { .api }

108

public class SecureTestEnvironment {

109

// Security context management for tests

110

}

111

```

112

113

### Testing Security Context

114

115

Security context management utilities for handling authentication and authorization in test environments.

116

117

```java { .api }

118

public class TestingSecurityContext {

119

// Security context utilities

120

}

121

```

122

123

### Process and Script Execution

124

125

Utilities for executing external processes and shell scripts within test environments.

126

127

```java { .api }

128

public class TestProcessBuilder {

129

// Process builder for test execution

130

}

131

132

public class ShellScript {

133

// Shell script execution utilities

134

}

135

```

136

137

### Pipeline Executor Service

138

139

Service loader for MiniCluster pipeline executor, enabling custom execution strategies for test environments.

140

141

```java { .api }

142

public class MiniClusterPipelineExecutorServiceLoader {

143

// Service loader for pipeline executors

144

}

145

```

146

147

## Execution Modes

148

149

### Test Execution Modes

150

151

Enumeration of available execution modes for parameterized testing with different cluster configurations.

152

153

```java { .api }

154

public enum TestExecutionMode {

155

CLUSTER, // Standard cluster execution

156

CLUSTER_OBJECT_REUSE // Cluster execution with object reuse optimization

157

}

158

```

159

160

### Collection Execution

161

162

Support for collection-based execution mode that runs jobs in-memory without cluster deployment, useful for unit testing individual operations.

163

164

```java { .api }

165

public boolean isCollectionExecution();

166

```

167

168

## Configuration and Utilities

169

170

### Test Utilities

171

172

General utility functions for test environment setup and management.

173

174

```java { .api }

175

public class TestUtils {

176

// Miscellaneous test utility functions

177

}

178

```

179

180

### File Utilities

181

182

File manipulation utilities specifically designed for test environments including temporary file management and path resolution.

183

184

```java { .api }

185

public class FileUtils {

186

// File manipulation utilities for tests

187

}

188

```

189

190

### Success Exception

191

192

Special exception type indicating successful test completion, used in specific testing scenarios where exceptions signal success rather than failure.

193

194

```java { .api }

195

public class SuccessException extends Exception {

196

// Exception indicating test success

197

}

198

```

199

200

## Usage Patterns

201

202

### Basic Streaming Test

203

204

```java

205

@Test

206

void testStreamingApplication() throws Exception {

207

TestStreamEnvironment env = new TestStreamEnvironment(miniCluster, 2);

208

209

DataStream<String> input = env.fromElements("a", "b", "c");

210

DataStream<String> result = input.map(String::toUpperCase);

211

212

result.print();

213

JobExecutionResult jobResult = env.execute("Upper Case Job");

214

215

// Verify execution was successful

216

assertNotNull(jobResult);

217

assertTrue(jobResult.getNetRuntime() > 0);

218

}

219

```

220

221

### Parameterized Execution Mode Testing

222

223

```java

224

@ParameterizedTest

225

@EnumSource(TestExecutionMode.class)

226

void testWithDifferentModes(TestExecutionMode mode) throws Exception {

227

// Configure environment based on execution mode

228

switch (mode) {

229

case CLUSTER:

230

// Standard cluster configuration

231

break;

232

case CLUSTER_OBJECT_REUSE:

233

// Object reuse optimization configuration

234

break;

235

}

236

237

// Run test with specified mode

238

}

239

```

240

241

### Context Management

242

243

```java

244

@BeforeEach

245

void setupTestContext() {

246

TestStreamEnvironment.setAsContext(miniCluster, 4);

247

}

248

249

@AfterEach

250

void cleanupTestContext() {

251

TestStreamEnvironment.unsetAsContext();

252

}

253

254

@Test

255

void testWithGlobalContext() {

256

// Use factory method - will return TestStreamEnvironment

257

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

258

// Test implementation

259

}

260

```