Scala API for Apache Flink streaming applications providing idiomatic Scala bindings for building high-throughput, low-latency stream processing applications.
—
Multi-stream operations enable complex processing patterns by combining multiple streams through unions, connections, joins, and co-processing. This is essential for correlating data from different sources and implementing complex event processing logic.
Combine multiple streams of the same type into a single stream.
class DataStream[T] {
/**
* Union this stream with other streams of the same type
* @param dataStreams Other streams to union with
* @return DataStream containing elements from all input streams
*/
def union(dataStreams: DataStream[T]*): DataStream[T]
}Usage Examples:
import org.apache.flink.streaming.api.scala._
case class Event(id: String, value: Int, source: String, timestamp: Long)
val stream1 = env.fromElements(
Event("e1", 10, "source1", 1000L),
Event("e2", 20, "source1", 2000L)
)
val stream2 = env.fromElements(
Event("e3", 15, "source2", 1500L),
Event("e4", 25, "source2", 2500L)
)
val stream3 = env.fromElements(
Event("e5", 30, "source3", 3000L)
)
// Union multiple streams
val unionedStream = stream1.union(stream2, stream3)
// All events from all streams will be in the result
unionedStream.print()Connect two streams of different types for co-processing.
class DataStream[T] {
/**
* Connect this stream with another stream of different type
* @param dataStream Stream to connect with
* @return ConnectedStreams for co-processing
*/
def connect[T2](dataStream: DataStream[T2]): ConnectedStreams[T, T2]
}
class ConnectedStreams[IN1, IN2] {
/**
* Apply different map functions to each connected stream
* @param fun1 Map function for first stream
* @param fun2 Map function for second stream
* @return DataStream with mapped results
*/
def map[R: TypeInformation](fun1: IN1 => R, fun2: IN2 => R): DataStream[R]
/**
* Apply CoMapFunction to connected streams
* @param coMapper CoMapFunction implementation
* @return DataStream with mapped results
*/
def map[R: TypeInformation](coMapper: CoMapFunction[IN1, IN2, R]): DataStream[R]
/**
* Apply different flatMap functions to each connected stream
* @param fun1 FlatMap function for first stream
* @param fun2 FlatMap function for second stream
* @return DataStream with flatMapped results
*/
def flatMap[R: TypeInformation](
fun1: IN1 => TraversableOnce[R],
fun2: IN2 => TraversableOnce[R]
): DataStream[R]
/**
* Apply CoFlatMapFunction to connected streams
* @param coFlatMapper CoFlatMapFunction implementation
* @return DataStream with flatMapped results
*/
def flatMap[R: TypeInformation](coFlatMapper: CoFlatMapFunction[IN1, IN2, R]): DataStream[R]
/**
* Apply CoProcessFunction for advanced co-processing
* @param coProcessFunction CoProcessFunction implementation
* @return DataStream with processed results
*/
def process[R: TypeInformation](coProcessFunction: CoProcessFunction[IN1, IN2, R]): DataStream[R]
}Usage Examples:
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.functions.co.CoMapFunction
case class Order(id: String, customerId: String, amount: Double, timestamp: Long)
case class Customer(id: String, name: String, tier: String)
case class EnrichedOrder(orderId: String, customerId: String, customerName: String, amount: Double, tier: String)
val orders = env.fromElements(
Order("o1", "c1", 100.0, 1000L),
Order("o2", "c2", 200.0, 2000L)
)
val customers = env.fromElements(
Customer("c1", "Alice", "Gold"),
Customer("c2", "Bob", "Silver")
)
// Connect streams for co-processing
val connected = orders.connect(customers)
// Using function syntax
val enriched = connected.map(
(order: Order) => s"Order: ${order.id} - ${order.amount}",
(customer: Customer) => s"Customer: ${customer.name} - ${customer.tier}"
)
// Using CoMapFunction
class OrderCustomerCoMapper extends CoMapFunction[Order, Customer, String] {
override def map1(order: Order): String = s"Processing order ${order.id} for customer ${order.customerId}"
override def map2(customer: Customer): String = s"Customer ${customer.name} registered with tier ${customer.tier}"
}
val processedWithCoMapper = connected.map(new OrderCustomerCoMapper)Connect keyed streams for stateful co-processing.
class ConnectedStreams[IN1, IN2] {
/**
* Key both connected streams by field positions
* @param keyPosition1 Key position for first stream
* @param keyPosition2 Key position for second stream
* @return Connected streams with keying
*/
def keyBy(keyPosition1: Int, keyPosition2: Int): ConnectedStreams[IN1, IN2]
/**
* Key both connected streams by field names
* @param field1 Key field for first stream
* @param field2 Key field for second stream
* @return Connected streams with keying
*/
def keyBy(field1: String, field2: String): ConnectedStreams[IN1, IN2]
/**
* Key both connected streams by key selector functions
* @param fun1 Key selector for first stream
* @param fun2 Key selector for second stream
* @return Connected streams with keying
*/
def keyBy[KEY: TypeInformation](fun1: IN1 => KEY, fun2: IN2 => KEY): ConnectedStreams[IN1, IN2]
}Join two streams based on keys and time windows.
class DataStream[T] {
/**
* Join with another stream
* @param otherStream Stream to join with
* @return JoinedStreams for configuring join
*/
def join[T2](otherStream: DataStream[T2]): JoinedStreams[T, T2]
}
// JoinedStreams configuration
class JoinedStreams[T1, T2] {
/**
* Specify the key for the first stream
* @param keySelector Key selector function
* @return Where clause for join configuration
*/
def where[KEY: TypeInformation](keySelector: T1 => KEY): JoinedStreams[T1, T2]#Where[KEY]
class Where[KEY] {
/**
* Specify the key for the second stream
* @param keySelector Key selector function for second stream
* @return EqualTo clause for join configuration
*/
def equalTo(keySelector: T2 => KEY): JoinedStreams[T1, T2]#Where[KEY]#EqualTo
class EqualTo {
/**
* Specify the window for the join
* @param assigner Window assigner for join
* @return WithWindow for join processing
*/
def window[W <: Window](assigner: WindowAssigner[_ >: CoGroupedStreams.TaggedUnion[T1, T2], W]): JoinedStreams[T1, T2]#Where[KEY]#EqualTo#WithWindow[W]
class WithWindow[W <: Window] {
/**
* Apply a join function to matched elements
* @param function Join function to combine matched elements
* @return DataStream with join results
*/
def apply[R: TypeInformation](function: JoinFunction[T1, T2, R]): DataStream[R]
/**
* Apply a join function using closure syntax
* @param fun Function to combine matched elements
* @return DataStream with join results
*/
def apply[R: TypeInformation](fun: (T1, T2) => R): DataStream[R]
/**
* Apply a ProcessJoinFunction for advanced join processing
* @param function ProcessJoinFunction implementation
* @return DataStream with join results
*/
def process[R: TypeInformation](function: ProcessJoinFunction[T1, T2, R]): DataStream[R]
}
}
}
}Usage Examples:
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.api.common.functions.JoinFunction
case class Click(userId: String, page: String, timestamp: Long)
case class Purchase(userId: String, product: String, amount: Double, timestamp: Long)
case class UserActivity(userId: String, page: String, product: String, amount: Double)
val clicks = env.fromElements(
Click("user1", "homepage", 1000L),
Click("user2", "products", 2000L)
).assignAscendingTimestamps(_.timestamp)
val purchases = env.fromElements(
Purchase("user1", "laptop", 999.99, 1500L),
Purchase("user2", "book", 29.99, 2200L)
).assignAscendingTimestamps(_.timestamp)
// Join clicks and purchases within 5-minute windows
val joinedActivity = clicks
.join(purchases)
.where(_.userId)
.equalTo(_.userId)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.apply { (click: Click, purchase: Purchase) =>
UserActivity(click.userId, click.page, purchase.product, purchase.amount)
}Group elements from two streams by key within windows.
class DataStream[T] {
/**
* CoGroup with another stream
* @param otherStream Stream to coGroup with
* @return CoGroupedStreams for configuring coGroup
*/
def coGroup[T2](otherStream: DataStream[T2]): CoGroupedStreams[T, T2]
}
class CoGroupedStreams[T1, T2] {
/**
* Specify the key for the first stream
* @param keySelector Key selector function
* @return Where clause for coGroup configuration
*/
def where[KEY: TypeInformation](keySelector: T1 => KEY): CoGroupedStreams[T1, T2]#Where[KEY]
class Where[KEY] {
/**
* Specify the key for the second stream
* @param keySelector Key selector function for second stream
* @return EqualTo clause for coGroup configuration
*/
def equalTo(keySelector: T2 => KEY): CoGroupedStreams[T1, T2]#Where[KEY]#EqualTo
class EqualTo {
/**
* Specify the window for the coGroup
* @param assigner Window assigner for coGroup
* @return WithWindow for coGroup processing
*/
def window[W <: Window](assigner: WindowAssigner[_ >: CoGroupedStreams.TaggedUnion[T1, T2], W]): CoGroupedStreams[T1, T2]#Where[KEY]#EqualTo#WithWindow[W]
class WithWindow[W <: Window] {
/**
* Apply a coGroup function to grouped elements
* @param function CoGroupFunction to process grouped elements
* @return DataStream with coGroup results
*/
def apply[R: TypeInformation](function: CoGroupFunction[T1, T2, R]): DataStream[R]
/**
* Apply a coGroup function using closure syntax
* @param fun Function to process grouped elements
* @return DataStream with coGroup results
*/
def apply[R: TypeInformation](fun: (Iterable[T1], Iterable[T2]) => TraversableOnce[R]): DataStream[R]
}
}
}
}Usage Examples:
import org.apache.flink.api.common.functions.CoGroupFunction
import org.apache.flink.util.Collector
// CoGroup to find users who clicked but didn't purchase
val clicksWithoutPurchases = clicks
.coGroup(purchases)
.where(_.userId)
.equalTo(_.userId)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.apply(new CoGroupFunction[Click, Purchase, String] {
override def coGroup(
clicks: java.lang.Iterable[Click],
purchases: java.lang.Iterable[Purchase],
out: Collector[String]
): Unit = {
val clickList = clicks.asScala.toList
val purchaseList = purchases.asScala.toList
if (clickList.nonEmpty && purchaseList.isEmpty) {
clickList.foreach(click => out.collect(s"User ${click.userId} clicked ${click.page} but didn't purchase"))
}
}
})
// Using closure syntax
val summaryStats = clicks
.coGroup(purchases)
.where(_.userId)
.equalTo(_.userId)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.apply { (clicks: Iterable[Click], purchases: Iterable[Purchase]) =>
val userId = clicks.headOption.orElse(purchases.headOption).map {
case c: Click => c.userId
case p: Purchase => p.userId
}.getOrElse("unknown")
List(s"User $userId: ${clicks.size} clicks, ${purchases.size} purchases")
}// Co-processing function interfaces
trait CoMapFunction[IN1, IN2, OUT] {
def map1(value: IN1): OUT
def map2(value: IN2): OUT
}
trait CoFlatMapFunction[IN1, IN2, OUT] {
def flatMap1(value: IN1, out: Collector[OUT]): Unit
def flatMap2(value: IN2, out: Collector[OUT]): Unit
}
abstract class CoProcessFunction[IN1, IN2, OUT] {
def processElement1(value: IN1, ctx: Context, out: Collector[OUT]): Unit
def processElement2(value: IN2, ctx: Context, out: Collector[OUT]): Unit
def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT]): Unit = {}
abstract class Context {
def timestamp(): Long
def timerService(): TimerService
def output[X](outputTag: OutputTag[X], value: X): Unit
}
abstract class OnTimerContext extends Context {
def timeDomain(): TimeDomain
}
}
// Join function interfaces
trait JoinFunction[IN1, IN2, OUT] {
def join(first: IN1, second: IN2): OUT
}
abstract class ProcessJoinFunction[IN1, IN2, OUT] {
def processElement(left: IN1, right: IN2, ctx: Context, out: Collector[OUT]): Unit
abstract class Context {
def getLeftTimestamp: Long
def getRightTimestamp: Long
def getCurrentWatermark: Long
}
}
// CoGroup function interface
trait CoGroupFunction[IN1, IN2, OUT] {
def coGroup(first: java.lang.Iterable[IN1], second: java.lang.Iterable[IN2], out: Collector[OUT]): Unit
}
// Tagged union for internal use
object CoGroupedStreams {
sealed trait TaggedUnion[T1, T2]
case class One[T1, T2](value: T1) extends TaggedUnion[T1, T2]
case class Two[T1, T2](value: T2) extends TaggedUnion[T1, T2]
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-streaming-scala-2-12