0
# Join and CoGroup Operations
1
2
Advanced operations for combining multiple datasets through joins, cross products, co-group operations, and unions. These operations enable complex data processing patterns that require coordination between multiple data sources.
3
4
## Capabilities
5
6
### Join Operations
7
8
#### Basic Join
9
10
Joins two DataSets on specified key fields, combining matching elements.
11
12
```python { .api }
13
def join(self, other_set):
14
"""
15
Joins with another DataSet on specified keys.
16
17
Parameters:
18
other_set (DataSet): DataSet to join with
19
20
Returns:
21
JoinOperatorWhere: Chainable join builder for .where().equal_to().using() pattern
22
"""
23
```
24
25
#### Join with Optimization Hints
26
27
Provides hints to the optimizer about the relative sizes of datasets.
28
29
```python { .api }
30
def join_with_huge(self, other_set):
31
"""
32
Join optimization hint for large second DataSet.
33
34
Parameters:
35
other_set (DataSet): Large DataSet to join with
36
37
Returns:
38
JoinOperatorWhere: Chainable join builder
39
"""
40
41
def join_with_tiny(self, other_set):
42
"""
43
Join optimization hint for small second DataSet.
44
45
Parameters:
46
other_set (DataSet): Small DataSet to join with
47
48
Returns:
49
JoinOperatorWhere: Chainable join builder
50
"""
51
```
52
53
### Cross Product Operations
54
55
#### Basic Cross Product
56
57
Creates Cartesian product of two DataSets.
58
59
```python { .api }
60
def cross(self, other_set):
61
"""
62
Creates cross product with another DataSet.
63
64
Parameters:
65
other_set (DataSet): DataSet to cross with
66
67
Returns:
68
CrossOperatorWhere: Cross operation builder
69
"""
70
```
71
72
#### Cross Product with Optimization Hints
73
74
Provides size hints for cross product optimization.
75
76
```python { .api }
77
def cross_with_huge(self, other_set):
78
"""
79
Cross product optimization hint for large second DataSet.
80
81
Parameters:
82
other_set (DataSet): Large DataSet
83
84
Returns:
85
CrossOperatorWhere: Cross operation builder
86
"""
87
88
def cross_with_tiny(self, other_set):
89
"""
90
Cross product optimization hint for small second DataSet.
91
92
Parameters:
93
other_set (DataSet): Small DataSet
94
95
Returns:
96
CrossOperatorWhere: Cross operation builder
97
"""
98
```
99
100
### CoGroup Operations
101
102
Groups two DataSets by matching keys and processes the groups together.
103
104
```python { .api }
105
def co_group(self, other_set):
106
"""
107
Groups two DataSets by keys and processes together.
108
109
Co-groups allow processing of groups from two DataSets that have the same key,
110
even when one or both groups are empty.
111
112
Parameters:
113
other_set (DataSet): DataSet to co-group with
114
115
Returns:
116
CoGroupOperatorWhere: CoGroup operation builder
117
"""
118
```
119
120
### Union Operations
121
122
Combines two DataSets with compatible schemas.
123
124
```python { .api }
125
def union(self, other_set):
126
"""
127
Creates union with another DataSet.
128
129
Both DataSets must have compatible element types.
130
131
Parameters:
132
other_set (DataSet): DataSet to union with
133
134
Returns:
135
OperatorSet: Union of both datasets
136
"""
137
```
138
139
## Join Operation Builders
140
141
### JoinOperatorWhere
142
143
Specifies join keys from the first DataSet.
144
145
```python { .api }
146
class JoinOperatorWhere:
147
def where(self, *fields):
148
"""
149
Specifies join key fields from first DataSet.
150
151
Parameters:
152
*fields (int): Field indices for join keys
153
154
Returns:
155
JoinOperatorTo: Next step in join building
156
"""
157
```
158
159
### JoinOperatorTo
160
161
Specifies join keys from the second DataSet.
162
163
```python { .api }
164
class JoinOperatorTo:
165
def equal_to(self, *fields):
166
"""
167
Specifies join key fields from second DataSet.
168
169
Parameters:
170
*fields (int): Field indices for join keys
171
172
Returns:
173
JoinOperator: Final join configuration
174
"""
175
```
176
177
### JoinOperator
178
179
Finalizes join operation with custom function.
180
181
```python { .api }
182
class JoinOperator:
183
def using(self, operator):
184
"""
185
Specifies JoinFunction to combine matching elements.
186
187
Parameters:
188
operator (JoinFunction): Function to combine joined elements
189
190
Returns:
191
OperatorSet: Joined dataset
192
"""
193
```
194
195
## Cross Operation Builders
196
197
### CrossOperatorWhere
198
199
Configures cross product operation.
200
201
```python { .api }
202
class CrossOperatorWhere:
203
def using(self, operator):
204
"""
205
Specifies CrossFunction to combine elements.
206
207
Parameters:
208
operator (CrossFunction): Function to combine cross product elements
209
210
Returns:
211
OperatorSet: Cross product result
212
"""
213
```
214
215
## CoGroup Operation Builders
216
217
### CoGroupOperatorWhere
218
219
Specifies keys for the first DataSet in co-group.
220
221
```python { .api }
222
class CoGroupOperatorWhere:
223
def where(self, *fields):
224
"""
225
Specifies key fields from first DataSet.
226
227
Parameters:
228
*fields (int): Field indices for grouping keys
229
230
Returns:
231
CoGroupOperatorTo: Next step in cogroup building
232
"""
233
```
234
235
### CoGroupOperatorTo
236
237
Specifies keys for the second DataSet in co-group.
238
239
```python { .api }
240
class CoGroupOperatorTo:
241
def equal_to(self, *fields):
242
"""
243
Specifies key fields from second DataSet.
244
245
Parameters:
246
*fields (int): Field indices for grouping keys
247
248
Returns:
249
CoGroupOperatorUsing: Final cogroup configuration
250
"""
251
```
252
253
### CoGroupOperatorUsing
254
255
Finalizes co-group operation with custom function.
256
257
```python { .api }
258
class CoGroupOperatorUsing:
259
def using(self, operator):
260
"""
261
Specifies CoGroupFunction to process matching groups.
262
263
Parameters:
264
operator (CoGroupFunction): Function to process cogroup results
265
266
Returns:
267
OperatorSet: CoGroup result dataset
268
"""
269
```
270
271
## Projection Support
272
273
### Projector
274
275
Handles field projection in join and cross operations.
276
277
```python { .api }
278
class Projector:
279
def project_first(self, *fields):
280
"""
281
Projects specified fields from first DataSet.
282
283
Parameters:
284
*fields (int): Field indices to project
285
286
Returns:
287
Projector: Self for method chaining
288
"""
289
290
def project_second(self, *fields):
291
"""
292
Projects specified fields from second DataSet.
293
294
Parameters:
295
*fields (int): Field indices to project
296
297
Returns:
298
Projector: Self for method chaining
299
"""
300
```
301
302
## Usage Examples
303
304
### Basic Inner Join
305
306
```python
307
from flink.plan.Environment import get_environment
308
from flink.functions.JoinFunction import JoinFunction
309
310
env = get_environment()
311
312
# Create two datasets
313
orders = env.from_elements(
314
(1, "customer1", 100),
315
(2, "customer2", 200),
316
(3, "customer1", 150)
317
)
318
319
customers = env.from_elements(
320
("customer1", "Alice"),
321
("customer2", "Bob")
322
)
323
324
# Define join function
325
class OrderCustomerJoin(JoinFunction):
326
def join(self, order, customer):
327
return (order[0], customer[1], order[2]) # (order_id, customer_name, amount)
328
329
# Perform join
330
result = orders.join(customers) \
331
.where(1) \
332
.equal_to(0) \
333
.using(OrderCustomerJoin())
334
335
result.output()
336
env.execute()
337
```
338
339
### Cross Product with Function
340
341
```python
342
from flink.functions.CrossFunction import CrossFunction
343
344
# Create datasets
345
colors = env.from_elements("red", "green", "blue")
346
sizes = env.from_elements("small", "medium", "large")
347
348
# Define cross function
349
class ProductCombiner(CrossFunction):
350
def cross(self, color, size):
351
return f"{size} {color} shirt"
352
353
# Create product combinations
354
products = colors.cross(sizes).using(ProductCombiner())
355
```
356
357
### CoGroup Operation
358
359
```python
360
from flink.functions.CoGroupFunction import CoGroupFunction
361
362
# Example: Left outer join using cogroup
363
class LeftOuterJoin(CoGroupFunction):
364
def co_group(self, iterator1, iterator2, collector):
365
left_items = list(iterator1)
366
right_items = list(iterator2)
367
368
if not right_items:
369
# No matching items in right dataset
370
for left_item in left_items:
371
collector.collect((left_item, None))
372
else:
373
# Join with all matching items
374
for left_item in left_items:
375
for right_item in right_items:
376
collector.collect((left_item, right_item))
377
378
result = dataset1.co_group(dataset2) \
379
.where(0) \
380
.equal_to(0) \
381
.using(LeftOuterJoin())
382
```
383
384
### Multiple Dataset Union
385
386
```python
387
# Union multiple datasets
388
data1 = env.from_elements(1, 2, 3)
389
data2 = env.from_elements(4, 5, 6)
390
data3 = env.from_elements(7, 8, 9)
391
392
# Chain unions
393
combined = data1.union(data2).union(data3)
394
```
395
396
### Performance Optimization with Hints
397
398
```python
399
# Optimize join when one dataset is much smaller
400
large_dataset = env.read_csv("large_file.csv", [str, int, float])
401
small_lookup = env.from_elements(("key1", "value1"), ("key2", "value2"))
402
403
# Use tiny hint for broadcast join optimization
404
result = large_dataset.join_with_tiny(small_lookup) \
405
.where(0) \
406
.equal_to(0) \
407
.using(JoinFunction())
408
409
# Use huge hint when second dataset is very large
410
result = small_dataset.join_with_huge(large_dataset) \
411
.where(0) \
412
.equal_to(0) \
413
.using(JoinFunction())
414
```
415
416
### Complex Multi-Step Joins
417
418
```python
419
# Complex pipeline with multiple joins
420
customers = env.read_csv("customers.csv", [str, str, str]) # id, name, city
421
orders = env.read_csv("orders.csv", [int, str, float]) # order_id, customer_id, amount
422
products = env.read_csv("products.csv", [int, str, float]) # product_id, name, price
423
424
# Join orders with customers
425
order_customer = orders.join(customers) \
426
.where(1) \
427
.equal_to(0) \
428
.using(lambda order, customer: (order[0], customer[1], order[2], customer[2]))
429
430
# Further processing
431
result = order_customer.group_by(3).sum(2) # Sum by city
432
```