or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

dataset-operations.mdexecution-environment.mdgrouped-dataset-operations.mdindex.mdjoin-operations.mdtype-system.mdutility-functions.md
tile.json

join-operations.mddocs/

Join Operations

Join operations combine two DataSets based on key equality. Flink supports various join types and provides optimization hints for better performance.

Join Types

Inner Join

class DataSet[T] {
  def join[O](other: DataSet[O]): UnfinishedJoinOperation[T, O]
}

Outer Joins

class DataSet[T] {
  def leftOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O]
  def rightOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O]
  def fullOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O]
}

Join Key Specification

UnfinishedJoinOperation

class UnfinishedJoinOperation[L, R] {
  def where[K: TypeInformation](fun: L => K): HalfUnfinishedJoinOperation[L, R]
  def where(fields: Int*): HalfUnfinishedJoinOperation[L, R]
  def where(firstField: String, otherFields: String*): HalfUnfinishedJoinOperation[L, R]
}

class HalfUnfinishedJoinOperation[L, R] {
  def equalTo[K: TypeInformation](fun: R => K): JoinDataSet[L, R]
  def equalTo(fields: Int*): JoinDataSet[L, R]
  def equalTo(firstField: String, otherFields: String*): JoinDataSet[L, R]
}

Join Function Application

class JoinDataSet[L, R] {
  def apply[O: TypeInformation: ClassTag](fun: (L, R) => O): DataSet[O]
  def apply[O: TypeInformation: ClassTag](fun: (L, R, Collector[O]) => Unit): DataSet[O]
  def apply[O: TypeInformation: ClassTag](joiner: JoinFunction[L, R, O]): DataSet[O]
  def apply[O: TypeInformation: ClassTag](joiner: FlatJoinFunction[L, R, O]): DataSet[O]
  
  // Optimization hints
  def withPartitioner[K: TypeInformation](partitioner: Partitioner[K]): JoinDataSet[L, R]
}

Usage Examples

Basic Inner Join

case class Employee(id: Int, name: String, deptId: Int)
case class Department(id: Int, name: String)

val employees = env.fromElements(
  Employee(1, "Alice", 10),
  Employee(2, "Bob", 20),
  Employee(3, "Charlie", 10)
)

val departments = env.fromElements(
  Department(10, "Engineering"),
  Department(20, "Sales"),
  Department(30, "Marketing")
)

// Join by department ID
val employeeDepartments = employees
  .join(departments)
  .where(_.deptId)
  .equalTo(_.id)
  .apply((emp, dept) => (emp.name, dept.name))

// Result: ("Alice", "Engineering"), ("Bob", "Sales"), ("Charlie", "Engineering")

Join with Field Positions

val orders = env.fromElements(
  (1, "Order1", 100),  // (orderId, orderName, customerId)
  (2, "Order2", 200),
  (3, "Order3", 100)
)

val customers = env.fromElements(
  (100, "Customer A"),  // (customerId, customerName)
  (200, "Customer B"),
  (300, "Customer C")
)

// Join using field positions
val orderCustomers = orders
  .join(customers)
  .where(2)  // customerId field in orders (index 2)
  .equalTo(0)  // customerId field in customers (index 0)
  .apply((order, customer) => (order._2, customer._2))

Join with Field Names

case class Order(orderId: Int, orderName: String, customerId: Int)
case class Customer(customerId: Int, customerName: String)

val orders = env.fromElements(
  Order(1, "Order1", 100),
  Order(2, "Order2", 200)
)

val customers = env.fromElements(
  Customer(100, "Customer A"),
  Customer(200, "Customer B")
)

// Join using field names
val result = orders
  .join(customers)
  .where("customerId")
  .equalTo("customerId")
  .apply((order, customer) => s"${order.orderName} for ${customer.customerName}")

Left Outer Join

val leftData = env.fromElements(("A", 1), ("B", 2), ("C", 3))
val rightData = env.fromElements(("A", 10), ("C", 30), ("D", 40))

val leftOuterResult = leftData
  .leftOuterJoin(rightData)
  .where(_._1)
  .equalTo(_._1)
  .apply { (left, right) =>
    val rightValue = Option(right).map(_._2).getOrElse(0)
    (left._1, left._2, rightValue)
  }

// Result: ("A", 1, 10), ("B", 2, 0), ("C", 3, 30)

Right Outer Join

val rightOuterResult = leftData
  .rightOuterJoin(rightData)
  .where(_._1)
  .equalTo(_._1)
  .apply { (left, right) =>
    val leftValue = Option(left).map(_._2).getOrElse(0)
    (right._1, leftValue, right._2)
  }

// Result: ("A", 1, 10), ("C", 3, 30), ("D", 0, 40)

Full Outer Join

val fullOuterResult = leftData
  .fullOuterJoin(rightData)
  .where(_._1)
  .equalTo(_._1)
  .apply { (left, right) =>
    val key = if (left != null) left._1 else right._1
    val leftValue = Option(left).map(_._2).getOrElse(0)
    val rightValue = Option(right).map(_._2).getOrElse(0)
    (key, leftValue, rightValue)
  }

// Result: ("A", 1, 10), ("B", 2, 0), ("C", 3, 30), ("D", 0, 40)

Multiple Key Fields

case class Sale(region: String, product: String, amount: Double)
case class Target(region: String, product: String, target: Double)

val sales = env.fromElements(
  Sale("US", "Product A", 1000),
  Sale("EU", "Product B", 1500)
)

val targets = env.fromElements(
  Target("US", "Product A", 1200),
  Target("EU", "Product B", 1400)
)

// Join on multiple fields
val comparison = sales
  .join(targets)
  .where(s => (s.region, s.product))
  .equalTo(t => (t.region, t.product))
  .apply { (sale, target) =>
    (sale.region, sale.product, sale.amount, target.target, sale.amount - target.target)
  }

Cross Product Operations

Cross operations create a Cartesian product of two DataSets.

Cross Product

class DataSet[T] {
  def cross[O](other: DataSet[O]): CrossDataSet[T, O]
  def crossWithTiny[O](other: DataSet[O]): CrossDataSet[T, O]
  def crossWithHuge[O](other: DataSet[O]): CrossDataSet[T, O]
}

class CrossDataSet[L, R] {
  def apply[O: TypeInformation: ClassTag](fun: (L, R) => O): DataSet[O]
  def apply[O: TypeInformation: ClassTag](fun: (L, R, Collector[O]) => Unit): DataSet[O]
  def apply[O: TypeInformation: ClassTag](crosser: CrossFunction[L, R, O]): DataSet[O]
}

Cross Examples

val left = env.fromElements("A", "B")
val right = env.fromElements(1, 2, 3)

// Simple cross product
val crossed = left
  .cross(right)
  .apply((l, r) => s"$l-$r")

// Result: "A-1", "A-2", "A-3", "B-1", "B-2", "B-3"

// Cross with hints for optimization
val leftSmall = env.fromElements(1, 2)  // Small dataset
val rightLarge = env.fromElements((1 to 1000000): _*)  // Large dataset

// Hint that left is tiny (will be broadcast)
val crossedWithHint = leftSmall
  .crossWithHuge(rightLarge)
  .apply((small, large) => small * large)

CoGroup Operations

CoGroup operations group elements from two DataSets by key and process groups together.

CoGroup API

class DataSet[T] {
  def coGroup[O: ClassTag](other: DataSet[O]): UnfinishedCoGroupOperation[T, O]
}

class UnfinishedCoGroupOperation[L, R] {
  def where[K: TypeInformation](fun: L => K): HalfUnfinishedCoGroupOperation[L, R]
  def where(fields: Int*): HalfUnfinishedCoGroupOperation[L, R]
}

class CoGroupDataSet[L, R] {
  def apply[O: TypeInformation: ClassTag](fun: (Iterator[L], Iterator[R]) => O): DataSet[O]
  def apply[O: TypeInformation: ClassTag](fun: (Iterator[L], Iterator[R], Collector[O]) => Unit): DataSet[O]
  def apply[O: TypeInformation: ClassTag](coGroupFunction: CoGroupFunction[L, R, O]): DataSet[O]
}

CoGroup Examples

val left = env.fromElements(("A", 1), ("B", 2), ("A", 3))
val right = env.fromElements(("A", 10), ("C", 30), ("A", 20))

val coGrouped = left
  .coGroup(right)
  .where(_._1)
  .equalTo(_._1)
  .apply { (leftIterator, rightIterator) =>
    val leftList = leftIterator.toList
    val rightList = rightIterator.toList
    val key = if (leftList.nonEmpty) leftList.head._1 else rightList.head._1
    val leftSum = leftList.map(_._2).sum
    val rightSum = rightList.map(_._2).sum
    (key, leftSum, rightSum)
  }

// Result: ("A", 4, 30), ("B", 2, 0), ("C", 0, 30)

Types

// Join function interfaces
trait JoinFunction[IN1, IN2, OUT] extends Function {
  def join(first: IN1, second: IN2): OUT
}

trait FlatJoinFunction[IN1, IN2, OUT] extends Function {
  def join(first: IN1, second: IN2, out: Collector[OUT]): Unit
}

// Cross function interfaces
trait CrossFunction[IN1, IN2, OUT] extends Function {
  def cross(val1: IN1, val2: IN2): OUT
}

// CoGroup function interface
trait CoGroupFunction[IN1, IN2, OUT] extends Function {
  def coGroup(first: java.lang.Iterable[IN1], second: java.lang.Iterable[IN2], out: Collector[OUT]): Unit
}

// For outer joins, null values are possible
// Always check for null in outer join functions:
def handleOuterJoin(left: LeftType, right: RightType): ResultType = {
  val leftValue = Option(left).getOrElse(defaultLeft)
  val rightValue = Option(right).getOrElse(defaultRight)
  // Process values...
}

Performance Hints

Join Hints

// For joins where one dataset is much smaller
val result = largeDataSet
  .join(smallDataSet)
  .where(_.key)
  .equalTo(_.key)
  .withPartitioner(new CustomPartitioner()) // Custom partitioning
  .apply((large, small) => processJoin(large, small))

// Cross product hints
val crossResult = smallDataSet
  .crossWithHuge(largeDataSet) // smallDataSet will be broadcast
  .apply((small, large) => combine(small, large))

Join Strategy Selection

Flink automatically chooses join strategies, but you can influence the choice:

  • Use crossWithTiny() when one dataset fits in memory
  • Use crossWithHuge() when the other dataset is very large
  • CoGroup is efficient for joining datasets with many-to-many relationships
  • Consider using broadcast variables for very small lookup datasets