Comprehensive transformation operations for processing distributed datasets. These operations form the core of data processing pipelines, enabling map-reduce style computations, filtering, aggregations, and advanced data manipulation patterns.
Applies a function to each element, producing a 1-to-1 transformation.
def map(self, operator):
"""
Applies a MapFunction to each element (1-to-1 transformation).
Parameters:
operator (MapFunction or lambda): Transformation function
Returns:
OperatorSet: Transformed dataset
"""Applies a function to each element, producing zero or more output elements.
def flat_map(self, operator):
"""
Applies a FlatMapFunction to each element (1-to-many transformation).
Parameters:
operator (FlatMapFunction or lambda): Transformation function
Returns:
OperatorSet: Transformed dataset
"""Filters elements using a predicate function.
def filter(self, operator):
"""
Filters elements using predicate function.
Parameters:
operator (FilterFunction or lambda): Predicate function returning boolean
Returns:
OperatorSet: Filtered dataset
"""Applies a function to entire partitions rather than individual elements.
def map_partition(self, operator):
"""
Applies MapPartitionFunction to entire partitions.
Parameters:
operator (MapPartitionFunction or lambda): Partition transformation function
Returns:
OperatorSet: Transformed dataset
"""Reduces the entire DataSet to a single element using a ReduceFunction.
def reduce(self, operator):
"""
Reduces DataSet to single element using ReduceFunction.
The transformation consecutively calls a ReduceFunction until only a single element remains.
Parameters:
operator (ReduceFunction or lambda): Reduction function combining two elements
Returns:
OperatorSet: Reduced dataset with single element
"""Applies a GroupReduceFunction to grouped elements or the entire DataSet.
def reduce_group(self, operator, combinable=False):
"""
Applies a GroupReduceFunction to grouped DataSet.
The transformation calls a GroupReduceFunction once for each group, or once for the entire DataSet
if not grouped. The function can iterate over all elements and emit any number of outputs.
Parameters:
operator (GroupReduceFunction or lambda): Group reduction function
combinable (bool): Whether function is combinable for optimization
Returns:
OperatorSet: Transformed dataset
"""Applies aggregation operations to specified fields.
def aggregate(self, aggregation, field):
"""
Applies aggregation operation to specified field.
Parameters:
aggregation (Aggregation): Aggregation type (Sum, Min, Max)
field (int): Field index to aggregate
Returns:
OperatorSet: Aggregated dataset
"""Convenience methods for common aggregations.
def min(self, field):
"""
Finds minimum value in specified field.
Parameters:
field (int): Field index
Returns:
OperatorSet: Dataset with minimum value
"""
def max(self, field):
"""
Finds maximum value in specified field.
Parameters:
field (int): Field index
Returns:
OperatorSet: Dataset with maximum value
"""
def sum(self, field):
"""
Sums values in specified field.
Parameters:
field (int): Field index
Returns:
OperatorSet: Dataset with sum
"""Groups DataSet by specified key fields.
def group_by(self, *keys):
"""
Groups DataSet by specified key fields.
Parameters:
*keys (int): Field indices for grouping keys
Returns:
UnsortedGrouping: Grouped dataset supporting group-wise operations
"""Removes duplicate records based on specified fields.
def distinct(self, *fields):
"""
Removes duplicate records based on specified fields.
Parameters:
*fields (int): Field indices for uniqueness comparison
Returns:
OperatorSet: Dataset with unique records
"""Returns the first n elements from the DataSet.
def first(self, count):
"""
Returns first n elements.
Parameters:
count (int): Number of elements to return
Returns:
OperatorSet: Dataset with first n elements
"""Projects (selects) specified fields from tuple elements.
def project(self, *fields):
"""
Projects (selects) specified fields from tuples.
Parameters:
*fields (int): Field indices to project
Returns:
OperatorSet: Dataset with projected fields
"""Hash-partitions DataSet by specified fields for optimal data distribution.
def partition_by_hash(self, *fields):
"""
Hash-partitions DataSet by specified fields.
Parameters:
*fields (int): Fields to use for hash partitioning
Returns:
OperatorSet: Hash-partitioned dataset
"""Re-balances DataSet across available partitions for better load distribution.
def rebalance(self):
"""
Re-balances DataSet across available partitions.
Returns:
OperatorSet: Rebalanced dataset
"""Counts elements in each partition.
def count_elements_per_partition(self):
"""
Counts elements in each partition.
Returns:
OperatorSet: Dataset with partition element counts
"""Adds unique index to each element.
def zip_with_index(self):
"""
Adds unique index to each element.
Returns:
OperatorSet: Dataset with indexed elements
"""Sets names for operations for debugging and monitoring.
def name(self, name):
"""
Sets name for the operation (debugging/monitoring).
Parameters:
name (str): Operation name
Returns:
DataSet: Self for method chaining
"""Sets parallelism for specific operations.
def set_parallelism(self, parallelism):
"""
Sets parallelism for this specific operation.
Parameters:
parallelism (int): Degree of parallelism
Returns:
DataSet: Self for method chaining
"""Represents a grouped DataSet supporting group-wise operations.
class UnsortedGrouping:
def reduce(self, operator):
"""Reduces each group to single element."""
def aggregate(self, aggregation, field):
"""Aggregates each group on specified field."""
def min(self, field):
"""Finds minimum in each group."""
def max(self, field):
"""Finds maximum in each group."""
def sum(self, field):
"""Sums values in each group."""Extends UnsortedGrouping with intra-group sorting capabilities.
class SortedGrouping(UnsortedGrouping):
def sort_group(self, field, order):
"""
Sorts elements within each group.
Parameters:
field (int): Field to sort by
order (Order): Sort direction (ASCENDING, DESCENDING)
Returns:
SortedGrouping: Self for method chaining
"""from flink.plan.Environment import get_environment
env = get_environment()
data = env.from_elements(1, 2, 3, 4, 5)
# Map transformation
doubled = data.map(lambda x: x * 2)
# Filter transformation
evens = data.filter(lambda x: x % 2 == 0)
# Flat map transformation
pairs = data.flat_map(lambda x: [x, x])# Create data with tuples
data = env.from_elements(("apple", 5), ("banana", 3), ("apple", 2), ("banana", 7))
# Group by first field and sum second field
result = data.group_by(0).sum(1)
# Alternative using aggregate
from flink.functions.Aggregation import Sum
result = data.group_by(0).aggregate(Sum(), 1)from flink.functions.GroupReduceFunction import GroupReduceFunction
class WordCounter(GroupReduceFunction):
def reduce(self, iterator, collector):
count = 0
word = None
for element in iterator:
word = element
count += 1
collector.collect((word, count))
# Word counting pipeline
text_data = env.read_text("input.txt")
words = text_data.flat_map(lambda line: line.lower().split())
word_counts = words.group_by(0).reduce_group(WordCounter())# Hash partition for better distribution
partitioned_data = data.partition_by_hash(0)
# Rebalance for even load distribution
balanced_data = data.rebalance()
# Configure operation parallelism
result = data.map(lambda x: x * 2).set_parallelism(8).name("Double Values")