Join operations combine two DataSets based on key equality. Flink supports various join types and provides optimization hints for better performance.
class DataSet[T] {
def join[O](other: DataSet[O]): UnfinishedJoinOperation[T, O]
}class DataSet[T] {
def leftOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O]
def rightOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O]
def fullOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O]
}class UnfinishedJoinOperation[L, R] {
def where[K: TypeInformation](fun: L => K): HalfUnfinishedJoinOperation[L, R]
def where(fields: Int*): HalfUnfinishedJoinOperation[L, R]
def where(firstField: String, otherFields: String*): HalfUnfinishedJoinOperation[L, R]
}
class HalfUnfinishedJoinOperation[L, R] {
def equalTo[K: TypeInformation](fun: R => K): JoinDataSet[L, R]
def equalTo(fields: Int*): JoinDataSet[L, R]
def equalTo(firstField: String, otherFields: String*): JoinDataSet[L, R]
}class JoinDataSet[L, R] {
def apply[O: TypeInformation: ClassTag](fun: (L, R) => O): DataSet[O]
def apply[O: TypeInformation: ClassTag](fun: (L, R, Collector[O]) => Unit): DataSet[O]
def apply[O: TypeInformation: ClassTag](joiner: JoinFunction[L, R, O]): DataSet[O]
def apply[O: TypeInformation: ClassTag](joiner: FlatJoinFunction[L, R, O]): DataSet[O]
// Optimization hints
def withPartitioner[K: TypeInformation](partitioner: Partitioner[K]): JoinDataSet[L, R]
}case class Employee(id: Int, name: String, deptId: Int)
case class Department(id: Int, name: String)
val employees = env.fromElements(
Employee(1, "Alice", 10),
Employee(2, "Bob", 20),
Employee(3, "Charlie", 10)
)
val departments = env.fromElements(
Department(10, "Engineering"),
Department(20, "Sales"),
Department(30, "Marketing")
)
// Join by department ID
val employeeDepartments = employees
.join(departments)
.where(_.deptId)
.equalTo(_.id)
.apply((emp, dept) => (emp.name, dept.name))
// Result: ("Alice", "Engineering"), ("Bob", "Sales"), ("Charlie", "Engineering")val orders = env.fromElements(
(1, "Order1", 100), // (orderId, orderName, customerId)
(2, "Order2", 200),
(3, "Order3", 100)
)
val customers = env.fromElements(
(100, "Customer A"), // (customerId, customerName)
(200, "Customer B"),
(300, "Customer C")
)
// Join using field positions
val orderCustomers = orders
.join(customers)
.where(2) // customerId field in orders (index 2)
.equalTo(0) // customerId field in customers (index 0)
.apply((order, customer) => (order._2, customer._2))case class Order(orderId: Int, orderName: String, customerId: Int)
case class Customer(customerId: Int, customerName: String)
val orders = env.fromElements(
Order(1, "Order1", 100),
Order(2, "Order2", 200)
)
val customers = env.fromElements(
Customer(100, "Customer A"),
Customer(200, "Customer B")
)
// Join using field names
val result = orders
.join(customers)
.where("customerId")
.equalTo("customerId")
.apply((order, customer) => s"${order.orderName} for ${customer.customerName}")val leftData = env.fromElements(("A", 1), ("B", 2), ("C", 3))
val rightData = env.fromElements(("A", 10), ("C", 30), ("D", 40))
val leftOuterResult = leftData
.leftOuterJoin(rightData)
.where(_._1)
.equalTo(_._1)
.apply { (left, right) =>
val rightValue = Option(right).map(_._2).getOrElse(0)
(left._1, left._2, rightValue)
}
// Result: ("A", 1, 10), ("B", 2, 0), ("C", 3, 30)val rightOuterResult = leftData
.rightOuterJoin(rightData)
.where(_._1)
.equalTo(_._1)
.apply { (left, right) =>
val leftValue = Option(left).map(_._2).getOrElse(0)
(right._1, leftValue, right._2)
}
// Result: ("A", 1, 10), ("C", 3, 30), ("D", 0, 40)val fullOuterResult = leftData
.fullOuterJoin(rightData)
.where(_._1)
.equalTo(_._1)
.apply { (left, right) =>
val key = if (left != null) left._1 else right._1
val leftValue = Option(left).map(_._2).getOrElse(0)
val rightValue = Option(right).map(_._2).getOrElse(0)
(key, leftValue, rightValue)
}
// Result: ("A", 1, 10), ("B", 2, 0), ("C", 3, 30), ("D", 0, 40)case class Sale(region: String, product: String, amount: Double)
case class Target(region: String, product: String, target: Double)
val sales = env.fromElements(
Sale("US", "Product A", 1000),
Sale("EU", "Product B", 1500)
)
val targets = env.fromElements(
Target("US", "Product A", 1200),
Target("EU", "Product B", 1400)
)
// Join on multiple fields
val comparison = sales
.join(targets)
.where(s => (s.region, s.product))
.equalTo(t => (t.region, t.product))
.apply { (sale, target) =>
(sale.region, sale.product, sale.amount, target.target, sale.amount - target.target)
}Cross operations create a Cartesian product of two DataSets.
class DataSet[T] {
def cross[O](other: DataSet[O]): CrossDataSet[T, O]
def crossWithTiny[O](other: DataSet[O]): CrossDataSet[T, O]
def crossWithHuge[O](other: DataSet[O]): CrossDataSet[T, O]
}
class CrossDataSet[L, R] {
def apply[O: TypeInformation: ClassTag](fun: (L, R) => O): DataSet[O]
def apply[O: TypeInformation: ClassTag](fun: (L, R, Collector[O]) => Unit): DataSet[O]
def apply[O: TypeInformation: ClassTag](crosser: CrossFunction[L, R, O]): DataSet[O]
}val left = env.fromElements("A", "B")
val right = env.fromElements(1, 2, 3)
// Simple cross product
val crossed = left
.cross(right)
.apply((l, r) => s"$l-$r")
// Result: "A-1", "A-2", "A-3", "B-1", "B-2", "B-3"
// Cross with hints for optimization
val leftSmall = env.fromElements(1, 2) // Small dataset
val rightLarge = env.fromElements((1 to 1000000): _*) // Large dataset
// Hint that left is tiny (will be broadcast)
val crossedWithHint = leftSmall
.crossWithHuge(rightLarge)
.apply((small, large) => small * large)CoGroup operations group elements from two DataSets by key and process groups together.
class DataSet[T] {
def coGroup[O: ClassTag](other: DataSet[O]): UnfinishedCoGroupOperation[T, O]
}
class UnfinishedCoGroupOperation[L, R] {
def where[K: TypeInformation](fun: L => K): HalfUnfinishedCoGroupOperation[L, R]
def where(fields: Int*): HalfUnfinishedCoGroupOperation[L, R]
}
class CoGroupDataSet[L, R] {
def apply[O: TypeInformation: ClassTag](fun: (Iterator[L], Iterator[R]) => O): DataSet[O]
def apply[O: TypeInformation: ClassTag](fun: (Iterator[L], Iterator[R], Collector[O]) => Unit): DataSet[O]
def apply[O: TypeInformation: ClassTag](coGroupFunction: CoGroupFunction[L, R, O]): DataSet[O]
}val left = env.fromElements(("A", 1), ("B", 2), ("A", 3))
val right = env.fromElements(("A", 10), ("C", 30), ("A", 20))
val coGrouped = left
.coGroup(right)
.where(_._1)
.equalTo(_._1)
.apply { (leftIterator, rightIterator) =>
val leftList = leftIterator.toList
val rightList = rightIterator.toList
val key = if (leftList.nonEmpty) leftList.head._1 else rightList.head._1
val leftSum = leftList.map(_._2).sum
val rightSum = rightList.map(_._2).sum
(key, leftSum, rightSum)
}
// Result: ("A", 4, 30), ("B", 2, 0), ("C", 0, 30)// Join function interfaces
trait JoinFunction[IN1, IN2, OUT] extends Function {
def join(first: IN1, second: IN2): OUT
}
trait FlatJoinFunction[IN1, IN2, OUT] extends Function {
def join(first: IN1, second: IN2, out: Collector[OUT]): Unit
}
// Cross function interfaces
trait CrossFunction[IN1, IN2, OUT] extends Function {
def cross(val1: IN1, val2: IN2): OUT
}
// CoGroup function interface
trait CoGroupFunction[IN1, IN2, OUT] extends Function {
def coGroup(first: java.lang.Iterable[IN1], second: java.lang.Iterable[IN2], out: Collector[OUT]): Unit
}
// For outer joins, null values are possible
// Always check for null in outer join functions:
def handleOuterJoin(left: LeftType, right: RightType): ResultType = {
val leftValue = Option(left).getOrElse(defaultLeft)
val rightValue = Option(right).getOrElse(defaultRight)
// Process values...
}// For joins where one dataset is much smaller
val result = largeDataSet
.join(smallDataSet)
.where(_.key)
.equalTo(_.key)
.withPartitioner(new CustomPartitioner()) // Custom partitioning
.apply((large, small) => processJoin(large, small))
// Cross product hints
val crossResult = smallDataSet
.crossWithHuge(largeDataSet) // smallDataSet will be broadcast
.apply((small, large) => combine(small, large))Flink automatically chooses join strategies, but you can influence the choice:
crossWithTiny() when one dataset fits in memorycrossWithHuge() when the other dataset is very large