Apache Flink Scala API providing type-safe distributed stream and batch processing with idiomatic Scala constructs, functional programming features, and seamless runtime integration.
Apache Flink Scala API provides comprehensive support for combining multiple DataSets using joins, co-groups, and cross products with flexible key selection and customizable join strategies.
class DataSet[T] {
// Start join operation
def join[O: ClassTag](other: DataSet[O]): UnfinishedJoinOperation[T, O]
// Join with size hints for optimization
def joinWithTiny[O: ClassTag](other: DataSet[O]): UnfinishedJoinOperation[T, O]
def joinWithHuge[O: ClassTag](other: DataSet[O]): UnfinishedJoinOperation[T, O]
}class DataSet[T] {
// Left outer join
def leftOuterJoin[O: ClassTag](other: DataSet[O]): UnfinishedJoinOperation[T, O]
// Right outer join
def rightOuterJoin[O: ClassTag](other: DataSet[O]): UnfinishedJoinOperation[T, O]
// Full outer join
def fullOuterJoin[O: ClassTag](other: DataSet[O]): UnfinishedJoinOperation[T, O]
}class UnfinishedJoinOperation[T, O] {
// Specify left side key using function
def where[K: TypeInformation](keySelector: T => K): UnfinishedJoinOperationWhere[T, O]
// Specify left side key using field positions
def where(fields: Int*): UnfinishedJoinOperationWhere[T, O]
// Specify left side key using field names
def where(firstField: String, otherFields: String*): UnfinishedJoinOperationWhere[T, O]
}
class UnfinishedJoinOperationWhere[T, O] {
// Specify right side key using function
def equalTo[K: TypeInformation](keySelector: O => K): UnfinishedJoinOperationWhereEqual[T, O]
// Specify right side key using field positions
def equalTo(fields: Int*): UnfinishedJoinOperationWhereEqual[T, O]
// Specify right side key using field names
def equalTo(firstField: String, otherFields: String*): UnfinishedJoinOperationWhereEqual[T, O]
}class UnfinishedJoinOperationWhereEqual[T, O] {
// Apply join function - creates tuple by default
def apply[R: TypeInformation: ClassTag](fun: (T, O) => R): DataSet[R]
// Use JoinFunction
def apply[R: TypeInformation: ClassTag](joiner: JoinFunction[T, O, R]): DataSet[R]
// Get JoinDataSet for further configuration
def getJoinDataSet: JoinDataSet[T, O]
}
class JoinDataSet[T, O] {
// Apply join function
def apply[R: TypeInformation: ClassTag](fun: (T, O) => R): DataSet[R]
def apply[R: TypeInformation: ClassTag](joiner: JoinFunction[T, O, R]): DataSet[R]
// Configuration methods
def withJoinHint(joinHint: JoinHint): JoinDataSet[T, O]
}CoGroup operations group elements from two DataSets by key and provide access to all elements from both groups.
class DataSet[T] {
// Start coGroup operation
def coGroup[O: ClassTag](other: DataSet[O]): UnfinishedCoGroupOperation[T, O]
}
class UnfinishedCoGroupOperation[T, O] {
// Specify left side key
def where[K: TypeInformation](keySelector: T => K): HalfUnfinishedKeyPairOperation[T, O, K]
def where(fields: Int*): HalfUnfinishedKeyPairOperation[T, O, _]
def where(firstField: String, otherFields: String*): HalfUnfinishedKeyPairOperation[T, O, _]
}
class HalfUnfinishedKeyPairOperation[T, O, K] {
// Specify right side key
def equalTo[K2: TypeInformation](keySelector: O => K2): CoGroupDataSet[T, O]
def equalTo(fields: Int*): CoGroupDataSet[T, O]
def equalTo(firstField: String, otherFields: String*): CoGroupDataSet[T, O]
}class CoGroupDataSet[T, O] {
// Apply function with iterators
def apply[R: TypeInformation: ClassTag](fun: (Iterator[T], Iterator[O]) => R): DataSet[R]
// Apply function with iterators returning multiple results
def apply[R: TypeInformation: ClassTag](fun: (Iterator[T], Iterator[O]) => TraversableOnce[R]): DataSet[R]
// Apply function with iterators and collector
def apply[R: TypeInformation: ClassTag](fun: (Iterator[T], Iterator[O], Collector[R]) => Unit): DataSet[R]
// Use CoGroupFunction
def apply[R: TypeInformation: ClassTag](coGrouper: CoGroupFunction[T, O, R]): DataSet[R]
}Cross operations create Cartesian products of two DataSets.
class DataSet[T] {
// Cross with other DataSet
def cross[O: ClassTag](other: DataSet[O]): CrossDataSet[T, O]
// Cross with size hints
def crossWithTiny[O: ClassTag](other: DataSet[O]): CrossDataSet[T, O]
def crossWithHuge[O: ClassTag](other: DataSet[O]): CrossDataSet[T, O]
}
class CrossDataSet[T, O] {
// Apply cross function
def apply[R: TypeInformation: ClassTag](fun: (T, O) => R): DataSet[R]
// Use CrossFunction
def apply[R: TypeInformation: ClassTag](crosser: CrossFunction[T, O, R]): DataSet[R]
// Configuration
def withCrossHint(crossHint: CrossHint): CrossDataSet[T, O]
}import org.apache.flink.api.scala._
case class Customer(id: Int, name: String, city: String)
case class Order(customerId: Int, product: String, amount: Double)
case class CustomerOrder(customerName: String, city: String, product: String, amount: Double)
val env = ExecutionEnvironment.getExecutionEnvironment
val customers = env.fromElements(
Customer(1, "Alice", "New York"),
Customer(2, "Bob", "London"),
Customer(3, "Charlie", "Paris")
)
val orders = env.fromElements(
Order(1, "laptop", 1000.0),
Order(2, "phone", 500.0),
Order(1, "mouse", 25.0)
)
// Join customers with orders
val customerOrders = customers
.join(orders)
.where(_.id)
.equalTo(_.customerId)
.apply { (customer, order) =>
CustomerOrder(customer.name, customer.city, order.product, order.amount)
}import org.apache.flink.api.scala._
case class Employee(id: Int, name: String, deptId: Int)
case class Department(id: Int, name: String)
case class EmployeeDept(empName: String, deptName: Option[String])
val env = ExecutionEnvironment.getExecutionEnvironment
val employees = env.fromElements(
Employee(1, "Alice", 10),
Employee(2, "Bob", 20),
Employee(3, "Charlie", 99) // Department doesn't exist
)
val departments = env.fromElements(
Department(10, "Engineering"),
Department(20, "Sales")
)
// Left outer join - all employees, with department if it exists
val employeeDepts = employees
.leftOuterJoin(departments)
.where(_.deptId)
.equalTo(_.id)
.apply { (emp, deptOpt) =>
EmployeeDept(emp.name, Option(deptOpt).map(_.name))
}import org.apache.flink.api.scala._
case class Student(id: Int, name: String)
case class Grade(studentId: Int, subject: String, score: Int)
case class StudentReport(name: String, grades: List[Grade], avgScore: Double)
val env = ExecutionEnvironment.getExecutionEnvironment
val students = env.fromElements(
Student(1, "Alice"),
Student(2, "Bob"),
Student(3, "Charlie")
)
val grades = env.fromElements(
Grade(1, "Math", 90),
Grade(1, "Science", 85),
Grade(2, "Math", 78),
Grade(2, "Science", 82),
Grade(1, "History", 88)
)
// CoGroup to create comprehensive student reports
val studentReports = students
.coGroup(grades)
.where(_.id)
.equalTo(_.studentId)
.apply { (studentIter, gradeIter) =>
val student = studentIter.next() // Should be exactly one
val gradeList = gradeIter.toList
val avgScore = if (gradeList.nonEmpty) gradeList.map(_.score).sum.toDouble / gradeList.length else 0.0
StudentReport(student.name, gradeList, avgScore)
}import org.apache.flink.api.scala._
case class Sale(year: Int, quarter: Int, region: String, amount: Double)
case class Target(year: Int, quarter: Int, region: String, target: Double)
case class Performance(year: Int, quarter: Int, region: String, actual: Double, target: Double, achievement: Double)
val env = ExecutionEnvironment.getExecutionEnvironment
val sales = env.fromElements(
Sale(2023, 1, "US", 1000000),
Sale(2023, 1, "EU", 800000),
Sale(2023, 2, "US", 1200000)
)
val targets = env.fromElements(
Target(2023, 1, "US", 900000),
Target(2023, 1, "EU", 750000),
Target(2023, 2, "US", 1100000)
)
// Join on multiple fields: year, quarter, region
val performance = sales
.join(targets)
.where(s => (s.year, s.quarter, s.region))
.equalTo(t => (t.year, t.quarter, t.region))
.apply { (sale, target) =>
Performance(
sale.year,
sale.quarter,
sale.region,
sale.amount,
target.target,
sale.amount / target.target
)
}import org.apache.flink.api.scala._
case class Color(name: String, hex: String)
case class Size(name: String, dimension: String)
case class Product(color: String, size: String, colorHex: String, sizeDimension: String)
val env = ExecutionEnvironment.getExecutionEnvironment
val colors = env.fromElements(
Color("Red", "#FF0000"),
Color("Blue", "#0000FF"),
Color("Green", "#00FF00")
)
val sizes = env.fromElements(
Size("Small", "S"),
Size("Medium", "M"),
Size("Large", "L")
)
// Create all color-size combinations
val products = colors
.cross(sizes)
.apply { (color, size) =>
Product(color.name, size.name, color.hex, size.dimension)
}import org.apache.flink.api.scala._
import org.apache.flink.api.common.functions.JoinFunction
case class User(id: Int, name: String, email: String)
case class Activity(userId: Int, activity: String, timestamp: Long)
case class UserActivity(userName: String, email: String, activities: List[String])
val env = ExecutionEnvironment.getExecutionEnvironment
val users = env.fromElements(
User(1, "Alice", "alice@example.com"),
User(2, "Bob", "bob@example.com")
)
val activities = env.fromElements(
Activity(1, "login", 1000L),
Activity(1, "view_page", 1001L),
Activity(2, "login", 1002L)
)
// Using JoinFunction for complex join logic
class UserActivityJoinFunction extends JoinFunction[User, Activity, (Int, String, String, String)] {
def join(user: User, activity: Activity): (Int, String, String, String) = {
(user.id, user.name, user.email, activity.activity)
}
}
val userActivities = users
.join(activities)
.where(_.id)
.equalTo(_.userId)
.apply(new UserActivityJoinFunction())import org.apache.flink.api.scala._
case class Transaction(id: String, productCode: String, amount: Double)
case class ProductInfo(code: String, name: String, category: String)
case class EnrichedTransaction(id: String, productName: String, category: String, amount: Double)
val env = ExecutionEnvironment.getExecutionEnvironment
// Large transaction dataset
val transactions = env.fromElements(
Transaction("t1", "P001", 100.0),
Transaction("t2", "P002", 200.0),
Transaction("t3", "P001", 150.0)
)
// Small product info dataset (suitable for broadcasting)
val productInfo = env.fromElements(
ProductInfo("P001", "Laptop", "Electronics"),
ProductInfo("P002", "Phone", "Electronics")
)
// Join with broadcast hint for small dataset
val enrichedTransactions = transactions
.joinWithTiny(productInfo) // Hint that productInfo is small
.where(_.productCode)
.equalTo(_.code)
.apply { (transaction, product) =>
EnrichedTransaction(
transaction.id,
product.name,
product.category,
transaction.amount
)
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-scala-2-10