Flink provides several operations for combining multiple data streams based on keys, time windows, or custom conditions. These include joins, co-groups, unions, and connected streams.
Join two streams based on key equality within a time window.
/**
* Stream join operations
*/
class JoinedStreams[T1, T2] {
def where[KEY](keySelector: T1 => KEY): Where[T1, T2, KEY]
}
class Where[T1, T2, KEY] {
def equalTo[KEY2](keySelector: T2 => KEY2): EqualTo[T1, T2, KEY]
}
class EqualTo[T1, T2, KEY] {
def window[W <: Window](assigner: WindowAssigner[TaggedUnion[T1, T2], W]): WithWindow[T1, T2, KEY, W]
}
class WithWindow[T1, T2, KEY, W <: Window] {
def apply[R](function: JoinFunction[T1, T2, R]): DataStream[R]
def apply[R](function: (T1, T2) => R): DataStream[R]
}Usage Examples:
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
case class Order(orderId: String, customerId: String, amount: Double, timestamp: Long)
case class Payment(orderId: String, paymentId: String, amount: Double, timestamp: Long)
case class OrderPayment(order: Order, payment: Payment)
val orders = env.fromElements(/* order data */)
.assignAscendingTimestamps(_.timestamp)
val payments = env.fromElements(/* payment data */)
.assignAscendingTimestamps(_.timestamp)
// Join orders and payments by orderId within 10-minute windows
val joinedStream = orders
.join(payments)
.where(_.orderId)
.equalTo(_.orderId)
.window(TumblingEventTimeWindows.of(Time.minutes(10)))
.apply((order: Order, payment: Payment) => OrderPayment(order, payment))
// Using JoinFunction interface
val joinedWithFunction = orders
.join(payments)
.where(_.orderId)
.equalTo(_.orderId)
.window(TumblingEventTimeWindows.of(Time.minutes(10)))
.apply(new JoinFunction[Order, Payment, OrderPayment] {
override def join(first: Order, second: Payment): OrderPayment = {
OrderPayment(first, second)
}
})Group elements from two streams by key within a window, similar to outer joins.
/**
* Stream co-group operations
*/
class CoGroupedStreams[T1, T2] {
def where[KEY](keySelector: T1 => KEY): Where[T1, T2, KEY]
}
class Where[T1, T2, KEY] {
def equalTo[KEY2](keySelector: T2 => KEY2): EqualTo[T1, T2, KEY]
}
class EqualTo[T1, T2, KEY] {
def window[W <: Window](assigner: WindowAssigner[TaggedUnion[T1, T2], W]): WithWindow[T1, T2, KEY, W]
}
class WithWindow[T1, T2, KEY, W <: Window] {
def apply[R](function: CoGroupFunction[T1, T2, R]): DataStream[R]
}Usage Examples:
import org.apache.flink.api.common.functions.CoGroupFunction
import org.apache.flink.util.Collector
// CoGroup orders and payments - like outer join
val coGroupedStream = orders
.coGroup(payments)
.where(_.orderId)
.equalTo(_.orderId)
.window(TumblingEventTimeWindows.of(Time.minutes(10)))
.apply(new CoGroupFunction[Order, Payment, String] {
override def coGroup(
orders: Iterable[Order],
payments: Iterable[Payment],
out: Collector[String]
): Unit = {
val orderList = orders.toList
val paymentList = payments.toList
if (orderList.nonEmpty && paymentList.nonEmpty) {
out.collect(s"Matched: ${orderList.length} orders, ${paymentList.length} payments")
} else if (orderList.nonEmpty) {
out.collect(s"Unmatched orders: ${orderList.length}")
} else if (paymentList.nonEmpty) {
out.collect(s"Unmatched payments: ${paymentList.length}")
}
}
})Join keyed streams based on time intervals between elements.
/**
* Interval join operations
*/
class IntervalJoin[T1, T2, K] {
def between(lowerBound: Time, upperBound: Time): IntervalJoin[T1, T2, K]
def upperBoundExclusive(): IntervalJoin[T1, T2, K]
def lowerBoundExclusive(): IntervalJoin[T1, T2, K]
def process[R](function: ProcessJoinFunction[T1, T2, R]): DataStream[R]
}Usage Examples:
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction
val keyedOrders = orders.keyBy(_.orderId)
val keyedPayments = payments.keyBy(_.orderId)
// Join orders with payments that arrive within 5 minutes after the order
val intervalJoinedStream = keyedOrders
.intervalJoin(keyedPayments)
.between(Time.minutes(0), Time.minutes(5))
.process(new ProcessJoinFunction[Order, Payment, String] {
override def processElement(
left: Order,
right: Payment,
ctx: ProcessJoinFunction.Context,
out: Collector[String]
): Unit = {
val timeDiff = right.timestamp - left.timestamp
out.collect(s"Order ${left.orderId} paid after ${timeDiff}ms")
}
})Connect two streams of different types for joint processing.
/**
* Connected streams operations
*/
class ConnectedStreams[T1, T2] {
def map[R](mapper1: T1 => R, mapper2: T2 => R): DataStream[R]
def map[R](function: CoMapFunction[T1, T2, R]): DataStream[R]
def flatMap[R](flatMapper1: T1 => TraversableOnce[R], flatMapper2: T2 => TraversableOnce[R]): DataStream[R]
def flatMap[R](function: CoFlatMapFunction[T1, T2, R]): DataStream[R]
def process[R](function: CoProcessFunction[T1, T2, R]): DataStream[R]
def keyBy[K1, K2](keySelector1: T1 => K1, keySelector2: T2 => K2): ConnectedStreams[T1, T2]
}Usage Examples:
import org.apache.flink.streaming.api.functions.co.{CoMapFunction, CoProcessFunction}
val controlStream = env.fromElements("START", "STOP", "RESET")
val dataStream = env.fromElements(1, 2, 3, 4, 5)
val connectedStream = controlStream.connect(dataStream)
// Simple co-map
val mappedStream = connectedStream.map(
control => s"Control: $control",
data => s"Data: $data"
)
// Using CoMapFunction
val coMappedStream = connectedStream.map(new CoMapFunction[String, Int, String] {
override def map1(control: String): String = s"Control: $control"
override def map2(data: Int): String = s"Data: $data"
})
// Using CoProcessFunction for stateful processing
val processedStream = connectedStream.process(new CoProcessFunction[String, Int, String] {
private var isRunning = false
override def processElement1(
control: String,
ctx: CoProcessFunction.Context,
out: Collector[String]
): Unit = {
control match {
case "START" => isRunning = true
case "STOP" => isRunning = false
case "RESET" => isRunning = false
}
out.collect(s"Control received: $control, running: $isRunning")
}
override def processElement2(
data: Int,
ctx: CoProcessFunction.Context,
out: Collector[String]
): Unit = {
if (isRunning) {
out.collect(s"Processing data: $data")
}
}
})Connect a non-keyed stream with a broadcast stream.
/**
* Broadcast connected streams operations
*/
class BroadcastConnectedStream[IN1, IN2] {
def process[OUT](function: BroadcastProcessFunction[IN1, IN2, OUT]): DataStream[OUT]
def process[OUT](function: KeyedBroadcastProcessFunction[K, IN1, IN2, OUT]): DataStream[OUT]
}Usage Examples:
import org.apache.flink.api.common.state.MapStateDescriptor
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction
// Define broadcast state descriptor
val configDescriptor = new MapStateDescriptor[String, String](
"config",
classOf[String],
classOf[String]
)
val configStream = env.fromElements(("threshold", "100"), ("multiplier", "2"))
val dataStream = env.fromElements(50, 75, 120, 200)
// Broadcast the config stream
val broadcastStream = configStream.broadcast(configDescriptor)
// Connect data stream with broadcast stream
val connectedBroadcastStream = dataStream.connect(broadcastStream)
val processedStream = connectedBroadcastStream.process(
new BroadcastProcessFunction[Int, (String, String), String] {
override def processElement(
value: Int,
ctx: BroadcastProcessFunction.ReadOnlyContext,
out: Collector[String]
): Unit = {
val broadcastState = ctx.getBroadcastState(configDescriptor)
val threshold = Option(broadcastState.get("threshold")).map(_.toInt).getOrElse(0)
val multiplier = Option(broadcastState.get("multiplier")).map(_.toInt).getOrElse(1)
if (value > threshold) {
out.collect(s"High value: ${value * multiplier}")
}
}
override def processBroadcastElement(
value: (String, String),
ctx: BroadcastProcessFunction.Context,
out: Collector[String]
): Unit = {
val broadcastState = ctx.getBroadcastState(configDescriptor)
broadcastState.put(value._1, value._2)
out.collect(s"Config updated: ${value._1} = ${value._2}")
}
}
)Combine multiple streams of the same type.
/**
* Union operations
*/
class DataStream[T] {
def union(otherStreams: DataStream[T]*): DataStream[T]
}Usage Examples:
val stream1 = env.fromElements(1, 2, 3)
val stream2 = env.fromElements(4, 5, 6)
val stream3 = env.fromElements(7, 8, 9)
// Union multiple streams
val unionedStream = stream1.union(stream2, stream3)
// Can also chain unions
val chainedUnion = stream1.union(stream2).union(stream3)// Join-related types
class JoinedStreams[T1, T2]
class CoGroupedStreams[T1, T2]
class IntervalJoin[T1, T2, K]
// Connected stream types
class ConnectedStreams[T1, T2]
class BroadcastConnectedStream[IN1, IN2]
// Function interfaces
trait JoinFunction[T1, T2, R] {
def join(first: T1, second: T2): R
}
trait CoGroupFunction[T1, T2, R] {
def coGroup(first: Iterable[T1], second: Iterable[T2], out: Collector[R]): Unit
}
trait ProcessJoinFunction[T1, T2, R] {
def processElement(left: T1, right: T2, ctx: ProcessJoinFunction.Context, out: Collector[R]): Unit
}
trait CoMapFunction[T1, T2, R] {
def map1(value: T1): R
def map2(value: T2): R
}
trait CoFlatMapFunction[T1, T2, R] {
def flatMap1(value: T1, out: Collector[R]): Unit
def flatMap2(value: T2, out: Collector[R]): Unit
}
trait CoProcessFunction[T1, T2, R] {
def processElement1(value: T1, ctx: CoProcessFunction.Context, out: Collector[R]): Unit
def processElement2(value: T2, ctx: CoProcessFunction.Context, out: Collector[R]): Unit
}
trait BroadcastProcessFunction[IN1, IN2, OUT] {
def processElement(value: IN1, ctx: BroadcastProcessFunction.ReadOnlyContext, out: Collector[OUT]): Unit
def processBroadcastElement(value: IN2, ctx: BroadcastProcessFunction.Context, out: Collector[OUT]): Unit
}
trait KeyedBroadcastProcessFunction[K, IN1, IN2, OUT] {
def processElement(value: IN1, ctx: KeyedBroadcastProcessFunction.ReadOnlyContext, out: Collector[OUT]): Unit
def processBroadcastElement(value: IN2, ctx: KeyedBroadcastProcessFunction.Context, out: Collector[OUT]): Unit
}
// Utility types
class TaggedUnion[T1, T2]
class Time