or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

environment.mdfunctions.mdindex.mdjoins.mdsources-sinks.mdtransformations.md
tile.json

tessl/maven-org-apache-flink--flink-python-2-10

Python API for Apache Flink providing comprehensive batch processing capabilities including data transformations, aggregations, joins, and distributed execution

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-python_2.10@1.3.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-python-2-10@1.3.0

index.mddocs/

Apache Flink Python API

A comprehensive Python programming interface for Apache Flink batch processing operations. The library provides a bridge between Python code and the Flink Java runtime, enabling developers to write distributed data processing applications using familiar Python syntax while leveraging Flink's powerful distributed computing capabilities.

Package Information

  • Package Name: flink-python_2.10
  • Language: Python
  • Package Type: Maven
  • Installation: Include in Flink classpath or run with Flink Python API launcher
  • Maven Coordinates: org.apache.flink:flink-python_2.10:1.3.3

Core Imports

from flink.plan.Environment import get_environment

For transformation functions:

from flink.functions.MapFunction import MapFunction
from flink.functions.ReduceFunction import ReduceFunction
from flink.functions.FilterFunction import FilterFunction

Basic Usage

from flink.plan.Environment import get_environment
from flink.functions.MapFunction import MapFunction
from flink.functions.GroupReduceFunction import GroupReduceFunction

# Create execution environment
env = get_environment()

# Create data source
data = env.from_elements("hello world", "hello flink", "flink python")

# Define transformation functions
class Tokenizer(MapFunction):
    def map(self, value):
        return value.lower().split()

class Counter(GroupReduceFunction):
    def reduce(self, iterator, collector):
        count = 0
        word = None
        for element in iterator:
            word = element
            count += 1
        collector.collect((word, count))

# Apply transformations
words = data.flat_map(Tokenizer())
word_counts = words.group_by(0).reduce_group(Counter())

# Output results
word_counts.output()

# Execute the program
env.execute(local=True)

Architecture

The Flink Python API follows a layered architecture:

  • Environment: Entry point managing execution context, data sources, and job configuration
  • DataSet: Core abstraction representing distributed datasets with transformation operations
  • Functions: User-defined transformation interfaces (MapFunction, ReduceFunction, etc.)
  • Operators: Internal representations of transformations with optimization support
  • Runtime Bridge: Communication layer connecting Python processes to Flink Java runtime

This design enables seamless integration between Python user code and Flink's distributed execution engine, with automatic serialization, fault tolerance, and performance optimizations.

Capabilities

Environment and Execution

Core execution environment providing job configuration, data source creation, and program execution capabilities.

def get_environment():
    """Creates execution environment for Flink programs."""

class Environment:
    def read_csv(self, path, types, line_delimiter="\n", field_delimiter=','): ...
    def read_text(self, path): ...
    def from_elements(self, *elements): ...
    def generate_sequence(self, frm, to): ...
    def register_type(self, type, serializer, deserializer): ...
    def set_parallelism(self, parallelism): ...
    def get_parallelism(self): ...
    def set_number_of_execution_retries(self, count): ...
    def get_number_of_execution_retries(self): ...
    def execute(self, local=False): ...

Environment and Execution

Data Transformations

Comprehensive transformation operations for processing distributed datasets including map, filter, reduce, and advanced operations.

class DataSet:
    def map(self, operator): ...
    def flat_map(self, operator): ...
    def filter(self, operator): ...
    def reduce(self, operator): ...
    def reduce_group(self, operator, combinable=False): ...
    def group_by(self, *keys): ...
    def map_partition(self, operator): ...
    def aggregate(self, aggregation, field): ...
    def min(self, field): ...
    def max(self, field): ...
    def sum(self, field): ...
    def project(self, *fields): ...
    def distinct(self, *fields): ...
    def first(self, count): ...
    def union(self, other_set): ...
    def partition_by_hash(self, *fields): ...
    def rebalance(self): ...
    def count_elements_per_partition(self): ...
    def zip_with_index(self): ...
    def name(self, name): ...
    def set_parallelism(self, parallelism): ...

Data Transformations

Join and CoGroup Operations

Advanced operations for combining multiple datasets through joins, cross products, and co-group operations.

class DataSet:
    def join(self, other_set): ...
    def join_with_huge(self, other_set): ...
    def join_with_tiny(self, other_set): ...
    def cross(self, other_set): ...
    def cross_with_huge(self, other_set): ...
    def cross_with_tiny(self, other_set): ...
    def co_group(self, other_set): ...
    def union(self, other_set): ...

Join and CoGroup Operations

User-Defined Functions

Function interfaces for implementing custom transformation logic with support for various processing patterns.

class MapFunction:
    def map(self, value): ...

class ReduceFunction:
    def reduce(self, value1, value2): ...

class GroupReduceFunction:
    def reduce(self, iterator, collector): ...

User-Defined Functions

Data Sources and Sinks

Input and output operations for reading from and writing to various data sources including files, collections, and external systems.

class Environment:
    def read_csv(self, path, types, line_delimiter="\n", field_delimiter=','): ...
    def read_text(self, path): ...

class DataSet:
    def write_csv(self, path, line_delimiter="\n", field_delimiter=',', write_mode=WriteMode.NO_OVERWRITE): ...
    def write_text(self, path, write_mode=WriteMode.NO_OVERWRITE): ...
    def output(self, to_error=False): ...

Data Sources and Sinks

Types

class WriteMode:
    NO_OVERWRITE = 0  # Fail if output file exists
    OVERWRITE = 1     # Overwrite existing files

class Order:
    NONE = 0         # No specific order
    ASCENDING = 1    # Ascending sort order
    DESCENDING = 2   # Descending sort order
    ANY = 3          # Any order acceptable

class OperatorSet(DataSet):
    """Specialized DataSet representing operations with custom operators."""
    def with_broadcast_set(self, name, set): ...

class DataSink:
    """Represents data output operations."""
    def name(self, name): ...
    def set_parallelism(self, parallelism): ...

class UnsortedGrouping:
    """Represents grouped DataSet supporting group-wise operations."""
    def reduce(self, operator): ...
    def reduce_group(self, operator, combinable=False): ...
    def aggregate(self, aggregation, field): ...
    def min(self, field): ...
    def max(self, field): ...
    def sum(self, field): ...
    def first(self, count): ...

class SortedGrouping(UnsortedGrouping):
    """Extends UnsortedGrouping with intra-group sorting capabilities."""
    def sort_group(self, field, order): ...

class JobExecutionResult:
    def get_net_runtime(self): ...  # Returns job execution time in milliseconds