or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

environment.mdfunctions.mdindex.mdjoins.mdsources-sinks.mdtransformations.md
tile.json

joins.mddocs/

Join and CoGroup Operations

Advanced operations for combining multiple datasets through joins, cross products, co-group operations, and unions. These operations enable complex data processing patterns that require coordination between multiple data sources.

Capabilities

Join Operations

Basic Join

Joins two DataSets on specified key fields, combining matching elements.

def join(self, other_set):
    """
    Joins with another DataSet on specified keys.
    
    Parameters:
        other_set (DataSet): DataSet to join with
        
    Returns:
        JoinOperatorWhere: Chainable join builder for .where().equal_to().using() pattern
    """

Join with Optimization Hints

Provides hints to the optimizer about the relative sizes of datasets.

def join_with_huge(self, other_set):
    """
    Join optimization hint for large second DataSet.
    
    Parameters:
        other_set (DataSet): Large DataSet to join with
        
    Returns:
        JoinOperatorWhere: Chainable join builder
    """

def join_with_tiny(self, other_set):
    """
    Join optimization hint for small second DataSet.
    
    Parameters:
        other_set (DataSet): Small DataSet to join with
        
    Returns:
        JoinOperatorWhere: Chainable join builder
    """

Cross Product Operations

Basic Cross Product

Creates Cartesian product of two DataSets.

def cross(self, other_set):
    """
    Creates cross product with another DataSet.
    
    Parameters:
        other_set (DataSet): DataSet to cross with
        
    Returns:
        CrossOperatorWhere: Cross operation builder
    """

Cross Product with Optimization Hints

Provides size hints for cross product optimization.

def cross_with_huge(self, other_set):
    """
    Cross product optimization hint for large second DataSet.
    
    Parameters:
        other_set (DataSet): Large DataSet
        
    Returns:
        CrossOperatorWhere: Cross operation builder
    """

def cross_with_tiny(self, other_set):
    """
    Cross product optimization hint for small second DataSet.
    
    Parameters:
        other_set (DataSet): Small DataSet
        
    Returns:
        CrossOperatorWhere: Cross operation builder
    """

CoGroup Operations

Groups two DataSets by matching keys and processes the groups together.

def co_group(self, other_set):
    """
    Groups two DataSets by keys and processes together.
    
    Co-groups allow processing of groups from two DataSets that have the same key,
    even when one or both groups are empty.
    
    Parameters:
        other_set (DataSet): DataSet to co-group with
        
    Returns:
        CoGroupOperatorWhere: CoGroup operation builder
    """

Union Operations

Combines two DataSets with compatible schemas.

def union(self, other_set):
    """
    Creates union with another DataSet.
    
    Both DataSets must have compatible element types.
    
    Parameters:
        other_set (DataSet): DataSet to union with
        
    Returns:
        OperatorSet: Union of both datasets
    """

Join Operation Builders

JoinOperatorWhere

Specifies join keys from the first DataSet.

class JoinOperatorWhere:
    def where(self, *fields):
        """
        Specifies join key fields from first DataSet.
        
        Parameters:
            *fields (int): Field indices for join keys
            
        Returns:
            JoinOperatorTo: Next step in join building
        """

JoinOperatorTo

Specifies join keys from the second DataSet.

class JoinOperatorTo:
    def equal_to(self, *fields):
        """
        Specifies join key fields from second DataSet.
        
        Parameters:
            *fields (int): Field indices for join keys
            
        Returns:
            JoinOperator: Final join configuration
        """

JoinOperator

Finalizes join operation with custom function.

class JoinOperator:
    def using(self, operator):
        """
        Specifies JoinFunction to combine matching elements.
        
        Parameters:
            operator (JoinFunction): Function to combine joined elements
            
        Returns:
            OperatorSet: Joined dataset
        """

Cross Operation Builders

CrossOperatorWhere

Configures cross product operation.

class CrossOperatorWhere:
    def using(self, operator):
        """
        Specifies CrossFunction to combine elements.
        
        Parameters:
            operator (CrossFunction): Function to combine cross product elements
            
        Returns:
            OperatorSet: Cross product result
        """

CoGroup Operation Builders

CoGroupOperatorWhere

Specifies keys for the first DataSet in co-group.

class CoGroupOperatorWhere:
    def where(self, *fields):
        """
        Specifies key fields from first DataSet.
        
        Parameters:
            *fields (int): Field indices for grouping keys
            
        Returns:
            CoGroupOperatorTo: Next step in cogroup building
        """

CoGroupOperatorTo

Specifies keys for the second DataSet in co-group.

class CoGroupOperatorTo:
    def equal_to(self, *fields):
        """
        Specifies key fields from second DataSet.
        
        Parameters:
            *fields (int): Field indices for grouping keys
            
        Returns:
            CoGroupOperatorUsing: Final cogroup configuration
        """

CoGroupOperatorUsing

Finalizes co-group operation with custom function.

class CoGroupOperatorUsing:
    def using(self, operator):
        """
        Specifies CoGroupFunction to process matching groups.
        
        Parameters:
            operator (CoGroupFunction): Function to process cogroup results
            
        Returns:
            OperatorSet: CoGroup result dataset
        """

Projection Support

Projector

Handles field projection in join and cross operations.

class Projector:
    def project_first(self, *fields):
        """
        Projects specified fields from first DataSet.
        
        Parameters:
            *fields (int): Field indices to project
            
        Returns:
            Projector: Self for method chaining
        """
    
    def project_second(self, *fields):
        """
        Projects specified fields from second DataSet.
        
        Parameters:
            *fields (int): Field indices to project
            
        Returns:
            Projector: Self for method chaining
        """

Usage Examples

Basic Inner Join

from flink.plan.Environment import get_environment
from flink.functions.JoinFunction import JoinFunction

env = get_environment()

# Create two datasets
orders = env.from_elements(
    (1, "customer1", 100),
    (2, "customer2", 200),
    (3, "customer1", 150)
)

customers = env.from_elements(
    ("customer1", "Alice"),
    ("customer2", "Bob")
)

# Define join function
class OrderCustomerJoin(JoinFunction):
    def join(self, order, customer):
        return (order[0], customer[1], order[2])  # (order_id, customer_name, amount)

# Perform join
result = orders.join(customers) \
    .where(1) \
    .equal_to(0) \
    .using(OrderCustomerJoin())

result.output()
env.execute()

Cross Product with Function

from flink.functions.CrossFunction import CrossFunction

# Create datasets
colors = env.from_elements("red", "green", "blue")
sizes = env.from_elements("small", "medium", "large")

# Define cross function
class ProductCombiner(CrossFunction):
    def cross(self, color, size):
        return f"{size} {color} shirt"

# Create product combinations
products = colors.cross(sizes).using(ProductCombiner())

CoGroup Operation

from flink.functions.CoGroupFunction import CoGroupFunction

# Example: Left outer join using cogroup
class LeftOuterJoin(CoGroupFunction):
    def co_group(self, iterator1, iterator2, collector):
        left_items = list(iterator1)
        right_items = list(iterator2)
        
        if not right_items:
            # No matching items in right dataset
            for left_item in left_items:
                collector.collect((left_item, None))
        else:
            # Join with all matching items
            for left_item in left_items:
                for right_item in right_items:
                    collector.collect((left_item, right_item))

result = dataset1.co_group(dataset2) \
    .where(0) \
    .equal_to(0) \
    .using(LeftOuterJoin())

Multiple Dataset Union

# Union multiple datasets
data1 = env.from_elements(1, 2, 3)
data2 = env.from_elements(4, 5, 6)
data3 = env.from_elements(7, 8, 9)

# Chain unions
combined = data1.union(data2).union(data3)

Performance Optimization with Hints

# Optimize join when one dataset is much smaller
large_dataset = env.read_csv("large_file.csv", [str, int, float])
small_lookup = env.from_elements(("key1", "value1"), ("key2", "value2"))

# Use tiny hint for broadcast join optimization
result = large_dataset.join_with_tiny(small_lookup) \
    .where(0) \
    .equal_to(0) \
    .using(JoinFunction())

# Use huge hint when second dataset is very large
result = small_dataset.join_with_huge(large_dataset) \
    .where(0) \
    .equal_to(0) \
    .using(JoinFunction())

Complex Multi-Step Joins

# Complex pipeline with multiple joins
customers = env.read_csv("customers.csv", [str, str, str])  # id, name, city
orders = env.read_csv("orders.csv", [int, str, float])      # order_id, customer_id, amount
products = env.read_csv("products.csv", [int, str, float])  # product_id, name, price

# Join orders with customers
order_customer = orders.join(customers) \
    .where(1) \
    .equal_to(0) \
    .using(lambda order, customer: (order[0], customer[1], order[2], customer[2]))

# Further processing
result = order_customer.group_by(3).sum(2)  # Sum by city