or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdsecurity-testing.mdtest-base-classes.mdtest-data.mdtest-environments.mdtest-utilities.md

test-environments.mddocs/

0

# Test Environments

1

2

Test environments provide specialized execution environments for running Flink jobs in test contexts. They execute jobs on LocalFlinkMiniCluster instances and can be configured as global execution contexts.

3

4

## Batch Testing Environment

5

6

### TestEnvironment

7

8

ExecutionEnvironment implementation that executes jobs on LocalFlinkMiniCluster for batch processing tests.

9

10

```java { .api }

11

public class TestEnvironment extends ExecutionEnvironment {

12

public TestEnvironment(LocalFlinkMiniCluster miniCluster, int parallelism, boolean isObjectReuseEnabled,

13

Collection<Path> jarFiles, Collection<URL> classPaths);

14

public TestEnvironment(LocalFlinkMiniCluster executor, int parallelism, boolean isObjectReuseEnabled);

15

16

public JobExecutionResult getLastJobExecutionResult();

17

public void startNewSession() throws Exception;

18

public JobExecutionResult execute(String jobName) throws Exception;

19

public String getExecutionPlan() throws Exception;

20

21

public void setAsContext();

22

public static void setAsContext(LocalFlinkMiniCluster miniCluster, int parallelism,

23

Collection<Path> jarFiles, Collection<URL> classPaths);

24

public static void setAsContext(LocalFlinkMiniCluster miniCluster, int parallelism);

25

public static void unsetAsContext();

26

}

27

```

28

29

**Usage Example:**

30

31

```java

32

// Create and configure test environment

33

LocalFlinkMiniCluster cluster = TestBaseUtils.startCluster(1, 4, false, false, true);

34

TestEnvironment testEnv = new TestEnvironment(cluster, 4, false);

35

36

// Set as global context

37

testEnv.setAsContext();

38

39

// Now ExecutionEnvironment.getExecutionEnvironment() returns this test environment

40

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

41

DataSet<String> result = env.fromElements("test").map(s -> s.toUpperCase());

42

JobExecutionResult jobResult = env.execute("test job");

43

44

// Clean up

45

TestEnvironment.unsetAsContext();

46

```

47

48

### CollectionTestEnvironment

49

50

Collection-based test environment for lightweight local testing without a cluster.

51

52

```java { .api }

53

public class CollectionTestEnvironment extends CollectionEnvironment {

54

public JobExecutionResult getLastJobExecutionResult();

55

public JobExecutionResult execute() throws Exception;

56

public JobExecutionResult execute(String jobName) throws Exception;

57

58

protected void setAsContext();

59

protected static void unsetAsContext();

60

}

61

```

62

63

**Usage Example:**

64

65

```java

66

// Use collection environment for simple tests

67

CollectionTestEnvironment collectionEnv = new CollectionTestEnvironment();

68

collectionEnv.setAsContext();

69

70

// Jobs will execute locally using Java collections

71

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

72

DataSet<Integer> numbers = env.fromElements(1, 2, 3, 4, 5);

73

List<Integer> result = numbers.map(x -> x * 2).collect();

74

75

CollectionTestEnvironment.unsetAsContext();

76

```

77

78

## Streaming Testing Environment

79

80

### TestStreamEnvironment

81

82

StreamExecutionEnvironment implementation that executes streaming jobs on LocalFlinkMiniCluster.

83

84

```java { .api }

85

public class TestStreamEnvironment extends StreamExecutionEnvironment {

86

public TestStreamEnvironment(LocalFlinkMiniCluster miniCluster, int parallelism,

87

Collection<Path> jarFiles, Collection<URL> classPaths);

88

public TestStreamEnvironment(LocalFlinkMiniCluster miniCluster, int parallelism);

89

90

public JobExecutionResult execute(String jobName) throws Exception;

91

92

public static void setAsContext(LocalFlinkMiniCluster cluster, int parallelism,

93

Collection<Path> jarFiles, Collection<URL> classpaths);

94

public static void setAsContext(LocalFlinkMiniCluster cluster, int parallelism);

95

public static void unsetAsContext();

96

}

97

```

98

99

**Usage Example:**

100

101

```java

102

// Create streaming test environment

103

LocalFlinkMiniCluster cluster = TestBaseUtils.startCluster(1, 4, false, false, true);

104

TestStreamEnvironment.setAsContext(cluster, 4);

105

106

// Now StreamExecutionEnvironment.getExecutionEnvironment() returns test environment

107

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

108

DataStream<String> stream = env.fromElements("hello", "world")

109

.map(s -> s.toUpperCase());

110

111

stream.print();

112

JobExecutionResult result = env.execute("streaming test");

113

114

// Clean up

115

TestStreamEnvironment.unsetAsContext();

116

TestBaseUtils.stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);

117

```

118

119

## Environment Configuration

120

121

All test environments support configuration of:

122

123

- **Parallelism**: Number of parallel slots for job execution

124

- **Object Reuse**: Whether to reuse objects to reduce garbage collection (batch only)

125

- **JAR Files**: Additional JAR files to include in the classpath

126

- **Class Paths**: Additional URLs to include in the classpath

127

- **Cluster Configuration**: LocalFlinkMiniCluster settings

128

129

## Context Management

130

131

Test environments can be set as global contexts using static methods:

132

133

- `setAsContext()` - Makes the test environment the default for `ExecutionEnvironment.getExecutionEnvironment()`

134

- `unsetAsContext()` - Restores the previous execution environment

135

136

This allows existing code that calls `ExecutionEnvironment.getExecutionEnvironment()` to automatically use the test environment without modification.