or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

environment.mdfunctions.mdindex.mdjoins.mdsources-sinks.mdtransformations.md

index.mddocs/

0

# Apache Flink Python API

1

2

A comprehensive Python programming interface for Apache Flink batch processing operations. The library provides a bridge between Python code and the Flink Java runtime, enabling developers to write distributed data processing applications using familiar Python syntax while leveraging Flink's powerful distributed computing capabilities.

3

4

## Package Information

5

6

- **Package Name**: flink-python_2.10

7

- **Language**: Python

8

- **Package Type**: Maven

9

- **Installation**: Include in Flink classpath or run with Flink Python API launcher

10

- **Maven Coordinates**: `org.apache.flink:flink-python_2.10:1.3.3`

11

12

## Core Imports

13

14

```python

15

from flink.plan.Environment import get_environment

16

```

17

18

For transformation functions:

19

20

```python

21

from flink.functions.MapFunction import MapFunction

22

from flink.functions.ReduceFunction import ReduceFunction

23

from flink.functions.FilterFunction import FilterFunction

24

```

25

26

## Basic Usage

27

28

```python

29

from flink.plan.Environment import get_environment

30

from flink.functions.MapFunction import MapFunction

31

from flink.functions.GroupReduceFunction import GroupReduceFunction

32

33

# Create execution environment

34

env = get_environment()

35

36

# Create data source

37

data = env.from_elements("hello world", "hello flink", "flink python")

38

39

# Define transformation functions

40

class Tokenizer(MapFunction):

41

def map(self, value):

42

return value.lower().split()

43

44

class Counter(GroupReduceFunction):

45

def reduce(self, iterator, collector):

46

count = 0

47

word = None

48

for element in iterator:

49

word = element

50

count += 1

51

collector.collect((word, count))

52

53

# Apply transformations

54

words = data.flat_map(Tokenizer())

55

word_counts = words.group_by(0).reduce_group(Counter())

56

57

# Output results

58

word_counts.output()

59

60

# Execute the program

61

env.execute(local=True)

62

```

63

64

## Architecture

65

66

The Flink Python API follows a layered architecture:

67

68

- **Environment**: Entry point managing execution context, data sources, and job configuration

69

- **DataSet**: Core abstraction representing distributed datasets with transformation operations

70

- **Functions**: User-defined transformation interfaces (MapFunction, ReduceFunction, etc.)

71

- **Operators**: Internal representations of transformations with optimization support

72

- **Runtime Bridge**: Communication layer connecting Python processes to Flink Java runtime

73

74

This design enables seamless integration between Python user code and Flink's distributed execution engine, with automatic serialization, fault tolerance, and performance optimizations.

75

76

## Capabilities

77

78

### Environment and Execution

79

80

Core execution environment providing job configuration, data source creation, and program execution capabilities.

81

82

```python { .api }

83

def get_environment():

84

"""Creates execution environment for Flink programs."""

85

86

class Environment:

87

def read_csv(self, path, types, line_delimiter="\n", field_delimiter=','): ...

88

def read_text(self, path): ...

89

def from_elements(self, *elements): ...

90

def generate_sequence(self, frm, to): ...

91

def register_type(self, type, serializer, deserializer): ...

92

def set_parallelism(self, parallelism): ...

93

def get_parallelism(self): ...

94

def set_number_of_execution_retries(self, count): ...

95

def get_number_of_execution_retries(self): ...

96

def execute(self, local=False): ...

97

```

98

99

[Environment and Execution](./environment.md)

100

101

### Data Transformations

102

103

Comprehensive transformation operations for processing distributed datasets including map, filter, reduce, and advanced operations.

104

105

```python { .api }

106

class DataSet:

107

def map(self, operator): ...

108

def flat_map(self, operator): ...

109

def filter(self, operator): ...

110

def reduce(self, operator): ...

111

def reduce_group(self, operator, combinable=False): ...

112

def group_by(self, *keys): ...

113

def map_partition(self, operator): ...

114

def aggregate(self, aggregation, field): ...

115

def min(self, field): ...

116

def max(self, field): ...

117

def sum(self, field): ...

118

def project(self, *fields): ...

119

def distinct(self, *fields): ...

120

def first(self, count): ...

121

def union(self, other_set): ...

122

def partition_by_hash(self, *fields): ...

123

def rebalance(self): ...

124

def count_elements_per_partition(self): ...

125

def zip_with_index(self): ...

126

def name(self, name): ...

127

def set_parallelism(self, parallelism): ...

128

```

129

130

[Data Transformations](./transformations.md)

131

132

### Join and CoGroup Operations

133

134

Advanced operations for combining multiple datasets through joins, cross products, and co-group operations.

135

136

```python { .api }

137

class DataSet:

138

def join(self, other_set): ...

139

def join_with_huge(self, other_set): ...

140

def join_with_tiny(self, other_set): ...

141

def cross(self, other_set): ...

142

def cross_with_huge(self, other_set): ...

143

def cross_with_tiny(self, other_set): ...

144

def co_group(self, other_set): ...

145

def union(self, other_set): ...

146

```

147

148

[Join and CoGroup Operations](./joins.md)

149

150

### User-Defined Functions

151

152

Function interfaces for implementing custom transformation logic with support for various processing patterns.

153

154

```python { .api }

155

class MapFunction:

156

def map(self, value): ...

157

158

class ReduceFunction:

159

def reduce(self, value1, value2): ...

160

161

class GroupReduceFunction:

162

def reduce(self, iterator, collector): ...

163

```

164

165

[User-Defined Functions](./functions.md)

166

167

### Data Sources and Sinks

168

169

Input and output operations for reading from and writing to various data sources including files, collections, and external systems.

170

171

```python { .api }

172

class Environment:

173

def read_csv(self, path, types, line_delimiter="\n", field_delimiter=','): ...

174

def read_text(self, path): ...

175

176

class DataSet:

177

def write_csv(self, path, line_delimiter="\n", field_delimiter=',', write_mode=WriteMode.NO_OVERWRITE): ...

178

def write_text(self, path, write_mode=WriteMode.NO_OVERWRITE): ...

179

def output(self, to_error=False): ...

180

```

181

182

[Data Sources and Sinks](./sources-sinks.md)

183

184

## Types

185

186

```python { .api }

187

class WriteMode:

188

NO_OVERWRITE = 0 # Fail if output file exists

189

OVERWRITE = 1 # Overwrite existing files

190

191

class Order:

192

NONE = 0 # No specific order

193

ASCENDING = 1 # Ascending sort order

194

DESCENDING = 2 # Descending sort order

195

ANY = 3 # Any order acceptable

196

197

class OperatorSet(DataSet):

198

"""Specialized DataSet representing operations with custom operators."""

199

def with_broadcast_set(self, name, set): ...

200

201

class DataSink:

202

"""Represents data output operations."""

203

def name(self, name): ...

204

def set_parallelism(self, parallelism): ...

205

206

class UnsortedGrouping:

207

"""Represents grouped DataSet supporting group-wise operations."""

208

def reduce(self, operator): ...

209

def reduce_group(self, operator, combinable=False): ...

210

def aggregate(self, aggregation, field): ...

211

def min(self, field): ...

212

def max(self, field): ...

213

def sum(self, field): ...

214

def first(self, count): ...

215

216

class SortedGrouping(UnsortedGrouping):

217

"""Extends UnsortedGrouping with intra-group sorting capabilities."""

218

def sort_group(self, field, order): ...

219

220

class JobExecutionResult:

221

def get_net_runtime(self): ... # Returns job execution time in milliseconds

222

```