or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

environment.mdfunctions.mdindex.mdjoins.mdsources-sinks.mdtransformations.md

environment.mddocs/

0

# Environment and Execution

1

2

The Environment provides the execution context for Flink Python programs, managing job configuration, data source creation, and program execution. It serves as the main entry point for all Flink applications.

3

4

## Capabilities

5

6

### Environment Creation

7

8

Creates the execution environment that represents the context in which the program is executed.

9

10

```python { .api }

11

def get_environment():

12

"""

13

Creates an execution environment that represents the context in which the program is currently executed.

14

15

Returns:

16

Environment: The execution environment instance

17

"""

18

```

19

20

### Data Source Operations

21

22

#### CSV File Reading

23

24

Creates DataSet from CSV files with configurable delimiters and type specifications.

25

26

```python { .api }

27

def read_csv(self, path, types, line_delimiter="\n", field_delimiter=','):

28

"""

29

Create a DataSet that represents the tuples produced by reading the given CSV file.

30

31

Parameters:

32

path (str): The path of the CSV file

33

types (list): Specifies the types for the CSV fields

34

line_delimiter (str): Line delimiter, default "\n"

35

field_delimiter (str): Field delimiter, default ","

36

37

Returns:

38

DataSet: A DataSet representing the CSV data

39

"""

40

```

41

42

#### Text File Reading

43

44

Creates DataSet by reading text files line by line.

45

46

```python { .api }

47

def read_text(self, path):

48

"""

49

Creates a DataSet that represents the Strings produced by reading the given file line wise.

50

51

Parameters:

52

path (str): The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path")

53

54

Returns:

55

DataSet: A DataSet representing the data read from the given file as text lines

56

"""

57

```

58

59

#### Collection Sources

60

61

Creates DataSet from in-memory collections of elements.

62

63

```python { .api }

64

def from_elements(self, *elements):

65

"""

66

Creates a new data set that contains the given elements.

67

68

The elements must all be of the same type, for example, all of the String or Integer.

69

The sequence of elements must not be empty.

70

71

Parameters:

72

*elements: The elements to make up the data set

73

74

Returns:

75

DataSet: A DataSet representing the given list of elements

76

"""

77

```

78

79

#### Sequence Generation

80

81

Creates DataSet containing sequences of numbers.

82

83

```python { .api }

84

def generate_sequence(self, frm, to):

85

"""

86

Creates a new data set that contains the given sequence of numbers.

87

88

Parameters:

89

frm (int): The start number for the sequence

90

to (int): The end number for the sequence

91

92

Returns:

93

DataSet: A DataSet representing the given sequence of numbers

94

"""

95

```

96

97

### Execution Configuration

98

99

#### Parallelism Control

100

101

Controls the degree of parallelism for operations.

102

103

```python { .api }

104

def set_parallelism(self, parallelism):

105

"""

106

Sets the parallelism for operations executed through this environment.

107

108

Setting a DOP of x here will cause all operators to run with x parallel instances.

109

110

Parameters:

111

parallelism (int): The degree of parallelism

112

"""

113

114

def get_parallelism(self):

115

"""

116

Gets the parallelism with which operation are executed by default.

117

118

Returns:

119

int: The parallelism used by operations

120

"""

121

```

122

123

#### Retry Configuration

124

125

Configures execution retry behavior on failures.

126

127

```python { .api }

128

def set_number_of_execution_retries(self, count):

129

"""

130

Sets the number of execution retries on failure.

131

132

Parameters:

133

count (int): Number of retries

134

"""

135

136

def get_number_of_execution_retries(self):

137

"""

138

Gets the number of execution retries.

139

140

Returns:

141

int: Current retry count

142

"""

143

```

144

145

#### Custom Type Registration

146

147

Registers custom types with serialization support.

148

149

```python { .api }

150

def register_type(self, type, serializer, deserializer):

151

"""

152

Registers the given type with this environment, allowing all operators within to

153

(de-)serialize objects of the given type.

154

155

Parameters:

156

type (class): Class of the objects to be (de-)serialized

157

serializer: Instance of the serializer

158

deserializer: Instance of the deserializer

159

"""

160

```

161

162

### Program Execution

163

164

Triggers the execution of the complete Flink program.

165

166

```python { .api }

167

def execute(self, local=False):

168

"""

169

Triggers the program execution.

170

171

The environment will execute all parts of the program that have resulted in a "sink" operation.

172

173

Parameters:

174

local (bool): Whether to execute in local mode

175

176

Returns:

177

JobExecutionResult: Execution result with runtime information

178

"""

179

```

180

181

### Job Execution Results

182

183

```python { .api }

184

class JobExecutionResult:

185

def get_net_runtime(self):

186

"""

187

Gets the net runtime of the executed job.

188

189

Returns:

190

int: Runtime in milliseconds

191

"""

192

```

193

194

## Usage Examples

195

196

### Basic Environment Setup

197

198

```python

199

from flink.plan.Environment import get_environment

200

201

# Create execution environment

202

env = get_environment()

203

204

# Configure parallelism

205

env.set_parallelism(4)

206

207

# Configure retries

208

env.set_number_of_execution_retries(3)

209

```

210

211

### Reading Different Data Sources

212

213

```python

214

# Read CSV with type specification

215

csv_data = env.read_csv("data.csv", [str, int, float])

216

217

# Read text file

218

text_data = env.read_text("input.txt")

219

220

# Create from elements

221

collection_data = env.from_elements("apple", "banana", "cherry")

222

223

# Generate sequence

224

numbers = env.generate_sequence(1, 100)

225

```

226

227

### Complete Program Execution

228

229

```python

230

# Create environment and data

231

env = get_environment()

232

data = env.from_elements(1, 2, 3, 4, 5)

233

234

# Apply transformations

235

result = data.map(lambda x: x * 2)

236

237

# Add sink operation

238

result.output()

239

240

# Execute program

241

execution_result = env.execute(local=True)

242

print(f"Job completed in {execution_result.get_net_runtime()} ms")

243

```