The Environment provides the execution context for Flink Python programs, managing job configuration, data source creation, and program execution. It serves as the main entry point for all Flink applications.
Creates the execution environment that represents the context in which the program is executed.
def get_environment():
"""
Creates an execution environment that represents the context in which the program is currently executed.
Returns:
Environment: The execution environment instance
"""Creates DataSet from CSV files with configurable delimiters and type specifications.
def read_csv(self, path, types, line_delimiter="\n", field_delimiter=','):
"""
Create a DataSet that represents the tuples produced by reading the given CSV file.
Parameters:
path (str): The path of the CSV file
types (list): Specifies the types for the CSV fields
line_delimiter (str): Line delimiter, default "\n"
field_delimiter (str): Field delimiter, default ","
Returns:
DataSet: A DataSet representing the CSV data
"""Creates DataSet by reading text files line by line.
def read_text(self, path):
"""
Creates a DataSet that represents the Strings produced by reading the given file line wise.
Parameters:
path (str): The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path")
Returns:
DataSet: A DataSet representing the data read from the given file as text lines
"""Creates DataSet from in-memory collections of elements.
def from_elements(self, *elements):
"""
Creates a new data set that contains the given elements.
The elements must all be of the same type, for example, all of the String or Integer.
The sequence of elements must not be empty.
Parameters:
*elements: The elements to make up the data set
Returns:
DataSet: A DataSet representing the given list of elements
"""Creates DataSet containing sequences of numbers.
def generate_sequence(self, frm, to):
"""
Creates a new data set that contains the given sequence of numbers.
Parameters:
frm (int): The start number for the sequence
to (int): The end number for the sequence
Returns:
DataSet: A DataSet representing the given sequence of numbers
"""Controls the degree of parallelism for operations.
def set_parallelism(self, parallelism):
"""
Sets the parallelism for operations executed through this environment.
Setting a DOP of x here will cause all operators to run with x parallel instances.
Parameters:
parallelism (int): The degree of parallelism
"""
def get_parallelism(self):
"""
Gets the parallelism with which operation are executed by default.
Returns:
int: The parallelism used by operations
"""Configures execution retry behavior on failures.
def set_number_of_execution_retries(self, count):
"""
Sets the number of execution retries on failure.
Parameters:
count (int): Number of retries
"""
def get_number_of_execution_retries(self):
"""
Gets the number of execution retries.
Returns:
int: Current retry count
"""Registers custom types with serialization support.
def register_type(self, type, serializer, deserializer):
"""
Registers the given type with this environment, allowing all operators within to
(de-)serialize objects of the given type.
Parameters:
type (class): Class of the objects to be (de-)serialized
serializer: Instance of the serializer
deserializer: Instance of the deserializer
"""Triggers the execution of the complete Flink program.
def execute(self, local=False):
"""
Triggers the program execution.
The environment will execute all parts of the program that have resulted in a "sink" operation.
Parameters:
local (bool): Whether to execute in local mode
Returns:
JobExecutionResult: Execution result with runtime information
"""class JobExecutionResult:
def get_net_runtime(self):
"""
Gets the net runtime of the executed job.
Returns:
int: Runtime in milliseconds
"""from flink.plan.Environment import get_environment
# Create execution environment
env = get_environment()
# Configure parallelism
env.set_parallelism(4)
# Configure retries
env.set_number_of_execution_retries(3)# Read CSV with type specification
csv_data = env.read_csv("data.csv", [str, int, float])
# Read text file
text_data = env.read_text("input.txt")
# Create from elements
collection_data = env.from_elements("apple", "banana", "cherry")
# Generate sequence
numbers = env.generate_sequence(1, 100)# Create environment and data
env = get_environment()
data = env.from_elements(1, 2, 3, 4, 5)
# Apply transformations
result = data.map(lambda x: x * 2)
# Add sink operation
result.output()
# Execute program
execution_result = env.execute(local=True)
print(f"Job completed in {execution_result.get_net_runtime()} ms")