or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

async-operations.mddata-stream.mdfunctions.mdindex.mdjoining.mdkeyed-stream.mdstream-execution-environment.mdwindowing.md
tile.json

joining.mddocs/

Joining

Flink provides several operations for combining multiple data streams based on keys, time windows, or custom conditions. These include joins, co-groups, unions, and connected streams.

Capabilities

Stream Joins

Join two streams based on key equality within a time window.

/**
 * Stream join operations
 */
class JoinedStreams[T1, T2] {
  def where[KEY](keySelector: T1 => KEY): Where[T1, T2, KEY]
}

class Where[T1, T2, KEY] {
  def equalTo[KEY2](keySelector: T2 => KEY2): EqualTo[T1, T2, KEY]
}

class EqualTo[T1, T2, KEY] {
  def window[W <: Window](assigner: WindowAssigner[TaggedUnion[T1, T2], W]): WithWindow[T1, T2, KEY, W]
}

class WithWindow[T1, T2, KEY, W <: Window] {
  def apply[R](function: JoinFunction[T1, T2, R]): DataStream[R]
  def apply[R](function: (T1, T2) => R): DataStream[R]
}

Usage Examples:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows

case class Order(orderId: String, customerId: String, amount: Double, timestamp: Long)
case class Payment(orderId: String, paymentId: String, amount: Double, timestamp: Long)
case class OrderPayment(order: Order, payment: Payment)

val orders = env.fromElements(/* order data */)
  .assignAscendingTimestamps(_.timestamp)

val payments = env.fromElements(/* payment data */)
  .assignAscendingTimestamps(_.timestamp)

// Join orders and payments by orderId within 10-minute windows
val joinedStream = orders
  .join(payments)
  .where(_.orderId)
  .equalTo(_.orderId) 
  .window(TumblingEventTimeWindows.of(Time.minutes(10)))
  .apply((order: Order, payment: Payment) => OrderPayment(order, payment))

// Using JoinFunction interface
val joinedWithFunction = orders
  .join(payments)
  .where(_.orderId)
  .equalTo(_.orderId)
  .window(TumblingEventTimeWindows.of(Time.minutes(10)))
  .apply(new JoinFunction[Order, Payment, OrderPayment] {
    override def join(first: Order, second: Payment): OrderPayment = {
      OrderPayment(first, second)
    }
  })

Stream CoGroups

Group elements from two streams by key within a window, similar to outer joins.

/**
 * Stream co-group operations
 */
class CoGroupedStreams[T1, T2] {
  def where[KEY](keySelector: T1 => KEY): Where[T1, T2, KEY]
}

class Where[T1, T2, KEY] {
  def equalTo[KEY2](keySelector: T2 => KEY2): EqualTo[T1, T2, KEY]
}

class EqualTo[T1, T2, KEY] {
  def window[W <: Window](assigner: WindowAssigner[TaggedUnion[T1, T2], W]): WithWindow[T1, T2, KEY, W]
}

class WithWindow[T1, T2, KEY, W <: Window] {
  def apply[R](function: CoGroupFunction[T1, T2, R]): DataStream[R]
}

Usage Examples:

import org.apache.flink.api.common.functions.CoGroupFunction
import org.apache.flink.util.Collector

// CoGroup orders and payments - like outer join
val coGroupedStream = orders
  .coGroup(payments)
  .where(_.orderId)
  .equalTo(_.orderId)
  .window(TumblingEventTimeWindows.of(Time.minutes(10)))
  .apply(new CoGroupFunction[Order, Payment, String] {
    override def coGroup(
      orders: Iterable[Order],
      payments: Iterable[Payment],
      out: Collector[String]
    ): Unit = {
      val orderList = orders.toList
      val paymentList = payments.toList
      
      if (orderList.nonEmpty && paymentList.nonEmpty) {
        out.collect(s"Matched: ${orderList.length} orders, ${paymentList.length} payments")
      } else if (orderList.nonEmpty) {
        out.collect(s"Unmatched orders: ${orderList.length}")
      } else if (paymentList.nonEmpty) {
        out.collect(s"Unmatched payments: ${paymentList.length}")
      }
    }
  })

Interval Joins

Join keyed streams based on time intervals between elements.

/**
 * Interval join operations
 */
class IntervalJoin[T1, T2, K] {
  def between(lowerBound: Time, upperBound: Time): IntervalJoin[T1, T2, K]
  def upperBoundExclusive(): IntervalJoin[T1, T2, K]
  def lowerBoundExclusive(): IntervalJoin[T1, T2, K]
  def process[R](function: ProcessJoinFunction[T1, T2, R]): DataStream[R]
}

Usage Examples:

import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction

val keyedOrders = orders.keyBy(_.orderId)
val keyedPayments = payments.keyBy(_.orderId)

// Join orders with payments that arrive within 5 minutes after the order
val intervalJoinedStream = keyedOrders
  .intervalJoin(keyedPayments)
  .between(Time.minutes(0), Time.minutes(5))
  .process(new ProcessJoinFunction[Order, Payment, String] {
    override def processElement(
      left: Order,
      right: Payment,
      ctx: ProcessJoinFunction.Context,
      out: Collector[String]
    ): Unit = {
      val timeDiff = right.timestamp - left.timestamp
      out.collect(s"Order ${left.orderId} paid after ${timeDiff}ms")
    }
  })

Connected Streams

Connect two streams of different types for joint processing.

/**
 * Connected streams operations
 */
class ConnectedStreams[T1, T2] {
  def map[R](mapper1: T1 => R, mapper2: T2 => R): DataStream[R]
  def map[R](function: CoMapFunction[T1, T2, R]): DataStream[R]
  def flatMap[R](flatMapper1: T1 => TraversableOnce[R], flatMapper2: T2 => TraversableOnce[R]): DataStream[R]
  def flatMap[R](function: CoFlatMapFunction[T1, T2, R]): DataStream[R]
  def process[R](function: CoProcessFunction[T1, T2, R]): DataStream[R]
  def keyBy[K1, K2](keySelector1: T1 => K1, keySelector2: T2 => K2): ConnectedStreams[T1, T2]
}

Usage Examples:

import org.apache.flink.streaming.api.functions.co.{CoMapFunction, CoProcessFunction}

val controlStream = env.fromElements("START", "STOP", "RESET")
val dataStream = env.fromElements(1, 2, 3, 4, 5)

val connectedStream = controlStream.connect(dataStream)

// Simple co-map
val mappedStream = connectedStream.map(
  control => s"Control: $control",
  data => s"Data: $data"
)

// Using CoMapFunction
val coMappedStream = connectedStream.map(new CoMapFunction[String, Int, String] {
  override def map1(control: String): String = s"Control: $control"
  override def map2(data: Int): String = s"Data: $data"
})

// Using CoProcessFunction for stateful processing
val processedStream = connectedStream.process(new CoProcessFunction[String, Int, String] {
  private var isRunning = false
  
  override def processElement1(
    control: String,
    ctx: CoProcessFunction.Context,
    out: Collector[String]
  ): Unit = {
    control match {
      case "START" => isRunning = true
      case "STOP" => isRunning = false
      case "RESET" => isRunning = false
    }
    out.collect(s"Control received: $control, running: $isRunning")
  }
  
  override def processElement2(
    data: Int,
    ctx: CoProcessFunction.Context, 
    out: Collector[String]
  ): Unit = {
    if (isRunning) {
      out.collect(s"Processing data: $data")
    }
  }
})

Broadcast Connected Streams

Connect a non-keyed stream with a broadcast stream.

/**
 * Broadcast connected streams operations  
 */
class BroadcastConnectedStream[IN1, IN2] {
  def process[OUT](function: BroadcastProcessFunction[IN1, IN2, OUT]): DataStream[OUT]
  def process[OUT](function: KeyedBroadcastProcessFunction[K, IN1, IN2, OUT]): DataStream[OUT]
}

Usage Examples:

import org.apache.flink.api.common.state.MapStateDescriptor
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction

// Define broadcast state descriptor
val configDescriptor = new MapStateDescriptor[String, String](
  "config",
  classOf[String],
  classOf[String]
)

val configStream = env.fromElements(("threshold", "100"), ("multiplier", "2"))
val dataStream = env.fromElements(50, 75, 120, 200)

// Broadcast the config stream
val broadcastStream = configStream.broadcast(configDescriptor)

// Connect data stream with broadcast stream
val connectedBroadcastStream = dataStream.connect(broadcastStream)

val processedStream = connectedBroadcastStream.process(
  new BroadcastProcessFunction[Int, (String, String), String] {
    
    override def processElement(
      value: Int,
      ctx: BroadcastProcessFunction.ReadOnlyContext,
      out: Collector[String]
    ): Unit = {
      val broadcastState = ctx.getBroadcastState(configDescriptor)
      val threshold = Option(broadcastState.get("threshold")).map(_.toInt).getOrElse(0)
      val multiplier = Option(broadcastState.get("multiplier")).map(_.toInt).getOrElse(1)
      
      if (value > threshold) {
        out.collect(s"High value: ${value * multiplier}")
      }
    }
    
    override def processBroadcastElement(
      value: (String, String),
      ctx: BroadcastProcessFunction.Context,
      out: Collector[String]
    ): Unit = {
      val broadcastState = ctx.getBroadcastState(configDescriptor)
      broadcastState.put(value._1, value._2)
      out.collect(s"Config updated: ${value._1} = ${value._2}")
    }
  }
)

Union Operations

Combine multiple streams of the same type.

/**
 * Union operations
 */
class DataStream[T] {
  def union(otherStreams: DataStream[T]*): DataStream[T]
}

Usage Examples:

val stream1 = env.fromElements(1, 2, 3)
val stream2 = env.fromElements(4, 5, 6) 
val stream3 = env.fromElements(7, 8, 9)

// Union multiple streams
val unionedStream = stream1.union(stream2, stream3)

// Can also chain unions
val chainedUnion = stream1.union(stream2).union(stream3)

Types

// Join-related types
class JoinedStreams[T1, T2]
class CoGroupedStreams[T1, T2]
class IntervalJoin[T1, T2, K]

// Connected stream types
class ConnectedStreams[T1, T2]
class BroadcastConnectedStream[IN1, IN2]

// Function interfaces
trait JoinFunction[T1, T2, R] {
  def join(first: T1, second: T2): R
}

trait CoGroupFunction[T1, T2, R] {
  def coGroup(first: Iterable[T1], second: Iterable[T2], out: Collector[R]): Unit
}

trait ProcessJoinFunction[T1, T2, R] {
  def processElement(left: T1, right: T2, ctx: ProcessJoinFunction.Context, out: Collector[R]): Unit
}

trait CoMapFunction[T1, T2, R] {
  def map1(value: T1): R
  def map2(value: T2): R
}

trait CoFlatMapFunction[T1, T2, R] {
  def flatMap1(value: T1, out: Collector[R]): Unit
  def flatMap2(value: T2, out: Collector[R]): Unit
}

trait CoProcessFunction[T1, T2, R] {
  def processElement1(value: T1, ctx: CoProcessFunction.Context, out: Collector[R]): Unit
  def processElement2(value: T2, ctx: CoProcessFunction.Context, out: Collector[R]): Unit
}

trait BroadcastProcessFunction[IN1, IN2, OUT] {
  def processElement(value: IN1, ctx: BroadcastProcessFunction.ReadOnlyContext, out: Collector[OUT]): Unit
  def processBroadcastElement(value: IN2, ctx: BroadcastProcessFunction.Context, out: Collector[OUT]): Unit
}

trait KeyedBroadcastProcessFunction[K, IN1, IN2, OUT] {
  def processElement(value: IN1, ctx: KeyedBroadcastProcessFunction.ReadOnlyContext, out: Collector[OUT]): Unit
  def processBroadcastElement(value: IN2, ctx: KeyedBroadcastProcessFunction.Context, out: Collector[OUT]): Unit
}

// Utility types
class TaggedUnion[T1, T2]
class Time