Apache Flink Scala API module that provides elegant and fluent Scala language bindings for Flink's distributed stream and batch processing framework
Operations for combining multiple DataSets through joins, crosses, and coGroup operations. These operations enable complex data relationships and multi-dataset analysis.
Combine two DataSets based on matching keys with various join strategies.
class DataSet[T] {
/**
* Starts an inner join with another DataSet
* @param other DataSet to join with
* @return UnfinishedJoinOperation for key specification
*/
def join[O](other: DataSet[O]): UnfinishedJoinOperation[T, O]
/**
* Starts an inner join with optimizer hint
* @param other DataSet to join with
* @param strategy Join strategy hint (OPTIMIZER_CHOOSES, BROADCAST_HASH_FIRST, etc.)
* @return UnfinishedJoinOperation for key specification
*/
def join[O](other: DataSet[O], strategy: JoinHint): UnfinishedJoinOperation[T, O]
/**
* Joins where other DataSet is small (broadcast join)
* @param other Small DataSet to broadcast
* @return UnfinishedJoinOperation for key specification
*/
def joinWithTiny[O](other: DataSet[O]): UnfinishedJoinOperation[T, O]
/**
* Joins where other DataSet is large (this DataSet is broadcast)
* @param other Large DataSet
* @return UnfinishedJoinOperation for key specification
*/
def joinWithHuge[O](other: DataSet[O]): UnfinishedJoinOperation[T, O]
}Perform left, right, and full outer joins to include non-matching elements.
class DataSet[T] {
/**
* Starts a full outer join with another DataSet
* @param other DataSet to join with
* @return UnfinishedOuterJoinOperation for key specification
*/
def fullOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O]
/**
* Starts a full outer join with optimizer hint
* @param other DataSet to join with
* @param strategy Join strategy hint
* @return UnfinishedOuterJoinOperation for key specification
*/
def fullOuterJoin[O](other: DataSet[O], strategy: JoinHint): UnfinishedOuterJoinOperation[T, O]
/**
* Starts a left outer join (keeps all elements from this DataSet)
* @param other DataSet to join with
* @return UnfinishedOuterJoinOperation for key specification
*/
def leftOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O]
/**
* Starts a left outer join with optimizer hint
* @param other DataSet to join with
* @param strategy Join strategy hint
* @return UnfinishedOuterJoinOperation for key specification
*/
def leftOuterJoin[O](other: DataSet[O], strategy: JoinHint): UnfinishedOuterJoinOperation[T, O]
/**
* Starts a right outer join (keeps all elements from other DataSet)
* @param other DataSet to join with
* @return UnfinishedOuterJoinOperation for key specification
*/
def rightOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O]
/**
* Starts a right outer join with optimizer hint
* @param other DataSet to join with
* @param strategy Join strategy hint
* @return UnfinishedOuterJoinOperation for key specification
*/
def rightOuterJoin[O](other: DataSet[O], strategy: JoinHint): UnfinishedOuterJoinOperation[T, O]
}Specify join keys and apply join functions to create final results.
class UnfinishedJoinOperation[L, R] {
/**
* Specifies join key using field positions
* @param fields Field positions for join key
* @return KeySelector for specifying other side's key
*/
def where(fields: Int*): Keys[L]
/**
* Specifies join key using field names
* @param firstField First field name
* @param otherFields Additional field names
* @return KeySelector for specifying other side's key
*/
def where(firstField: String, otherFields: String*): Keys[L]
/**
* Specifies join key using key selector function
* @param fun Key selector function
* @return KeySelector for specifying other side's key
*/
def where[K: TypeInformation](fun: L => K): Keys[L]
}
class Keys[T] {
/**
* Specifies other side's join key using field positions
* @param fields Field positions for other side's key
* @return JoinFunctionAssigner for applying join function
*/
def equalTo(fields: Int*): JoinFunctionAssigner[L, R]
/**
* Specifies other side's join key using field names
* @param firstField First field name
* @param otherFields Additional field names
* @return JoinFunctionAssigner for applying join function
*/
def equalTo(firstField: String, otherFields: String*): JoinFunctionAssigner[L, R]
/**
* Specifies other side's join key using key selector function
* @param fun Key selector function
* @return JoinFunctionAssigner for applying join function
*/
def equalTo[K: TypeInformation](fun: R => K): JoinFunctionAssigner[L, R]
}Apply functions to joined elements to produce final results.
trait JoinFunctionAssigner[L, R] {
/**
* Applies function to each pair of joined elements
* @param fun Function combining left and right elements
* @return DataSet with join results
*/
def apply[O: TypeInformation: ClassTag](fun: (L, R) => O): DataSet[O]
/**
* Applies function with collector for multiple outputs per join
* @param fun Function with collector for emitting multiple results
* @return DataSet with collected join results
*/
def apply[O: TypeInformation: ClassTag](fun: (L, R, Collector[O]) => Unit): DataSet[O]
/**
* Applies JoinFunction to joined elements
* @param joiner JoinFunction implementation
* @return DataSet with join results
*/
def apply[O: TypeInformation: ClassTag](joiner: JoinFunction[L, R, O]): DataSet[O]
/**
* Applies FlatJoinFunction for multiple outputs per join
* @param joiner FlatJoinFunction implementation
* @return DataSet with flattened join results
*/
def apply[O: TypeInformation: ClassTag](joiner: FlatJoinFunction[L, R, O]): DataSet[O]
/**
* Uses custom partitioner for join distribution
* @param partitioner Custom partitioner
* @return JoinFunctionAssigner with custom partitioning
*/
def withPartitioner[K: TypeInformation](partitioner: Partitioner[K]): JoinFunctionAssigner[L, R]
}Usage Examples:
import org.apache.flink.api.scala._
case class Customer(id: Int, name: String, city: String)
case class Order(id: Int, customerId: Int, product: String, amount: Double)
case class CustomerOrder(customerName: String, product: String, amount: Double)
val env = ExecutionEnvironment.getExecutionEnvironment
val customers = env.fromElements(
Customer(1, "Alice", "NYC"),
Customer(2, "Bob", "LA"),
Customer(3, "Charlie", "Chicago")
)
val orders = env.fromElements(
Order(101, 1, "Laptop", 999.99),
Order(102, 2, "Phone", 599.99),
Order(103, 1, "Mouse", 29.99)
)
// Inner join customers with orders
val customerOrders = customers
.join(orders)
.where(_.id)
.equalTo(_.customerId)
.apply((customer, order) => CustomerOrder(customer.name, order.product, order.amount))
// Left outer join to include customers without orders
val allCustomerOrders = customers
.leftOuterJoin(orders)
.where(_.id)
.equalTo(_.customerId)
.apply { (customer, order) =>
if (order != null) {
CustomerOrder(customer.name, order.product, order.amount)
} else {
CustomerOrder(customer.name, "No orders", 0.0)
}
}Create Cartesian product of two DataSets for all-pairs operations.
class DataSet[T] {
/**
* Creates Cartesian product with another DataSet
* @param other DataSet to cross with
* @return CrossDataSet for applying cross function
*/
def cross[O](other: DataSet[O]): CrossDataSet[T, O]
/**
* Cross where other DataSet is small (broadcast)
* @param other Small DataSet to broadcast
* @return CrossDataSet for applying cross function
*/
def crossWithTiny[O](other: DataSet[O]): CrossDataSet[T, O]
/**
* Cross where other DataSet is large (this DataSet is broadcast)
* @param other Large DataSet
* @return CrossDataSet for applying cross function
*/
def crossWithHuge[O](other: DataSet[O]): CrossDataSet[T, O]
}Apply functions to all pairs of elements from crossed DataSets.
class CrossDataSet[L, R] {
/**
* Applies function to each pair of crossed elements
* @param fun Function combining left and right elements
* @return DataSet with cross results
*/
def apply[O: TypeInformation: ClassTag](fun: (L, R) => O): DataSet[O]
/**
* Applies CrossFunction to crossed elements
* @param crosser CrossFunction implementation
* @return DataSet with cross results
*/
def apply[O: TypeInformation: ClassTag](crosser: CrossFunction[L, R, O]): DataSet[O]
}Usage Examples:
val colors = env.fromElements("Red", "Green", "Blue")
val sizes = env.fromElements("Small", "Medium", "Large")
// Create all color-size combinations
val combinations = colors
.cross(sizes)
.apply((color, size) => s"$color $size")
// Results: ["Red Small", "Red Medium", "Red Large", "Green Small", ...]Group elements from two DataSets by key and process groups together.
class DataSet[T] {
/**
* Starts a coGroup operation with another DataSet
* @param other DataSet to coGroup with
* @return UnfinishedCoGroupOperation for key specification
*/
def coGroup[O: ClassTag](other: DataSet[O]): UnfinishedCoGroupOperation[T, O]
}class UnfinishedCoGroupOperation[L, R] {
/**
* Specifies coGroup key using field positions
* @param fields Field positions for coGroup key
* @return KeySelector for specifying other side's key
*/
def where(fields: Int*): Keys[L]
/**
* Specifies coGroup key using key selector function
* @param fun Key selector function
* @return KeySelector for specifying other side's key
*/
def where[K: TypeInformation](fun: L => K): Keys[L]
}
class CoGroupDataSet[L, R] {
/**
* Applies function to each pair of grouped iterators
* @param fun Function processing left and right group iterators
* @return DataSet with coGroup results
*/
def apply[O: TypeInformation: ClassTag](fun: (Iterator[L], Iterator[R]) => O): DataSet[O]
/**
* Applies function with collector to grouped iterators
* @param fun Function with collector for multiple outputs
* @return DataSet with collected coGroup results
*/
def apply[O: TypeInformation: ClassTag](fun: (Iterator[L], Iterator[R], Collector[O]) => Unit): DataSet[O]
/**
* Applies CoGroupFunction to grouped iterators
* @param coGrouper CoGroupFunction implementation
* @return DataSet with coGroup results
*/
def apply[O: TypeInformation: ClassTag](coGrouper: CoGroupFunction[L, R, O]): DataSet[O]
/**
* Uses custom partitioner for coGroup distribution
* @param partitioner Custom partitioner
* @return CoGroupDataSet with custom partitioning
*/
def withPartitioner[K: TypeInformation](partitioner: Partitioner[K]): CoGroupDataSet[L, R]
}Sort elements within each group before coGroup processing.
class CoGroupDataSet[L, R] {
/**
* Sorts first DataSet's groups by field position
* @param field Field position for sorting
* @param order Sort order
* @return CoGroupDataSet with sorted first groups
*/
def sortFirstGroup(field: Int, order: Order): CoGroupDataSet[L, R]
/**
* Sorts first DataSet's groups by field name
* @param field Field name for sorting
* @param order Sort order
* @return CoGroupDataSet with sorted first groups
*/
def sortFirstGroup(field: String, order: Order): CoGroupDataSet[L, R]
/**
* Sorts second DataSet's groups by field position
* @param field Field position for sorting
* @param order Sort order
* @return CoGroupDataSet with sorted second groups
*/
def sortSecondGroup(field: Int, order: Order): CoGroupDataSet[L, R]
/**
* Sorts second DataSet's groups by field name
* @param field Field name for sorting
* @param order Sort order
* @return CoGroupDataSet with sorted second groups
*/
def sortSecondGroup(field: String, order: Order): CoGroupDataSet[L, R]
}Usage Examples:
case class LeftData(key: String, value: Int)
case class RightData(key: String, description: String)
val leftData = env.fromElements(
LeftData("A", 1),
LeftData("A", 2),
LeftData("B", 3)
)
val rightData = env.fromElements(
RightData("A", "Group A data"),
RightData("C", "Group C data")
)
// CoGroup by key to process groups together
val coGroupResult = leftData
.coGroup(rightData)
.where(_.key)
.equalTo(_.key)
.apply { (leftIter, rightIter) =>
val leftList = leftIter.toList
val rightList = rightIter.toList
val key = if (leftList.nonEmpty) leftList.head.key else rightList.head.key
val leftSum = leftList.map(_.value).sum
val rightCount = rightList.length
s"Key: $key, LeftSum: $leftSum, RightCount: $rightCount"
}Combine DataSets of the same type without key matching.
class DataSet[T] {
/**
* Creates union with another DataSet of the same type
* @param other DataSet to union with
* @return DataSet containing elements from both DataSets
*/
def union(other: DataSet[T]): DataSet[T]
}
class ExecutionEnvironment {
/**
* Creates union of multiple DataSets
* @param sets Sequence of DataSets to union
* @return Unified DataSet
*/
def union[T](sets: Seq[DataSet[T]]): DataSet[T]
}Usage Examples:
val numbers1 = env.fromElements(1, 2, 3)
val numbers2 = env.fromElements(4, 5, 6)
val numbers3 = env.fromElements(7, 8, 9)
// Union two DataSets
val combined = numbers1.union(numbers2)
// Union multiple DataSets
val allNumbers = env.union(Seq(numbers1, numbers2, numbers3))abstract class JoinFunction[IN1, IN2, OUT] extends Function {
def join(first: IN1, second: IN2): OUT
}
abstract class FlatJoinFunction[IN1, IN2, OUT] extends Function {
def join(first: IN1, second: IN2, out: Collector[OUT]): Unit
}
abstract class CrossFunction[IN1, IN2, OUT] extends Function {
def cross(val1: IN1, val2: IN2): OUT
}
abstract class CoGroupFunction[IN1, IN2, OUT] extends Function {
def coGroup(first: java.lang.Iterable[IN1], second: java.lang.Iterable[IN2], out: Collector[OUT]): Unit
}
sealed trait JoinHint
object JoinHint {
case object OPTIMIZER_CHOOSES extends JoinHint
case object BROADCAST_HASH_FIRST extends JoinHint
case object BROADCAST_HASH_SECOND extends JoinHint
case object REPARTITION_HASH_FIRST extends JoinHint
case object REPARTITION_HASH_SECOND extends JoinHint
case object REPARTITION_SORT_MERGE extends JoinHint
}
sealed trait JoinType
object JoinType {
case object INNER extends JoinType
case object LEFT_OUTER extends JoinType
case object RIGHT_OUTER extends JoinType
case object FULL_OUTER extends JoinType
}
class UnfinishedJoinOperation[L, R] {
// Fluent interface for join key specification
}
class UnfinishedOuterJoinOperation[L, R] {
// Fluent interface for outer join key specification
}
class UnfinishedCoGroupOperation[L, R] {
// Fluent interface for coGroup key specification
}
trait JoinFunctionAssigner[L, R] {
// Interface for applying join functions
}
class CrossDataSet[L, R] extends DataSet[(L, R)] {
// Represents result of cross operation
}
class CoGroupDataSet[L, R] {
// Represents configured coGroup operation ready for function application
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-scala-2-11