or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

aggregation-grouping.mddata-input-output.mddataset-operations.mdexecution-environments.mdindex.mditeration-operations.mdjoin-cogroup-operations.mdutility-functions.md

execution-environments.mddocs/

0

# Execution Environments

1

2

Execution environments provide the central context for Flink batch program execution, offering methods to create data sources, configure execution parameters, and trigger program execution.

3

4

## Capabilities

5

6

### ExecutionEnvironment

7

8

The abstract base class for all execution environments, providing the primary entry point for Flink batch programs.

9

10

```java { .api }

11

/**

12

* Get the default execution environment (auto-detects local vs remote)

13

* @return ExecutionEnvironment instance

14

*/

15

public static ExecutionEnvironment getExecutionEnvironment();

16

17

/**

18

* Create a local execution environment

19

* @return LocalEnvironment for local execution

20

*/

21

public static LocalEnvironment createLocalEnvironment();

22

23

/**

24

* Create a local execution environment with specific parallelism

25

* @param parallelism the parallelism for the local environment

26

* @return LocalEnvironment for local execution

27

*/

28

public static LocalEnvironment createLocalEnvironment(int parallelism);

29

30

/**

31

* Create a remote execution environment

32

* @param host hostname of the JobManager

33

* @param port port of the JobManager

34

* @param jarFiles JAR files to be sent to the cluster

35

* @return RemoteEnvironment for remote execution

36

*/

37

public static RemoteEnvironment createRemoteEnvironment(String host, int port, String... jarFiles);

38

```

39

40

**Usage Examples:**

41

42

```java

43

// Auto-detect environment (local when running in IDE, remote when submitted to cluster)

44

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

45

46

// Force local execution with default parallelism

47

LocalEnvironment localEnv = ExecutionEnvironment.createLocalEnvironment();

48

49

// Local execution with specific parallelism

50

LocalEnvironment localEnvParallel = ExecutionEnvironment.createLocalEnvironment(4);

51

52

// Remote execution

53

RemoteEnvironment remoteEnv = ExecutionEnvironment.createRemoteEnvironment(

54

"localhost", 8081, "/path/to/your-job.jar");

55

```

56

57

### Data Source Creation

58

59

Methods for creating DataSets from various data sources.

60

61

```java { .api }

62

/**

63

* Create a DataSet from a Java collection

64

* @param data the collection to create the DataSet from

65

* @return DataSet containing the collection elements

66

*/

67

public <T> DataSet<T> fromCollection(Collection<T> data);

68

69

/**

70

* Create a DataSet from individual elements

71

* @param data the elements to create the DataSet from

72

* @return DataSet containing the specified elements

73

*/

74

@SafeVarargs

75

public final <T> DataSet<T> fromElements(T... data);

76

77

/**

78

* Read a text file and create a DataSet of Strings

79

* @param filePath path to the text file

80

* @return DataSet of lines from the file

81

*/

82

public DataSet<String> readTextFile(String filePath);

83

84

/**

85

* Read a text file with specific character encoding

86

* @param filePath path to the text file

87

* @param charsetName the charset name for decoding the file

88

* @return DataSet of lines from the file

89

*/

90

public DataSet<String> readTextFile(String filePath, String charsetName);

91

92

/**

93

* Read text file as StringValue objects

94

* @param filePath path to the text file

95

* @return DataSource where each element is a StringValue from the file

96

*/

97

public DataSource<StringValue> readTextFileWithValue(String filePath);

98

99

/**

100

* Read text file as StringValue objects with charset and error handling

101

* @param filePath path to the text file

102

* @param charsetName the charset name for decoding the file

103

* @param skipInvalidLines whether to skip lines that cannot be decoded

104

* @return DataSource where each element is a StringValue from the file

105

*/

106

public DataSource<StringValue> readTextFileWithValue(String filePath, String charsetName, boolean skipInvalidLines);

107

108

/**

109

* Read file containing primitive values

110

* @param filePath path to the file

111

* @param typeClass the class of the primitive type

112

* @return DataSource with elements of the primitive type

113

*/

114

public <X> DataSource<X> readFileOfPrimitives(String filePath, Class<X> typeClass);

115

116

/**

117

* Read file containing primitive values with custom delimiter

118

* @param filePath path to the file

119

* @param delimiter the delimiter separating values

120

* @param typeClass the class of the primitive type

121

* @return DataSource with elements of the primitive type

122

*/

123

public <X> DataSource<X> readFileOfPrimitives(String filePath, String delimiter, Class<X> typeClass);

124

125

/**

126

* Read a file with a custom input format

127

* @param inputFormat the input format to use

128

* @param filePath path to the file

129

* @return DataSet with elements read by the input format

130

*/

131

public <T> DataSet<T> readFile(FileInputFormat<T> inputFormat, String filePath);

132

133

/**

134

* Create a CSV reader for structured data

135

* @param filePath path to the CSV file

136

* @return CsvReader for configuration and DataSet creation

137

*/

138

public CsvReader readCsvFile(String filePath);

139

140

/**

141

* Generate a sequence of numbers

142

* @param from starting number (inclusive)

143

* @param to ending number (inclusive)

144

* @return DataSet containing the number sequence

145

*/

146

public DataSet<Long> generateSequence(long from, long to);

147

```

148

149

### Execution Control

150

151

Methods for configuring and executing Flink programs.

152

153

```java { .api }

154

/**

155

* Execute the program with a generated job name

156

* @return JobExecutionResult containing execution statistics

157

* @throws Exception if execution fails

158

*/

159

public JobExecutionResult execute() throws Exception;

160

161

/**

162

* Execute the program with a specific job name

163

* @param jobName name for the job

164

* @return JobExecutionResult containing execution statistics

165

* @throws Exception if execution fails

166

*/

167

public JobExecutionResult execute(String jobName) throws Exception;

168

169

/**

170

* Get the execution plan as JSON without executing

171

* @return JSON representation of the execution plan

172

* @throws Exception if plan generation fails

173

*/

174

public String getExecutionPlan() throws Exception;

175

176

/**

177

* Set the default parallelism for all operations

178

* @param parallelism the parallelism level

179

*/

180

public void setParallelism(int parallelism);

181

182

/**

183

* Get the current default parallelism

184

* @return the current parallelism level

185

*/

186

public int getParallelism();

187

188

/**

189

* Get the execution configuration

190

* @return ExecutionConfig for advanced configuration

191

*/

192

public ExecutionConfig getConfig();

193

```

194

195

### LocalEnvironment

196

197

Specialized execution environment for local execution in the current JVM.

198

199

```java { .api }

200

/**

201

* LocalEnvironment for local execution

202

* Inherits all methods from ExecutionEnvironment

203

* Executes programs in the current JVM process

204

*/

205

public class LocalEnvironment extends ExecutionEnvironment {

206

// Additional local-specific configuration methods available

207

}

208

```

209

210

### RemoteEnvironment

211

212

Specialized execution environment for remote execution on a Flink cluster.

213

214

```java { .api }

215

/**

216

* RemoteEnvironment for remote cluster execution

217

* Inherits all methods from ExecutionEnvironment

218

* Submits programs to a remote Flink cluster

219

*/

220

public class RemoteEnvironment extends ExecutionEnvironment {

221

// Additional remote-specific configuration methods available

222

}

223

```

224

225

### CollectionEnvironment

226

227

Specialized execution environment for collection-based execution (primarily for testing).

228

229

```java { .api }

230

/**

231

* CollectionEnvironment for collection-based execution

232

* Inherits all methods from ExecutionEnvironment

233

* Executes programs using Java collections (useful for testing)

234

*/

235

public class CollectionEnvironment extends ExecutionEnvironment {

236

// Collection-based execution methods

237

}

238

```

239

240

### ExecutionEnvironmentFactory

241

242

Factory interface for creating custom execution environments.

243

244

```java { .api }

245

/**

246

* Factory interface for creating custom ExecutionEnvironments

247

*/

248

public interface ExecutionEnvironmentFactory {

249

/**

250

* Create a custom execution environment

251

* @return ExecutionEnvironment instance

252

*/

253

ExecutionEnvironment createExecutionEnvironment();

254

}

255

```

256

257

## Types

258

259

```java { .api }

260

import org.apache.flink.api.java.ExecutionEnvironment;

261

import org.apache.flink.api.java.LocalEnvironment;

262

import org.apache.flink.api.java.RemoteEnvironment;

263

import org.apache.flink.api.java.CollectionEnvironment;

264

import org.apache.flink.api.java.ExecutionEnvironmentFactory;

265

import org.apache.flink.api.java.operators.DataSource;

266

import org.apache.flink.api.common.JobExecutionResult;

267

import org.apache.flink.configuration.Configuration;

268

import org.apache.flink.core.execution.JobClient;

269

import org.apache.flink.types.StringValue;

270

```