or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

environment.mdfunctions.mdindex.mdjoins.mdsources-sinks.mdtransformations.md
tile.json

environment.mddocs/

Environment and Execution

The Environment provides the execution context for Flink Python programs, managing job configuration, data source creation, and program execution. It serves as the main entry point for all Flink applications.

Capabilities

Environment Creation

Creates the execution environment that represents the context in which the program is executed.

def get_environment():
    """
    Creates an execution environment that represents the context in which the program is currently executed.
    
    Returns:
        Environment: The execution environment instance
    """

Data Source Operations

CSV File Reading

Creates DataSet from CSV files with configurable delimiters and type specifications.

def read_csv(self, path, types, line_delimiter="\n", field_delimiter=','):
    """
    Create a DataSet that represents the tuples produced by reading the given CSV file.
    
    Parameters:
        path (str): The path of the CSV file
        types (list): Specifies the types for the CSV fields
        line_delimiter (str): Line delimiter, default "\n"
        field_delimiter (str): Field delimiter, default ","
        
    Returns:
        DataSet: A DataSet representing the CSV data
    """

Text File Reading

Creates DataSet by reading text files line by line.

def read_text(self, path):
    """
    Creates a DataSet that represents the Strings produced by reading the given file line wise.
    
    Parameters:
        path (str): The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path")
        
    Returns:
        DataSet: A DataSet representing the data read from the given file as text lines
    """

Collection Sources

Creates DataSet from in-memory collections of elements.

def from_elements(self, *elements):
    """
    Creates a new data set that contains the given elements.
    
    The elements must all be of the same type, for example, all of the String or Integer.
    The sequence of elements must not be empty.
    
    Parameters:
        *elements: The elements to make up the data set
        
    Returns:
        DataSet: A DataSet representing the given list of elements
    """

Sequence Generation

Creates DataSet containing sequences of numbers.

def generate_sequence(self, frm, to):
    """
    Creates a new data set that contains the given sequence of numbers.
    
    Parameters:
        frm (int): The start number for the sequence
        to (int): The end number for the sequence
        
    Returns:
        DataSet: A DataSet representing the given sequence of numbers
    """

Execution Configuration

Parallelism Control

Controls the degree of parallelism for operations.

def set_parallelism(self, parallelism):
    """
    Sets the parallelism for operations executed through this environment.
    
    Setting a DOP of x here will cause all operators to run with x parallel instances.
    
    Parameters:
        parallelism (int): The degree of parallelism
    """

def get_parallelism(self):
    """
    Gets the parallelism with which operation are executed by default.
    
    Returns:
        int: The parallelism used by operations
    """

Retry Configuration

Configures execution retry behavior on failures.

def set_number_of_execution_retries(self, count):
    """
    Sets the number of execution retries on failure.
    
    Parameters:
        count (int): Number of retries
    """

def get_number_of_execution_retries(self):
    """
    Gets the number of execution retries.
    
    Returns:
        int: Current retry count
    """

Custom Type Registration

Registers custom types with serialization support.

def register_type(self, type, serializer, deserializer):
    """
    Registers the given type with this environment, allowing all operators within to
    (de-)serialize objects of the given type.
    
    Parameters:
        type (class): Class of the objects to be (de-)serialized
        serializer: Instance of the serializer
        deserializer: Instance of the deserializer
    """

Program Execution

Triggers the execution of the complete Flink program.

def execute(self, local=False):
    """
    Triggers the program execution.
    
    The environment will execute all parts of the program that have resulted in a "sink" operation.
    
    Parameters:
        local (bool): Whether to execute in local mode
        
    Returns:
        JobExecutionResult: Execution result with runtime information
    """

Job Execution Results

class JobExecutionResult:
    def get_net_runtime(self):
        """
        Gets the net runtime of the executed job.
        
        Returns:
            int: Runtime in milliseconds
        """

Usage Examples

Basic Environment Setup

from flink.plan.Environment import get_environment

# Create execution environment
env = get_environment()

# Configure parallelism
env.set_parallelism(4)

# Configure retries
env.set_number_of_execution_retries(3)

Reading Different Data Sources

# Read CSV with type specification
csv_data = env.read_csv("data.csv", [str, int, float])

# Read text file
text_data = env.read_text("input.txt")

# Create from elements
collection_data = env.from_elements("apple", "banana", "cherry")

# Generate sequence
numbers = env.generate_sequence(1, 100)

Complete Program Execution

# Create environment and data
env = get_environment()
data = env.from_elements(1, 2, 3, 4, 5)

# Apply transformations
result = data.map(lambda x: x * 2)

# Add sink operation
result.output()

# Execute program
execution_result = env.execute(local=True)
print(f"Job completed in {execution_result.get_net_runtime()} ms")