Advanced operations for combining multiple datasets through joins, cross products, co-group operations, and unions. These operations enable complex data processing patterns that require coordination between multiple data sources.
Joins two DataSets on specified key fields, combining matching elements.
def join(self, other_set):
"""
Joins with another DataSet on specified keys.
Parameters:
other_set (DataSet): DataSet to join with
Returns:
JoinOperatorWhere: Chainable join builder for .where().equal_to().using() pattern
"""Provides hints to the optimizer about the relative sizes of datasets.
def join_with_huge(self, other_set):
"""
Join optimization hint for large second DataSet.
Parameters:
other_set (DataSet): Large DataSet to join with
Returns:
JoinOperatorWhere: Chainable join builder
"""
def join_with_tiny(self, other_set):
"""
Join optimization hint for small second DataSet.
Parameters:
other_set (DataSet): Small DataSet to join with
Returns:
JoinOperatorWhere: Chainable join builder
"""Creates Cartesian product of two DataSets.
def cross(self, other_set):
"""
Creates cross product with another DataSet.
Parameters:
other_set (DataSet): DataSet to cross with
Returns:
CrossOperatorWhere: Cross operation builder
"""Provides size hints for cross product optimization.
def cross_with_huge(self, other_set):
"""
Cross product optimization hint for large second DataSet.
Parameters:
other_set (DataSet): Large DataSet
Returns:
CrossOperatorWhere: Cross operation builder
"""
def cross_with_tiny(self, other_set):
"""
Cross product optimization hint for small second DataSet.
Parameters:
other_set (DataSet): Small DataSet
Returns:
CrossOperatorWhere: Cross operation builder
"""Groups two DataSets by matching keys and processes the groups together.
def co_group(self, other_set):
"""
Groups two DataSets by keys and processes together.
Co-groups allow processing of groups from two DataSets that have the same key,
even when one or both groups are empty.
Parameters:
other_set (DataSet): DataSet to co-group with
Returns:
CoGroupOperatorWhere: CoGroup operation builder
"""Combines two DataSets with compatible schemas.
def union(self, other_set):
"""
Creates union with another DataSet.
Both DataSets must have compatible element types.
Parameters:
other_set (DataSet): DataSet to union with
Returns:
OperatorSet: Union of both datasets
"""Specifies join keys from the first DataSet.
class JoinOperatorWhere:
def where(self, *fields):
"""
Specifies join key fields from first DataSet.
Parameters:
*fields (int): Field indices for join keys
Returns:
JoinOperatorTo: Next step in join building
"""Specifies join keys from the second DataSet.
class JoinOperatorTo:
def equal_to(self, *fields):
"""
Specifies join key fields from second DataSet.
Parameters:
*fields (int): Field indices for join keys
Returns:
JoinOperator: Final join configuration
"""Finalizes join operation with custom function.
class JoinOperator:
def using(self, operator):
"""
Specifies JoinFunction to combine matching elements.
Parameters:
operator (JoinFunction): Function to combine joined elements
Returns:
OperatorSet: Joined dataset
"""Configures cross product operation.
class CrossOperatorWhere:
def using(self, operator):
"""
Specifies CrossFunction to combine elements.
Parameters:
operator (CrossFunction): Function to combine cross product elements
Returns:
OperatorSet: Cross product result
"""Specifies keys for the first DataSet in co-group.
class CoGroupOperatorWhere:
def where(self, *fields):
"""
Specifies key fields from first DataSet.
Parameters:
*fields (int): Field indices for grouping keys
Returns:
CoGroupOperatorTo: Next step in cogroup building
"""Specifies keys for the second DataSet in co-group.
class CoGroupOperatorTo:
def equal_to(self, *fields):
"""
Specifies key fields from second DataSet.
Parameters:
*fields (int): Field indices for grouping keys
Returns:
CoGroupOperatorUsing: Final cogroup configuration
"""Finalizes co-group operation with custom function.
class CoGroupOperatorUsing:
def using(self, operator):
"""
Specifies CoGroupFunction to process matching groups.
Parameters:
operator (CoGroupFunction): Function to process cogroup results
Returns:
OperatorSet: CoGroup result dataset
"""Handles field projection in join and cross operations.
class Projector:
def project_first(self, *fields):
"""
Projects specified fields from first DataSet.
Parameters:
*fields (int): Field indices to project
Returns:
Projector: Self for method chaining
"""
def project_second(self, *fields):
"""
Projects specified fields from second DataSet.
Parameters:
*fields (int): Field indices to project
Returns:
Projector: Self for method chaining
"""from flink.plan.Environment import get_environment
from flink.functions.JoinFunction import JoinFunction
env = get_environment()
# Create two datasets
orders = env.from_elements(
(1, "customer1", 100),
(2, "customer2", 200),
(3, "customer1", 150)
)
customers = env.from_elements(
("customer1", "Alice"),
("customer2", "Bob")
)
# Define join function
class OrderCustomerJoin(JoinFunction):
def join(self, order, customer):
return (order[0], customer[1], order[2]) # (order_id, customer_name, amount)
# Perform join
result = orders.join(customers) \
.where(1) \
.equal_to(0) \
.using(OrderCustomerJoin())
result.output()
env.execute()from flink.functions.CrossFunction import CrossFunction
# Create datasets
colors = env.from_elements("red", "green", "blue")
sizes = env.from_elements("small", "medium", "large")
# Define cross function
class ProductCombiner(CrossFunction):
def cross(self, color, size):
return f"{size} {color} shirt"
# Create product combinations
products = colors.cross(sizes).using(ProductCombiner())from flink.functions.CoGroupFunction import CoGroupFunction
# Example: Left outer join using cogroup
class LeftOuterJoin(CoGroupFunction):
def co_group(self, iterator1, iterator2, collector):
left_items = list(iterator1)
right_items = list(iterator2)
if not right_items:
# No matching items in right dataset
for left_item in left_items:
collector.collect((left_item, None))
else:
# Join with all matching items
for left_item in left_items:
for right_item in right_items:
collector.collect((left_item, right_item))
result = dataset1.co_group(dataset2) \
.where(0) \
.equal_to(0) \
.using(LeftOuterJoin())# Union multiple datasets
data1 = env.from_elements(1, 2, 3)
data2 = env.from_elements(4, 5, 6)
data3 = env.from_elements(7, 8, 9)
# Chain unions
combined = data1.union(data2).union(data3)# Optimize join when one dataset is much smaller
large_dataset = env.read_csv("large_file.csv", [str, int, float])
small_lookup = env.from_elements(("key1", "value1"), ("key2", "value2"))
# Use tiny hint for broadcast join optimization
result = large_dataset.join_with_tiny(small_lookup) \
.where(0) \
.equal_to(0) \
.using(JoinFunction())
# Use huge hint when second dataset is very large
result = small_dataset.join_with_huge(large_dataset) \
.where(0) \
.equal_to(0) \
.using(JoinFunction())# Complex pipeline with multiple joins
customers = env.read_csv("customers.csv", [str, str, str]) # id, name, city
orders = env.read_csv("orders.csv", [int, str, float]) # order_id, customer_id, amount
products = env.read_csv("products.csv", [int, str, float]) # product_id, name, price
# Join orders with customers
order_customer = orders.join(customers) \
.where(1) \
.equal_to(0) \
.using(lambda order, customer: (order[0], customer[1], order[2], customer[2]))
# Further processing
result = order_customer.group_by(3).sum(2) # Sum by city