Apache Flink Scala API provides comprehensive support for combining multiple DataSets using joins, co-groups, and cross products with flexible key selection and customizable join strategies.
class DataSet[T] {
// Start join operation
def join[O: ClassTag](other: DataSet[O]): UnfinishedJoinOperation[T, O]
// Join with size hints for optimization
def joinWithTiny[O: ClassTag](other: DataSet[O]): UnfinishedJoinOperation[T, O]
def joinWithHuge[O: ClassTag](other: DataSet[O]): UnfinishedJoinOperation[T, O]
}class DataSet[T] {
// Left outer join
def leftOuterJoin[O: ClassTag](other: DataSet[O]): UnfinishedJoinOperation[T, O]
// Right outer join
def rightOuterJoin[O: ClassTag](other: DataSet[O]): UnfinishedJoinOperation[T, O]
// Full outer join
def fullOuterJoin[O: ClassTag](other: DataSet[O]): UnfinishedJoinOperation[T, O]
}class UnfinishedJoinOperation[T, O] {
// Specify left side key using function
def where[K: TypeInformation](keySelector: T => K): UnfinishedJoinOperationWhere[T, O]
// Specify left side key using field positions
def where(fields: Int*): UnfinishedJoinOperationWhere[T, O]
// Specify left side key using field names
def where(firstField: String, otherFields: String*): UnfinishedJoinOperationWhere[T, O]
}
class UnfinishedJoinOperationWhere[T, O] {
// Specify right side key using function
def equalTo[K: TypeInformation](keySelector: O => K): UnfinishedJoinOperationWhereEqual[T, O]
// Specify right side key using field positions
def equalTo(fields: Int*): UnfinishedJoinOperationWhereEqual[T, O]
// Specify right side key using field names
def equalTo(firstField: String, otherFields: String*): UnfinishedJoinOperationWhereEqual[T, O]
}class UnfinishedJoinOperationWhereEqual[T, O] {
// Apply join function - creates tuple by default
def apply[R: TypeInformation: ClassTag](fun: (T, O) => R): DataSet[R]
// Use JoinFunction
def apply[R: TypeInformation: ClassTag](joiner: JoinFunction[T, O, R]): DataSet[R]
// Get JoinDataSet for further configuration
def getJoinDataSet: JoinDataSet[T, O]
}
class JoinDataSet[T, O] {
// Apply join function
def apply[R: TypeInformation: ClassTag](fun: (T, O) => R): DataSet[R]
def apply[R: TypeInformation: ClassTag](joiner: JoinFunction[T, O, R]): DataSet[R]
// Configuration methods
def withJoinHint(joinHint: JoinHint): JoinDataSet[T, O]
}CoGroup operations group elements from two DataSets by key and provide access to all elements from both groups.
class DataSet[T] {
// Start coGroup operation
def coGroup[O: ClassTag](other: DataSet[O]): UnfinishedCoGroupOperation[T, O]
}
class UnfinishedCoGroupOperation[T, O] {
// Specify left side key
def where[K: TypeInformation](keySelector: T => K): HalfUnfinishedKeyPairOperation[T, O, K]
def where(fields: Int*): HalfUnfinishedKeyPairOperation[T, O, _]
def where(firstField: String, otherFields: String*): HalfUnfinishedKeyPairOperation[T, O, _]
}
class HalfUnfinishedKeyPairOperation[T, O, K] {
// Specify right side key
def equalTo[K2: TypeInformation](keySelector: O => K2): CoGroupDataSet[T, O]
def equalTo(fields: Int*): CoGroupDataSet[T, O]
def equalTo(firstField: String, otherFields: String*): CoGroupDataSet[T, O]
}class CoGroupDataSet[T, O] {
// Apply function with iterators
def apply[R: TypeInformation: ClassTag](fun: (Iterator[T], Iterator[O]) => R): DataSet[R]
// Apply function with iterators returning multiple results
def apply[R: TypeInformation: ClassTag](fun: (Iterator[T], Iterator[O]) => TraversableOnce[R]): DataSet[R]
// Apply function with iterators and collector
def apply[R: TypeInformation: ClassTag](fun: (Iterator[T], Iterator[O], Collector[R]) => Unit): DataSet[R]
// Use CoGroupFunction
def apply[R: TypeInformation: ClassTag](coGrouper: CoGroupFunction[T, O, R]): DataSet[R]
}Cross operations create Cartesian products of two DataSets.
class DataSet[T] {
// Cross with other DataSet
def cross[O: ClassTag](other: DataSet[O]): CrossDataSet[T, O]
// Cross with size hints
def crossWithTiny[O: ClassTag](other: DataSet[O]): CrossDataSet[T, O]
def crossWithHuge[O: ClassTag](other: DataSet[O]): CrossDataSet[T, O]
}
class CrossDataSet[T, O] {
// Apply cross function
def apply[R: TypeInformation: ClassTag](fun: (T, O) => R): DataSet[R]
// Use CrossFunction
def apply[R: TypeInformation: ClassTag](crosser: CrossFunction[T, O, R]): DataSet[R]
// Configuration
def withCrossHint(crossHint: CrossHint): CrossDataSet[T, O]
}import org.apache.flink.api.scala._
case class Customer(id: Int, name: String, city: String)
case class Order(customerId: Int, product: String, amount: Double)
case class CustomerOrder(customerName: String, city: String, product: String, amount: Double)
val env = ExecutionEnvironment.getExecutionEnvironment
val customers = env.fromElements(
Customer(1, "Alice", "New York"),
Customer(2, "Bob", "London"),
Customer(3, "Charlie", "Paris")
)
val orders = env.fromElements(
Order(1, "laptop", 1000.0),
Order(2, "phone", 500.0),
Order(1, "mouse", 25.0)
)
// Join customers with orders
val customerOrders = customers
.join(orders)
.where(_.id)
.equalTo(_.customerId)
.apply { (customer, order) =>
CustomerOrder(customer.name, customer.city, order.product, order.amount)
}import org.apache.flink.api.scala._
case class Employee(id: Int, name: String, deptId: Int)
case class Department(id: Int, name: String)
case class EmployeeDept(empName: String, deptName: Option[String])
val env = ExecutionEnvironment.getExecutionEnvironment
val employees = env.fromElements(
Employee(1, "Alice", 10),
Employee(2, "Bob", 20),
Employee(3, "Charlie", 99) // Department doesn't exist
)
val departments = env.fromElements(
Department(10, "Engineering"),
Department(20, "Sales")
)
// Left outer join - all employees, with department if it exists
val employeeDepts = employees
.leftOuterJoin(departments)
.where(_.deptId)
.equalTo(_.id)
.apply { (emp, deptOpt) =>
EmployeeDept(emp.name, Option(deptOpt).map(_.name))
}import org.apache.flink.api.scala._
case class Student(id: Int, name: String)
case class Grade(studentId: Int, subject: String, score: Int)
case class StudentReport(name: String, grades: List[Grade], avgScore: Double)
val env = ExecutionEnvironment.getExecutionEnvironment
val students = env.fromElements(
Student(1, "Alice"),
Student(2, "Bob"),
Student(3, "Charlie")
)
val grades = env.fromElements(
Grade(1, "Math", 90),
Grade(1, "Science", 85),
Grade(2, "Math", 78),
Grade(2, "Science", 82),
Grade(1, "History", 88)
)
// CoGroup to create comprehensive student reports
val studentReports = students
.coGroup(grades)
.where(_.id)
.equalTo(_.studentId)
.apply { (studentIter, gradeIter) =>
val student = studentIter.next() // Should be exactly one
val gradeList = gradeIter.toList
val avgScore = if (gradeList.nonEmpty) gradeList.map(_.score).sum.toDouble / gradeList.length else 0.0
StudentReport(student.name, gradeList, avgScore)
}import org.apache.flink.api.scala._
case class Sale(year: Int, quarter: Int, region: String, amount: Double)
case class Target(year: Int, quarter: Int, region: String, target: Double)
case class Performance(year: Int, quarter: Int, region: String, actual: Double, target: Double, achievement: Double)
val env = ExecutionEnvironment.getExecutionEnvironment
val sales = env.fromElements(
Sale(2023, 1, "US", 1000000),
Sale(2023, 1, "EU", 800000),
Sale(2023, 2, "US", 1200000)
)
val targets = env.fromElements(
Target(2023, 1, "US", 900000),
Target(2023, 1, "EU", 750000),
Target(2023, 2, "US", 1100000)
)
// Join on multiple fields: year, quarter, region
val performance = sales
.join(targets)
.where(s => (s.year, s.quarter, s.region))
.equalTo(t => (t.year, t.quarter, t.region))
.apply { (sale, target) =>
Performance(
sale.year,
sale.quarter,
sale.region,
sale.amount,
target.target,
sale.amount / target.target
)
}import org.apache.flink.api.scala._
case class Color(name: String, hex: String)
case class Size(name: String, dimension: String)
case class Product(color: String, size: String, colorHex: String, sizeDimension: String)
val env = ExecutionEnvironment.getExecutionEnvironment
val colors = env.fromElements(
Color("Red", "#FF0000"),
Color("Blue", "#0000FF"),
Color("Green", "#00FF00")
)
val sizes = env.fromElements(
Size("Small", "S"),
Size("Medium", "M"),
Size("Large", "L")
)
// Create all color-size combinations
val products = colors
.cross(sizes)
.apply { (color, size) =>
Product(color.name, size.name, color.hex, size.dimension)
}import org.apache.flink.api.scala._
import org.apache.flink.api.common.functions.JoinFunction
case class User(id: Int, name: String, email: String)
case class Activity(userId: Int, activity: String, timestamp: Long)
case class UserActivity(userName: String, email: String, activities: List[String])
val env = ExecutionEnvironment.getExecutionEnvironment
val users = env.fromElements(
User(1, "Alice", "alice@example.com"),
User(2, "Bob", "bob@example.com")
)
val activities = env.fromElements(
Activity(1, "login", 1000L),
Activity(1, "view_page", 1001L),
Activity(2, "login", 1002L)
)
// Using JoinFunction for complex join logic
class UserActivityJoinFunction extends JoinFunction[User, Activity, (Int, String, String, String)] {
def join(user: User, activity: Activity): (Int, String, String, String) = {
(user.id, user.name, user.email, activity.activity)
}
}
val userActivities = users
.join(activities)
.where(_.id)
.equalTo(_.userId)
.apply(new UserActivityJoinFunction())import org.apache.flink.api.scala._
case class Transaction(id: String, productCode: String, amount: Double)
case class ProductInfo(code: String, name: String, category: String)
case class EnrichedTransaction(id: String, productName: String, category: String, amount: Double)
val env = ExecutionEnvironment.getExecutionEnvironment
// Large transaction dataset
val transactions = env.fromElements(
Transaction("t1", "P001", 100.0),
Transaction("t2", "P002", 200.0),
Transaction("t3", "P001", 150.0)
)
// Small product info dataset (suitable for broadcasting)
val productInfo = env.fromElements(
ProductInfo("P001", "Laptop", "Electronics"),
ProductInfo("P002", "Phone", "Electronics")
)
// Join with broadcast hint for small dataset
val enrichedTransactions = transactions
.joinWithTiny(productInfo) // Hint that productInfo is small
.where(_.productCode)
.equalTo(_.code)
.apply { (transaction, product) =>
EnrichedTransaction(
transaction.id,
product.name,
product.category,
transaction.amount
)
}