0
# Environment and Execution
1
2
The Environment provides the execution context for Flink Python programs, managing job configuration, data source creation, and program execution. It serves as the main entry point for all Flink applications.
3
4
## Capabilities
5
6
### Environment Creation
7
8
Creates the execution environment that represents the context in which the program is executed.
9
10
```python { .api }
11
def get_environment():
12
"""
13
Creates an execution environment that represents the context in which the program is currently executed.
14
15
Returns:
16
Environment: The execution environment instance
17
"""
18
```
19
20
### Data Source Operations
21
22
#### CSV File Reading
23
24
Creates DataSet from CSV files with configurable delimiters and type specifications.
25
26
```python { .api }
27
def read_csv(self, path, types, line_delimiter="\n", field_delimiter=','):
28
"""
29
Create a DataSet that represents the tuples produced by reading the given CSV file.
30
31
Parameters:
32
path (str): The path of the CSV file
33
types (list): Specifies the types for the CSV fields
34
line_delimiter (str): Line delimiter, default "\n"
35
field_delimiter (str): Field delimiter, default ","
36
37
Returns:
38
DataSet: A DataSet representing the CSV data
39
"""
40
```
41
42
#### Text File Reading
43
44
Creates DataSet by reading text files line by line.
45
46
```python { .api }
47
def read_text(self, path):
48
"""
49
Creates a DataSet that represents the Strings produced by reading the given file line wise.
50
51
Parameters:
52
path (str): The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path")
53
54
Returns:
55
DataSet: A DataSet representing the data read from the given file as text lines
56
"""
57
```
58
59
#### Collection Sources
60
61
Creates DataSet from in-memory collections of elements.
62
63
```python { .api }
64
def from_elements(self, *elements):
65
"""
66
Creates a new data set that contains the given elements.
67
68
The elements must all be of the same type, for example, all of the String or Integer.
69
The sequence of elements must not be empty.
70
71
Parameters:
72
*elements: The elements to make up the data set
73
74
Returns:
75
DataSet: A DataSet representing the given list of elements
76
"""
77
```
78
79
#### Sequence Generation
80
81
Creates DataSet containing sequences of numbers.
82
83
```python { .api }
84
def generate_sequence(self, frm, to):
85
"""
86
Creates a new data set that contains the given sequence of numbers.
87
88
Parameters:
89
frm (int): The start number for the sequence
90
to (int): The end number for the sequence
91
92
Returns:
93
DataSet: A DataSet representing the given sequence of numbers
94
"""
95
```
96
97
### Execution Configuration
98
99
#### Parallelism Control
100
101
Controls the degree of parallelism for operations.
102
103
```python { .api }
104
def set_parallelism(self, parallelism):
105
"""
106
Sets the parallelism for operations executed through this environment.
107
108
Setting a DOP of x here will cause all operators to run with x parallel instances.
109
110
Parameters:
111
parallelism (int): The degree of parallelism
112
"""
113
114
def get_parallelism(self):
115
"""
116
Gets the parallelism with which operation are executed by default.
117
118
Returns:
119
int: The parallelism used by operations
120
"""
121
```
122
123
#### Retry Configuration
124
125
Configures execution retry behavior on failures.
126
127
```python { .api }
128
def set_number_of_execution_retries(self, count):
129
"""
130
Sets the number of execution retries on failure.
131
132
Parameters:
133
count (int): Number of retries
134
"""
135
136
def get_number_of_execution_retries(self):
137
"""
138
Gets the number of execution retries.
139
140
Returns:
141
int: Current retry count
142
"""
143
```
144
145
#### Custom Type Registration
146
147
Registers custom types with serialization support.
148
149
```python { .api }
150
def register_type(self, type, serializer, deserializer):
151
"""
152
Registers the given type with this environment, allowing all operators within to
153
(de-)serialize objects of the given type.
154
155
Parameters:
156
type (class): Class of the objects to be (de-)serialized
157
serializer: Instance of the serializer
158
deserializer: Instance of the deserializer
159
"""
160
```
161
162
### Program Execution
163
164
Triggers the execution of the complete Flink program.
165
166
```python { .api }
167
def execute(self, local=False):
168
"""
169
Triggers the program execution.
170
171
The environment will execute all parts of the program that have resulted in a "sink" operation.
172
173
Parameters:
174
local (bool): Whether to execute in local mode
175
176
Returns:
177
JobExecutionResult: Execution result with runtime information
178
"""
179
```
180
181
### Job Execution Results
182
183
```python { .api }
184
class JobExecutionResult:
185
def get_net_runtime(self):
186
"""
187
Gets the net runtime of the executed job.
188
189
Returns:
190
int: Runtime in milliseconds
191
"""
192
```
193
194
## Usage Examples
195
196
### Basic Environment Setup
197
198
```python
199
from flink.plan.Environment import get_environment
200
201
# Create execution environment
202
env = get_environment()
203
204
# Configure parallelism
205
env.set_parallelism(4)
206
207
# Configure retries
208
env.set_number_of_execution_retries(3)
209
```
210
211
### Reading Different Data Sources
212
213
```python
214
# Read CSV with type specification
215
csv_data = env.read_csv("data.csv", [str, int, float])
216
217
# Read text file
218
text_data = env.read_text("input.txt")
219
220
# Create from elements
221
collection_data = env.from_elements("apple", "banana", "cherry")
222
223
# Generate sequence
224
numbers = env.generate_sequence(1, 100)
225
```
226
227
### Complete Program Execution
228
229
```python
230
# Create environment and data
231
env = get_environment()
232
data = env.from_elements(1, 2, 3, 4, 5)
233
234
# Apply transformations
235
result = data.map(lambda x: x * 2)
236
237
# Add sink operation
238
result.output()
239
240
# Execute program
241
execution_result = env.execute(local=True)
242
print(f"Job completed in {execution_result.get_net_runtime()} ms")
243
```