or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cli.mdcluster-management.mdexecution-context.mdexecution-environments.mdindex.mdprogram-management.md

execution-environments.mddocs/

0

# Execution Environments

1

2

Executors for running Flink programs both locally for development and testing, and remotely on production clusters.

3

4

## Capabilities

5

6

### LocalExecutor

7

8

Executor for running Flink programs locally in the same JVM, ideal for development, testing, and debugging.

9

10

```java { .api }

11

/**

12

* Executor for running Flink programs locally in the same JVM

13

*/

14

public class LocalExecutor extends PlanExecutor {

15

/**

16

* Creates a local executor with default configuration

17

*/

18

public LocalExecutor();

19

20

/**

21

* Creates a local executor with specific configuration

22

* @param conf Configuration for local execution environment

23

*/

24

public LocalExecutor(Configuration conf);

25

26

/**

27

* Starts the local execution environment

28

* @throws Exception if startup fails

29

*/

30

public void start() throws Exception;

31

32

/**

33

* Stops the local execution environment and releases resources

34

* @throws Exception if shutdown fails

35

*/

36

public void stop() throws Exception;

37

38

/**

39

* Checks if the local executor is currently running

40

* @return true if the executor is running

41

*/

42

public boolean isRunning();

43

44

/**

45

* Executes a Flink execution plan locally

46

* @param plan The execution plan to run

47

* @return JobExecutionResult with execution details and results

48

* @throws Exception if execution fails

49

*/

50

public JobExecutionResult executePlan(Plan plan) throws Exception;

51

52

/**

53

* Gets the optimizer plan as JSON for visualization

54

* @param plan The execution plan to optimize

55

* @return JSON string representation of the optimized plan

56

* @throws Exception if plan optimization fails

57

*/

58

public String getOptimizerPlanAsJSON(Plan plan) throws Exception;

59

60

/**

61

* Ends a specific job session

62

* @param jobID The job ID to end

63

* @throws Exception if session termination fails

64

*/

65

public void endSession(JobID jobID) throws Exception;

66

67

/**

68

* Checks if files should be overwritten by default

69

* @return true if default overwrite is enabled

70

*/

71

public boolean isDefaultOverwriteFiles();

72

73

/**

74

* Sets whether files should be overwritten by default

75

* @param defaultOverwriteFiles true to enable default overwrite

76

*/

77

public void setDefaultOverwriteFiles(boolean defaultOverwriteFiles);

78

79

/**

80

* Gets the number of task slots per TaskManager

81

* @return Number of task slots

82

*/

83

public int getTaskManagerNumSlots();

84

85

/**

86

* Sets the number of task slots per TaskManager

87

* @param taskManagerNumSlots Number of task slots to configure

88

*/

89

public void setTaskManagerNumSlots(int taskManagerNumSlots);

90

91

/**

92

* Executes a program locally (static convenience method)

93

* @param program The program to execute

94

* @param args Command line arguments

95

* @return JobExecutionResult with execution details and results

96

* @throws Exception if execution fails

97

*/

98

public static JobExecutionResult execute(Program program, String... args) throws Exception;

99

100

/**

101

* Executes an execution plan locally (static convenience method)

102

* @param plan The execution plan to run

103

* @return JobExecutionResult with execution details and results

104

* @throws Exception if execution fails

105

*/

106

public static JobExecutionResult execute(Plan plan) throws Exception;

107

108

/**

109

* Gets the optimizer plan as JSON (static convenience method)

110

* @param plan The execution plan to optimize

111

* @return JSON string representation of the optimized plan

112

* @throws Exception if plan optimization fails

113

*/

114

public static String optimizerPlanAsJSON(Plan plan) throws Exception;

115

116

/**

117

* Gets the execution plan as JSON (static convenience method)

118

* @param plan The execution plan to convert

119

* @return JSON string representation of the plan

120

* @throws Exception if plan conversion fails

121

*/

122

public static String getPlanAsJSON(Plan plan) throws Exception;

123

}

124

```

125

126

### RemoteExecutor

127

128

Executor for running Flink programs on remote clusters, supporting various connection modes and configurations.

129

130

```java { .api }

131

/**

132

* Executor for running Flink programs on remote clusters

133

*/

134

public class RemoteExecutor extends PlanExecutor {

135

/**

136

* Creates a remote executor for a specific hostname and port

137

* @param hostname Hostname of the Flink cluster JobManager

138

* @param port Port of the Flink cluster JobManager

139

*/

140

public RemoteExecutor(String hostname, int port);

141

142

/**

143

* Creates a remote executor with a JAR file dependency

144

* @param hostname Hostname of the Flink cluster JobManager

145

* @param port Port of the Flink cluster JobManager

146

* @param jarFile JAR file URL to include in execution

147

*/

148

public RemoteExecutor(String hostname, int port, URL jarFile);

149

150

/**

151

* Creates a remote executor with hostname:port format

152

* @param hostport Hostname and port in "hostname:port" format

153

* @param jarFile JAR file URL to include in execution

154

*/

155

public RemoteExecutor(String hostport, URL jarFile);

156

157

/**

158

* Creates a remote executor with multiple JAR files

159

* @param hostname Hostname of the Flink cluster JobManager

160

* @param port Port of the Flink cluster JobManager

161

* @param jarFiles List of JAR file URLs to include in execution

162

*/

163

public RemoteExecutor(String hostname, int port, List<URL> jarFiles);

164

165

/**

166

* Creates a remote executor with custom configuration

167

* @param hostname Hostname of the Flink cluster JobManager

168

* @param port Port of the Flink cluster JobManager

169

* @param clientConfiguration Configuration for the client connection

170

*/

171

public RemoteExecutor(String hostname, int port, Configuration clientConfiguration);

172

173

/**

174

* Creates a remote executor with full configuration options

175

* @param inet Socket address of the JobManager

176

* @param clientConfiguration Configuration for the client connection

177

* @param jarFiles List of JAR file URLs to include in execution

178

* @param globalClasspaths List of global classpath URLs

179

*/

180

public RemoteExecutor(InetSocketAddress inet, Configuration clientConfiguration, List<URL> jarFiles, List<URL> globalClasspaths);

181

182

/**

183

* Starts the remote executor connection

184

* @throws Exception if connection startup fails

185

*/

186

public void start() throws Exception;

187

188

/**

189

* Stops the remote executor and closes connections

190

* @throws Exception if shutdown fails

191

*/

192

public void stop() throws Exception;

193

194

/**

195

* Checks if the remote executor is currently running

196

* @return true if the executor is running and connected

197

*/

198

public boolean isRunning();

199

200

/**

201

* Executes a Flink execution plan on the remote cluster

202

* @param plan The execution plan to run

203

* @return JobExecutionResult with execution details and results

204

* @throws Exception if execution fails

205

*/

206

public JobExecutionResult executePlan(Plan plan) throws Exception;

207

208

/**

209

* Executes a job with JARs on the remote cluster

210

* @param program The job with associated JAR files

211

* @return JobExecutionResult with execution details and results

212

* @throws Exception if execution fails

213

*/

214

public JobExecutionResult executePlanWithJars(JobWithJars program) throws Exception;

215

216

/**

217

* Gets the optimizer plan as JSON for visualization

218

* @param plan The execution plan to optimize

219

* @return JSON string representation of the optimized plan

220

* @throws Exception if plan optimization fails

221

*/

222

public String getOptimizerPlanAsJSON(Plan plan) throws Exception;

223

224

/**

225

* Ends a specific job session on the remote cluster

226

* @param jobID The job ID to end

227

* @throws Exception if session termination fails

228

*/

229

public void endSession(JobID jobID) throws Exception;

230

231

/**

232

* Sets the default parallelism for job execution

233

* @param defaultParallelism Default parallelism level

234

*/

235

public void setDefaultParallelism(int defaultParallelism);

236

237

/**

238

* Gets the default parallelism for job execution

239

* @return Default parallelism level

240

*/

241

public int getDefaultParallelism();

242

}

243

```

244

245

### Client Utilities

246

247

Utility methods for client operations and connection management.

248

249

```java { .api }

250

/**

251

* Common utility methods for client operations

252

*/

253

public class ClientUtils {

254

/**

255

* Parses a hostname:port string into an InetSocketAddress

256

* @param hostport String in "hostname:port" format

257

* @return InetSocketAddress parsed from the input string

258

* @throws IllegalArgumentException if the format is invalid

259

*/

260

public static InetSocketAddress parseHostPortAddress(String hostport);

261

}

262

```

263

264

**Usage Examples:**

265

266

```java

267

import org.apache.flink.client.LocalExecutor;

268

import org.apache.flink.client.RemoteExecutor;

269

import org.apache.flink.client.ClientUtils;

270

import org.apache.flink.api.common.Plan;

271

import org.apache.flink.configuration.Configuration;

272

import java.net.InetSocketAddress;

273

import java.net.URL;

274

import java.util.Arrays;

275

import java.util.List;

276

277

// Local execution for development/testing

278

LocalExecutor localExecutor = new LocalExecutor();

279

localExecutor.setTaskManagerNumSlots(4);

280

localExecutor.setDefaultOverwriteFiles(true);

281

282

try {

283

localExecutor.start();

284

285

// Execute a Flink plan locally

286

Plan myPlan = createMyFlinkPlan(); // Your plan creation logic

287

JobExecutionResult result = localExecutor.executePlan(myPlan);

288

289

System.out.println("Local execution completed in: " + result.getNetRuntime() + "ms");

290

291

// Get execution plan as JSON for visualization

292

String planJson = localExecutor.getOptimizerPlanAsJSON(myPlan);

293

System.out.println("Execution plan: " + planJson);

294

295

} finally {

296

localExecutor.stop();

297

}

298

299

// Remote execution on a cluster

300

String jobManagerHost = "flink-cluster.example.com";

301

int jobManagerPort = 6123;

302

303

// Basic remote executor

304

RemoteExecutor remoteExecutor = new RemoteExecutor(jobManagerHost, jobManagerPort);

305

306

// Remote executor with JAR dependencies

307

List<URL> jarFiles = Arrays.asList(

308

new File("my-job.jar").toURI().toURL(),

309

new File("dependencies.jar").toURI().toURL()

310

);

311

RemoteExecutor remoteExecutorWithJars = new RemoteExecutor(jobManagerHost, jobManagerPort, jarFiles);

312

313

// Remote executor with full configuration

314

Configuration config = new Configuration();

315

config.setString("jobmanager.rpc.address", jobManagerHost);

316

config.setInteger("jobmanager.rpc.port", jobManagerPort);

317

318

InetSocketAddress jobManagerAddress = ClientUtils.parseHostPortAddress("flink-cluster.example.com:6123");

319

RemoteExecutor configuredRemoteExecutor = new RemoteExecutor(

320

jobManagerAddress,

321

config,

322

jarFiles,

323

Arrays.asList() // global classpaths

324

);

325

326

try {

327

remoteExecutor.start();

328

remoteExecutor.setDefaultParallelism(8);

329

330

// Execute a plan on the remote cluster

331

JobExecutionResult remoteResult = remoteExecutor.executePlan(myPlan);

332

System.out.println("Remote execution completed in: " + remoteResult.getNetRuntime() + "ms");

333

334

} finally {

335

remoteExecutor.stop();

336

}

337

338

// Static convenience methods for quick execution

339

JobExecutionResult quickLocalResult = LocalExecutor.execute(myPlan);

340

String quickPlanJson = LocalExecutor.getPlanAsJSON(myPlan);

341

```

342

343

### PlanExecutor Base Class

344

345

Base class for all plan executors providing common execution interface.

346

347

```java { .api }

348

/**

349

* Base class for all plan executors

350

*/

351

public abstract class PlanExecutor {

352

/**

353

* Starts the executor

354

* @throws Exception if startup fails

355

*/

356

public abstract void start() throws Exception;

357

358

/**

359

* Stops the executor

360

* @throws Exception if shutdown fails

361

*/

362

public abstract void stop() throws Exception;

363

364

/**

365

* Checks if the executor is running

366

* @return true if the executor is active

367

*/

368

public abstract boolean isRunning();

369

370

/**

371

* Executes a Flink execution plan

372

* @param plan The execution plan to run

373

* @return JobExecutionResult with execution details

374

* @throws Exception if execution fails

375

*/

376

public abstract JobExecutionResult executePlan(Plan plan) throws Exception;

377

378

/**

379

* Gets the optimized execution plan as JSON

380

* @param plan The execution plan to optimize

381

* @return JSON representation of the optimized plan

382

* @throws Exception if optimization fails

383

*/

384

public abstract String getOptimizerPlanAsJSON(Plan plan) throws Exception;

385

386

/**

387

* Ends a job session

388

* @param jobID The job ID to terminate

389

* @throws Exception if session termination fails

390

*/

391

public abstract void endSession(JobID jobID) throws Exception;

392

}

393

```