0
# Data Transformations
1
2
Comprehensive transformation operations for processing distributed datasets. These operations form the core of data processing pipelines, enabling map-reduce style computations, filtering, aggregations, and advanced data manipulation patterns.
3
4
## Capabilities
5
6
### Basic Transformations
7
8
#### Map Operations
9
10
Applies a function to each element, producing a 1-to-1 transformation.
11
12
```python { .api }
13
def map(self, operator):
14
"""
15
Applies a MapFunction to each element (1-to-1 transformation).
16
17
Parameters:
18
operator (MapFunction or lambda): Transformation function
19
20
Returns:
21
OperatorSet: Transformed dataset
22
"""
23
```
24
25
#### Flat Map Operations
26
27
Applies a function to each element, producing zero or more output elements.
28
29
```python { .api }
30
def flat_map(self, operator):
31
"""
32
Applies a FlatMapFunction to each element (1-to-many transformation).
33
34
Parameters:
35
operator (FlatMapFunction or lambda): Transformation function
36
37
Returns:
38
OperatorSet: Transformed dataset
39
"""
40
```
41
42
#### Filter Operations
43
44
Filters elements using a predicate function.
45
46
```python { .api }
47
def filter(self, operator):
48
"""
49
Filters elements using predicate function.
50
51
Parameters:
52
operator (FilterFunction or lambda): Predicate function returning boolean
53
54
Returns:
55
OperatorSet: Filtered dataset
56
"""
57
```
58
59
#### Map Partition Operations
60
61
Applies a function to entire partitions rather than individual elements.
62
63
```python { .api }
64
def map_partition(self, operator):
65
"""
66
Applies MapPartitionFunction to entire partitions.
67
68
Parameters:
69
operator (MapPartitionFunction or lambda): Partition transformation function
70
71
Returns:
72
OperatorSet: Transformed dataset
73
"""
74
```
75
76
### Reduce Operations
77
78
#### Element Reduction
79
80
Reduces the entire DataSet to a single element using a ReduceFunction.
81
82
```python { .api }
83
def reduce(self, operator):
84
"""
85
Reduces DataSet to single element using ReduceFunction.
86
87
The transformation consecutively calls a ReduceFunction until only a single element remains.
88
89
Parameters:
90
operator (ReduceFunction or lambda): Reduction function combining two elements
91
92
Returns:
93
OperatorSet: Reduced dataset with single element
94
"""
95
```
96
97
#### Group Reduction
98
99
Applies a GroupReduceFunction to grouped elements or the entire DataSet.
100
101
```python { .api }
102
def reduce_group(self, operator, combinable=False):
103
"""
104
Applies a GroupReduceFunction to grouped DataSet.
105
106
The transformation calls a GroupReduceFunction once for each group, or once for the entire DataSet
107
if not grouped. The function can iterate over all elements and emit any number of outputs.
108
109
Parameters:
110
operator (GroupReduceFunction or lambda): Group reduction function
111
combinable (bool): Whether function is combinable for optimization
112
113
Returns:
114
OperatorSet: Transformed dataset
115
"""
116
```
117
118
### Aggregation Operations
119
120
#### Generic Aggregation
121
122
Applies aggregation operations to specified fields.
123
124
```python { .api }
125
def aggregate(self, aggregation, field):
126
"""
127
Applies aggregation operation to specified field.
128
129
Parameters:
130
aggregation (Aggregation): Aggregation type (Sum, Min, Max)
131
field (int): Field index to aggregate
132
133
Returns:
134
OperatorSet: Aggregated dataset
135
"""
136
```
137
138
#### Built-in Aggregations
139
140
Convenience methods for common aggregations.
141
142
```python { .api }
143
def min(self, field):
144
"""
145
Finds minimum value in specified field.
146
147
Parameters:
148
field (int): Field index
149
150
Returns:
151
OperatorSet: Dataset with minimum value
152
"""
153
154
def max(self, field):
155
"""
156
Finds maximum value in specified field.
157
158
Parameters:
159
field (int): Field index
160
161
Returns:
162
OperatorSet: Dataset with maximum value
163
"""
164
165
def sum(self, field):
166
"""
167
Sums values in specified field.
168
169
Parameters:
170
field (int): Field index
171
172
Returns:
173
OperatorSet: Dataset with sum
174
"""
175
```
176
177
### Grouping Operations
178
179
#### Group By Keys
180
181
Groups DataSet by specified key fields.
182
183
```python { .api }
184
def group_by(self, *keys):
185
"""
186
Groups DataSet by specified key fields.
187
188
Parameters:
189
*keys (int): Field indices for grouping keys
190
191
Returns:
192
UnsortedGrouping: Grouped dataset supporting group-wise operations
193
"""
194
```
195
196
### Utility Operations
197
198
#### Distinct Elements
199
200
Removes duplicate records based on specified fields.
201
202
```python { .api }
203
def distinct(self, *fields):
204
"""
205
Removes duplicate records based on specified fields.
206
207
Parameters:
208
*fields (int): Field indices for uniqueness comparison
209
210
Returns:
211
OperatorSet: Dataset with unique records
212
"""
213
```
214
215
#### First N Elements
216
217
Returns the first n elements from the DataSet.
218
219
```python { .api }
220
def first(self, count):
221
"""
222
Returns first n elements.
223
224
Parameters:
225
count (int): Number of elements to return
226
227
Returns:
228
OperatorSet: Dataset with first n elements
229
"""
230
```
231
232
#### Field Projection
233
234
Projects (selects) specified fields from tuple elements.
235
236
```python { .api }
237
def project(self, *fields):
238
"""
239
Projects (selects) specified fields from tuples.
240
241
Parameters:
242
*fields (int): Field indices to project
243
244
Returns:
245
OperatorSet: Dataset with projected fields
246
"""
247
```
248
249
### Partitioning Operations
250
251
#### Hash Partitioning
252
253
Hash-partitions DataSet by specified fields for optimal data distribution.
254
255
```python { .api }
256
def partition_by_hash(self, *fields):
257
"""
258
Hash-partitions DataSet by specified fields.
259
260
Parameters:
261
*fields (int): Fields to use for hash partitioning
262
263
Returns:
264
OperatorSet: Hash-partitioned dataset
265
"""
266
```
267
268
#### Rebalancing
269
270
Re-balances DataSet across available partitions for better load distribution.
271
272
```python { .api }
273
def rebalance(self):
274
"""
275
Re-balances DataSet across available partitions.
276
277
Returns:
278
OperatorSet: Rebalanced dataset
279
"""
280
```
281
282
### Advanced Operations
283
284
#### Element Counting per Partition
285
286
Counts elements in each partition.
287
288
```python { .api }
289
def count_elements_per_partition(self):
290
"""
291
Counts elements in each partition.
292
293
Returns:
294
OperatorSet: Dataset with partition element counts
295
"""
296
```
297
298
#### Index Assignment
299
300
Adds unique index to each element.
301
302
```python { .api }
303
def zip_with_index(self):
304
"""
305
Adds unique index to each element.
306
307
Returns:
308
OperatorSet: Dataset with indexed elements
309
"""
310
```
311
312
### Operation Configuration
313
314
#### Operation Naming
315
316
Sets names for operations for debugging and monitoring.
317
318
```python { .api }
319
def name(self, name):
320
"""
321
Sets name for the operation (debugging/monitoring).
322
323
Parameters:
324
name (str): Operation name
325
326
Returns:
327
DataSet: Self for method chaining
328
"""
329
```
330
331
#### Parallelism Configuration
332
333
Sets parallelism for specific operations.
334
335
```python { .api }
336
def set_parallelism(self, parallelism):
337
"""
338
Sets parallelism for this specific operation.
339
340
Parameters:
341
parallelism (int): Degree of parallelism
342
343
Returns:
344
DataSet: Self for method chaining
345
"""
346
```
347
348
## Grouping Classes
349
350
### UnsortedGrouping
351
352
Represents a grouped DataSet supporting group-wise operations.
353
354
```python { .api }
355
class UnsortedGrouping:
356
def reduce(self, operator):
357
"""Reduces each group to single element."""
358
359
def aggregate(self, aggregation, field):
360
"""Aggregates each group on specified field."""
361
362
def min(self, field):
363
"""Finds minimum in each group."""
364
365
def max(self, field):
366
"""Finds maximum in each group."""
367
368
def sum(self, field):
369
"""Sums values in each group."""
370
```
371
372
### SortedGrouping
373
374
Extends UnsortedGrouping with intra-group sorting capabilities.
375
376
```python { .api }
377
class SortedGrouping(UnsortedGrouping):
378
def sort_group(self, field, order):
379
"""
380
Sorts elements within each group.
381
382
Parameters:
383
field (int): Field to sort by
384
order (Order): Sort direction (ASCENDING, DESCENDING)
385
386
Returns:
387
SortedGrouping: Self for method chaining
388
"""
389
```
390
391
## Usage Examples
392
393
### Basic Transformations
394
395
```python
396
from flink.plan.Environment import get_environment
397
398
env = get_environment()
399
data = env.from_elements(1, 2, 3, 4, 5)
400
401
# Map transformation
402
doubled = data.map(lambda x: x * 2)
403
404
# Filter transformation
405
evens = data.filter(lambda x: x % 2 == 0)
406
407
# Flat map transformation
408
pairs = data.flat_map(lambda x: [x, x])
409
```
410
411
### Aggregations and Grouping
412
413
```python
414
# Create data with tuples
415
data = env.from_elements(("apple", 5), ("banana", 3), ("apple", 2), ("banana", 7))
416
417
# Group by first field and sum second field
418
result = data.group_by(0).sum(1)
419
420
# Alternative using aggregate
421
from flink.functions.Aggregation import Sum
422
result = data.group_by(0).aggregate(Sum(), 1)
423
```
424
425
### Advanced Processing Patterns
426
427
```python
428
from flink.functions.GroupReduceFunction import GroupReduceFunction
429
430
class WordCounter(GroupReduceFunction):
431
def reduce(self, iterator, collector):
432
count = 0
433
word = None
434
for element in iterator:
435
word = element
436
count += 1
437
collector.collect((word, count))
438
439
# Word counting pipeline
440
text_data = env.read_text("input.txt")
441
words = text_data.flat_map(lambda line: line.lower().split())
442
word_counts = words.group_by(0).reduce_group(WordCounter())
443
```
444
445
### Performance Optimization
446
447
```python
448
# Hash partition for better distribution
449
partitioned_data = data.partition_by_hash(0)
450
451
# Rebalance for even load distribution
452
balanced_data = data.rebalance()
453
454
# Configure operation parallelism
455
result = data.map(lambda x: x * 2).set_parallelism(8).name("Double Values")
456
```