Flink provides powerful operations for combining multiple streams including union, connect, join, and co-group operations for complex stream processing scenarios.
class DataStream[T] {
def union(dataStreams: DataStream[T]*): DataStream[T]
}Merge multiple streams of the same type:
import org.apache.flink.streaming.api.scala._
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Create multiple streams of same type
val stream1 = env.fromElements(1, 2, 3)
val stream2 = env.fromElements(4, 5, 6)
val stream3 = env.fromElements(7, 8, 9)
// Union streams
val unionedStream = stream1.union(stream2, stream3)
unionedStream.print("Union")
// Union with different sources
val fileStream = env.readTextFile("/path/to/file.txt")
val socketStream = env.socketTextStream("localhost", 9999)
val collectionStream = env.fromElements("static1", "static2")
val allTextStreams = fileStream.union(socketStream, collectionStream)class DataStream[T] {
def connect[T2](dataStream: DataStream[T2]): ConnectedStreams[T, T2]
}Connect two streams of different types for co-processing:
case class SensorReading(id: String, temperature: Double)
case class SensorAlert(id: String, message: String)
val env = StreamExecutionEnvironment.getExecutionEnvironment
val sensorData = env.fromElements(
SensorReading("sensor1", 20.0),
SensorReading("sensor2", 25.0)
)
val alerts = env.fromElements(
SensorAlert("sensor1", "High temperature warning")
)
// Connect streams for co-processing
val connectedStreams = sensorData.connect(alerts)class ConnectedStreams[IN1, IN2] {
def map[R: TypeInformation](fun1: IN1 => R, fun2: IN2 => R): DataStream[R]
def map[R: TypeInformation](coMapper: CoMapFunction[IN1, IN2, R]): DataStream[R]
def flatMap[R: TypeInformation](fun1: (IN1, Collector[R]) => Unit, fun2: (IN2, Collector[R]) => Unit): DataStream[R]
def flatMap[R: TypeInformation](coFlatMapper: CoFlatMapFunction[IN1, IN2, R]): DataStream[R]
def process[R: TypeInformation](coProcessFunction: CoProcessFunction[IN1, IN2, R]): DataStream[R]
}Process connected streams:
import org.apache.flink.streaming.api.functions.co.{CoMapFunction, CoProcessFunction}
import org.apache.flink.util.Collector
// Map both streams to common output type
val mappedConnected = connectedStreams.map(
reading => s"Data: ${reading.id} - ${reading.temperature}°C",
alert => s"Alert: ${alert.id} - ${alert.message}"
)
// CoMapFunction for more complex logic
class SensorCoMapper extends CoMapFunction[SensorReading, SensorAlert, String] {
override def map1(reading: SensorReading): String = {
s"Temperature reading from ${reading.id}: ${reading.temperature}°C"
}
override def map2(alert: SensorAlert): String = {
s"ALERT for ${alert.id}: ${alert.message}"
}
}
val coMappedResult = connectedStreams.map(new SensorCoMapper)
// CoProcessFunction for stateful co-processing
class StatefulCoProcessor extends CoProcessFunction[SensorReading, SensorAlert, String] {
override def processElement1(
reading: SensorReading,
ctx: CoProcessFunction[SensorReading, SensorAlert, String]#Context,
out: Collector[String]
): Unit = {
// Process sensor readings
out.collect(s"Processing reading: ${reading.id}")
}
override def processElement2(
alert: SensorAlert,
ctx: CoProcessFunction[SensorReading, SensorAlert, String]#Context,
out: Collector[String]
): Unit = {
// Process alerts
out.collect(s"Processing alert: ${alert.id}")
}
}
val processedConnected = connectedStreams.process(new StatefulCoProcessor)class ConnectedStreams[IN1, IN2] {
def keyBy[KEY: TypeInformation](fun1: IN1 => KEY, fun2: IN2 => KEY): ConnectedStreams[IN1, IN2]
def keyBy(key1: Int, key2: Int): ConnectedStreams[IN1, IN2]
def keyBy(key1: String, key2: String): ConnectedStreams[IN1, IN2]
}Key connected streams for stateful co-processing:
// Key both streams by sensor ID
val keyedConnected = connectedStreams.keyBy(
_.id, // Key function for sensor readings
_.id // Key function for alerts
)
// Now can maintain state per sensor ID across both streams
class KeyedCoProcessor extends CoProcessFunction[SensorReading, SensorAlert, String] {
// State accessible for each key
override def processElement1(reading: SensorReading, ctx: Context, out: Collector[String]): Unit = {
// Process reading with access to keyed state
out.collect(s"Keyed processing: ${reading.id}")
}
override def processElement2(alert: SensorAlert, ctx: Context, out: Collector[String]): Unit = {
// Process alert with access to keyed state
out.collect(s"Keyed alert processing: ${alert.id}")
}
}class DataStream[T] {
def join[T2](otherStream: DataStream[T2]): JoinedStreams[T, T2]
}
// JoinedStreams builder methods
class JoinedStreams[T1, T2] {
def where[KEY: TypeInformation](keySelector: T1 => KEY): Where[KEY]
class Where[KEY] {
def equalTo(keySelector: T2 => KEY): EqualTo
class EqualTo {
def window[W <: Window](assigner: WindowAssigner[_, W]): WithWindow[W]
class WithWindow[W <: Window] {
def apply[R: TypeInformation](function: JoinFunction[T1, T2, R]): DataStream[R]
def apply[R: TypeInformation](function: (T1, T2) => R): DataStream[R]
}
}
}
}Windowed inner joins between streams:
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.api.common.functions.JoinFunction
case class Order(orderId: String, customerId: String, amount: Double, timestamp: Long)
case class Customer(customerId: String, name: String, email: String)
val env = StreamExecutionEnvironment.getExecutionEnvironment
val orders = env.fromElements(
Order("order1", "customer1", 100.0, 1000),
Order("order2", "customer2", 200.0, 2000)
)
val customers = env.fromElements(
Customer("customer1", "Alice", "alice@example.com"),
Customer("customer2", "Bob", "bob@example.com")
)
// Join orders with customers in 5-minute windows
val orderCustomerJoin = orders
.join(customers)
.where(_.customerId)
.equalTo(_.customerId)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.apply((order, customer) => {
s"Order ${order.orderId} by ${customer.name} (${customer.email}): $${order.amount}"
})
// Using JoinFunction
class OrderCustomerJoinFunction extends JoinFunction[Order, Customer, String] {
override def join(order: Order, customer: Customer): String = {
s"${customer.name} ordered $${order.amount} (Order: ${order.orderId})"
}
}
val joinWithFunction = orders
.join(customers)
.where(_.customerId)
.equalTo(_.customerId)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.apply(new OrderCustomerJoinFunction)class DataStream[T] {
def coGroup[T2](otherStream: DataStream[T2]): CoGroupedStreams[T, T2]
}
// CoGroupedStreams builder methods (similar to JoinedStreams)
class CoGroupedStreams[T1, T2] {
def where[KEY: TypeInformation](keySelector: T1 => KEY): Where[KEY]
class Where[KEY] {
def equalTo(keySelector: T2 => KEY): EqualTo
class EqualTo {
def window[W <: Window](assigner: WindowAssigner[_, W]): WithWindow[W]
class WithWindow[W <: Window] {
def apply[R: TypeInformation](function: CoGroupFunction[T1, T2, R]): DataStream[R]
}
}
}
}Windowed co-grouping (outer join equivalent):
import org.apache.flink.api.common.functions.CoGroupFunction
import org.apache.flink.util.Collector
case class LeftEvent(key: String, value: Int, timestamp: Long)
case class RightEvent(key: String, data: String, timestamp: Long)
val env = StreamExecutionEnvironment.getExecutionEnvironment
val leftStream = env.fromElements(
LeftEvent("key1", 10, 1000),
LeftEvent("key2", 20, 2000)
)
val rightStream = env.fromElements(
RightEvent("key1", "data1", 1500),
RightEvent("key3", "data3", 2500)
)
// CoGroup function processes all elements with same key in window
class MyCoGroupFunction extends CoGroupFunction[LeftEvent, RightEvent, String] {
override def coGroup(
leftElements: java.lang.Iterable[LeftEvent],
rightElements: java.lang.Iterable[RightEvent],
out: Collector[String]
): Unit = {
import scala.collection.JavaConverters._
val leftList = leftElements.asScala.toList
val rightList = rightElements.asScala.toList
if (leftList.nonEmpty && rightList.nonEmpty) {
// Inner join case
for (left <- leftList; right <- rightList) {
out.collect(s"Matched: ${left.key} -> ${left.value}, ${right.data}")
}
} else if (leftList.nonEmpty) {
// Left outer join case
for (left <- leftList) {
out.collect(s"Left only: ${left.key} -> ${left.value}")
}
} else if (rightList.nonEmpty) {
// Right outer join case
for (right <- rightList) {
out.collect(s"Right only: ${right.key} -> ${right.data}")
}
}
}
}
val coGroupResult = leftStream
.coGroup(rightStream)
.where(_.key)
.equalTo(_.key)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.apply(new MyCoGroupFunction)def unionMultipleStreams[T](streams: List[DataStream[T]]): DataStream[T] = {
streams.reduce(_.union(_))
}
val env = StreamExecutionEnvironment.getExecutionEnvironment
val multipleStreams = List(
env.fromElements(1, 2, 3),
env.fromElements(4, 5, 6),
env.fromElements(7, 8, 9),
env.fromElements(10, 11, 12)
)
val allUnioned = unionMultipleStreams(multipleStreams)import org.apache.flink.streaming.api.functions.co.CoProcessFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
case class MainEvent(id: String, data: String)
case class ConfigUpdate(configKey: String, configValue: String)
class SideInputProcessor extends CoProcessFunction[MainEvent, ConfigUpdate, String] {
private var config: ValueState[String] = _
override def open(parameters: org.apache.flink.configuration.Configuration): Unit = {
config = getRuntimeContext.getState(
new ValueStateDescriptor[String]("config", classOf[String])
)
}
override def processElement1(
event: MainEvent,
ctx: Context,
out: Collector[String]
): Unit = {
val currentConfig = Option(config.value()).getOrElse("default")
out.collect(s"Processing ${event.id} with config: $currentConfig")
}
override def processElement2(
configUpdate: ConfigUpdate,
ctx: Context,
out: Collector[String]
): Unit = {
config.update(configUpdate.configValue)
out.collect(s"Config updated: ${configUpdate.configKey} = ${configUpdate.configValue}")
}
}
val mainEvents = env.fromElements(
MainEvent("event1", "data1"),
MainEvent("event2", "data2")
)
val configUpdates = env.fromElements(
ConfigUpdate("setting1", "value1"),
ConfigUpdate("setting2", "value2")
)
val sideInputResult = mainEvents
.keyBy(_.id)
.connect(configUpdates.broadcast)
.process(new SideInputProcessor)import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.functions.co.CoProcessFunction
import org.apache.flink.api.common.functions.JoinFunction
import org.apache.flink.util.Collector
case class Transaction(txnId: String, userId: String, amount: Double, timestamp: Long)
case class User(userId: String, name: String, riskScore: Int)
case class FraudAlert(userId: String, reason: String, timestamp: Long)
case class EnrichedTransaction(
txnId: String,
userId: String,
userName: String,
amount: Double,
riskScore: Int,
timestamp: Long
)
object MultiStreamProcessingPipeline {
class FraudDetectionProcessor extends CoProcessFunction[EnrichedTransaction, FraudAlert, String] {
override def processElement1(
txn: EnrichedTransaction,
ctx: Context,
out: Collector[String]
): Unit = {
if (txn.riskScore > 80 || txn.amount > 10000) {
out.collect(s"SUSPICIOUS: ${txn.userName} (${txn.userId}) - $${txn.amount} (Risk: ${txn.riskScore})")
} else {
out.collect(s"NORMAL: ${txn.userName} - $${txn.amount}")
}
}
override def processElement2(
alert: FraudAlert,
ctx: Context,
out: Collector[String]
): Unit = {
out.collect(s"FRAUD ALERT: ${alert.userId} - ${alert.reason}")
}
}
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Multiple transaction sources
val creditCardTxns = env.fromElements(
Transaction("cc1", "user1", 500.0, 1000),
Transaction("cc2", "user2", 15000.0, 2000)
)
val debitCardTxns = env.fromElements(
Transaction("db1", "user1", 200.0, 1500),
Transaction("db2", "user3", 800.0, 2500)
)
val bankTransferTxns = env.fromElements(
Transaction("bt1", "user2", 25000.0, 1800)
)
// Union all transaction streams
val allTransactions = creditCardTxns
.union(debitCardTxns, bankTransferTxns)
// User data stream
val users = env.fromElements(
User("user1", "Alice", 30),
User("user2", "Bob", 85),
User("user3", "Charlie", 20)
)
// Fraud alerts stream
val fraudAlerts = env.fromElements(
FraudAlert("user2", "Unusual spending pattern", 2100)
)
// Join transactions with user data
val enrichedTransactions = allTransactions
.join(users)
.where(_.userId)
.equalTo(_.userId)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.apply(new JoinFunction[Transaction, User, EnrichedTransaction] {
override def join(txn: Transaction, user: User): EnrichedTransaction = {
EnrichedTransaction(txn.txnId, txn.userId, user.name, txn.amount, user.riskScore, txn.timestamp)
}
})
// Connect enriched transactions with fraud alerts
val fraudDetectionResult = enrichedTransactions
.keyBy(_.userId)
.connect(fraudAlerts.keyBy(_.userId))
.process(new FraudDetectionProcessor)
// High-value transactions (separate processing)
val highValueTxns = enrichedTransactions
.filter(_.amount > 5000)
.map(txn => s"HIGH VALUE: ${txn.userName} - $${txn.amount}")
// Global statistics
val globalStats = allTransactions
.union(allTransactions) // Demonstrate union with self
.map(_ => 1)
.reduce(_ + _)
.map(count => s"Total transactions processed: $count")
// Print all results
fraudDetectionResult.print("Fraud Detection")
highValueTxns.print("High Value")
globalStats.print("Global Stats")
env.execute("Multi-Stream Processing Pipeline")
}
}