or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cli.mdcluster-management.mdexecution-context.mdexecution-environments.mdindex.mdprogram-management.md

execution-context.mddocs/

0

# Execution Context

1

2

Execution environments that provide programmatic APIs for submitting and managing Flink jobs within applications.

3

4

## Capabilities

5

6

### ContextEnvironment

7

8

Execution environment for remote cluster execution that provides programmatic job submission with full context awareness.

9

10

```java { .api }

11

/**

12

* Execution environment for remote cluster execution with context awareness

13

*/

14

public class ContextEnvironment extends ExecutionEnvironment {

15

/**

16

* Creates a context environment for remote execution

17

* @param remoteConnection Client connection to the remote cluster

18

* @param jarFiles List of JAR file URLs to include

19

* @param classpaths List of additional classpath URLs

20

* @param userCodeClassLoader Class loader for user code

21

* @param savepointSettings Settings for savepoint restoration

22

*/

23

public ContextEnvironment(

24

ClusterClient remoteConnection,

25

List<URL> jarFiles,

26

List<URL> classpaths,

27

ClassLoader userCodeClassLoader,

28

SavepointRestoreSettings savepointSettings

29

);

30

31

/**

32

* Executes the job with the specified name

33

* @param jobName Name for the job execution

34

* @return JobExecutionResult with execution details and results

35

* @throws Exception if execution fails

36

*/

37

public JobExecutionResult execute(String jobName) throws Exception;

38

39

/**

40

* Gets the execution plan without executing the job

41

* @return String representation of the execution plan

42

* @throws Exception if plan generation fails

43

*/

44

public String getExecutionPlan() throws Exception;

45

46

/**

47

* Starts a new session for job execution

48

* @throws Exception if session creation fails

49

*/

50

public void startNewSession() throws Exception;

51

52

/**

53

* Gets the cluster client used by this environment

54

* @return ClusterClient instance for cluster communication

55

*/

56

public ClusterClient getClient();

57

58

/**

59

* Gets the list of JAR files associated with this environment

60

* @return List of JAR file URLs

61

*/

62

public List<URL> getJars();

63

64

/**

65

* Gets the list of additional classpaths

66

* @return List of classpath URLs

67

*/

68

public List<URL> getClasspaths();

69

70

/**

71

* Gets the user code class loader

72

* @return ClassLoader for user code execution

73

*/

74

public ClassLoader getUserCodeClassLoader();

75

76

/**

77

* Gets the savepoint restore settings

78

* @return SavepointRestoreSettings for job restoration

79

*/

80

public SavepointRestoreSettings getSavepointRestoreSettings();

81

82

/**

83

* Sets the context environment factory as the active factory

84

* @param factory ContextEnvironmentFactory to use for creating environments

85

*/

86

public static void setAsContext(ContextEnvironmentFactory factory);

87

88

/**

89

* Removes the current context environment factory

90

*/

91

public static void unsetContext();

92

}

93

```

94

95

### DetachedEnvironment

96

97

Execution environment for detached (non-blocking) job execution that returns immediately after job submission.

98

99

```java { .api }

100

/**

101

* Execution environment for detached (non-blocking) execution

102

*/

103

public class DetachedEnvironment extends ContextEnvironment {

104

/**

105

* Creates a detached environment for non-blocking remote execution

106

* @param remoteConnection Client connection to the remote cluster

107

* @param jarFiles List of JAR file URLs to include

108

* @param classpaths List of additional classpath URLs

109

* @param userCodeClassLoader Class loader for user code

110

* @param savepointSettings Settings for savepoint restoration

111

*/

112

public DetachedEnvironment(

113

ClusterClient remoteConnection,

114

List<URL> jarFiles,

115

List<URL> classpaths,

116

ClassLoader userCodeClassLoader,

117

SavepointRestoreSettings savepointSettings

118

);

119

120

/**

121

* Executes the job in detached mode (non-blocking)

122

* @param jobName Name for the job execution

123

* @return JobExecutionResult with job submission details (not execution results)

124

* @throws Exception if job submission fails

125

*/

126

public JobExecutionResult execute(String jobName) throws Exception;

127

128

/**

129

* Sets the detached execution plan

130

* @param plan FlinkPlan to execute in detached mode

131

*/

132

public void setDetachedPlan(FlinkPlan plan);

133

134

/**

135

* Finalizes the execution and returns submission result

136

* @return JobSubmissionResult with job ID and submission status

137

* @throws Exception if finalization fails

138

*/

139

public JobSubmissionResult finalizeExecute() throws Exception;

140

}

141

```

142

143

### ContextEnvironmentFactory

144

145

Factory for creating context-appropriate execution environments based on configuration and requirements.

146

147

```java { .api }

148

/**

149

* Factory for creating context-appropriate execution environments

150

*/

151

public class ContextEnvironmentFactory implements ExecutionEnvironmentFactory {

152

/**

153

* Creates a context environment factory with full configuration

154

* @param client Cluster client for remote communication

155

* @param jarFilesToAttach List of JAR files to attach to jobs

156

* @param classpathsToAttach List of classpaths to attach to jobs

157

* @param userCodeClassLoader Class loader for user code

158

* @param defaultParallelism Default parallelism for job execution

159

* @param isDetached Whether to create detached environments

160

* @param savepointSettings Settings for savepoint restoration

161

*/

162

public ContextEnvironmentFactory(

163

ClusterClient client,

164

List<URL> jarFilesToAttach,

165

List<URL> classpathsToAttach,

166

ClassLoader userCodeClassLoader,

167

int defaultParallelism,

168

boolean isDetached,

169

SavepointRestoreSettings savepointSettings

170

);

171

172

/**

173

* Creates an execution environment based on factory configuration

174

* @return ExecutionEnvironment instance (ContextEnvironment or DetachedEnvironment)

175

*/

176

public ExecutionEnvironment createExecutionEnvironment();

177

178

/**

179

* Gets the last environment created by this factory

180

* @return Last created ExecutionEnvironment instance

181

*/

182

public ExecutionEnvironment getLastEnvCreated();

183

}

184

```

185

186

### Preview and Optimization Environments

187

188

Specialized environments for plan preview and optimization without actual execution.

189

190

```java { .api }

191

/**

192

* Execution environment for generating plan previews without execution

193

*/

194

public class PreviewPlanEnvironment extends ExecutionEnvironment {

195

/**

196

* Generates a plan preview instead of executing

197

* @param jobName Name for the job (used in preview)

198

* @return JobExecutionResult (empty - no actual execution)

199

* @throws Exception if preview generation fails

200

*/

201

public JobExecutionResult execute(String jobName) throws Exception;

202

203

/**

204

* Gets the execution plan for preview

205

* @return String representation of the execution plan

206

* @throws Exception if plan generation fails

207

*/

208

public String getExecutionPlan() throws Exception;

209

210

/**

211

* Gets the preview plan as a formatted string

212

* @return Formatted preview of the execution plan

213

* @throws Exception if preview generation fails

214

*/

215

public String getPreviewPlan() throws Exception;

216

}

217

218

/**

219

* Execution environment for plan optimization preview

220

*/

221

public class OptimizerPlanEnvironment extends ExecutionEnvironment {

222

/**

223

* Generates an optimized plan instead of executing

224

* @param jobName Name for the job (used in optimization)

225

* @return JobExecutionResult (empty - no actual execution)

226

* @throws Exception if optimization fails

227

*/

228

public JobExecutionResult execute(String jobName) throws Exception;

229

230

/**

231

* Gets the optimized execution plan

232

* @return String representation of the optimized plan

233

* @throws Exception if optimization fails

234

*/

235

public String getExecutionPlan() throws Exception;

236

}

237

```

238

239

**Usage Examples:**

240

241

```java

242

import org.apache.flink.client.program.ContextEnvironment;

243

import org.apache.flink.client.program.DetachedEnvironment;

244

import org.apache.flink.client.program.ContextEnvironmentFactory;

245

import org.apache.flink.client.program.StandaloneClusterClient;

246

import org.apache.flink.api.java.ExecutionEnvironment;

247

import org.apache.flink.configuration.Configuration;

248

import java.net.URL;

249

import java.util.Arrays;

250

import java.util.List;

251

252

// Setup cluster connection

253

Configuration config = new Configuration();

254

StandaloneClusterClient client = new StandaloneClusterClient(config);

255

256

List<URL> jarFiles = Arrays.asList(

257

new File("my-job.jar").toURI().toURL()

258

);

259

List<URL> classpaths = Arrays.asList();

260

261

// Create context environment for synchronous execution

262

ContextEnvironment contextEnv = new ContextEnvironment(

263

client,

264

jarFiles,

265

classpaths,

266

Thread.currentThread().getContextClassLoader(),

267

SavepointRestoreSettings.none()

268

);

269

270

// Set as the active execution environment

271

ContextEnvironmentFactory factory = new ContextEnvironmentFactory(

272

client,

273

jarFiles,

274

classpaths,

275

Thread.currentThread().getContextClassLoader(),

276

4, // default parallelism

277

false, // not detached

278

SavepointRestoreSettings.none()

279

);

280

281

ContextEnvironment.setAsContext(factory);

282

283

try {

284

// Now your Flink program code can use ExecutionEnvironment.getExecutionEnvironment()

285

// and it will automatically use the context environment

286

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

287

288

// Build your Flink program

289

DataSet<String> data = env.fromElements("Hello", "World", "Flink");

290

data.print();

291

292

// Execute the job (this will use the context environment)

293

JobExecutionResult result = env.execute("My Context Job");

294

System.out.println("Job completed in: " + result.getNetRuntime() + "ms");

295

296

} finally {

297

ContextEnvironment.unsetContext();

298

client.shutdown();

299

}

300

301

// Detached execution for fire-and-forget jobs

302

DetachedEnvironment detachedEnv = new DetachedEnvironment(

303

client,

304

jarFiles,

305

classpaths,

306

Thread.currentThread().getContextClassLoader(),

307

SavepointRestoreSettings.none()

308

);

309

310

ContextEnvironmentFactory detachedFactory = new ContextEnvironmentFactory(

311

client,

312

jarFiles,

313

classpaths,

314

Thread.currentThread().getContextClassLoader(),

315

4, // default parallelism

316

true, // detached mode

317

SavepointRestoreSettings.none()

318

);

319

320

ContextEnvironment.setAsContext(detachedFactory);

321

322

try {

323

ExecutionEnvironment detachedEnvFromFactory = ExecutionEnvironment.getExecutionEnvironment();

324

325

// Build your Flink program

326

DataSet<String> data = detachedEnvFromFactory.fromElements("Detached", "Job", "Execution");

327

data.writeAsText("/path/to/output");

328

329

// This will submit the job and return immediately

330

JobExecutionResult detachedResult = detachedEnvFromFactory.execute("My Detached Job");

331

System.out.println("Job submitted with ID: " + detachedResult.getJobID());

332

333

} finally {

334

ContextEnvironment.unsetContext();

335

}

336

337

// Plan preview without execution

338

PreviewPlanEnvironment previewEnv = new PreviewPlanEnvironment();

339

previewEnv.setParallelism(4);

340

341

DataSet<String> previewData = previewEnv.fromElements("Preview", "Plan", "Generation");

342

previewData.map(s -> s.toUpperCase()).print();

343

344

// Get the execution plan without running the job

345

String executionPlan = previewEnv.getExecutionPlan();

346

System.out.println("Execution Plan Preview:");

347

System.out.println(executionPlan);

348

349

String previewPlan = previewEnv.getPreviewPlan();

350

System.out.println("Formatted Preview:");

351

System.out.println(previewPlan);

352

```

353

354

### Environment Management Interfaces

355

356

Base interfaces for execution environment management.

357

358

```java { .api }

359

/**

360

* Factory interface for creating execution environments

361

*/

362

public interface ExecutionEnvironmentFactory {

363

/**

364

* Creates an execution environment instance

365

* @return ExecutionEnvironment configured by this factory

366

*/

367

ExecutionEnvironment createExecutionEnvironment();

368

}

369

```