or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-io.mdcheckpointing.mddatastream-transformations.mdexecution-environment.mdindex.mdkeyed-streams-state.mdprocess-functions.mdsources-sinks.mdtime-watermarks.mdwindowing.md

execution-environment.mddocs/

0

# Stream Execution Environment

1

2

The StreamExecutionEnvironment is the main entry point for creating and configuring Apache Flink streaming applications. It provides methods to create data streams from various sources, configure runtime settings, and execute streaming jobs.

3

4

## Capabilities

5

6

### Environment Creation

7

8

Create different types of execution environments based on your deployment needs.

9

10

```java { .api }

11

/**

12

* Get the default execution environment, which is determined based on the context.

13

* In an IDE or as a regular program: creates a local environment

14

* In a cluster: creates the cluster environment

15

*/

16

static StreamExecutionEnvironment getExecutionEnvironment();

17

18

/**

19

* Create a local execution environment for testing and development

20

* @param parallelism - the parallelism for the local environment

21

*/

22

static StreamExecutionEnvironment createLocalEnvironment(int parallelism);

23

24

/**

25

* Create a local execution environment with default parallelism

26

*/

27

static StreamExecutionEnvironment createLocalEnvironment();

28

29

/**

30

* Create a remote execution environment for cluster deployment

31

* @param host - the host of the JobManager

32

* @param port - the port of the JobManager

33

* @param jarFiles - JAR files to be shipped to the cluster

34

*/

35

static StreamExecutionEnvironment createRemoteEnvironment(

36

String host,

37

int port,

38

String... jarFiles

39

);

40

```

41

42

**Usage Examples:**

43

44

```java

45

// Get default environment (most common)

46

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

47

48

// Create local environment for testing

49

StreamExecutionEnvironment localEnv = StreamExecutionEnvironment.createLocalEnvironment(4);

50

51

// Create remote environment for cluster

52

StreamExecutionEnvironment remoteEnv = StreamExecutionEnvironment.createRemoteEnvironment(

53

"jobmanager-host", 6123, "my-job.jar"

54

);

55

```

56

57

### Data Source Creation

58

59

Create data streams from various built-in sources.

60

61

```java { .api }

62

/**

63

* Create a DataStream from the given elements

64

* @param data - the elements to create the stream from

65

*/

66

<T> DataStreamSource<T> fromElements(T... data);

67

68

/**

69

* Create a DataStream from a collection

70

* @param data - the collection to create the stream from

71

*/

72

<T> DataStreamSource<T> fromCollection(Collection<T> data);

73

74

/**

75

* Create a DataStream from a collection with type information

76

* @param data - the collection to create the stream from

77

* @param typeInfo - explicit type information

78

*/

79

<T> DataStreamSource<T> fromCollection(Collection<T> data, TypeInformation<T> typeInfo);

80

81

/**

82

* Add a custom source function to create a data stream

83

* @param function - the source function

84

*/

85

<T> DataStreamSource<T> addSource(SourceFunction<T> function);

86

87

/**

88

* Add a custom source function with type information

89

* @param function - the source function

90

* @param typeInfo - explicit type information

91

*/

92

<T> DataStreamSource<T> addSource(SourceFunction<T> function, TypeInformation<T> typeInfo);

93

94

/**

95

* Read text from a socket connection

96

* @param hostname - the hostname to connect to

97

* @param port - the port to connect to

98

*/

99

DataStreamSource<String> socketTextStream(String hostname, int port);

100

101

/**

102

* Read text from a socket with custom delimiter

103

* @param hostname - the hostname to connect to

104

* @param port - the port to connect to

105

* @param delimiter - the delimiter to split records

106

*/

107

DataStreamSource<String> socketTextStream(String hostname, int port, String delimiter);

108

109

/**

110

* Read the entire text file from the file system

111

* @param filePath - the path to the file

112

*/

113

DataStreamSource<String> readTextFile(String filePath);

114

115

/**

116

* Read text file with character set

117

* @param filePath - the path to the file

118

* @param charsetName - the character set name

119

*/

120

DataStreamSource<String> readTextFile(String filePath, String charsetName);

121

122

/**

123

* Generate a sequence of numbers (deprecated)

124

* @deprecated Use fromSequence() instead

125

* @param from - start of sequence (inclusive)

126

* @param to - end of sequence (inclusive)

127

*/

128

@Deprecated

129

DataStreamSource<Long> generateSequence(long from, long to);

130

131

/**

132

* Create a sequence of numbers from 'from' to 'to'

133

* @param from - start of sequence (inclusive)

134

* @param to - end of sequence (inclusive)

135

* @return DataStream of Long values

136

*/

137

DataStreamSource<Long> fromSequence(long from, long to);

138

139

/**

140

* Create a DataStream from a modern unified Source

141

* @param source - the Source to read from

142

* @param timestampsAndWatermarks - watermark strategy for event time

143

* @param sourceName - name of the source

144

* @return DataStream from the source

145

*/

146

<T> DataStreamSource<T> fromSource(

147

Source<T, ?, ?> source,

148

WatermarkStrategy<T> timestampsAndWatermarks,

149

String sourceName

150

);

151

```

152

153

**Usage Examples:**

154

155

```java

156

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

157

158

// From elements

159

DataStream<String> elements = env.fromElements("hello", "world", "flink");

160

161

// From collection

162

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);

163

DataStream<Integer> fromList = env.fromCollection(numbers);

164

165

// Socket stream

166

DataStream<String> socketStream = env.socketTextStream("localhost", 9999);

167

168

// File stream

169

DataStream<String> fileStream = env.readTextFile("/path/to/input.txt");

170

171

// Sequence

172

DataStream<Long> sequence = env.generateSequence(1, 1000);

173

174

// Custom source

175

DataStream<MyEvent> customStream = env.addSource(new MyCustomSource());

176

```

177

178

### Job Execution

179

180

Execute streaming jobs and handle execution results.

181

182

```java { .api }

183

/**

184

* Execute the streaming job

185

* @return JobExecutionResult containing execution information

186

*/

187

JobExecutionResult execute() throws Exception;

188

189

/**

190

* Execute the streaming job with a custom name

191

* @param jobName - the name of the job

192

* @return JobExecutionResult containing execution information

193

*/

194

JobExecutionResult execute(String jobName) throws Exception;

195

196

/**

197

* Execute the streaming job asynchronously

198

* @return JobClient for managing the running job

199

*/

200

JobClient executeAsync() throws Exception;

201

202

/**

203

* Execute the streaming job asynchronously with a custom name

204

* @param jobName - the name of the job

205

* @return JobClient for managing the running job

206

*/

207

JobClient executeAsync(String jobName) throws Exception;

208

```

209

210

**Usage Examples:**

211

212

```java

213

// Execute with default name

214

JobExecutionResult result = env.execute();

215

216

// Execute with custom name

217

JobExecutionResult result = env.execute("My Streaming Job");

218

219

// Access execution results

220

System.out.println("Job ID: " + result.getJobID());

221

System.out.println("Execution Time: " + result.getNetRuntime());

222

```

223

224

### Configuration

225

226

Configure runtime settings and execution parameters.

227

228

```java { .api }

229

/**

230

* Set the parallelism for operations executed through this environment

231

* @param parallelism - the parallelism

232

*/

233

StreamExecutionEnvironment setParallelism(int parallelism);

234

235

/**

236

* Get the default parallelism

237

*/

238

int getParallelism();

239

240

/**

241

* Set the maximum parallelism

242

* @param maxParallelism - the maximum parallelism

243

*/

244

StreamExecutionEnvironment setMaxParallelism(int maxParallelism);

245

246

/**

247

* Get the maximum parallelism

248

*/

249

int getMaxParallelism();

250

251

/**

252

* Enable checkpointing for fault tolerance

253

* @param interval - checkpoint interval in milliseconds

254

*/

255

StreamExecutionEnvironment enableCheckpointing(long interval);

256

257

/**

258

* Enable checkpointing with mode

259

* @param interval - checkpoint interval in milliseconds

260

* @param mode - checkpointing mode (EXACTLY_ONCE or AT_LEAST_ONCE)

261

*/

262

StreamExecutionEnvironment enableCheckpointing(long interval, CheckpointingMode mode);

263

264

/**

265

* Get the checkpoint configuration

266

*/

267

CheckpointConfig getCheckpointConfig();

268

269

/**

270

* Set the time characteristic for the application (deprecated)

271

* @deprecated Time characteristics are deprecated. Use source-based watermark assignment instead.

272

* @param characteristic - the time characteristic (EventTime, ProcessingTime, IngestionTime)

273

*/

274

@Deprecated

275

StreamExecutionEnvironment setStreamTimeCharacteristic(TimeCharacteristic characteristic);

276

277

/**

278

* Set the runtime execution mode (batch or streaming)

279

* @param executionMode - BATCH for bounded data, STREAMING for unbounded data

280

* @return StreamExecutionEnvironment for method chaining

281

*/

282

StreamExecutionEnvironment setRuntimeMode(RuntimeExecutionMode executionMode);

283

284

/**

285

* Get the time characteristic

286

*/

287

TimeCharacteristic getStreamTimeCharacteristic();

288

289

/**

290

* Set the buffer timeout for network buffers

291

* @param timeoutMillis - timeout in milliseconds

292

*/

293

StreamExecutionEnvironment setBufferTimeout(long timeoutMillis);

294

295

/**

296

* Get the buffer timeout

297

*/

298

long getBufferTimeout();

299

```

300

301

**Usage Examples:**

302

303

```java

304

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

305

306

// Set parallelism

307

env.setParallelism(4);

308

env.setMaxParallelism(128);

309

310

// Enable checkpointing

311

env.enableCheckpointing(5000); // every 5 seconds

312

env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);

313

314

// Configure checkpointing

315

CheckpointConfig config = env.getCheckpointConfig();

316

config.setMinPauseBetweenCheckpoints(500);

317

config.setCheckpointTimeout(60000);

318

319

// Set time characteristic

320

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

321

322

// Set buffer timeout

323

env.setBufferTimeout(100);

324

```

325

326

### Execution Configuration

327

328

Access and modify execution configuration settings.

329

330

```java { .api }

331

/**

332

* Get the execution configuration

333

*/

334

ExecutionConfig getConfig();

335

```

336

337

**Usage Examples:**

338

339

```java

340

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

341

ExecutionConfig config = env.getConfig();

342

343

// Configure execution settings

344

config.setAutoWatermarkInterval(1000);

345

config.setLatencyTrackingInterval(2000);

346

config.enableObjectReuse();

347

config.setGlobalJobParameters(ParameterTool.fromArgs(args));

348

```

349

350

## Types

351

352

### Environment Types

353

354

```java { .api }

355

abstract class StreamExecutionEnvironment {

356

// Factory methods, source creation, execution, and configuration methods as above

357

}

358

359

class LocalStreamEnvironment extends StreamExecutionEnvironment {

360

// Local execution environment implementation

361

}

362

363

class RemoteStreamEnvironment extends StreamExecutionEnvironment {

364

// Remote cluster execution environment implementation

365

}

366

367

// Data source type

368

class DataStreamSource<T> extends SingleOutputStreamOperator<T> {

369

// Represents a data source in the streaming topology

370

}

371

```

372

373

### Configuration Types

374

375

```java { .api }

376

enum TimeCharacteristic {

377

ProcessingTime, // Processing time semantics

378

IngestionTime, // Ingestion time semantics

379

EventTime // Event time semantics

380

}

381

382

enum CheckpointingMode {

383

EXACTLY_ONCE, // Exactly-once processing guarantees

384

AT_LEAST_ONCE // At-least-once processing guarantees

385

}

386

387

class ExecutionConfig {

388

// Configuration for job execution

389

void setAutoWatermarkInterval(long interval);

390

void setLatencyTrackingInterval(long interval);

391

void enableObjectReuse();

392

void setGlobalJobParameters(GlobalJobParameters parameters);

393

}

394

395

class CheckpointConfig {

396

// Configuration for checkpointing

397

void setCheckpointingMode(CheckpointingMode mode);

398

void setMinPauseBetweenCheckpoints(long minPause);

399

void setCheckpointTimeout(long timeout);

400

void setMaxConcurrentCheckpoints(int maxConcurrent);

401

}

402

```

403

404

### Job Execution Results

405

406

```java { .api }

407

class JobExecutionResult {

408

// Result of job execution

409

JobID getJobID();

410

long getNetRuntime();

411

long getNetRuntime(TimeUnit desiredUnit);

412

Map<String, Object> getAllAccumulatorResults();

413

<T> T getAccumulatorResult(String accumulatorName);

414

}

415

```