or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

async-operations.mddata-sources.mdfunction-interfaces.mdindex.mdkeyed-streams.mdoutput-operations.mdscala-extensions.mdstream-composition.mdstream-environment.mdstream-partitioning.mdstream-transformations.mdwindow-operations.mdwindowing.md
tile.json

stream-composition.mddocs/

Stream Composition and Joining

Flink provides powerful operations for combining multiple streams including union, connect, join, and co-group operations for complex stream processing scenarios.

Union Operations

Basic Union

class DataStream[T] {
  def union(dataStreams: DataStream[T]*): DataStream[T]
}

Merge multiple streams of the same type:

import org.apache.flink.streaming.api.scala._

val env = StreamExecutionEnvironment.getExecutionEnvironment

// Create multiple streams of same type
val stream1 = env.fromElements(1, 2, 3)
val stream2 = env.fromElements(4, 5, 6)
val stream3 = env.fromElements(7, 8, 9)

// Union streams
val unionedStream = stream1.union(stream2, stream3)
unionedStream.print("Union")

// Union with different sources
val fileStream = env.readTextFile("/path/to/file.txt")
val socketStream = env.socketTextStream("localhost", 9999)
val collectionStream = env.fromElements("static1", "static2")

val allTextStreams = fileStream.union(socketStream, collectionStream)

Connected Streams

Connect Operation

class DataStream[T] {
  def connect[T2](dataStream: DataStream[T2]): ConnectedStreams[T, T2]
}

Connect two streams of different types for co-processing:

case class SensorReading(id: String, temperature: Double)
case class SensorAlert(id: String, message: String)

val env = StreamExecutionEnvironment.getExecutionEnvironment

val sensorData = env.fromElements(
  SensorReading("sensor1", 20.0),
  SensorReading("sensor2", 25.0)
)

val alerts = env.fromElements(
  SensorAlert("sensor1", "High temperature warning")
)

// Connect streams for co-processing
val connectedStreams = sensorData.connect(alerts)

Connected Stream Operations

class ConnectedStreams[IN1, IN2] {
  def map[R: TypeInformation](fun1: IN1 => R, fun2: IN2 => R): DataStream[R]
  def map[R: TypeInformation](coMapper: CoMapFunction[IN1, IN2, R]): DataStream[R]
  def flatMap[R: TypeInformation](fun1: (IN1, Collector[R]) => Unit, fun2: (IN2, Collector[R]) => Unit): DataStream[R]
  def flatMap[R: TypeInformation](coFlatMapper: CoFlatMapFunction[IN1, IN2, R]): DataStream[R]
  def process[R: TypeInformation](coProcessFunction: CoProcessFunction[IN1, IN2, R]): DataStream[R]
}

Process connected streams:

import org.apache.flink.streaming.api.functions.co.{CoMapFunction, CoProcessFunction}
import org.apache.flink.util.Collector

// Map both streams to common output type
val mappedConnected = connectedStreams.map(
  reading => s"Data: ${reading.id} - ${reading.temperature}°C",
  alert => s"Alert: ${alert.id} - ${alert.message}"
)

// CoMapFunction for more complex logic
class SensorCoMapper extends CoMapFunction[SensorReading, SensorAlert, String] {
  override def map1(reading: SensorReading): String = {
    s"Temperature reading from ${reading.id}: ${reading.temperature}°C"
  }
  
  override def map2(alert: SensorAlert): String = {
    s"ALERT for ${alert.id}: ${alert.message}"
  }
}

val coMappedResult = connectedStreams.map(new SensorCoMapper)

// CoProcessFunction for stateful co-processing
class StatefulCoProcessor extends CoProcessFunction[SensorReading, SensorAlert, String] {
  override def processElement1(
    reading: SensorReading,
    ctx: CoProcessFunction[SensorReading, SensorAlert, String]#Context,
    out: Collector[String]
  ): Unit = {
    // Process sensor readings
    out.collect(s"Processing reading: ${reading.id}")
  }
  
  override def processElement2(
    alert: SensorAlert,
    ctx: CoProcessFunction[SensorReading, SensorAlert, String]#Context,
    out: Collector[String]
  ): Unit = {
    // Process alerts
    out.collect(s"Processing alert: ${alert.id}")
  }
}

val processedConnected = connectedStreams.process(new StatefulCoProcessor)

Keyed Connected Streams

class ConnectedStreams[IN1, IN2] {
  def keyBy[KEY: TypeInformation](fun1: IN1 => KEY, fun2: IN2 => KEY): ConnectedStreams[IN1, IN2]
  def keyBy(key1: Int, key2: Int): ConnectedStreams[IN1, IN2]
  def keyBy(key1: String, key2: String): ConnectedStreams[IN1, IN2]
}

Key connected streams for stateful co-processing:

// Key both streams by sensor ID
val keyedConnected = connectedStreams.keyBy(
  _.id,  // Key function for sensor readings
  _.id   // Key function for alerts
)

// Now can maintain state per sensor ID across both streams
class KeyedCoProcessor extends CoProcessFunction[SensorReading, SensorAlert, String] {
  // State accessible for each key
  override def processElement1(reading: SensorReading, ctx: Context, out: Collector[String]): Unit = {
    // Process reading with access to keyed state
    out.collect(s"Keyed processing: ${reading.id}")
  }
  
  override def processElement2(alert: SensorAlert, ctx: Context, out: Collector[String]): Unit = {
    // Process alert with access to keyed state
    out.collect(s"Keyed alert processing: ${alert.id}")
  }
}

Stream Joining

Join Builder Pattern

class DataStream[T] {
  def join[T2](otherStream: DataStream[T2]): JoinedStreams[T, T2]
}

// JoinedStreams builder methods
class JoinedStreams[T1, T2] {
  def where[KEY: TypeInformation](keySelector: T1 => KEY): Where[KEY]
  
  class Where[KEY] {
    def equalTo(keySelector: T2 => KEY): EqualTo
    
    class EqualTo {
      def window[W <: Window](assigner: WindowAssigner[_, W]): WithWindow[W]
      
      class WithWindow[W <: Window] {
        def apply[R: TypeInformation](function: JoinFunction[T1, T2, R]): DataStream[R]
        def apply[R: TypeInformation](function: (T1, T2) => R): DataStream[R]
      }
    }
  }
}

Windowed inner joins between streams:

import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.api.common.functions.JoinFunction

case class Order(orderId: String, customerId: String, amount: Double, timestamp: Long)
case class Customer(customerId: String, name: String, email: String)

val env = StreamExecutionEnvironment.getExecutionEnvironment

val orders = env.fromElements(
  Order("order1", "customer1", 100.0, 1000),
  Order("order2", "customer2", 200.0, 2000)
)

val customers = env.fromElements(
  Customer("customer1", "Alice", "alice@example.com"),
  Customer("customer2", "Bob", "bob@example.com")
)

// Join orders with customers in 5-minute windows
val orderCustomerJoin = orders
  .join(customers)
  .where(_.customerId)
  .equalTo(_.customerId)
  .window(TumblingEventTimeWindows.of(Time.minutes(5)))
  .apply((order, customer) => {
    s"Order ${order.orderId} by ${customer.name} (${customer.email}): $${order.amount}"
  })

// Using JoinFunction
class OrderCustomerJoinFunction extends JoinFunction[Order, Customer, String] {
  override def join(order: Order, customer: Customer): String = {
    s"${customer.name} ordered $${order.amount} (Order: ${order.orderId})"
  }
}

val joinWithFunction = orders
  .join(customers)
  .where(_.customerId)
  .equalTo(_.customerId)
  .window(TumblingEventTimeWindows.of(Time.minutes(5)))
  .apply(new OrderCustomerJoinFunction)

Stream Co-Grouping

CoGroup Builder Pattern

class DataStream[T] {
  def coGroup[T2](otherStream: DataStream[T2]): CoGroupedStreams[T, T2]
}

// CoGroupedStreams builder methods (similar to JoinedStreams)
class CoGroupedStreams[T1, T2] {
  def where[KEY: TypeInformation](keySelector: T1 => KEY): Where[KEY]
  
  class Where[KEY] {
    def equalTo(keySelector: T2 => KEY): EqualTo
    
    class EqualTo {
      def window[W <: Window](assigner: WindowAssigner[_, W]): WithWindow[W]
      
      class WithWindow[W <: Window] {
        def apply[R: TypeInformation](function: CoGroupFunction[T1, T2, R]): DataStream[R]
      }
    }
  }
}

Windowed co-grouping (outer join equivalent):

import org.apache.flink.api.common.functions.CoGroupFunction
import org.apache.flink.util.Collector

case class LeftEvent(key: String, value: Int, timestamp: Long)
case class RightEvent(key: String, data: String, timestamp: Long)

val env = StreamExecutionEnvironment.getExecutionEnvironment

val leftStream = env.fromElements(
  LeftEvent("key1", 10, 1000),
  LeftEvent("key2", 20, 2000)
)

val rightStream = env.fromElements(
  RightEvent("key1", "data1", 1500),
  RightEvent("key3", "data3", 2500)
)

// CoGroup function processes all elements with same key in window
class MyCoGroupFunction extends CoGroupFunction[LeftEvent, RightEvent, String] {
  override def coGroup(
    leftElements: java.lang.Iterable[LeftEvent],
    rightElements: java.lang.Iterable[RightEvent],
    out: Collector[String]
  ): Unit = {
    import scala.collection.JavaConverters._
    
    val leftList = leftElements.asScala.toList
    val rightList = rightElements.asScala.toList
    
    if (leftList.nonEmpty && rightList.nonEmpty) {
      // Inner join case
      for (left <- leftList; right <- rightList) {
        out.collect(s"Matched: ${left.key} -> ${left.value}, ${right.data}")
      }
    } else if (leftList.nonEmpty) {
      // Left outer join case
      for (left <- leftList) {
        out.collect(s"Left only: ${left.key} -> ${left.value}")
      }
    } else if (rightList.nonEmpty) {
      // Right outer join case  
      for (right <- rightList) {
        out.collect(s"Right only: ${right.key} -> ${right.data}")
      }
    }
  }
}

val coGroupResult = leftStream
  .coGroup(rightStream)
  .where(_.key)
  .equalTo(_.key)
  .window(TumblingEventTimeWindows.of(Time.minutes(5)))
  .apply(new MyCoGroupFunction)

Advanced Composition Patterns

Multi-Stream Union

def unionMultipleStreams[T](streams: List[DataStream[T]]): DataStream[T] = {
  streams.reduce(_.union(_))
}

val env = StreamExecutionEnvironment.getExecutionEnvironment

val multipleStreams = List(
  env.fromElements(1, 2, 3),
  env.fromElements(4, 5, 6),
  env.fromElements(7, 8, 9),
  env.fromElements(10, 11, 12)
)

val allUnioned = unionMultipleStreams(multipleStreams)

Side Input Pattern with Connect

import org.apache.flink.streaming.api.functions.co.CoProcessFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}

case class MainEvent(id: String, data: String)
case class ConfigUpdate(configKey: String, configValue: String)

class SideInputProcessor extends CoProcessFunction[MainEvent, ConfigUpdate, String] {
  private var config: ValueState[String] = _
  
  override def open(parameters: org.apache.flink.configuration.Configuration): Unit = {
    config = getRuntimeContext.getState(
      new ValueStateDescriptor[String]("config", classOf[String])
    )
  }
  
  override def processElement1(
    event: MainEvent,
    ctx: Context,
    out: Collector[String]
  ): Unit = {
    val currentConfig = Option(config.value()).getOrElse("default")
    out.collect(s"Processing ${event.id} with config: $currentConfig")
  }
  
  override def processElement2(
    configUpdate: ConfigUpdate,
    ctx: Context,
    out: Collector[String]
  ): Unit = {
    config.update(configUpdate.configValue)
    out.collect(s"Config updated: ${configUpdate.configKey} = ${configUpdate.configValue}")
  }
}

val mainEvents = env.fromElements(
  MainEvent("event1", "data1"),
  MainEvent("event2", "data2")
)

val configUpdates = env.fromElements(
  ConfigUpdate("setting1", "value1"),
  ConfigUpdate("setting2", "value2")
)

val sideInputResult = mainEvents
  .keyBy(_.id)
  .connect(configUpdates.broadcast)
  .process(new SideInputProcessor)

Complete Example: Multi-Stream Processing Pipeline

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.functions.co.CoProcessFunction
import org.apache.flink.api.common.functions.JoinFunction
import org.apache.flink.util.Collector

case class Transaction(txnId: String, userId: String, amount: Double, timestamp: Long)
case class User(userId: String, name: String, riskScore: Int)
case class FraudAlert(userId: String, reason: String, timestamp: Long)
case class EnrichedTransaction(
  txnId: String,
  userId: String,
  userName: String,
  amount: Double,
  riskScore: Int,
  timestamp: Long
)

object MultiStreamProcessingPipeline {
  
  class FraudDetectionProcessor extends CoProcessFunction[EnrichedTransaction, FraudAlert, String] {
    override def processElement1(
      txn: EnrichedTransaction,
      ctx: Context,
      out: Collector[String]
    ): Unit = {
      if (txn.riskScore > 80 || txn.amount > 10000) {
        out.collect(s"SUSPICIOUS: ${txn.userName} (${txn.userId}) - $${txn.amount} (Risk: ${txn.riskScore})")
      } else {
        out.collect(s"NORMAL: ${txn.userName} - $${txn.amount}")
      }
    }
    
    override def processElement2(
      alert: FraudAlert,
      ctx: Context,
      out: Collector[String]
    ): Unit = {
      out.collect(s"FRAUD ALERT: ${alert.userId} - ${alert.reason}")
    }
  }
  
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    
    // Multiple transaction sources
    val creditCardTxns = env.fromElements(
      Transaction("cc1", "user1", 500.0, 1000),
      Transaction("cc2", "user2", 15000.0, 2000)
    )
    
    val debitCardTxns = env.fromElements(
      Transaction("db1", "user1", 200.0, 1500),
      Transaction("db2", "user3", 800.0, 2500)
    )
    
    val bankTransferTxns = env.fromElements(
      Transaction("bt1", "user2", 25000.0, 1800)
    )
    
    // Union all transaction streams
    val allTransactions = creditCardTxns
      .union(debitCardTxns, bankTransferTxns)
    
    // User data stream
    val users = env.fromElements(
      User("user1", "Alice", 30),
      User("user2", "Bob", 85),
      User("user3", "Charlie", 20)
    )
    
    // Fraud alerts stream
    val fraudAlerts = env.fromElements(
      FraudAlert("user2", "Unusual spending pattern", 2100)
    )
    
    // Join transactions with user data
    val enrichedTransactions = allTransactions
      .join(users)
      .where(_.userId)
      .equalTo(_.userId)
      .window(TumblingEventTimeWindows.of(Time.minutes(5)))
      .apply(new JoinFunction[Transaction, User, EnrichedTransaction] {
        override def join(txn: Transaction, user: User): EnrichedTransaction = {
          EnrichedTransaction(txn.txnId, txn.userId, user.name, txn.amount, user.riskScore, txn.timestamp)
        }
      })
    
    // Connect enriched transactions with fraud alerts
    val fraudDetectionResult = enrichedTransactions
      .keyBy(_.userId)
      .connect(fraudAlerts.keyBy(_.userId))
      .process(new FraudDetectionProcessor)
    
    // High-value transactions (separate processing)
    val highValueTxns = enrichedTransactions
      .filter(_.amount > 5000)
      .map(txn => s"HIGH VALUE: ${txn.userName} - $${txn.amount}")
    
    // Global statistics
    val globalStats = allTransactions
      .union(allTransactions)  // Demonstrate union with self
      .map(_ => 1)
      .reduce(_ + _)
      .map(count => s"Total transactions processed: $count")
    
    // Print all results
    fraudDetectionResult.print("Fraud Detection")
    highValueTxns.print("High Value")
    globalStats.print("Global Stats")
    
    env.execute("Multi-Stream Processing Pipeline")
  }
}