or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-operations.mddata-stream.mdfunctions.mdindex.mdjoining.mdkeyed-stream.mdstream-execution-environment.mdwindowing.md

stream-execution-environment.mddocs/

0

# Stream Execution Environment

1

2

The StreamExecutionEnvironment is the main entry point for creating Flink streaming applications. It provides methods for creating data sources, configuring execution parameters, and managing the job lifecycle.

3

4

## Capabilities

5

6

### Environment Creation

7

8

Factory methods for creating different types of execution environments.

9

10

```scala { .api }

11

/**

12

* Creates a local execution environment with default parallelism

13

*/

14

object StreamExecutionEnvironment {

15

def getExecutionEnvironment: StreamExecutionEnvironment

16

def createLocalEnvironment(): StreamExecutionEnvironment

17

def createLocalEnvironment(parallelism: Int): StreamExecutionEnvironment

18

def createRemoteEnvironment(host: String, port: Int, jarFiles: String*): StreamExecutionEnvironment

19

}

20

```

21

22

**Usage Examples:**

23

24

```scala

25

import org.apache.flink.streaming.api.scala._

26

27

// Get execution environment (local or cluster depending on context)

28

val env = StreamExecutionEnvironment.getExecutionEnvironment

29

30

// Force local execution

31

val localEnv = StreamExecutionEnvironment.createLocalEnvironment()

32

33

// Remote cluster execution

34

val remoteEnv = StreamExecutionEnvironment.createRemoteEnvironment("localhost", 8081, "path/to/job.jar")

35

```

36

37

### Configuration Management

38

39

Methods for configuring job execution parameters.

40

41

```scala { .api }

42

/**

43

* Configure parallelism and execution mode

44

*/

45

class StreamExecutionEnvironment {

46

def setParallelism(parallelism: Int): Unit

47

def getParallelism: Int

48

def setMaxParallelism(maxParallelism: Int): Unit

49

def getMaxParallelism: Int

50

def setRuntimeMode(executionMode: RuntimeExecutionMode): StreamExecutionEnvironment

51

def setBufferTimeout(timeoutMillis: Long): StreamExecutionEnvironment

52

def getBufferTimeout: Long

53

def disableOperatorChaining(): StreamExecutionEnvironment

54

}

55

```

56

57

### Checkpointing Configuration

58

59

Enable and configure checkpointing for fault tolerance.

60

61

```scala { .api }

62

/**

63

* Configure checkpointing for fault tolerance

64

*/

65

class StreamExecutionEnvironment {

66

def enableCheckpointing(interval: Long): StreamExecutionEnvironment

67

def enableCheckpointing(interval: Long, mode: CheckpointingMode): StreamExecutionEnvironment

68

def getCheckpointConfig: CheckpointConfig

69

}

70

```

71

72

**Usage Examples:**

73

74

```scala

75

// Enable checkpointing every 5 seconds

76

env.enableCheckpointing(5000)

77

78

// Configure checkpoint mode

79

env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE)

80

81

// Advanced checkpoint configuration

82

val checkpointConfig = env.getCheckpointConfig

83

checkpointConfig.setMinPauseBetweenCheckpoints(500)

84

checkpointConfig.setCheckpointTimeout(60000)

85

```

86

87

### Data Source Creation

88

89

Methods for creating data streams from various sources.

90

91

```scala { .api }

92

/**

93

* Create data streams from various sources

94

*/

95

class StreamExecutionEnvironment {

96

def fromCollection[T](data: Seq[T]): DataStream[T]

97

def fromElements[T](data: T*): DataStream[T]

98

def fromParallelCollection[T](data: SplittableIterator[T]): DataStream[T]

99

def generateSequence(from: Long, to: Long): DataStream[Long]

100

def readTextFile(filePath: String): DataStream[String]

101

def readFile[T](inputFormat: FileInputFormat[T], filePath: String): DataStream[T]

102

def socketTextStream(hostname: String, port: Int): DataStream[String]

103

def addSource[T](function: SourceFunction[T]): DataStream[T]

104

def fromSource[T](source: Source[T, _, _], watermarkStrategy: WatermarkStrategy[T], sourceName: String): DataStream[T]

105

}

106

```

107

108

**Usage Examples:**

109

110

```scala

111

// Create from collection

112

val dataStream = env.fromCollection(List(1, 2, 3, 4, 5))

113

114

// Create from elements

115

val elementsStream = env.fromElements("hello", "world", "flink")

116

117

// Read from file

118

val fileStream = env.readTextFile("path/to/input.txt")

119

120

// Socket stream for testing

121

val socketStream = env.socketTextStream("localhost", 9999)

122

123

// Custom source function

124

val customStream = env.addSource(new MyCustomSourceFunction())

125

```

126

127

### Job Execution

128

129

Methods for executing the streaming job.

130

131

```scala { .api }

132

/**

133

* Execute the streaming job

134

*/

135

class StreamExecutionEnvironment {

136

def execute(): JobExecutionResult

137

def execute(jobName: String): JobExecutionResult

138

def executeAsync(): JobClient

139

def executeAsync(jobName: String): JobClient

140

def getStreamGraph: StreamGraph

141

def getStreamGraph(jobName: String): StreamGraph

142

}

143

```

144

145

**Usage Examples:**

146

147

```scala

148

// Execute with default job name

149

env.execute()

150

151

// Execute with custom job name

152

env.execute("My Streaming Job")

153

154

// Async execution for non-blocking operation

155

val jobClient = env.executeAsync("Async Job")

156

```

157

158

### State Backend Configuration

159

160

Configure state backends for stateful operations.

161

162

```scala { .api }

163

/**

164

* Configure state backend

165

*/

166

class StreamExecutionEnvironment {

167

def setStateBackend(backend: StateBackend): StreamExecutionEnvironment

168

def getStateBackend: StateBackend

169

}

170

```

171

172

### Configuration Access

173

174

Access to execution configuration and cached files.

175

176

```scala { .api }

177

/**

178

* Access configuration and resources

179

*/

180

class StreamExecutionEnvironment {

181

def getConfig: ExecutionConfig

182

def getCachedFiles: Map[String, URI]

183

def registerCachedFile(filePath: String, name: String): Unit

184

def registerCachedFile(filePath: String, name: String, executable: Boolean): Unit

185

}

186

```

187

188

## Types

189

190

```scala { .api }

191

// Main environment class

192

class StreamExecutionEnvironment(javaEnv: JavaEnv)

193

194

// Execution modes and configuration

195

enum RuntimeExecutionMode {

196

STREAMING, BATCH

197

}

198

199

enum CheckpointingMode {

200

EXACTLY_ONCE, AT_LEAST_ONCE

201

}

202

203

// Job execution results

204

trait JobExecutionResult

205

trait JobClient

206

207

// Configuration classes

208

class ExecutionConfig

209

class CheckpointConfig

210

class StreamGraph

211

```