or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

data-sources-sinks.mdexecution-environment.mdgrouping-aggregation.mdhadoop-integration.mdindex.mditerations.mdjoins-cogroups.mdtransformations.mdtype-system.md
tile.json

joins-cogroups.mddocs/

Joins and CoGroups

Apache Flink Scala API provides comprehensive support for combining multiple DataSets using joins, co-groups, and cross products with flexible key selection and customizable join strategies.

Join Operations

Basic Join Setup

class DataSet[T] {
  // Start join operation
  def join[O: ClassTag](other: DataSet[O]): UnfinishedJoinOperation[T, O]
  
  // Join with size hints for optimization
  def joinWithTiny[O: ClassTag](other: DataSet[O]): UnfinishedJoinOperation[T, O]
  def joinWithHuge[O: ClassTag](other: DataSet[O]): UnfinishedJoinOperation[T, O]
}

Outer Joins

class DataSet[T] {
  // Left outer join
  def leftOuterJoin[O: ClassTag](other: DataSet[O]): UnfinishedJoinOperation[T, O]
  
  // Right outer join
  def rightOuterJoin[O: ClassTag](other: DataSet[O]): UnfinishedJoinOperation[T, O]
  
  // Full outer join
  def fullOuterJoin[O: ClassTag](other: DataSet[O]): UnfinishedJoinOperation[T, O]
}

Join Key Specification

class UnfinishedJoinOperation[T, O] {
  // Specify left side key using function
  def where[K: TypeInformation](keySelector: T => K): UnfinishedJoinOperationWhere[T, O]
  
  // Specify left side key using field positions
  def where(fields: Int*): UnfinishedJoinOperationWhere[T, O]
  
  // Specify left side key using field names
  def where(firstField: String, otherFields: String*): UnfinishedJoinOperationWhere[T, O]
}

class UnfinishedJoinOperationWhere[T, O] {
  // Specify right side key using function
  def equalTo[K: TypeInformation](keySelector: O => K): UnfinishedJoinOperationWhereEqual[T, O]
  
  // Specify right side key using field positions
  def equalTo(fields: Int*): UnfinishedJoinOperationWhereEqual[T, O]
  
  // Specify right side key using field names
  def equalTo(firstField: String, otherFields: String*): UnfinishedJoinOperationWhereEqual[T, O]
}

Join Function Application

class UnfinishedJoinOperationWhereEqual[T, O] {
  // Apply join function - creates tuple by default
  def apply[R: TypeInformation: ClassTag](fun: (T, O) => R): DataSet[R]
  
  // Use JoinFunction
  def apply[R: TypeInformation: ClassTag](joiner: JoinFunction[T, O, R]): DataSet[R]
  
  // Get JoinDataSet for further configuration
  def getJoinDataSet: JoinDataSet[T, O]
}

class JoinDataSet[T, O] {
  // Apply join function
  def apply[R: TypeInformation: ClassTag](fun: (T, O) => R): DataSet[R]
  def apply[R: TypeInformation: ClassTag](joiner: JoinFunction[T, O, R]): DataSet[R]
  
  // Configuration methods
  def withJoinHint(joinHint: JoinHint): JoinDataSet[T, O]
}

CoGroup Operations

CoGroup operations group elements from two DataSets by key and provide access to all elements from both groups.

class DataSet[T] {
  // Start coGroup operation
  def coGroup[O: ClassTag](other: DataSet[O]): UnfinishedCoGroupOperation[T, O]
}

class UnfinishedCoGroupOperation[T, O] {
  // Specify left side key
  def where[K: TypeInformation](keySelector: T => K): HalfUnfinishedKeyPairOperation[T, O, K]
  def where(fields: Int*): HalfUnfinishedKeyPairOperation[T, O, _]
  def where(firstField: String, otherFields: String*): HalfUnfinishedKeyPairOperation[T, O, _]
}

class HalfUnfinishedKeyPairOperation[T, O, K] {
  // Specify right side key
  def equalTo[K2: TypeInformation](keySelector: O => K2): CoGroupDataSet[T, O]
  def equalTo(fields: Int*): CoGroupDataSet[T, O]
  def equalTo(firstField: String, otherFields: String*): CoGroupDataSet[T, O]
}

CoGroup Function Application

class CoGroupDataSet[T, O] {
  // Apply function with iterators
  def apply[R: TypeInformation: ClassTag](fun: (Iterator[T], Iterator[O]) => R): DataSet[R]
  
  // Apply function with iterators returning multiple results
  def apply[R: TypeInformation: ClassTag](fun: (Iterator[T], Iterator[O]) => TraversableOnce[R]): DataSet[R]
  
  // Apply function with iterators and collector
  def apply[R: TypeInformation: ClassTag](fun: (Iterator[T], Iterator[O], Collector[R]) => Unit): DataSet[R]
  
  // Use CoGroupFunction
  def apply[R: TypeInformation: ClassTag](coGrouper: CoGroupFunction[T, O, R]): DataSet[R]
}

Cross Operations

Cross operations create Cartesian products of two DataSets.

class DataSet[T] {
  // Cross with other DataSet
  def cross[O: ClassTag](other: DataSet[O]): CrossDataSet[T, O]
  
  // Cross with size hints
  def crossWithTiny[O: ClassTag](other: DataSet[O]): CrossDataSet[T, O]
  def crossWithHuge[O: ClassTag](other: DataSet[O]): CrossDataSet[T, O]
}

class CrossDataSet[T, O] {
  // Apply cross function
  def apply[R: TypeInformation: ClassTag](fun: (T, O) => R): DataSet[R]
  
  // Use CrossFunction
  def apply[R: TypeInformation: ClassTag](crosser: CrossFunction[T, O, R]): DataSet[R]
  
  // Configuration
  def withCrossHint(crossHint: CrossHint): CrossDataSet[T, O]
}

Usage Examples

Inner Join

import org.apache.flink.api.scala._

case class Customer(id: Int, name: String, city: String)
case class Order(customerId: Int, product: String, amount: Double)
case class CustomerOrder(customerName: String, city: String, product: String, amount: Double)

val env = ExecutionEnvironment.getExecutionEnvironment

val customers = env.fromElements(
  Customer(1, "Alice", "New York"),
  Customer(2, "Bob", "London"),
  Customer(3, "Charlie", "Paris")
)

val orders = env.fromElements(
  Order(1, "laptop", 1000.0),
  Order(2, "phone", 500.0),
  Order(1, "mouse", 25.0)
)

// Join customers with orders
val customerOrders = customers
  .join(orders)
  .where(_.id)
  .equalTo(_.customerId)
  .apply { (customer, order) =>
    CustomerOrder(customer.name, customer.city, order.product, order.amount)
  }

Left Outer Join

import org.apache.flink.api.scala._

case class Employee(id: Int, name: String, deptId: Int)
case class Department(id: Int, name: String)
case class EmployeeDept(empName: String, deptName: Option[String])

val env = ExecutionEnvironment.getExecutionEnvironment

val employees = env.fromElements(
  Employee(1, "Alice", 10),
  Employee(2, "Bob", 20),
  Employee(3, "Charlie", 99) // Department doesn't exist
)

val departments = env.fromElements(
  Department(10, "Engineering"),
  Department(20, "Sales")
)

// Left outer join - all employees, with department if it exists
val employeeDepts = employees
  .leftOuterJoin(departments)
  .where(_.deptId)
  .equalTo(_.id)
  .apply { (emp, deptOpt) =>
    EmployeeDept(emp.name, Option(deptOpt).map(_.name))
  }

CoGroup Operation

import org.apache.flink.api.scala._

case class Student(id: Int, name: String)
case class Grade(studentId: Int, subject: String, score: Int)
case class StudentReport(name: String, grades: List[Grade], avgScore: Double)

val env = ExecutionEnvironment.getExecutionEnvironment

val students = env.fromElements(
  Student(1, "Alice"),
  Student(2, "Bob"),
  Student(3, "Charlie")
)

val grades = env.fromElements(
  Grade(1, "Math", 90),
  Grade(1, "Science", 85),
  Grade(2, "Math", 78),
  Grade(2, "Science", 82),
  Grade(1, "History", 88)
)

// CoGroup to create comprehensive student reports
val studentReports = students
  .coGroup(grades)
  .where(_.id)
  .equalTo(_.studentId)
  .apply { (studentIter, gradeIter) =>
    val student = studentIter.next() // Should be exactly one
    val gradeList = gradeIter.toList
    val avgScore = if (gradeList.nonEmpty) gradeList.map(_.score).sum.toDouble / gradeList.length else 0.0
    StudentReport(student.name, gradeList, avgScore)
  }

Multiple Join Conditions

import org.apache.flink.api.scala._

case class Sale(year: Int, quarter: Int, region: String, amount: Double)
case class Target(year: Int, quarter: Int, region: String, target: Double)
case class Performance(year: Int, quarter: Int, region: String, actual: Double, target: Double, achievement: Double)

val env = ExecutionEnvironment.getExecutionEnvironment

val sales = env.fromElements(
  Sale(2023, 1, "US", 1000000),
  Sale(2023, 1, "EU", 800000),
  Sale(2023, 2, "US", 1200000)
)

val targets = env.fromElements(
  Target(2023, 1, "US", 900000),
  Target(2023, 1, "EU", 750000),
  Target(2023, 2, "US", 1100000)
)

// Join on multiple fields: year, quarter, region
val performance = sales
  .join(targets)
  .where(s => (s.year, s.quarter, s.region))
  .equalTo(t => (t.year, t.quarter, t.region))
  .apply { (sale, target) =>
    Performance(
      sale.year, 
      sale.quarter, 
      sale.region, 
      sale.amount, 
      target.target,
      sale.amount / target.target
    )
  }

Cross Product

import org.apache.flink.api.scala._

case class Color(name: String, hex: String)
case class Size(name: String, dimension: String)
case class Product(color: String, size: String, colorHex: String, sizeDimension: String)

val env = ExecutionEnvironment.getExecutionEnvironment

val colors = env.fromElements(
  Color("Red", "#FF0000"),
  Color("Blue", "#0000FF"),
  Color("Green", "#00FF00")
)

val sizes = env.fromElements(
  Size("Small", "S"),
  Size("Medium", "M"),
  Size("Large", "L")
)

// Create all color-size combinations
val products = colors
  .cross(sizes)
  .apply { (color, size) =>
    Product(color.name, size.name, color.hex, size.dimension)
  }

Join with Custom Functions

import org.apache.flink.api.scala._
import org.apache.flink.api.common.functions.JoinFunction

case class User(id: Int, name: String, email: String)
case class Activity(userId: Int, activity: String, timestamp: Long)
case class UserActivity(userName: String, email: String, activities: List[String])

val env = ExecutionEnvironment.getExecutionEnvironment

val users = env.fromElements(
  User(1, "Alice", "alice@example.com"),
  User(2, "Bob", "bob@example.com")
)

val activities = env.fromElements(
  Activity(1, "login", 1000L),
  Activity(1, "view_page", 1001L),
  Activity(2, "login", 1002L)
)

// Using JoinFunction for complex join logic
class UserActivityJoinFunction extends JoinFunction[User, Activity, (Int, String, String, String)] {
  def join(user: User, activity: Activity): (Int, String, String, String) = {
    (user.id, user.name, user.email, activity.activity)
  }
}

val userActivities = users
  .join(activities)
  .where(_.id)
  .equalTo(_.userId)
  .apply(new UserActivityJoinFunction())

Broadcast Join

import org.apache.flink.api.scala._

case class Transaction(id: String, productCode: String, amount: Double)
case class ProductInfo(code: String, name: String, category: String)
case class EnrichedTransaction(id: String, productName: String, category: String, amount: Double)

val env = ExecutionEnvironment.getExecutionEnvironment

// Large transaction dataset
val transactions = env.fromElements(
  Transaction("t1", "P001", 100.0),
  Transaction("t2", "P002", 200.0),
  Transaction("t3", "P001", 150.0)
)

// Small product info dataset (suitable for broadcasting)
val productInfo = env.fromElements(
  ProductInfo("P001", "Laptop", "Electronics"),
  ProductInfo("P002", "Phone", "Electronics")
)

// Join with broadcast hint for small dataset
val enrichedTransactions = transactions
  .joinWithTiny(productInfo) // Hint that productInfo is small
  .where(_.productCode)
  .equalTo(_.code)
  .apply { (transaction, product) =>
    EnrichedTransaction(
      transaction.id,
      product.name,
      product.category,
      transaction.amount
    )
  }