Python API for Apache Flink providing comprehensive batch processing capabilities including data transformations, aggregations, joins, and distributed execution
Comprehensive transformation operations for processing distributed datasets. These operations form the core of data processing pipelines, enabling map-reduce style computations, filtering, aggregations, and advanced data manipulation patterns.
Applies a function to each element, producing a 1-to-1 transformation.
def map(self, operator):
"""
Applies a MapFunction to each element (1-to-1 transformation).
Parameters:
operator (MapFunction or lambda): Transformation function
Returns:
OperatorSet: Transformed dataset
"""Applies a function to each element, producing zero or more output elements.
def flat_map(self, operator):
"""
Applies a FlatMapFunction to each element (1-to-many transformation).
Parameters:
operator (FlatMapFunction or lambda): Transformation function
Returns:
OperatorSet: Transformed dataset
"""Filters elements using a predicate function.
def filter(self, operator):
"""
Filters elements using predicate function.
Parameters:
operator (FilterFunction or lambda): Predicate function returning boolean
Returns:
OperatorSet: Filtered dataset
"""Applies a function to entire partitions rather than individual elements.
def map_partition(self, operator):
"""
Applies MapPartitionFunction to entire partitions.
Parameters:
operator (MapPartitionFunction or lambda): Partition transformation function
Returns:
OperatorSet: Transformed dataset
"""Reduces the entire DataSet to a single element using a ReduceFunction.
def reduce(self, operator):
"""
Reduces DataSet to single element using ReduceFunction.
The transformation consecutively calls a ReduceFunction until only a single element remains.
Parameters:
operator (ReduceFunction or lambda): Reduction function combining two elements
Returns:
OperatorSet: Reduced dataset with single element
"""Applies a GroupReduceFunction to grouped elements or the entire DataSet.
def reduce_group(self, operator, combinable=False):
"""
Applies a GroupReduceFunction to grouped DataSet.
The transformation calls a GroupReduceFunction once for each group, or once for the entire DataSet
if not grouped. The function can iterate over all elements and emit any number of outputs.
Parameters:
operator (GroupReduceFunction or lambda): Group reduction function
combinable (bool): Whether function is combinable for optimization
Returns:
OperatorSet: Transformed dataset
"""Applies aggregation operations to specified fields.
def aggregate(self, aggregation, field):
"""
Applies aggregation operation to specified field.
Parameters:
aggregation (Aggregation): Aggregation type (Sum, Min, Max)
field (int): Field index to aggregate
Returns:
OperatorSet: Aggregated dataset
"""Convenience methods for common aggregations.
def min(self, field):
"""
Finds minimum value in specified field.
Parameters:
field (int): Field index
Returns:
OperatorSet: Dataset with minimum value
"""
def max(self, field):
"""
Finds maximum value in specified field.
Parameters:
field (int): Field index
Returns:
OperatorSet: Dataset with maximum value
"""
def sum(self, field):
"""
Sums values in specified field.
Parameters:
field (int): Field index
Returns:
OperatorSet: Dataset with sum
"""Groups DataSet by specified key fields.
def group_by(self, *keys):
"""
Groups DataSet by specified key fields.
Parameters:
*keys (int): Field indices for grouping keys
Returns:
UnsortedGrouping: Grouped dataset supporting group-wise operations
"""Removes duplicate records based on specified fields.
def distinct(self, *fields):
"""
Removes duplicate records based on specified fields.
Parameters:
*fields (int): Field indices for uniqueness comparison
Returns:
OperatorSet: Dataset with unique records
"""Returns the first n elements from the DataSet.
def first(self, count):
"""
Returns first n elements.
Parameters:
count (int): Number of elements to return
Returns:
OperatorSet: Dataset with first n elements
"""Projects (selects) specified fields from tuple elements.
def project(self, *fields):
"""
Projects (selects) specified fields from tuples.
Parameters:
*fields (int): Field indices to project
Returns:
OperatorSet: Dataset with projected fields
"""Hash-partitions DataSet by specified fields for optimal data distribution.
def partition_by_hash(self, *fields):
"""
Hash-partitions DataSet by specified fields.
Parameters:
*fields (int): Fields to use for hash partitioning
Returns:
OperatorSet: Hash-partitioned dataset
"""Re-balances DataSet across available partitions for better load distribution.
def rebalance(self):
"""
Re-balances DataSet across available partitions.
Returns:
OperatorSet: Rebalanced dataset
"""Counts elements in each partition.
def count_elements_per_partition(self):
"""
Counts elements in each partition.
Returns:
OperatorSet: Dataset with partition element counts
"""Adds unique index to each element.
def zip_with_index(self):
"""
Adds unique index to each element.
Returns:
OperatorSet: Dataset with indexed elements
"""Sets names for operations for debugging and monitoring.
def name(self, name):
"""
Sets name for the operation (debugging/monitoring).
Parameters:
name (str): Operation name
Returns:
DataSet: Self for method chaining
"""Sets parallelism for specific operations.
def set_parallelism(self, parallelism):
"""
Sets parallelism for this specific operation.
Parameters:
parallelism (int): Degree of parallelism
Returns:
DataSet: Self for method chaining
"""Represents a grouped DataSet supporting group-wise operations.
class UnsortedGrouping:
def reduce(self, operator):
"""Reduces each group to single element."""
def aggregate(self, aggregation, field):
"""Aggregates each group on specified field."""
def min(self, field):
"""Finds minimum in each group."""
def max(self, field):
"""Finds maximum in each group."""
def sum(self, field):
"""Sums values in each group."""Extends UnsortedGrouping with intra-group sorting capabilities.
class SortedGrouping(UnsortedGrouping):
def sort_group(self, field, order):
"""
Sorts elements within each group.
Parameters:
field (int): Field to sort by
order (Order): Sort direction (ASCENDING, DESCENDING)
Returns:
SortedGrouping: Self for method chaining
"""from flink.plan.Environment import get_environment
env = get_environment()
data = env.from_elements(1, 2, 3, 4, 5)
# Map transformation
doubled = data.map(lambda x: x * 2)
# Filter transformation
evens = data.filter(lambda x: x % 2 == 0)
# Flat map transformation
pairs = data.flat_map(lambda x: [x, x])# Create data with tuples
data = env.from_elements(("apple", 5), ("banana", 3), ("apple", 2), ("banana", 7))
# Group by first field and sum second field
result = data.group_by(0).sum(1)
# Alternative using aggregate
from flink.functions.Aggregation import Sum
result = data.group_by(0).aggregate(Sum(), 1)from flink.functions.GroupReduceFunction import GroupReduceFunction
class WordCounter(GroupReduceFunction):
def reduce(self, iterator, collector):
count = 0
word = None
for element in iterator:
word = element
count += 1
collector.collect((word, count))
# Word counting pipeline
text_data = env.read_text("input.txt")
words = text_data.flat_map(lambda line: line.lower().split())
word_counts = words.group_by(0).reduce_group(WordCounter())# Hash partition for better distribution
partitioned_data = data.partition_by_hash(0)
# Rebalance for even load distribution
balanced_data = data.rebalance()
# Configure operation parallelism
result = data.map(lambda x: x * 2).set_parallelism(8).name("Double Values")Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-python-2-10