or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

checkpointing-state.mddatastream-operations.mdexecution-environment.mdindex.mdsources-and-sinks.mdstream-operators.mdwindowing.md

execution-environment.mddocs/

0

# Execution Environment

1

2

The execution environment provides the entry point for creating and configuring Flink streaming applications. It manages job execution, parallelism settings, and environment-specific configurations.

3

4

## StreamExecutionEnvironment

5

6

The main entry point for all streaming applications.

7

8

```java { .api }

9

public abstract class StreamExecutionEnvironment {

10

// Environment creation

11

public static StreamExecutionEnvironment getExecutionEnvironment();

12

public static LocalStreamEnvironment createLocalEnvironment();

13

public static LocalStreamEnvironment createLocalEnvironment(int parallelism);

14

public static StreamExecutionEnvironment createRemoteEnvironment(String host, int port, String... jarFiles);

15

public static StreamExecutionEnvironment createRemoteEnvironment(String host, int port, int parallelism, String... jarFiles);

16

public static void setDefaultLocalParallelism(int parallelism);

17

18

// Job execution

19

public JobExecutionResult execute() throws Exception;

20

public JobExecutionResult execute(String jobName) throws Exception;

21

22

// Configuration

23

public StreamExecutionEnvironment setParallelism(int parallelism);

24

public int getParallelism();

25

public StreamExecutionEnvironment setBufferTimeout(long timeoutMillis);

26

public long getBufferTimeout();

27

public StreamExecutionEnvironment disableOperatorChaining();

28

public void setNumberOfExecutionRetries(int numberOfExecutionRetries);

29

public int getNumberOfExecutionRetries();

30

public void setStateHandleProvider(StateHandleProvider<?> provider);

31

public StateHandleProvider<?> getStateHandleProvider();

32

33

// Checkpointing

34

public StreamExecutionEnvironment enableCheckpointing(long interval);

35

public StreamExecutionEnvironment enableCheckpointing();

36

37

// Source creation - Element and Collection sources

38

public DataStreamSource<Long> generateSequence(long from, long to);

39

public DataStreamSource<Long> generateParallelSequence(long from, long to);

40

public <OUT> DataStreamSource<OUT> fromElements(OUT... data);

41

public <OUT> DataStreamSource<OUT> fromCollection(Collection<OUT> data);

42

public <OUT> DataStreamSource<OUT> fromCollection(Collection<OUT> data, TypeInformation<OUT> typeInfo);

43

public <OUT> DataStreamSource<OUT> fromCollection(Iterator<OUT> data, Class<OUT> type);

44

public <OUT> DataStreamSource<OUT> fromCollection(Iterator<OUT> data, TypeInformation<OUT> typeInfo);

45

public <OUT> DataStreamSource<OUT> fromParallelCollection(SplittableIterator<OUT> iterator, Class<OUT> type);

46

public <OUT> DataStreamSource<OUT> fromParallelCollection(SplittableIterator<OUT> iterator, TypeInformation<OUT> typeInfo);

47

48

// Source creation - Generic sources

49

public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function);

50

public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName);

51

public <OUT> DataStreamSource<OUT> createInput(InputFormat<OUT, ?> inputFormat);

52

public <OUT> DataStreamSource<OUT> createInput(InputFormat<OUT, ?> inputFormat, TypeInformation<OUT> typeInfo);

53

54

// Built-in sources - File sources

55

public DataStreamSource<String> readTextFile(String filePath);

56

public DataStreamSource<String> readTextFile(String filePath, String charsetName);

57

public DataStreamSource<StringValue> readTextFileWithValue(String filePath);

58

public DataStreamSource<StringValue> readTextFileWithValue(String filePath, String charsetName, boolean skipInvalidLines);

59

public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat, String filePath);

60

public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat, String filePath, FileProcessingMode watchType, long interval);

61

public <OUT> DataStreamSource<OUT> readFileOfPrimitives(String filePath, Class<OUT> typeClass);

62

public <OUT> DataStreamSource<OUT> readFileOfPrimitives(String filePath, String delimiter, Class<OUT> typeClass);

63

public DataStream<String> readFileStream(String filePath, long intervalMillis, WatchType watchType);

64

65

// Built-in sources - Network sources

66

public DataStreamSource<String> socketTextStream(String hostname, int port);

67

public DataStreamSource<String> socketTextStream(String hostname, int port, char delimiter);

68

public DataStreamSource<String> socketTextStream(String hostname, int port, char delimiter, long maxRetry);

69

70

// Type system and configuration

71

public ExecutionConfig getConfig();

72

public void addDefaultKryoSerializer(Class<?> type, Serializer<?> serializer);

73

public void addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass);

74

public void registerType(Class<?> type);

75

public void registerTypeWithKryoSerializer(Class<?> type, Serializer<?> serializer);

76

public void registerTypeWithKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass);

77

78

// Utility methods

79

public <F> F clean(F f);

80

public StreamGraph getStreamGraph();

81

public String getExecutionPlan();

82

}

83

```

84

85

## LocalStreamEnvironment

86

87

Environment for local execution, primarily used for development and testing.

88

89

```java { .api }

90

public class LocalStreamEnvironment extends StreamExecutionEnvironment {

91

public LocalStreamEnvironment();

92

public LocalStreamEnvironment(Configuration configuration);

93

94

@Override

95

public JobExecutionResult execute() throws Exception;

96

97

@Override

98

public JobExecutionResult execute(String jobName) throws Exception;

99

}

100

```

101

102

## RemoteStreamEnvironment

103

104

Environment for executing streaming jobs on a remote Flink cluster.

105

106

```java { .api }

107

public class RemoteStreamEnvironment extends StreamExecutionEnvironment {

108

public RemoteStreamEnvironment(String host, int port, String... jarFiles);

109

public RemoteStreamEnvironment(String host, int port, int parallelism, String... jarFiles);

110

111

@Override

112

public JobExecutionResult execute() throws Exception;

113

114

@Override

115

public JobExecutionResult execute(String jobName) throws Exception;

116

}

117

```

118

119

## Usage Examples

120

121

### Basic Environment Setup

122

123

```java

124

// Get default environment (local or remote based on context)

125

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

126

127

// Set parallelism

128

env.setParallelism(4);

129

130

// Set network buffer timeout

131

env.setBufferTimeout(100);

132

133

// Disable operator chaining for debugging

134

env.disableOperatorChaining();

135

```

136

137

### Local Environment

138

139

```java

140

// Create local environment with default parallelism

141

LocalStreamEnvironment localEnv = StreamExecutionEnvironment.createLocalEnvironment();

142

143

// Create local environment with specific parallelism

144

LocalStreamEnvironment localEnv2 = StreamExecutionEnvironment.createLocalEnvironment(2);

145

```

146

147

### Remote Environment

148

149

```java

150

// Connect to remote cluster

151

RemoteStreamEnvironment remoteEnv = StreamExecutionEnvironment

152

.createRemoteEnvironment("localhost", 6123, "/path/to/job.jar");

153

154

// With specific parallelism

155

RemoteStreamEnvironment remoteEnv2 = StreamExecutionEnvironment

156

.createRemoteEnvironment("localhost", 6123, 4, "/path/to/job.jar");

157

```

158

159

### Creating Sources

160

161

```java

162

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

163

164

// From elements

165

DataStreamSource<String> stream1 = env.fromElements("hello", "world");

166

167

// From collection

168

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);

169

DataStreamSource<Integer> stream2 = env.fromCollection(numbers);

170

171

// From file

172

DataStreamSource<String> stream3 = env.readTextFile("/path/to/input.txt");

173

174

// From socket

175

DataStreamSource<String> stream4 = env.socketTextStream("localhost", 9999);

176

177

// Custom source

178

DataStreamSource<String> stream5 = env.addSource(new MyCustomSource());

179

```

180

181

### Job Execution

182

183

```java

184

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

185

186

// Create your data stream pipeline

187

DataStream<String> processed = env

188

.socketTextStream("localhost", 9999)

189

.map(String::toUpperCase)

190

.filter(s -> s.length() > 5);

191

192

processed.print();

193

194

// Execute the job

195

JobExecutionResult result = env.execute("My Streaming Job");

196

197

// Access execution results

198

System.out.println("Job execution time: " + result.getNetRuntime());

199

```

200

201

## Types

202

203

```java { .api }

204

public class JobExecutionResult {

205

public long getNetRuntime();

206

public long getNetRuntime(TimeUnit desiredUnit);

207

public Map<String, Object> getAllAccumulatorResults();

208

public <T> T getAccumulatorResult(String name);

209

}

210

211

public interface StateHandleProvider<T extends StateHandle> extends Serializable {

212

T createStateHandle(Serializable state) throws Exception;

213

}

214

215

public enum FileProcessingMode {

216

PROCESS_ONCE,

217

PROCESS_CONTINUOUSLY

218

}

219

220

public enum WatchType {

221

PROCESS_ONCE,

222

REPROCESS_WITH_APPENDED

223

}

224

225

public class StreamGraph {

226

// Internal representation of the streaming dataflow graph

227

}

228

229

public class StringValue implements Value {

230

// Flink's StringValue type for efficient string handling

231

public StringValue();

232

public StringValue(String value);

233

public String getValue();

234

public void setValue(String value);

235

}

236

```