or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

environment.mdfunctions.mdindex.mdjoins.mdsources-sinks.mdtransformations.md
tile.json

transformations.mddocs/

Data Transformations

Comprehensive transformation operations for processing distributed datasets. These operations form the core of data processing pipelines, enabling map-reduce style computations, filtering, aggregations, and advanced data manipulation patterns.

Capabilities

Basic Transformations

Map Operations

Applies a function to each element, producing a 1-to-1 transformation.

def map(self, operator):
    """
    Applies a MapFunction to each element (1-to-1 transformation).
    
    Parameters:
        operator (MapFunction or lambda): Transformation function
        
    Returns:
        OperatorSet: Transformed dataset
    """

Flat Map Operations

Applies a function to each element, producing zero or more output elements.

def flat_map(self, operator):
    """
    Applies a FlatMapFunction to each element (1-to-many transformation).
    
    Parameters:
        operator (FlatMapFunction or lambda): Transformation function
        
    Returns:
        OperatorSet: Transformed dataset
    """

Filter Operations

Filters elements using a predicate function.

def filter(self, operator):
    """
    Filters elements using predicate function.
    
    Parameters:
        operator (FilterFunction or lambda): Predicate function returning boolean
        
    Returns:
        OperatorSet: Filtered dataset
    """

Map Partition Operations

Applies a function to entire partitions rather than individual elements.

def map_partition(self, operator):
    """
    Applies MapPartitionFunction to entire partitions.
    
    Parameters:
        operator (MapPartitionFunction or lambda): Partition transformation function
        
    Returns:
        OperatorSet: Transformed dataset
    """

Reduce Operations

Element Reduction

Reduces the entire DataSet to a single element using a ReduceFunction.

def reduce(self, operator):
    """
    Reduces DataSet to single element using ReduceFunction.
    
    The transformation consecutively calls a ReduceFunction until only a single element remains.
    
    Parameters:
        operator (ReduceFunction or lambda): Reduction function combining two elements
        
    Returns:
        OperatorSet: Reduced dataset with single element
    """

Group Reduction

Applies a GroupReduceFunction to grouped elements or the entire DataSet.

def reduce_group(self, operator, combinable=False):
    """
    Applies a GroupReduceFunction to grouped DataSet.
    
    The transformation calls a GroupReduceFunction once for each group, or once for the entire DataSet
    if not grouped. The function can iterate over all elements and emit any number of outputs.
    
    Parameters:
        operator (GroupReduceFunction or lambda): Group reduction function
        combinable (bool): Whether function is combinable for optimization
        
    Returns:
        OperatorSet: Transformed dataset
    """

Aggregation Operations

Generic Aggregation

Applies aggregation operations to specified fields.

def aggregate(self, aggregation, field):
    """
    Applies aggregation operation to specified field.
    
    Parameters:
        aggregation (Aggregation): Aggregation type (Sum, Min, Max)
        field (int): Field index to aggregate
        
    Returns:
        OperatorSet: Aggregated dataset
    """

Built-in Aggregations

Convenience methods for common aggregations.

def min(self, field):
    """
    Finds minimum value in specified field.
    
    Parameters:
        field (int): Field index
        
    Returns:
        OperatorSet: Dataset with minimum value
    """

def max(self, field):
    """
    Finds maximum value in specified field.
    
    Parameters:
        field (int): Field index
        
    Returns:
        OperatorSet: Dataset with maximum value
    """

def sum(self, field):
    """
    Sums values in specified field.
    
    Parameters:
        field (int): Field index
        
    Returns:
        OperatorSet: Dataset with sum
    """

Grouping Operations

Group By Keys

Groups DataSet by specified key fields.

def group_by(self, *keys):
    """
    Groups DataSet by specified key fields.
    
    Parameters:
        *keys (int): Field indices for grouping keys
        
    Returns:
        UnsortedGrouping: Grouped dataset supporting group-wise operations
    """

Utility Operations

Distinct Elements

Removes duplicate records based on specified fields.

def distinct(self, *fields):
    """
    Removes duplicate records based on specified fields.
    
    Parameters:
        *fields (int): Field indices for uniqueness comparison
        
    Returns:
        OperatorSet: Dataset with unique records
    """

First N Elements

Returns the first n elements from the DataSet.

def first(self, count):
    """
    Returns first n elements.
    
    Parameters:
        count (int): Number of elements to return
        
    Returns:
        OperatorSet: Dataset with first n elements
    """

Field Projection

Projects (selects) specified fields from tuple elements.

def project(self, *fields):
    """
    Projects (selects) specified fields from tuples.
    
    Parameters:
        *fields (int): Field indices to project
        
    Returns:
        OperatorSet: Dataset with projected fields
    """

Partitioning Operations

Hash Partitioning

Hash-partitions DataSet by specified fields for optimal data distribution.

def partition_by_hash(self, *fields):
    """
    Hash-partitions DataSet by specified fields.
    
    Parameters:
        *fields (int): Fields to use for hash partitioning
        
    Returns:
        OperatorSet: Hash-partitioned dataset
    """

Rebalancing

Re-balances DataSet across available partitions for better load distribution.

def rebalance(self):
    """
    Re-balances DataSet across available partitions.
    
    Returns:
        OperatorSet: Rebalanced dataset
    """

Advanced Operations

Element Counting per Partition

Counts elements in each partition.

def count_elements_per_partition(self):
    """
    Counts elements in each partition.
    
    Returns:
        OperatorSet: Dataset with partition element counts
    """

Index Assignment

Adds unique index to each element.

def zip_with_index(self):
    """
    Adds unique index to each element.
    
    Returns:
        OperatorSet: Dataset with indexed elements
    """

Operation Configuration

Operation Naming

Sets names for operations for debugging and monitoring.

def name(self, name):
    """
    Sets name for the operation (debugging/monitoring).
    
    Parameters:
        name (str): Operation name
        
    Returns:
        DataSet: Self for method chaining
    """

Parallelism Configuration

Sets parallelism for specific operations.

def set_parallelism(self, parallelism):
    """
    Sets parallelism for this specific operation.
    
    Parameters:
        parallelism (int): Degree of parallelism
        
    Returns:
        DataSet: Self for method chaining
    """

Grouping Classes

UnsortedGrouping

Represents a grouped DataSet supporting group-wise operations.

class UnsortedGrouping:
    def reduce(self, operator): 
        """Reduces each group to single element."""
    
    def aggregate(self, aggregation, field): 
        """Aggregates each group on specified field."""
    
    def min(self, field): 
        """Finds minimum in each group."""
    
    def max(self, field): 
        """Finds maximum in each group."""
    
    def sum(self, field): 
        """Sums values in each group."""

SortedGrouping

Extends UnsortedGrouping with intra-group sorting capabilities.

class SortedGrouping(UnsortedGrouping):
    def sort_group(self, field, order):
        """
        Sorts elements within each group.
        
        Parameters:
            field (int): Field to sort by
            order (Order): Sort direction (ASCENDING, DESCENDING)
            
        Returns:
            SortedGrouping: Self for method chaining
        """

Usage Examples

Basic Transformations

from flink.plan.Environment import get_environment

env = get_environment()
data = env.from_elements(1, 2, 3, 4, 5)

# Map transformation
doubled = data.map(lambda x: x * 2)

# Filter transformation
evens = data.filter(lambda x: x % 2 == 0)

# Flat map transformation
pairs = data.flat_map(lambda x: [x, x])

Aggregations and Grouping

# Create data with tuples
data = env.from_elements(("apple", 5), ("banana", 3), ("apple", 2), ("banana", 7))

# Group by first field and sum second field
result = data.group_by(0).sum(1)

# Alternative using aggregate
from flink.functions.Aggregation import Sum
result = data.group_by(0).aggregate(Sum(), 1)

Advanced Processing Patterns

from flink.functions.GroupReduceFunction import GroupReduceFunction

class WordCounter(GroupReduceFunction):
    def reduce(self, iterator, collector):
        count = 0
        word = None
        for element in iterator:
            word = element
            count += 1
        collector.collect((word, count))

# Word counting pipeline
text_data = env.read_text("input.txt")
words = text_data.flat_map(lambda line: line.lower().split())
word_counts = words.group_by(0).reduce_group(WordCounter())

Performance Optimization

# Hash partition for better distribution
partitioned_data = data.partition_by_hash(0)

# Rebalance for even load distribution
balanced_data = data.rebalance()

# Configure operation parallelism
result = data.map(lambda x: x * 2).set_parallelism(8).name("Double Values")