0
# Apache Flink Python API
1
2
A comprehensive Python programming interface for Apache Flink batch processing operations. The library provides a bridge between Python code and the Flink Java runtime, enabling developers to write distributed data processing applications using familiar Python syntax while leveraging Flink's powerful distributed computing capabilities.
3
4
## Package Information
5
6
- **Package Name**: flink-python_2.10
7
- **Language**: Python
8
- **Package Type**: Maven
9
- **Installation**: Include in Flink classpath or run with Flink Python API launcher
10
- **Maven Coordinates**: `org.apache.flink:flink-python_2.10:1.3.3`
11
12
## Core Imports
13
14
```python
15
from flink.plan.Environment import get_environment
16
```
17
18
For transformation functions:
19
20
```python
21
from flink.functions.MapFunction import MapFunction
22
from flink.functions.ReduceFunction import ReduceFunction
23
from flink.functions.FilterFunction import FilterFunction
24
```
25
26
## Basic Usage
27
28
```python
29
from flink.plan.Environment import get_environment
30
from flink.functions.MapFunction import MapFunction
31
from flink.functions.GroupReduceFunction import GroupReduceFunction
32
33
# Create execution environment
34
env = get_environment()
35
36
# Create data source
37
data = env.from_elements("hello world", "hello flink", "flink python")
38
39
# Define transformation functions
40
class Tokenizer(MapFunction):
41
def map(self, value):
42
return value.lower().split()
43
44
class Counter(GroupReduceFunction):
45
def reduce(self, iterator, collector):
46
count = 0
47
word = None
48
for element in iterator:
49
word = element
50
count += 1
51
collector.collect((word, count))
52
53
# Apply transformations
54
words = data.flat_map(Tokenizer())
55
word_counts = words.group_by(0).reduce_group(Counter())
56
57
# Output results
58
word_counts.output()
59
60
# Execute the program
61
env.execute(local=True)
62
```
63
64
## Architecture
65
66
The Flink Python API follows a layered architecture:
67
68
- **Environment**: Entry point managing execution context, data sources, and job configuration
69
- **DataSet**: Core abstraction representing distributed datasets with transformation operations
70
- **Functions**: User-defined transformation interfaces (MapFunction, ReduceFunction, etc.)
71
- **Operators**: Internal representations of transformations with optimization support
72
- **Runtime Bridge**: Communication layer connecting Python processes to Flink Java runtime
73
74
This design enables seamless integration between Python user code and Flink's distributed execution engine, with automatic serialization, fault tolerance, and performance optimizations.
75
76
## Capabilities
77
78
### Environment and Execution
79
80
Core execution environment providing job configuration, data source creation, and program execution capabilities.
81
82
```python { .api }
83
def get_environment():
84
"""Creates execution environment for Flink programs."""
85
86
class Environment:
87
def read_csv(self, path, types, line_delimiter="\n", field_delimiter=','): ...
88
def read_text(self, path): ...
89
def from_elements(self, *elements): ...
90
def generate_sequence(self, frm, to): ...
91
def register_type(self, type, serializer, deserializer): ...
92
def set_parallelism(self, parallelism): ...
93
def get_parallelism(self): ...
94
def set_number_of_execution_retries(self, count): ...
95
def get_number_of_execution_retries(self): ...
96
def execute(self, local=False): ...
97
```
98
99
[Environment and Execution](./environment.md)
100
101
### Data Transformations
102
103
Comprehensive transformation operations for processing distributed datasets including map, filter, reduce, and advanced operations.
104
105
```python { .api }
106
class DataSet:
107
def map(self, operator): ...
108
def flat_map(self, operator): ...
109
def filter(self, operator): ...
110
def reduce(self, operator): ...
111
def reduce_group(self, operator, combinable=False): ...
112
def group_by(self, *keys): ...
113
def map_partition(self, operator): ...
114
def aggregate(self, aggregation, field): ...
115
def min(self, field): ...
116
def max(self, field): ...
117
def sum(self, field): ...
118
def project(self, *fields): ...
119
def distinct(self, *fields): ...
120
def first(self, count): ...
121
def union(self, other_set): ...
122
def partition_by_hash(self, *fields): ...
123
def rebalance(self): ...
124
def count_elements_per_partition(self): ...
125
def zip_with_index(self): ...
126
def name(self, name): ...
127
def set_parallelism(self, parallelism): ...
128
```
129
130
[Data Transformations](./transformations.md)
131
132
### Join and CoGroup Operations
133
134
Advanced operations for combining multiple datasets through joins, cross products, and co-group operations.
135
136
```python { .api }
137
class DataSet:
138
def join(self, other_set): ...
139
def join_with_huge(self, other_set): ...
140
def join_with_tiny(self, other_set): ...
141
def cross(self, other_set): ...
142
def cross_with_huge(self, other_set): ...
143
def cross_with_tiny(self, other_set): ...
144
def co_group(self, other_set): ...
145
def union(self, other_set): ...
146
```
147
148
[Join and CoGroup Operations](./joins.md)
149
150
### User-Defined Functions
151
152
Function interfaces for implementing custom transformation logic with support for various processing patterns.
153
154
```python { .api }
155
class MapFunction:
156
def map(self, value): ...
157
158
class ReduceFunction:
159
def reduce(self, value1, value2): ...
160
161
class GroupReduceFunction:
162
def reduce(self, iterator, collector): ...
163
```
164
165
[User-Defined Functions](./functions.md)
166
167
### Data Sources and Sinks
168
169
Input and output operations for reading from and writing to various data sources including files, collections, and external systems.
170
171
```python { .api }
172
class Environment:
173
def read_csv(self, path, types, line_delimiter="\n", field_delimiter=','): ...
174
def read_text(self, path): ...
175
176
class DataSet:
177
def write_csv(self, path, line_delimiter="\n", field_delimiter=',', write_mode=WriteMode.NO_OVERWRITE): ...
178
def write_text(self, path, write_mode=WriteMode.NO_OVERWRITE): ...
179
def output(self, to_error=False): ...
180
```
181
182
[Data Sources and Sinks](./sources-sinks.md)
183
184
## Types
185
186
```python { .api }
187
class WriteMode:
188
NO_OVERWRITE = 0 # Fail if output file exists
189
OVERWRITE = 1 # Overwrite existing files
190
191
class Order:
192
NONE = 0 # No specific order
193
ASCENDING = 1 # Ascending sort order
194
DESCENDING = 2 # Descending sort order
195
ANY = 3 # Any order acceptable
196
197
class OperatorSet(DataSet):
198
"""Specialized DataSet representing operations with custom operators."""
199
def with_broadcast_set(self, name, set): ...
200
201
class DataSink:
202
"""Represents data output operations."""
203
def name(self, name): ...
204
def set_parallelism(self, parallelism): ...
205
206
class UnsortedGrouping:
207
"""Represents grouped DataSet supporting group-wise operations."""
208
def reduce(self, operator): ...
209
def reduce_group(self, operator, combinable=False): ...
210
def aggregate(self, aggregation, field): ...
211
def min(self, field): ...
212
def max(self, field): ...
213
def sum(self, field): ...
214
def first(self, count): ...
215
216
class SortedGrouping(UnsortedGrouping):
217
"""Extends UnsortedGrouping with intra-group sorting capabilities."""
218
def sort_group(self, field, order): ...
219
220
class JobExecutionResult:
221
def get_net_runtime(self): ... # Returns job execution time in milliseconds
222
```