Apache Flink Scala API providing type-safe operations and functional programming paradigms for distributed stream and batch processing applications.
Join operations combine two DataSets based on key equality. Flink supports various join types and provides optimization hints for better performance.
class DataSet[T] {
def join[O](other: DataSet[O]): UnfinishedJoinOperation[T, O]
}class DataSet[T] {
def leftOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O]
def rightOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O]
def fullOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O]
}class UnfinishedJoinOperation[L, R] {
def where[K: TypeInformation](fun: L => K): HalfUnfinishedJoinOperation[L, R]
def where(fields: Int*): HalfUnfinishedJoinOperation[L, R]
def where(firstField: String, otherFields: String*): HalfUnfinishedJoinOperation[L, R]
}
class HalfUnfinishedJoinOperation[L, R] {
def equalTo[K: TypeInformation](fun: R => K): JoinDataSet[L, R]
def equalTo(fields: Int*): JoinDataSet[L, R]
def equalTo(firstField: String, otherFields: String*): JoinDataSet[L, R]
}class JoinDataSet[L, R] {
def apply[O: TypeInformation: ClassTag](fun: (L, R) => O): DataSet[O]
def apply[O: TypeInformation: ClassTag](fun: (L, R, Collector[O]) => Unit): DataSet[O]
def apply[O: TypeInformation: ClassTag](joiner: JoinFunction[L, R, O]): DataSet[O]
def apply[O: TypeInformation: ClassTag](joiner: FlatJoinFunction[L, R, O]): DataSet[O]
// Optimization hints
def withPartitioner[K: TypeInformation](partitioner: Partitioner[K]): JoinDataSet[L, R]
}case class Employee(id: Int, name: String, deptId: Int)
case class Department(id: Int, name: String)
val employees = env.fromElements(
Employee(1, "Alice", 10),
Employee(2, "Bob", 20),
Employee(3, "Charlie", 10)
)
val departments = env.fromElements(
Department(10, "Engineering"),
Department(20, "Sales"),
Department(30, "Marketing")
)
// Join by department ID
val employeeDepartments = employees
.join(departments)
.where(_.deptId)
.equalTo(_.id)
.apply((emp, dept) => (emp.name, dept.name))
// Result: ("Alice", "Engineering"), ("Bob", "Sales"), ("Charlie", "Engineering")val orders = env.fromElements(
(1, "Order1", 100), // (orderId, orderName, customerId)
(2, "Order2", 200),
(3, "Order3", 100)
)
val customers = env.fromElements(
(100, "Customer A"), // (customerId, customerName)
(200, "Customer B"),
(300, "Customer C")
)
// Join using field positions
val orderCustomers = orders
.join(customers)
.where(2) // customerId field in orders (index 2)
.equalTo(0) // customerId field in customers (index 0)
.apply((order, customer) => (order._2, customer._2))case class Order(orderId: Int, orderName: String, customerId: Int)
case class Customer(customerId: Int, customerName: String)
val orders = env.fromElements(
Order(1, "Order1", 100),
Order(2, "Order2", 200)
)
val customers = env.fromElements(
Customer(100, "Customer A"),
Customer(200, "Customer B")
)
// Join using field names
val result = orders
.join(customers)
.where("customerId")
.equalTo("customerId")
.apply((order, customer) => s"${order.orderName} for ${customer.customerName}")val leftData = env.fromElements(("A", 1), ("B", 2), ("C", 3))
val rightData = env.fromElements(("A", 10), ("C", 30), ("D", 40))
val leftOuterResult = leftData
.leftOuterJoin(rightData)
.where(_._1)
.equalTo(_._1)
.apply { (left, right) =>
val rightValue = Option(right).map(_._2).getOrElse(0)
(left._1, left._2, rightValue)
}
// Result: ("A", 1, 10), ("B", 2, 0), ("C", 3, 30)val rightOuterResult = leftData
.rightOuterJoin(rightData)
.where(_._1)
.equalTo(_._1)
.apply { (left, right) =>
val leftValue = Option(left).map(_._2).getOrElse(0)
(right._1, leftValue, right._2)
}
// Result: ("A", 1, 10), ("C", 3, 30), ("D", 0, 40)val fullOuterResult = leftData
.fullOuterJoin(rightData)
.where(_._1)
.equalTo(_._1)
.apply { (left, right) =>
val key = if (left != null) left._1 else right._1
val leftValue = Option(left).map(_._2).getOrElse(0)
val rightValue = Option(right).map(_._2).getOrElse(0)
(key, leftValue, rightValue)
}
// Result: ("A", 1, 10), ("B", 2, 0), ("C", 3, 30), ("D", 0, 40)case class Sale(region: String, product: String, amount: Double)
case class Target(region: String, product: String, target: Double)
val sales = env.fromElements(
Sale("US", "Product A", 1000),
Sale("EU", "Product B", 1500)
)
val targets = env.fromElements(
Target("US", "Product A", 1200),
Target("EU", "Product B", 1400)
)
// Join on multiple fields
val comparison = sales
.join(targets)
.where(s => (s.region, s.product))
.equalTo(t => (t.region, t.product))
.apply { (sale, target) =>
(sale.region, sale.product, sale.amount, target.target, sale.amount - target.target)
}Cross operations create a Cartesian product of two DataSets.
class DataSet[T] {
def cross[O](other: DataSet[O]): CrossDataSet[T, O]
def crossWithTiny[O](other: DataSet[O]): CrossDataSet[T, O]
def crossWithHuge[O](other: DataSet[O]): CrossDataSet[T, O]
}
class CrossDataSet[L, R] {
def apply[O: TypeInformation: ClassTag](fun: (L, R) => O): DataSet[O]
def apply[O: TypeInformation: ClassTag](fun: (L, R, Collector[O]) => Unit): DataSet[O]
def apply[O: TypeInformation: ClassTag](crosser: CrossFunction[L, R, O]): DataSet[O]
}val left = env.fromElements("A", "B")
val right = env.fromElements(1, 2, 3)
// Simple cross product
val crossed = left
.cross(right)
.apply((l, r) => s"$l-$r")
// Result: "A-1", "A-2", "A-3", "B-1", "B-2", "B-3"
// Cross with hints for optimization
val leftSmall = env.fromElements(1, 2) // Small dataset
val rightLarge = env.fromElements((1 to 1000000): _*) // Large dataset
// Hint that left is tiny (will be broadcast)
val crossedWithHint = leftSmall
.crossWithHuge(rightLarge)
.apply((small, large) => small * large)CoGroup operations group elements from two DataSets by key and process groups together.
class DataSet[T] {
def coGroup[O: ClassTag](other: DataSet[O]): UnfinishedCoGroupOperation[T, O]
}
class UnfinishedCoGroupOperation[L, R] {
def where[K: TypeInformation](fun: L => K): HalfUnfinishedCoGroupOperation[L, R]
def where(fields: Int*): HalfUnfinishedCoGroupOperation[L, R]
}
class CoGroupDataSet[L, R] {
def apply[O: TypeInformation: ClassTag](fun: (Iterator[L], Iterator[R]) => O): DataSet[O]
def apply[O: TypeInformation: ClassTag](fun: (Iterator[L], Iterator[R], Collector[O]) => Unit): DataSet[O]
def apply[O: TypeInformation: ClassTag](coGroupFunction: CoGroupFunction[L, R, O]): DataSet[O]
}val left = env.fromElements(("A", 1), ("B", 2), ("A", 3))
val right = env.fromElements(("A", 10), ("C", 30), ("A", 20))
val coGrouped = left
.coGroup(right)
.where(_._1)
.equalTo(_._1)
.apply { (leftIterator, rightIterator) =>
val leftList = leftIterator.toList
val rightList = rightIterator.toList
val key = if (leftList.nonEmpty) leftList.head._1 else rightList.head._1
val leftSum = leftList.map(_._2).sum
val rightSum = rightList.map(_._2).sum
(key, leftSum, rightSum)
}
// Result: ("A", 4, 30), ("B", 2, 0), ("C", 0, 30)// Join function interfaces
trait JoinFunction[IN1, IN2, OUT] extends Function {
def join(first: IN1, second: IN2): OUT
}
trait FlatJoinFunction[IN1, IN2, OUT] extends Function {
def join(first: IN1, second: IN2, out: Collector[OUT]): Unit
}
// Cross function interfaces
trait CrossFunction[IN1, IN2, OUT] extends Function {
def cross(val1: IN1, val2: IN2): OUT
}
// CoGroup function interface
trait CoGroupFunction[IN1, IN2, OUT] extends Function {
def coGroup(first: java.lang.Iterable[IN1], second: java.lang.Iterable[IN2], out: Collector[OUT]): Unit
}
// For outer joins, null values are possible
// Always check for null in outer join functions:
def handleOuterJoin(left: LeftType, right: RightType): ResultType = {
val leftValue = Option(left).getOrElse(defaultLeft)
val rightValue = Option(right).getOrElse(defaultRight)
// Process values...
}// For joins where one dataset is much smaller
val result = largeDataSet
.join(smallDataSet)
.where(_.key)
.equalTo(_.key)
.withPartitioner(new CustomPartitioner()) // Custom partitioning
.apply((large, small) => processJoin(large, small))
// Cross product hints
val crossResult = smallDataSet
.crossWithHuge(largeDataSet) // smallDataSet will be broadcast
.apply((small, large) => combine(small, large))Flink automatically chooses join strategies, but you can influence the choice:
crossWithTiny() when one dataset fits in memorycrossWithHuge() when the other dataset is very largeInstall with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-scala-2-12@1.20.2