Python API for Apache Flink providing comprehensive batch processing capabilities including data transformations, aggregations, joins, and distributed execution
npx @tessl/cli install tessl/maven-org-apache-flink--flink-python-2-10@1.3.0A comprehensive Python programming interface for Apache Flink batch processing operations. The library provides a bridge between Python code and the Flink Java runtime, enabling developers to write distributed data processing applications using familiar Python syntax while leveraging Flink's powerful distributed computing capabilities.
org.apache.flink:flink-python_2.10:1.3.3from flink.plan.Environment import get_environmentFor transformation functions:
from flink.functions.MapFunction import MapFunction
from flink.functions.ReduceFunction import ReduceFunction
from flink.functions.FilterFunction import FilterFunctionfrom flink.plan.Environment import get_environment
from flink.functions.MapFunction import MapFunction
from flink.functions.GroupReduceFunction import GroupReduceFunction
# Create execution environment
env = get_environment()
# Create data source
data = env.from_elements("hello world", "hello flink", "flink python")
# Define transformation functions
class Tokenizer(MapFunction):
def map(self, value):
return value.lower().split()
class Counter(GroupReduceFunction):
def reduce(self, iterator, collector):
count = 0
word = None
for element in iterator:
word = element
count += 1
collector.collect((word, count))
# Apply transformations
words = data.flat_map(Tokenizer())
word_counts = words.group_by(0).reduce_group(Counter())
# Output results
word_counts.output()
# Execute the program
env.execute(local=True)The Flink Python API follows a layered architecture:
This design enables seamless integration between Python user code and Flink's distributed execution engine, with automatic serialization, fault tolerance, and performance optimizations.
Core execution environment providing job configuration, data source creation, and program execution capabilities.
def get_environment():
"""Creates execution environment for Flink programs."""
class Environment:
def read_csv(self, path, types, line_delimiter="\n", field_delimiter=','): ...
def read_text(self, path): ...
def from_elements(self, *elements): ...
def generate_sequence(self, frm, to): ...
def register_type(self, type, serializer, deserializer): ...
def set_parallelism(self, parallelism): ...
def get_parallelism(self): ...
def set_number_of_execution_retries(self, count): ...
def get_number_of_execution_retries(self): ...
def execute(self, local=False): ...Comprehensive transformation operations for processing distributed datasets including map, filter, reduce, and advanced operations.
class DataSet:
def map(self, operator): ...
def flat_map(self, operator): ...
def filter(self, operator): ...
def reduce(self, operator): ...
def reduce_group(self, operator, combinable=False): ...
def group_by(self, *keys): ...
def map_partition(self, operator): ...
def aggregate(self, aggregation, field): ...
def min(self, field): ...
def max(self, field): ...
def sum(self, field): ...
def project(self, *fields): ...
def distinct(self, *fields): ...
def first(self, count): ...
def union(self, other_set): ...
def partition_by_hash(self, *fields): ...
def rebalance(self): ...
def count_elements_per_partition(self): ...
def zip_with_index(self): ...
def name(self, name): ...
def set_parallelism(self, parallelism): ...Advanced operations for combining multiple datasets through joins, cross products, and co-group operations.
class DataSet:
def join(self, other_set): ...
def join_with_huge(self, other_set): ...
def join_with_tiny(self, other_set): ...
def cross(self, other_set): ...
def cross_with_huge(self, other_set): ...
def cross_with_tiny(self, other_set): ...
def co_group(self, other_set): ...
def union(self, other_set): ...Function interfaces for implementing custom transformation logic with support for various processing patterns.
class MapFunction:
def map(self, value): ...
class ReduceFunction:
def reduce(self, value1, value2): ...
class GroupReduceFunction:
def reduce(self, iterator, collector): ...Input and output operations for reading from and writing to various data sources including files, collections, and external systems.
class Environment:
def read_csv(self, path, types, line_delimiter="\n", field_delimiter=','): ...
def read_text(self, path): ...
class DataSet:
def write_csv(self, path, line_delimiter="\n", field_delimiter=',', write_mode=WriteMode.NO_OVERWRITE): ...
def write_text(self, path, write_mode=WriteMode.NO_OVERWRITE): ...
def output(self, to_error=False): ...class WriteMode:
NO_OVERWRITE = 0 # Fail if output file exists
OVERWRITE = 1 # Overwrite existing files
class Order:
NONE = 0 # No specific order
ASCENDING = 1 # Ascending sort order
DESCENDING = 2 # Descending sort order
ANY = 3 # Any order acceptable
class OperatorSet(DataSet):
"""Specialized DataSet representing operations with custom operators."""
def with_broadcast_set(self, name, set): ...
class DataSink:
"""Represents data output operations."""
def name(self, name): ...
def set_parallelism(self, parallelism): ...
class UnsortedGrouping:
"""Represents grouped DataSet supporting group-wise operations."""
def reduce(self, operator): ...
def reduce_group(self, operator, combinable=False): ...
def aggregate(self, aggregation, field): ...
def min(self, field): ...
def max(self, field): ...
def sum(self, field): ...
def first(self, count): ...
class SortedGrouping(UnsortedGrouping):
"""Extends UnsortedGrouping with intra-group sorting capabilities."""
def sort_group(self, field, order): ...
class JobExecutionResult:
def get_net_runtime(self): ... # Returns job execution time in milliseconds