CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-streaming-scala-2-12

Scala API for Apache Flink streaming applications providing idiomatic Scala bindings for building high-throughput, low-latency stream processing applications.

Pending
Overview
Eval results
Files

stream-connections.mddocs/

Stream Connections and Joins

Multi-stream operations enable complex processing patterns by combining multiple streams through unions, connections, joins, and co-processing. This is essential for correlating data from different sources and implementing complex event processing logic.

Capabilities

Stream Union

Combine multiple streams of the same type into a single stream.

class DataStream[T] {
  /**
   * Union this stream with other streams of the same type
   * @param dataStreams Other streams to union with
   * @return DataStream containing elements from all input streams
   */
  def union(dataStreams: DataStream[T]*): DataStream[T]
}

Usage Examples:

import org.apache.flink.streaming.api.scala._

case class Event(id: String, value: Int, source: String, timestamp: Long)

val stream1 = env.fromElements(
  Event("e1", 10, "source1", 1000L),
  Event("e2", 20, "source1", 2000L)
)

val stream2 = env.fromElements(
  Event("e3", 15, "source2", 1500L),
  Event("e4", 25, "source2", 2500L)
)

val stream3 = env.fromElements(
  Event("e5", 30, "source3", 3000L)
)

// Union multiple streams
val unionedStream = stream1.union(stream2, stream3)

// All events from all streams will be in the result
unionedStream.print()

Connected Streams

Connect two streams of different types for co-processing.

class DataStream[T] {
  /**
   * Connect this stream with another stream of different type
   * @param dataStream Stream to connect with
   * @return ConnectedStreams for co-processing
   */
  def connect[T2](dataStream: DataStream[T2]): ConnectedStreams[T, T2]
}

class ConnectedStreams[IN1, IN2] {
  /**
   * Apply different map functions to each connected stream
   * @param fun1 Map function for first stream
   * @param fun2 Map function for second stream
   * @return DataStream with mapped results
   */
  def map[R: TypeInformation](fun1: IN1 => R, fun2: IN2 => R): DataStream[R]
  
  /**
   * Apply CoMapFunction to connected streams
   * @param coMapper CoMapFunction implementation
   * @return DataStream with mapped results
   */
  def map[R: TypeInformation](coMapper: CoMapFunction[IN1, IN2, R]): DataStream[R]
  
  /**
   * Apply different flatMap functions to each connected stream
   * @param fun1 FlatMap function for first stream
   * @param fun2 FlatMap function for second stream
   * @return DataStream with flatMapped results
   */
  def flatMap[R: TypeInformation](
    fun1: IN1 => TraversableOnce[R], 
    fun2: IN2 => TraversableOnce[R]
  ): DataStream[R]
  
  /**
   * Apply CoFlatMapFunction to connected streams
   * @param coFlatMapper CoFlatMapFunction implementation
   * @return DataStream with flatMapped results
   */
  def flatMap[R: TypeInformation](coFlatMapper: CoFlatMapFunction[IN1, IN2, R]): DataStream[R]
  
  /**
   * Apply CoProcessFunction for advanced co-processing
   * @param coProcessFunction CoProcessFunction implementation
   * @return DataStream with processed results
   */
  def process[R: TypeInformation](coProcessFunction: CoProcessFunction[IN1, IN2, R]): DataStream[R]
}

Usage Examples:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.functions.co.CoMapFunction

case class Order(id: String, customerId: String, amount: Double, timestamp: Long)
case class Customer(id: String, name: String, tier: String)
case class EnrichedOrder(orderId: String, customerId: String, customerName: String, amount: Double, tier: String)

val orders = env.fromElements(
  Order("o1", "c1", 100.0, 1000L),
  Order("o2", "c2", 200.0, 2000L)
)

val customers = env.fromElements(
  Customer("c1", "Alice", "Gold"),
  Customer("c2", "Bob", "Silver")
)

// Connect streams for co-processing
val connected = orders.connect(customers)

// Using function syntax
val enriched = connected.map(
  (order: Order) => s"Order: ${order.id} - ${order.amount}",
  (customer: Customer) => s"Customer: ${customer.name} - ${customer.tier}"
)

// Using CoMapFunction
class OrderCustomerCoMapper extends CoMapFunction[Order, Customer, String] {
  override def map1(order: Order): String = s"Processing order ${order.id} for customer ${order.customerId}"
  override def map2(customer: Customer): String = s"Customer ${customer.name} registered with tier ${customer.tier}"
}

val processedWithCoMapper = connected.map(new OrderCustomerCoMapper)

Keyed Connected Streams

Connect keyed streams for stateful co-processing.

class ConnectedStreams[IN1, IN2] {
  /**
   * Key both connected streams by field positions
   * @param keyPosition1 Key position for first stream
   * @param keyPosition2 Key position for second stream
   * @return Connected streams with keying
   */
  def keyBy(keyPosition1: Int, keyPosition2: Int): ConnectedStreams[IN1, IN2]
  
  /**
   * Key both connected streams by field names
   * @param field1 Key field for first stream
   * @param field2 Key field for second stream
   * @return Connected streams with keying
   */
  def keyBy(field1: String, field2: String): ConnectedStreams[IN1, IN2]
  
  /**
   * Key both connected streams by key selector functions
   * @param fun1 Key selector for first stream
   * @param fun2 Key selector for second stream
   * @return Connected streams with keying
   */
  def keyBy[KEY: TypeInformation](fun1: IN1 => KEY, fun2: IN2 => KEY): ConnectedStreams[IN1, IN2]
}

Stream Joins

Join two streams based on keys and time windows.

class DataStream[T] {
  /**
   * Join with another stream
   * @param otherStream Stream to join with
   * @return JoinedStreams for configuring join
   */
  def join[T2](otherStream: DataStream[T2]): JoinedStreams[T, T2]
}

// JoinedStreams configuration
class JoinedStreams[T1, T2] {
  /**
   * Specify the key for the first stream
   * @param keySelector Key selector function
   * @return Where clause for join configuration
   */
  def where[KEY: TypeInformation](keySelector: T1 => KEY): JoinedStreams[T1, T2]#Where[KEY]
  
  class Where[KEY] {
    /**
     * Specify the key for the second stream
     * @param keySelector Key selector function for second stream
     * @return EqualTo clause for join configuration
     */
    def equalTo(keySelector: T2 => KEY): JoinedStreams[T1, T2]#Where[KEY]#EqualTo
    
    class EqualTo {
      /**
       * Specify the window for the join
       * @param assigner Window assigner for join
       * @return WithWindow for join processing
       */
      def window[W <: Window](assigner: WindowAssigner[_ >: CoGroupedStreams.TaggedUnion[T1, T2], W]): JoinedStreams[T1, T2]#Where[KEY]#EqualTo#WithWindow[W]
      
      class WithWindow[W <: Window] {
        /**
         * Apply a join function to matched elements
         * @param function Join function to combine matched elements
         * @return DataStream with join results
         */
        def apply[R: TypeInformation](function: JoinFunction[T1, T2, R]): DataStream[R]
        
        /**
         * Apply a join function using closure syntax
         * @param fun Function to combine matched elements
         * @return DataStream with join results
         */
        def apply[R: TypeInformation](fun: (T1, T2) => R): DataStream[R]
        
        /**
         * Apply a ProcessJoinFunction for advanced join processing
         * @param function ProcessJoinFunction implementation
         * @return DataStream with join results
         */
        def process[R: TypeInformation](function: ProcessJoinFunction[T1, T2, R]): DataStream[R]
      }
    }
  }
}

Usage Examples:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.api.common.functions.JoinFunction

case class Click(userId: String, page: String, timestamp: Long)
case class Purchase(userId: String, product: String, amount: Double, timestamp: Long)
case class UserActivity(userId: String, page: String, product: String, amount: Double)

val clicks = env.fromElements(
  Click("user1", "homepage", 1000L),
  Click("user2", "products", 2000L)
).assignAscendingTimestamps(_.timestamp)

val purchases = env.fromElements(
  Purchase("user1", "laptop", 999.99, 1500L),
  Purchase("user2", "book", 29.99, 2200L)
).assignAscendingTimestamps(_.timestamp)

// Join clicks and purchases within 5-minute windows
val joinedActivity = clicks
  .join(purchases)
  .where(_.userId)
  .equalTo(_.userId)
  .window(TumblingEventTimeWindows.of(Time.minutes(5)))
  .apply { (click: Click, purchase: Purchase) =>
    UserActivity(click.userId, click.page, purchase.product, purchase.amount)
  }

CoGroup Operations

Group elements from two streams by key within windows.

class DataStream[T] {
  /**
   * CoGroup with another stream
   * @param otherStream Stream to coGroup with
   * @return CoGroupedStreams for configuring coGroup
   */
  def coGroup[T2](otherStream: DataStream[T2]): CoGroupedStreams[T, T2]
}

class CoGroupedStreams[T1, T2] {
  /**
   * Specify the key for the first stream
   * @param keySelector Key selector function
   * @return Where clause for coGroup configuration
   */
  def where[KEY: TypeInformation](keySelector: T1 => KEY): CoGroupedStreams[T1, T2]#Where[KEY]
  
  class Where[KEY] {
    /**
     * Specify the key for the second stream
     * @param keySelector Key selector function for second stream
     * @return EqualTo clause for coGroup configuration
     */
    def equalTo(keySelector: T2 => KEY): CoGroupedStreams[T1, T2]#Where[KEY]#EqualTo
    
    class EqualTo {
      /**
       * Specify the window for the coGroup
       * @param assigner Window assigner for coGroup
       * @return WithWindow for coGroup processing
       */
      def window[W <: Window](assigner: WindowAssigner[_ >: CoGroupedStreams.TaggedUnion[T1, T2], W]): CoGroupedStreams[T1, T2]#Where[KEY]#EqualTo#WithWindow[W]
      
      class WithWindow[W <: Window] {
        /**
         * Apply a coGroup function to grouped elements
         * @param function CoGroupFunction to process grouped elements
         * @return DataStream with coGroup results
         */
        def apply[R: TypeInformation](function: CoGroupFunction[T1, T2, R]): DataStream[R]
        
        /**
         * Apply a coGroup function using closure syntax
         * @param fun Function to process grouped elements
         * @return DataStream with coGroup results
         */
        def apply[R: TypeInformation](fun: (Iterable[T1], Iterable[T2]) => TraversableOnce[R]): DataStream[R]
      }
    }
  }
}

Usage Examples:

import org.apache.flink.api.common.functions.CoGroupFunction
import org.apache.flink.util.Collector

// CoGroup to find users who clicked but didn't purchase
val clicksWithoutPurchases = clicks
  .coGroup(purchases)
  .where(_.userId)
  .equalTo(_.userId)
  .window(TumblingEventTimeWindows.of(Time.minutes(5)))
  .apply(new CoGroupFunction[Click, Purchase, String] {
    override def coGroup(
      clicks: java.lang.Iterable[Click], 
      purchases: java.lang.Iterable[Purchase], 
      out: Collector[String]
    ): Unit = {
      val clickList = clicks.asScala.toList
      val purchaseList = purchases.asScala.toList
      
      if (clickList.nonEmpty && purchaseList.isEmpty) {
        clickList.foreach(click => out.collect(s"User ${click.userId} clicked ${click.page} but didn't purchase"))
      }
    }
  })

// Using closure syntax
val summaryStats = clicks
  .coGroup(purchases)
  .where(_.userId)
  .equalTo(_.userId)
  .window(TumblingEventTimeWindows.of(Time.minutes(5)))
  .apply { (clicks: Iterable[Click], purchases: Iterable[Purchase]) =>
    val userId = clicks.headOption.orElse(purchases.headOption).map {
      case c: Click => c.userId
      case p: Purchase => p.userId
    }.getOrElse("unknown")
    
    List(s"User $userId: ${clicks.size} clicks, ${purchases.size} purchases")
  }

Types

// Co-processing function interfaces
trait CoMapFunction[IN1, IN2, OUT] {
  def map1(value: IN1): OUT
  def map2(value: IN2): OUT
}

trait CoFlatMapFunction[IN1, IN2, OUT] {
  def flatMap1(value: IN1, out: Collector[OUT]): Unit
  def flatMap2(value: IN2, out: Collector[OUT]): Unit
}

abstract class CoProcessFunction[IN1, IN2, OUT] {
  def processElement1(value: IN1, ctx: Context, out: Collector[OUT]): Unit
  def processElement2(value: IN2, ctx: Context, out: Collector[OUT]): Unit
  def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT]): Unit = {}
  
  abstract class Context {
    def timestamp(): Long
    def timerService(): TimerService
    def output[X](outputTag: OutputTag[X], value: X): Unit
  }
  
  abstract class OnTimerContext extends Context {
    def timeDomain(): TimeDomain
  }
}

// Join function interfaces
trait JoinFunction[IN1, IN2, OUT] {
  def join(first: IN1, second: IN2): OUT
}

abstract class ProcessJoinFunction[IN1, IN2, OUT] {
  def processElement(left: IN1, right: IN2, ctx: Context, out: Collector[OUT]): Unit
  
  abstract class Context {
    def getLeftTimestamp: Long
    def getRightTimestamp: Long
    def getCurrentWatermark: Long
  }
}

// CoGroup function interface
trait CoGroupFunction[IN1, IN2, OUT] {
  def coGroup(first: java.lang.Iterable[IN1], second: java.lang.Iterable[IN2], out: Collector[OUT]): Unit
}

// Tagged union for internal use
object CoGroupedStreams {
  sealed trait TaggedUnion[T1, T2]
  case class One[T1, T2](value: T1) extends TaggedUnion[T1, T2]
  case class Two[T1, T2](value: T2) extends TaggedUnion[T1, T2]
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-streaming-scala-2-12

docs

async-io.md

data-streams.md

execution-environment.md

index.md

keyed-streams.md

processing-functions.md

sinks-output.md

stream-connections.md

window-functions.md

windowing.md

tile.json