or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

binary-operations.mddata-transformations.mdexecution-environment.mdgrouping-aggregation.mdindex.mdinput-output.mdpartitioning-distribution.mdtype-system.mdutilities.md
tile.json

utilities.mddocs/

Utilities

Advanced utilities for sampling, indexing, data analysis, and partial function support. These utilities extend DataSet functionality with specialized operations.

Capabilities

DataSet Utilities

Enhanced utilities available through implicit conversion on DataSets.

implicit class DataSetUtils[T](dataSet: DataSet[T]) {
  /**
   * Adds consecutive indices to each element starting from 0
   * @return DataSet of (index, element) tuples
   */
  def zipWithIndex: DataSet[(Long, T)]
  
  /**
   * Adds unique identifiers to each element  
   * @return DataSet of (uniqueId, element) tuples
   */
  def zipWithUniqueId: DataSet[(Long, T)]
  
  /**
   * Counts the number of elements in each partition
   * @return DataSet of (partitionIndex, elementCount) tuples
   */
  def countElementsPerPartition: DataSet[(Int, Long)]
  
  /**
   * Computes checksum and total count of the DataSet
   * @return ChecksumHashCode with checksum and count
   */
  def checksumHashCode(): ChecksumHashCode
}

Usage Examples:

import org.apache.flink.api.scala._
import org.apache.flink.api.scala.utils._

val env = ExecutionEnvironment.getExecutionEnvironment
val data = env.fromElements("apple", "banana", "cherry", "date")

// Add consecutive indices
val indexed = data.zipWithIndex
// Result: [(0, "apple"), (1, "banana"), (2, "cherry"), (3, "date")]

// Add unique IDs (useful in distributed environment)
val withIds = data.zipWithUniqueId
// Result: [(uniqueId1, "apple"), (uniqueId2, "banana"), ...]

// Count elements per partition
val partitionCounts = data.countElementsPerPartition
// Result: [(0, 2), (1, 2)] for 2 partitions with 2 elements each

// Get checksum and count
val checksum = data.checksumHashCode()
println(s"Checksum: ${checksum.getChecksum}, Count: ${checksum.getCount}")

Sampling Operations

Statistical sampling methods for data analysis and subset creation.

implicit class DataSetUtils[T](dataSet: DataSet[T]) {
  /**
   * Samples elements randomly by fraction
   * @param withReplacement Whether to sample with replacement
   * @param fraction Fraction of elements to sample (0.0 to 1.0)
   * @param seed Random seed for reproducibility
   * @return DataSet with sampled elements
   */
  def sample(withReplacement: Boolean, fraction: Double, seed: Long = Random.nextLong()): DataSet[T]
  
  /**
   * Samples a specific number of elements
   * @param withReplacement Whether to sample with replacement
   * @param numSamples Number of elements to sample
   * @param seed Random seed for reproducibility
   * @return DataSet with sampled elements
   */
  def sampleWithSize(withReplacement: Boolean, numSamples: Int, seed: Long = Random.nextLong()): DataSet[T]
}

Usage Examples:

val largeDataset = env.generateSequence(1, 1000000)

// Sample 10% of elements without replacement
val sample10Percent = largeDataset.sample(withReplacement = false, fraction = 0.1)

// Sample exactly 1000 elements with replacement
val exactSample = largeDataset.sampleWithSize(withReplacement = true, numSamples = 1000)

// Reproducible sampling with seed
val reproducibleSample = largeDataset.sample(
  withReplacement = false, 
  fraction = 0.05, 
  seed = 12345L
)

Advanced Range Partitioning

Range partitioning with custom data distribution for optimized data placement.

implicit class DataSetUtils[T](dataSet: DataSet[T]) {
  /**
   * Partitions by range using custom data distribution
   * @param distribution Custom data distribution
   * @param fields Field positions for partitioning
   * @return DataSet with custom range partitioning
   */
  def partitionByRange(distribution: DataDistribution, fields: Int*): DataSet[T]
  
  /**
   * Partitions by range using custom data distribution and field names
   * @param distribution Custom data distribution
   * @param firstField First field name
   * @param otherFields Additional field names
   * @return DataSet with custom range partitioning
   */
  def partitionByRange(
    distribution: DataDistribution, 
    firstField: String, 
    otherFields: String*
  ): DataSet[T]
  
  /**
   * Partitions by range using custom data distribution and key selector
   * @param distribution Custom data distribution
   * @param fun Key selector function
   * @return DataSet with custom range partitioning
   */
  def partitionByRange[K: TypeInformation](
    distribution: DataDistribution, 
    fun: T => K
  ): DataSet[T]
}

Usage Examples:

// Custom data distribution for range partitioning
case class SalesRecord(region: String, amount: Double, date: String)

val salesData = env.fromElements(
  SalesRecord("North", 1000.0, "2023-01-01"),
  SalesRecord("South", 1500.0, "2023-01-02"),
  SalesRecord("East", 800.0, "2023-01-03")
)

// Create custom distribution (implementation depends on requirements)
val customDistribution = new DataDistribution {
  // Implementation for determining partition boundaries
}

val partitionedSales = salesData.partitionByRange(customDistribution, _.amount)

Partial Function Extensions

Enable pattern matching and partial function usage in transformations.

// Import for partial function support
import org.apache.flink.api.scala.extensions._

implicit class OnDataSet[T](dataSet: DataSet[T]) {
  /**
   * Maps using partial function with pattern matching
   * @param fun Partial function for transformation
   * @return DataSet with partial function mapping
   */
  def mapWith[R: TypeInformation: ClassTag](fun: T => R): DataSet[R]
  
  /**
   * FlatMaps using partial function
   * @param fun Partial function returning traversable
   * @return DataSet with partial function flatMap
   */
  def flatMapWith[R: TypeInformation: ClassTag](fun: T => TraversableOnce[R]): DataSet[R]
  
  /**
   * Filters using partial function
   * @param fun Partial function for filtering
   * @return DataSet with partial function filtering
   */
  def filterWith(fun: T => Boolean): DataSet[T]
  
  /**
   * Reduces using partial function
   * @param fun Partial function for reduction
   * @return DataSet with partial function reduction
   */
  def reduceWith(fun: (T, T) => T): DataSet[T]
  
  /**
   * Reduces groups using partial function
   * @param fun Partial function for group reduction
   * @return DataSet with partial function group reduction
   */
  def reduceGroupWith[R: TypeInformation: ClassTag](fun: Stream[T] => R): DataSet[R]
  
  /**
   * Groups by using partial function
   * @param fun Partial function for key extraction
   * @return GroupedDataSet with partial function grouping
   */
  def groupingBy[K: TypeInformation](fun: T => K): GroupedDataSet[T]
  
  /**
   * MapPartition using partial function on streams
   * @param fun Partial function for partition transformation
   * @return DataSet with partial function mapPartition
   */
  def mapPartitionWith[R: TypeInformation: ClassTag](fun: Stream[T] => R): DataSet[R]
}

Usage Examples:

import org.apache.flink.api.scala._
import org.apache.flink.api.scala.extensions._

sealed trait Event
case class UserEvent(userId: String, action: String) extends Event
case class SystemEvent(level: String, message: String) extends Event
case class ErrorEvent(error: String, stackTrace: String) extends Event

val events: DataSet[Event] = env.fromElements(
  UserEvent("user1", "login"),
  SystemEvent("INFO", "System started"),
  ErrorEvent("NPE", "NullPointerException at line 42")
)

// Pattern matching with partial functions
val userActions = events.mapWith {
  case UserEvent(userId, action) => s"$userId performed $action"
  case _ => "Not a user event"
}

val criticalEvents = events.filterWith {
  case ErrorEvent(_, _) => true
  case SystemEvent("ERROR", _) => true
  case _ => false
}

// Group by event type using pattern matching
val groupedByType = events.groupingBy {
  case _: UserEvent => "user"
  case _: SystemEvent => "system"
  case _: ErrorEvent => "error"
}

Grouped DataSet Extensions

Partial function support for grouped operations.

implicit class OnGroupedDataSet[T](groupedDataSet: GroupedDataSet[T]) {
  /**
   * Reduces groups using partial function
   * @param fun Partial function for group reduction
   * @return DataSet with partial function group reduction
   */
  def reduceWith(fun: (T, T) => T): DataSet[T]
  
  /**
   * Reduces groups using partial function on streams
   * @param fun Partial function processing group as stream
   * @return DataSet with stream-based group reduction
   */
  def reduceGroupWith[R: TypeInformation: ClassTag](fun: Stream[T] => R): DataSet[R]
}

Binary Operation Extensions

Partial function support for join, cross, and coGroup operations.

implicit class OnJoinFunctionAssigner[L, R](joiner: JoinFunctionAssigner[L, R]) {
  /**
   * Applies join function using partial functions
   * @param fun Partial function for joining elements
   * @return DataSet with partial function join
   */
  def applyWith[O: TypeInformation: ClassTag](fun: (L, R) => O): DataSet[O]
}

implicit class OnCrossDataSet[L, R](crossDataSet: CrossDataSet[L, R]) {
  /**
   * Applies cross function using partial functions
   * @param fun Partial function for crossing elements
   * @return DataSet with partial function cross
   */
  def applyWith[O: TypeInformation: ClassTag](fun: (L, R) => O): DataSet[O]
}

implicit class OnCoGroupDataSet[L, R](coGroupDataSet: CoGroupDataSet[L, R]) {
  /**
   * Applies coGroup function using partial functions on streams
   * @param fun Partial function for coGrouping streams
   * @return DataSet with partial function coGroup
   */
  def applyWith[O: TypeInformation: ClassTag](fun: (Stream[L], Stream[R]) => O): DataSet[O]
}

Usage Examples:

case class Order(customerId: Int, productId: Int, amount: Double)
case class Customer(id: Int, name: String, segment: String)

val orders = env.fromElements(
  Order(1, 101, 99.99),
  Order(2, 102, 149.99)
)

val customers = env.fromElements(
  Customer(1, "Alice", "Premium"),
  Customer(2, "Bob", "Standard")
)

// Join with partial functions
val customerOrders = customers
  .join(orders)
  .where(_.id)
  .equalTo(_.customerId)
  .applyWith { (customer, order) =>
    s"${customer.name} (${customer.segment}) ordered product ${order.productId} for $${order.amount}"
  }

Metrics Integration

Scala-friendly gauge metrics for monitoring.

class ScalaGauge[T](getValue: () => T) extends Gauge[T] {
  /**
   * Gets the current gauge value
   * @return Current value
   */
  override def getValue: T = getValue()
}

Usage Examples:

import org.apache.flink.api.scala.metrics.ScalaGauge
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.metrics.MetricGroup

class MonitoredMapFunction extends RichMapFunction[String, String] {
  @volatile private var processedCount = 0L
  
  override def open(parameters: Configuration): Unit = {
    val metricGroup: MetricGroup = getRuntimeContext.getMetricGroup
    
    // Register Scala gauge
    metricGroup.gauge("processedCount", new ScalaGauge(() => processedCount))
  }
  
  override def map(value: String): String = {
    processedCount += 1
    value.toUpperCase
  }
}

Package Utilities

Utility functions in the utils package.

package object utils {
  /**
   * Gets call location name for debugging
   * @param depth Stack depth to examine
   * @return Call location name
   */
  def getCallLocationName(depth: Int = 3): String
}

Types

class ChecksumHashCode {
  /**
   * Gets the computed checksum
   * @return Checksum value
   */
  def getChecksum: Long
  
  /**
   * Gets the element count
   * @return Number of elements
   */
  def getCount: Long
}

trait DataDistribution {
  /**
   * Gets bucket boundary for range partitioning
   * @param bucketNum Bucket number
   * @param totalNumBuckets Total number of buckets
   * @return Bucket boundary object
   */
  def getBucketBoundary(bucketNum: Int, totalNumBuckets: Int): AnyRef
}

// Wrapper classes for partial function support
class OnDataSet[T](dataSet: DataSet[T]) {
  // Enhanced DataSet with partial function capabilities
}

class OnGroupedDataSet[T](groupedDataSet: GroupedDataSet[T]) {
  // Enhanced GroupedDataSet with partial function capabilities
}

class OnJoinFunctionAssigner[L, R](joiner: JoinFunctionAssigner[L, R]) {
  // Enhanced join operations with partial function capabilities
}

class OnCrossDataSet[L, R](crossDataSet: CrossDataSet[L, R]) {
  // Enhanced cross operations with partial function capabilities
}

class OnCoGroupDataSet[L, R](coGroupDataSet: CoGroupDataSet[L, R]) {
  // Enhanced coGroup operations with partial function capabilities
}

class ScalaGauge[T](getValue: () => T) extends Gauge[T] {
  // Scala-friendly gauge metric implementation
}

// Stream type for partial function support
type Stream[T] = scala.collection.immutable.Stream[T]