Operations for combining multiple DataSets through joins, crosses, and coGroup operations. These operations enable complex data relationships and multi-dataset analysis.
Combine two DataSets based on matching keys with various join strategies.
class DataSet[T] {
/**
* Starts an inner join with another DataSet
* @param other DataSet to join with
* @return UnfinishedJoinOperation for key specification
*/
def join[O](other: DataSet[O]): UnfinishedJoinOperation[T, O]
/**
* Starts an inner join with optimizer hint
* @param other DataSet to join with
* @param strategy Join strategy hint (OPTIMIZER_CHOOSES, BROADCAST_HASH_FIRST, etc.)
* @return UnfinishedJoinOperation for key specification
*/
def join[O](other: DataSet[O], strategy: JoinHint): UnfinishedJoinOperation[T, O]
/**
* Joins where other DataSet is small (broadcast join)
* @param other Small DataSet to broadcast
* @return UnfinishedJoinOperation for key specification
*/
def joinWithTiny[O](other: DataSet[O]): UnfinishedJoinOperation[T, O]
/**
* Joins where other DataSet is large (this DataSet is broadcast)
* @param other Large DataSet
* @return UnfinishedJoinOperation for key specification
*/
def joinWithHuge[O](other: DataSet[O]): UnfinishedJoinOperation[T, O]
}Perform left, right, and full outer joins to include non-matching elements.
class DataSet[T] {
/**
* Starts a full outer join with another DataSet
* @param other DataSet to join with
* @return UnfinishedOuterJoinOperation for key specification
*/
def fullOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O]
/**
* Starts a full outer join with optimizer hint
* @param other DataSet to join with
* @param strategy Join strategy hint
* @return UnfinishedOuterJoinOperation for key specification
*/
def fullOuterJoin[O](other: DataSet[O], strategy: JoinHint): UnfinishedOuterJoinOperation[T, O]
/**
* Starts a left outer join (keeps all elements from this DataSet)
* @param other DataSet to join with
* @return UnfinishedOuterJoinOperation for key specification
*/
def leftOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O]
/**
* Starts a left outer join with optimizer hint
* @param other DataSet to join with
* @param strategy Join strategy hint
* @return UnfinishedOuterJoinOperation for key specification
*/
def leftOuterJoin[O](other: DataSet[O], strategy: JoinHint): UnfinishedOuterJoinOperation[T, O]
/**
* Starts a right outer join (keeps all elements from other DataSet)
* @param other DataSet to join with
* @return UnfinishedOuterJoinOperation for key specification
*/
def rightOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O]
/**
* Starts a right outer join with optimizer hint
* @param other DataSet to join with
* @param strategy Join strategy hint
* @return UnfinishedOuterJoinOperation for key specification
*/
def rightOuterJoin[O](other: DataSet[O], strategy: JoinHint): UnfinishedOuterJoinOperation[T, O]
}Specify join keys and apply join functions to create final results.
class UnfinishedJoinOperation[L, R] {
/**
* Specifies join key using field positions
* @param fields Field positions for join key
* @return KeySelector for specifying other side's key
*/
def where(fields: Int*): Keys[L]
/**
* Specifies join key using field names
* @param firstField First field name
* @param otherFields Additional field names
* @return KeySelector for specifying other side's key
*/
def where(firstField: String, otherFields: String*): Keys[L]
/**
* Specifies join key using key selector function
* @param fun Key selector function
* @return KeySelector for specifying other side's key
*/
def where[K: TypeInformation](fun: L => K): Keys[L]
}
class Keys[T] {
/**
* Specifies other side's join key using field positions
* @param fields Field positions for other side's key
* @return JoinFunctionAssigner for applying join function
*/
def equalTo(fields: Int*): JoinFunctionAssigner[L, R]
/**
* Specifies other side's join key using field names
* @param firstField First field name
* @param otherFields Additional field names
* @return JoinFunctionAssigner for applying join function
*/
def equalTo(firstField: String, otherFields: String*): JoinFunctionAssigner[L, R]
/**
* Specifies other side's join key using key selector function
* @param fun Key selector function
* @return JoinFunctionAssigner for applying join function
*/
def equalTo[K: TypeInformation](fun: R => K): JoinFunctionAssigner[L, R]
}Apply functions to joined elements to produce final results.
trait JoinFunctionAssigner[L, R] {
/**
* Applies function to each pair of joined elements
* @param fun Function combining left and right elements
* @return DataSet with join results
*/
def apply[O: TypeInformation: ClassTag](fun: (L, R) => O): DataSet[O]
/**
* Applies function with collector for multiple outputs per join
* @param fun Function with collector for emitting multiple results
* @return DataSet with collected join results
*/
def apply[O: TypeInformation: ClassTag](fun: (L, R, Collector[O]) => Unit): DataSet[O]
/**
* Applies JoinFunction to joined elements
* @param joiner JoinFunction implementation
* @return DataSet with join results
*/
def apply[O: TypeInformation: ClassTag](joiner: JoinFunction[L, R, O]): DataSet[O]
/**
* Applies FlatJoinFunction for multiple outputs per join
* @param joiner FlatJoinFunction implementation
* @return DataSet with flattened join results
*/
def apply[O: TypeInformation: ClassTag](joiner: FlatJoinFunction[L, R, O]): DataSet[O]
/**
* Uses custom partitioner for join distribution
* @param partitioner Custom partitioner
* @return JoinFunctionAssigner with custom partitioning
*/
def withPartitioner[K: TypeInformation](partitioner: Partitioner[K]): JoinFunctionAssigner[L, R]
}Usage Examples:
import org.apache.flink.api.scala._
case class Customer(id: Int, name: String, city: String)
case class Order(id: Int, customerId: Int, product: String, amount: Double)
case class CustomerOrder(customerName: String, product: String, amount: Double)
val env = ExecutionEnvironment.getExecutionEnvironment
val customers = env.fromElements(
Customer(1, "Alice", "NYC"),
Customer(2, "Bob", "LA"),
Customer(3, "Charlie", "Chicago")
)
val orders = env.fromElements(
Order(101, 1, "Laptop", 999.99),
Order(102, 2, "Phone", 599.99),
Order(103, 1, "Mouse", 29.99)
)
// Inner join customers with orders
val customerOrders = customers
.join(orders)
.where(_.id)
.equalTo(_.customerId)
.apply((customer, order) => CustomerOrder(customer.name, order.product, order.amount))
// Left outer join to include customers without orders
val allCustomerOrders = customers
.leftOuterJoin(orders)
.where(_.id)
.equalTo(_.customerId)
.apply { (customer, order) =>
if (order != null) {
CustomerOrder(customer.name, order.product, order.amount)
} else {
CustomerOrder(customer.name, "No orders", 0.0)
}
}Create Cartesian product of two DataSets for all-pairs operations.
class DataSet[T] {
/**
* Creates Cartesian product with another DataSet
* @param other DataSet to cross with
* @return CrossDataSet for applying cross function
*/
def cross[O](other: DataSet[O]): CrossDataSet[T, O]
/**
* Cross where other DataSet is small (broadcast)
* @param other Small DataSet to broadcast
* @return CrossDataSet for applying cross function
*/
def crossWithTiny[O](other: DataSet[O]): CrossDataSet[T, O]
/**
* Cross where other DataSet is large (this DataSet is broadcast)
* @param other Large DataSet
* @return CrossDataSet for applying cross function
*/
def crossWithHuge[O](other: DataSet[O]): CrossDataSet[T, O]
}Apply functions to all pairs of elements from crossed DataSets.
class CrossDataSet[L, R] {
/**
* Applies function to each pair of crossed elements
* @param fun Function combining left and right elements
* @return DataSet with cross results
*/
def apply[O: TypeInformation: ClassTag](fun: (L, R) => O): DataSet[O]
/**
* Applies CrossFunction to crossed elements
* @param crosser CrossFunction implementation
* @return DataSet with cross results
*/
def apply[O: TypeInformation: ClassTag](crosser: CrossFunction[L, R, O]): DataSet[O]
}Usage Examples:
val colors = env.fromElements("Red", "Green", "Blue")
val sizes = env.fromElements("Small", "Medium", "Large")
// Create all color-size combinations
val combinations = colors
.cross(sizes)
.apply((color, size) => s"$color $size")
// Results: ["Red Small", "Red Medium", "Red Large", "Green Small", ...]Group elements from two DataSets by key and process groups together.
class DataSet[T] {
/**
* Starts a coGroup operation with another DataSet
* @param other DataSet to coGroup with
* @return UnfinishedCoGroupOperation for key specification
*/
def coGroup[O: ClassTag](other: DataSet[O]): UnfinishedCoGroupOperation[T, O]
}class UnfinishedCoGroupOperation[L, R] {
/**
* Specifies coGroup key using field positions
* @param fields Field positions for coGroup key
* @return KeySelector for specifying other side's key
*/
def where(fields: Int*): Keys[L]
/**
* Specifies coGroup key using key selector function
* @param fun Key selector function
* @return KeySelector for specifying other side's key
*/
def where[K: TypeInformation](fun: L => K): Keys[L]
}
class CoGroupDataSet[L, R] {
/**
* Applies function to each pair of grouped iterators
* @param fun Function processing left and right group iterators
* @return DataSet with coGroup results
*/
def apply[O: TypeInformation: ClassTag](fun: (Iterator[L], Iterator[R]) => O): DataSet[O]
/**
* Applies function with collector to grouped iterators
* @param fun Function with collector for multiple outputs
* @return DataSet with collected coGroup results
*/
def apply[O: TypeInformation: ClassTag](fun: (Iterator[L], Iterator[R], Collector[O]) => Unit): DataSet[O]
/**
* Applies CoGroupFunction to grouped iterators
* @param coGrouper CoGroupFunction implementation
* @return DataSet with coGroup results
*/
def apply[O: TypeInformation: ClassTag](coGrouper: CoGroupFunction[L, R, O]): DataSet[O]
/**
* Uses custom partitioner for coGroup distribution
* @param partitioner Custom partitioner
* @return CoGroupDataSet with custom partitioning
*/
def withPartitioner[K: TypeInformation](partitioner: Partitioner[K]): CoGroupDataSet[L, R]
}Sort elements within each group before coGroup processing.
class CoGroupDataSet[L, R] {
/**
* Sorts first DataSet's groups by field position
* @param field Field position for sorting
* @param order Sort order
* @return CoGroupDataSet with sorted first groups
*/
def sortFirstGroup(field: Int, order: Order): CoGroupDataSet[L, R]
/**
* Sorts first DataSet's groups by field name
* @param field Field name for sorting
* @param order Sort order
* @return CoGroupDataSet with sorted first groups
*/
def sortFirstGroup(field: String, order: Order): CoGroupDataSet[L, R]
/**
* Sorts second DataSet's groups by field position
* @param field Field position for sorting
* @param order Sort order
* @return CoGroupDataSet with sorted second groups
*/
def sortSecondGroup(field: Int, order: Order): CoGroupDataSet[L, R]
/**
* Sorts second DataSet's groups by field name
* @param field Field name for sorting
* @param order Sort order
* @return CoGroupDataSet with sorted second groups
*/
def sortSecondGroup(field: String, order: Order): CoGroupDataSet[L, R]
}Usage Examples:
case class LeftData(key: String, value: Int)
case class RightData(key: String, description: String)
val leftData = env.fromElements(
LeftData("A", 1),
LeftData("A", 2),
LeftData("B", 3)
)
val rightData = env.fromElements(
RightData("A", "Group A data"),
RightData("C", "Group C data")
)
// CoGroup by key to process groups together
val coGroupResult = leftData
.coGroup(rightData)
.where(_.key)
.equalTo(_.key)
.apply { (leftIter, rightIter) =>
val leftList = leftIter.toList
val rightList = rightIter.toList
val key = if (leftList.nonEmpty) leftList.head.key else rightList.head.key
val leftSum = leftList.map(_.value).sum
val rightCount = rightList.length
s"Key: $key, LeftSum: $leftSum, RightCount: $rightCount"
}Combine DataSets of the same type without key matching.
class DataSet[T] {
/**
* Creates union with another DataSet of the same type
* @param other DataSet to union with
* @return DataSet containing elements from both DataSets
*/
def union(other: DataSet[T]): DataSet[T]
}
class ExecutionEnvironment {
/**
* Creates union of multiple DataSets
* @param sets Sequence of DataSets to union
* @return Unified DataSet
*/
def union[T](sets: Seq[DataSet[T]]): DataSet[T]
}Usage Examples:
val numbers1 = env.fromElements(1, 2, 3)
val numbers2 = env.fromElements(4, 5, 6)
val numbers3 = env.fromElements(7, 8, 9)
// Union two DataSets
val combined = numbers1.union(numbers2)
// Union multiple DataSets
val allNumbers = env.union(Seq(numbers1, numbers2, numbers3))abstract class JoinFunction[IN1, IN2, OUT] extends Function {
def join(first: IN1, second: IN2): OUT
}
abstract class FlatJoinFunction[IN1, IN2, OUT] extends Function {
def join(first: IN1, second: IN2, out: Collector[OUT]): Unit
}
abstract class CrossFunction[IN1, IN2, OUT] extends Function {
def cross(val1: IN1, val2: IN2): OUT
}
abstract class CoGroupFunction[IN1, IN2, OUT] extends Function {
def coGroup(first: java.lang.Iterable[IN1], second: java.lang.Iterable[IN2], out: Collector[OUT]): Unit
}
sealed trait JoinHint
object JoinHint {
case object OPTIMIZER_CHOOSES extends JoinHint
case object BROADCAST_HASH_FIRST extends JoinHint
case object BROADCAST_HASH_SECOND extends JoinHint
case object REPARTITION_HASH_FIRST extends JoinHint
case object REPARTITION_HASH_SECOND extends JoinHint
case object REPARTITION_SORT_MERGE extends JoinHint
}
sealed trait JoinType
object JoinType {
case object INNER extends JoinType
case object LEFT_OUTER extends JoinType
case object RIGHT_OUTER extends JoinType
case object FULL_OUTER extends JoinType
}
class UnfinishedJoinOperation[L, R] {
// Fluent interface for join key specification
}
class UnfinishedOuterJoinOperation[L, R] {
// Fluent interface for outer join key specification
}
class UnfinishedCoGroupOperation[L, R] {
// Fluent interface for coGroup key specification
}
trait JoinFunctionAssigner[L, R] {
// Interface for applying join functions
}
class CrossDataSet[L, R] extends DataSet[(L, R)] {
// Represents result of cross operation
}
class CoGroupDataSet[L, R] {
// Represents configured coGroup operation ready for function application
}