or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

binary-operations.mddata-transformations.mdexecution-environment.mdgrouping-aggregation.mdindex.mdinput-output.mdpartitioning-distribution.mdtype-system.mdutilities.md
tile.json

binary-operations.mddocs/

Binary Operations

Operations for combining multiple DataSets through joins, crosses, and coGroup operations. These operations enable complex data relationships and multi-dataset analysis.

Capabilities

Join Operations

Combine two DataSets based on matching keys with various join strategies.

class DataSet[T] {
  /**
   * Starts an inner join with another DataSet
   * @param other DataSet to join with
   * @return UnfinishedJoinOperation for key specification
   */
  def join[O](other: DataSet[O]): UnfinishedJoinOperation[T, O]
  
  /**
   * Starts an inner join with optimizer hint
   * @param other DataSet to join with
   * @param strategy Join strategy hint (OPTIMIZER_CHOOSES, BROADCAST_HASH_FIRST, etc.)
   * @return UnfinishedJoinOperation for key specification
   */
  def join[O](other: DataSet[O], strategy: JoinHint): UnfinishedJoinOperation[T, O]
  
  /**
   * Joins where other DataSet is small (broadcast join)
   * @param other Small DataSet to broadcast
   * @return UnfinishedJoinOperation for key specification
   */
  def joinWithTiny[O](other: DataSet[O]): UnfinishedJoinOperation[T, O]
  
  /**
   * Joins where other DataSet is large (this DataSet is broadcast)
   * @param other Large DataSet
   * @return UnfinishedJoinOperation for key specification
   */
  def joinWithHuge[O](other: DataSet[O]): UnfinishedJoinOperation[T, O]
}

Outer Join Operations

Perform left, right, and full outer joins to include non-matching elements.

class DataSet[T] {
  /**
   * Starts a full outer join with another DataSet
   * @param other DataSet to join with
   * @return UnfinishedOuterJoinOperation for key specification
   */
  def fullOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O]
  
  /**
   * Starts a full outer join with optimizer hint
   * @param other DataSet to join with
   * @param strategy Join strategy hint
   * @return UnfinishedOuterJoinOperation for key specification
   */
  def fullOuterJoin[O](other: DataSet[O], strategy: JoinHint): UnfinishedOuterJoinOperation[T, O]
  
  /**
   * Starts a left outer join (keeps all elements from this DataSet)
   * @param other DataSet to join with
   * @return UnfinishedOuterJoinOperation for key specification
   */
  def leftOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O]
  
  /**
   * Starts a left outer join with optimizer hint
   * @param other DataSet to join with
   * @param strategy Join strategy hint
   * @return UnfinishedOuterJoinOperation for key specification
   */
  def leftOuterJoin[O](other: DataSet[O], strategy: JoinHint): UnfinishedOuterJoinOperation[T, O]
  
  /**
   * Starts a right outer join (keeps all elements from other DataSet)
   * @param other DataSet to join with
   * @return UnfinishedOuterJoinOperation for key specification
   */
  def rightOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O]
  
  /**
   * Starts a right outer join with optimizer hint
   * @param other DataSet to join with
   * @param strategy Join strategy hint
   * @return UnfinishedOuterJoinOperation for key specification
   */
  def rightOuterJoin[O](other: DataSet[O], strategy: JoinHint): UnfinishedOuterJoinOperation[T, O]
}

Join Key Specification

Specify join keys and apply join functions to create final results.

class UnfinishedJoinOperation[L, R] {
  /**
   * Specifies join key using field positions
   * @param fields Field positions for join key
   * @return KeySelector for specifying other side's key
   */
  def where(fields: Int*): Keys[L]
  
  /**
   * Specifies join key using field names
   * @param firstField First field name
   * @param otherFields Additional field names
   * @return KeySelector for specifying other side's key
   */
  def where(firstField: String, otherFields: String*): Keys[L]
  
  /**
   * Specifies join key using key selector function
   * @param fun Key selector function
   * @return KeySelector for specifying other side's key
   */
  def where[K: TypeInformation](fun: L => K): Keys[L]
}

class Keys[T] {
  /**
   * Specifies other side's join key using field positions
   * @param fields Field positions for other side's key
   * @return JoinFunctionAssigner for applying join function
   */
  def equalTo(fields: Int*): JoinFunctionAssigner[L, R]
  
  /**
   * Specifies other side's join key using field names
   * @param firstField First field name
   * @param otherFields Additional field names
   * @return JoinFunctionAssigner for applying join function
   */
  def equalTo(firstField: String, otherFields: String*): JoinFunctionAssigner[L, R]
  
  /**
   * Specifies other side's join key using key selector function
   * @param fun Key selector function
   * @return JoinFunctionAssigner for applying join function
   */
  def equalTo[K: TypeInformation](fun: R => K): JoinFunctionAssigner[L, R]
}

Join Function Application

Apply functions to joined elements to produce final results.

trait JoinFunctionAssigner[L, R] {
  /**
   * Applies function to each pair of joined elements
   * @param fun Function combining left and right elements
   * @return DataSet with join results
   */
  def apply[O: TypeInformation: ClassTag](fun: (L, R) => O): DataSet[O]
  
  /**
   * Applies function with collector for multiple outputs per join
   * @param fun Function with collector for emitting multiple results
   * @return DataSet with collected join results
   */
  def apply[O: TypeInformation: ClassTag](fun: (L, R, Collector[O]) => Unit): DataSet[O]
  
  /**
   * Applies JoinFunction to joined elements
   * @param joiner JoinFunction implementation
   * @return DataSet with join results
   */
  def apply[O: TypeInformation: ClassTag](joiner: JoinFunction[L, R, O]): DataSet[O]
  
  /**
   * Applies FlatJoinFunction for multiple outputs per join
   * @param joiner FlatJoinFunction implementation
   * @return DataSet with flattened join results
   */
  def apply[O: TypeInformation: ClassTag](joiner: FlatJoinFunction[L, R, O]): DataSet[O]
  
  /**
   * Uses custom partitioner for join distribution
   * @param partitioner Custom partitioner
   * @return JoinFunctionAssigner with custom partitioning
   */
  def withPartitioner[K: TypeInformation](partitioner: Partitioner[K]): JoinFunctionAssigner[L, R]
}

Usage Examples:

import org.apache.flink.api.scala._

case class Customer(id: Int, name: String, city: String)
case class Order(id: Int, customerId: Int, product: String, amount: Double)
case class CustomerOrder(customerName: String, product: String, amount: Double)

val env = ExecutionEnvironment.getExecutionEnvironment

val customers = env.fromElements(
  Customer(1, "Alice", "NYC"),
  Customer(2, "Bob", "LA"),
  Customer(3, "Charlie", "Chicago")
)

val orders = env.fromElements(
  Order(101, 1, "Laptop", 999.99),
  Order(102, 2, "Phone", 599.99),
  Order(103, 1, "Mouse", 29.99)
)

// Inner join customers with orders
val customerOrders = customers
  .join(orders)
  .where(_.id)
  .equalTo(_.customerId)
  .apply((customer, order) => CustomerOrder(customer.name, order.product, order.amount))

// Left outer join to include customers without orders
val allCustomerOrders = customers
  .leftOuterJoin(orders)
  .where(_.id)
  .equalTo(_.customerId)
  .apply { (customer, order) =>
    if (order != null) {
      CustomerOrder(customer.name, order.product, order.amount)
    } else {
      CustomerOrder(customer.name, "No orders", 0.0)
    }
  }

Cross Operations

Create Cartesian product of two DataSets for all-pairs operations.

class DataSet[T] {
  /**
   * Creates Cartesian product with another DataSet
   * @param other DataSet to cross with
   * @return CrossDataSet for applying cross function
   */
  def cross[O](other: DataSet[O]): CrossDataSet[T, O]
  
  /**
   * Cross where other DataSet is small (broadcast)
   * @param other Small DataSet to broadcast
   * @return CrossDataSet for applying cross function
   */
  def crossWithTiny[O](other: DataSet[O]): CrossDataSet[T, O]
  
  /**
   * Cross where other DataSet is large (this DataSet is broadcast)
   * @param other Large DataSet
   * @return CrossDataSet for applying cross function
   */
  def crossWithHuge[O](other: DataSet[O]): CrossDataSet[T, O]
}

Cross Function Application

Apply functions to all pairs of elements from crossed DataSets.

class CrossDataSet[L, R] {
  /**
   * Applies function to each pair of crossed elements
   * @param fun Function combining left and right elements
   * @return DataSet with cross results
   */
  def apply[O: TypeInformation: ClassTag](fun: (L, R) => O): DataSet[O]
  
  /**
   * Applies CrossFunction to crossed elements
   * @param crosser CrossFunction implementation
   * @return DataSet with cross results
   */
  def apply[O: TypeInformation: ClassTag](crosser: CrossFunction[L, R, O]): DataSet[O]
}

Usage Examples:

val colors = env.fromElements("Red", "Green", "Blue")
val sizes = env.fromElements("Small", "Medium", "Large")

// Create all color-size combinations
val combinations = colors
  .cross(sizes)
  .apply((color, size) => s"$color $size")

// Results: ["Red Small", "Red Medium", "Red Large", "Green Small", ...]

CoGroup Operations

Group elements from two DataSets by key and process groups together.

class DataSet[T] {
  /**
   * Starts a coGroup operation with another DataSet
   * @param other DataSet to coGroup with
   * @return UnfinishedCoGroupOperation for key specification
   */
  def coGroup[O: ClassTag](other: DataSet[O]): UnfinishedCoGroupOperation[T, O]
}

CoGroup Key Specification and Function Application

class UnfinishedCoGroupOperation[L, R] {
  /**
   * Specifies coGroup key using field positions
   * @param fields Field positions for coGroup key
   * @return KeySelector for specifying other side's key
   */
  def where(fields: Int*): Keys[L]
  
  /**
   * Specifies coGroup key using key selector function
   * @param fun Key selector function
   * @return KeySelector for specifying other side's key
   */
  def where[K: TypeInformation](fun: L => K): Keys[L]
}

class CoGroupDataSet[L, R] {
  /**
   * Applies function to each pair of grouped iterators
   * @param fun Function processing left and right group iterators
   * @return DataSet with coGroup results
   */
  def apply[O: TypeInformation: ClassTag](fun: (Iterator[L], Iterator[R]) => O): DataSet[O]
  
  /**
   * Applies function with collector to grouped iterators
   * @param fun Function with collector for multiple outputs
   * @return DataSet with collected coGroup results
   */
  def apply[O: TypeInformation: ClassTag](fun: (Iterator[L], Iterator[R], Collector[O]) => Unit): DataSet[O]
  
  /**
   * Applies CoGroupFunction to grouped iterators
   * @param coGrouper CoGroupFunction implementation
   * @return DataSet with coGroup results
   */
  def apply[O: TypeInformation: ClassTag](coGrouper: CoGroupFunction[L, R, O]): DataSet[O]
  
  /**
   * Uses custom partitioner for coGroup distribution
   * @param partitioner Custom partitioner
   * @return CoGroupDataSet with custom partitioning
   */
  def withPartitioner[K: TypeInformation](partitioner: Partitioner[K]): CoGroupDataSet[L, R]
}

CoGroup Sorting

Sort elements within each group before coGroup processing.

class CoGroupDataSet[L, R] {
  /**
   * Sorts first DataSet's groups by field position
   * @param field Field position for sorting
   * @param order Sort order
   * @return CoGroupDataSet with sorted first groups
   */
  def sortFirstGroup(field: Int, order: Order): CoGroupDataSet[L, R]
  
  /**
   * Sorts first DataSet's groups by field name
   * @param field Field name for sorting
   * @param order Sort order
   * @return CoGroupDataSet with sorted first groups
   */
  def sortFirstGroup(field: String, order: Order): CoGroupDataSet[L, R]
  
  /**
   * Sorts second DataSet's groups by field position
   * @param field Field position for sorting
   * @param order Sort order
   * @return CoGroupDataSet with sorted second groups
   */
  def sortSecondGroup(field: Int, order: Order): CoGroupDataSet[L, R]
  
  /**
   * Sorts second DataSet's groups by field name
   * @param field Field name for sorting
   * @param order Sort order
   * @return CoGroupDataSet with sorted second groups
   */
  def sortSecondGroup(field: String, order: Order): CoGroupDataSet[L, R]
}

Usage Examples:

case class LeftData(key: String, value: Int)
case class RightData(key: String, description: String)

val leftData = env.fromElements(
  LeftData("A", 1),
  LeftData("A", 2),
  LeftData("B", 3)
)

val rightData = env.fromElements(
  RightData("A", "Group A data"),
  RightData("C", "Group C data")
)

// CoGroup by key to process groups together
val coGroupResult = leftData
  .coGroup(rightData)
  .where(_.key)
  .equalTo(_.key)
  .apply { (leftIter, rightIter) =>
    val leftList = leftIter.toList
    val rightList = rightIter.toList
    val key = if (leftList.nonEmpty) leftList.head.key else rightList.head.key
    val leftSum = leftList.map(_.value).sum
    val rightCount = rightList.length
    s"Key: $key, LeftSum: $leftSum, RightCount: $rightCount"
  }

Union Operations

Combine DataSets of the same type without key matching.

class DataSet[T] {
  /**
   * Creates union with another DataSet of the same type
   * @param other DataSet to union with
   * @return DataSet containing elements from both DataSets
   */
  def union(other: DataSet[T]): DataSet[T]
}

class ExecutionEnvironment {
  /**
   * Creates union of multiple DataSets
   * @param sets Sequence of DataSets to union
   * @return Unified DataSet
   */
  def union[T](sets: Seq[DataSet[T]]): DataSet[T]
}

Usage Examples:

val numbers1 = env.fromElements(1, 2, 3)
val numbers2 = env.fromElements(4, 5, 6)
val numbers3 = env.fromElements(7, 8, 9)

// Union two DataSets
val combined = numbers1.union(numbers2)

// Union multiple DataSets
val allNumbers = env.union(Seq(numbers1, numbers2, numbers3))

Types

abstract class JoinFunction[IN1, IN2, OUT] extends Function {
  def join(first: IN1, second: IN2): OUT
}

abstract class FlatJoinFunction[IN1, IN2, OUT] extends Function {
  def join(first: IN1, second: IN2, out: Collector[OUT]): Unit
}

abstract class CrossFunction[IN1, IN2, OUT] extends Function {
  def cross(val1: IN1, val2: IN2): OUT
}

abstract class CoGroupFunction[IN1, IN2, OUT] extends Function {
  def coGroup(first: java.lang.Iterable[IN1], second: java.lang.Iterable[IN2], out: Collector[OUT]): Unit
}

sealed trait JoinHint
object JoinHint {
  case object OPTIMIZER_CHOOSES extends JoinHint
  case object BROADCAST_HASH_FIRST extends JoinHint
  case object BROADCAST_HASH_SECOND extends JoinHint
  case object REPARTITION_HASH_FIRST extends JoinHint
  case object REPARTITION_HASH_SECOND extends JoinHint
  case object REPARTITION_SORT_MERGE extends JoinHint
}

sealed trait JoinType
object JoinType {
  case object INNER extends JoinType
  case object LEFT_OUTER extends JoinType
  case object RIGHT_OUTER extends JoinType
  case object FULL_OUTER extends JoinType
}

class UnfinishedJoinOperation[L, R] {
  // Fluent interface for join key specification
}

class UnfinishedOuterJoinOperation[L, R] {
  // Fluent interface for outer join key specification
}

class UnfinishedCoGroupOperation[L, R] {
  // Fluent interface for coGroup key specification
}

trait JoinFunctionAssigner[L, R] {
  // Interface for applying join functions
}

class CrossDataSet[L, R] extends DataSet[(L, R)] {
  // Represents result of cross operation
}

class CoGroupDataSet[L, R] {
  // Represents configured coGroup operation ready for function application
}