or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

dataset-operations.mdexecution-environment.mdgrouped-dataset-operations.mdindex.mdjoin-operations.mdtype-system.mdutility-functions.md
tile.json

utility-functions.mddocs/

Utility Functions

The Flink Scala API provides additional utility functions through the utils package, offering extended functionality for DataSet operations including sampling, indexing, and partitioning.

Importing Utilities

import org.apache.flink.api.scala.utils._

This import adds implicit conversions that extend DataSet with additional utility methods.

Element Counting and Indexing

Count Elements Per Partition

class DataSet[T] {
  def countElementsPerPartition(): DataSet[(Int, Long)]  // (partitionId, elementCount)
}
val data = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
  .setParallelism(3)

// Count elements in each partition
val partitionCounts = data.countElementsPerPartition()
partitionCounts.collect() 
// Result: (0, 3), (1, 3), (2, 4) - partition ID and count per partition

Zip with Index

class DataSet[T] {
  def zipWithIndex(): DataSet[(Long, T)]  // (index, element)
}
val words = env.fromElements("apple", "banana", "cherry", "date")

// Add sequential index to each element
val indexed = words.zipWithIndex()
indexed.collect()
// Result: (0, "apple"), (1, "banana"), (2, "cherry"), (3, "date")

// Use indexed data
val evenIndexed = indexed.filter(_._1 % 2 == 0).map(_._2)
// Result: "apple", "cherry"

Zip with Unique ID

class DataSet[T] {
  def zipWithUniqueId(): DataSet[(Long, T)]  // (uniqueId, element)
}
val data = env.fromElements("A", "B", "C", "D", "E")
  .setParallelism(2)

// Add unique ID to each element (not necessarily sequential)
val withUniqueId = data.zipWithUniqueId()
withUniqueId.collect()
// Result: (0, "A"), (1, "B"), (2, "C"), (3, "D"), (4, "E")
// Note: IDs are unique but may not be in exact sequential order across partitions

Sampling Operations

Sample with Fraction

class DataSet[T] {
  def sample(
    withReplacement: Boolean, 
    fraction: Double, 
    seed: Long = Utils.RNG.nextLong()
  ): DataSet[T]
}
val numbers = env.fromElements(1 to 1000: _*)

// Sample approximately 10% of elements without replacement
val sample10Percent = numbers.sample(
  withReplacement = false,
  fraction = 0.1,
  seed = 12345L
)

// Sample with replacement (elements can appear multiple times)
val sampleWithReplacement = numbers.sample(
  withReplacement = true,
  fraction = 0.05,
  seed = 67890L
)

println(s"Original size: ${numbers.count()}")
println(s"Sample size: ${sample10Percent.count()}")

Sample with Fixed Size

class DataSet[T] {
  def sampleWithSize(
    withReplacement: Boolean,
    numSamples: Int,
    seed: Long = Utils.RNG.nextLong()
  ): DataSet[T]
}
val largeDataset = env.fromElements(1 to 10000: _*)

// Sample exactly 100 elements without replacement
val fixedSample = largeDataset.sampleWithSize(
  withReplacement = false,
  numSamples = 100,
  seed = 42L
)

// Sample with replacement - can get duplicates
val sampleWithDuplicates = largeDataset.sampleWithSize(
  withReplacement = true,
  numSamples = 50,
  seed = 123L
)

fixedSample.count() // Always returns 100 (or dataset size if smaller)

Advanced Partitioning

Partition by Range with Distribution

class DataSet[T] {
  def partitionByRange(
    distribution: DataDistribution,
    fields: Int*
  ): DataSet[T]
  
  def partitionByRange(
    distribution: DataDistribution,
    firstField: String,
    otherFields: String*
  ): DataSet[T]
}
import org.apache.flink.api.common.distributions.DataDistribution

val salesData = env.fromElements(
  ("Q1", 1000), ("Q2", 1500), ("Q3", 1200), ("Q4", 1800),
  ("Q1", 1100), ("Q2", 1600), ("Q3", 1300), ("Q4", 1900)
)

// Create a custom data distribution
class QuarterDistribution extends DataDistribution {
  override def getBucketBoundary(bucketNum: Int, totalNumBuckets: Int): Array[AnyRef] = {
    val quarters = Array("Q1", "Q2", "Q3", "Q4")
    Array(quarters(bucketNum * quarters.length / totalNumBuckets))
  }
}

// Partition by range using the custom distribution
val rangePartitioned = salesData.partitionByRange(
  new QuarterDistribution(),
  0  // Partition by first field (quarter)
)

Checksum and Validation

Checksum Hash Code

class DataSet[T] {
  def checksumHashCode(): ChecksumHashCode
}
val data1 = env.fromElements("apple", "banana", "cherry")
val data2 = env.fromElements("apple", "banana", "cherry")
val data3 = env.fromElements("apple", "cherry", "banana")  // Different order

// Calculate checksums
val checksum1 = data1.checksumHashCode()
val checksum2 = data2.checksumHashCode() 
val checksum3 = data3.checksumHashCode()

// Compare datasets for equality
println(s"data1 == data2: ${checksum1 == checksum2}")  // true (same content)
println(s"data1 == data3: ${checksum1 == checksum3}")  // false (different order)

Practical Usage Examples

Data Quality Analysis

case class Record(id: Int, value: String, timestamp: Long)

val records = env.fromElements(
  Record(1, "A", 1000L),
  Record(2, "B", 2000L),
  Record(3, "C", 3000L),
  Record(4, "D", 4000L),
  Record(5, "E", 5000L)
)

// Count records per partition for load balancing analysis
val partitionAnalysis = records.countElementsPerPartition()

// Sample data for quality checks
val qualitySample = records.sample(withReplacement = false, fraction = 0.1, seed = 42L)

// Add index for tracking
val indexedRecords = records.zipWithIndex()

// Verify data integrity
val dataChecksum = records.checksumHashCode()

Statistical Sampling

val populationData = env.fromElements((1 to 100000).map(i => (i, s"Person$i", Math.random() * 100)): _*)

// Stratified sampling - sample from different age groups
val youngSample = populationData
  .filter(_._3 < 30)  // Age < 30
  .sample(withReplacement = false, fraction = 0.05)

val middleAgedSample = populationData
  .filter(p => p._3 >= 30 && p._3 < 60)  // Age 30-60
  .sample(withReplacement = false, fraction = 0.03)

val seniorSample = populationData
  .filter(_._3 >= 60)  // Age >= 60
  .sample(withReplacement = false, fraction = 0.02)

// Combine stratified samples
val stratifiedSample = youngSample.union(middleAgedSample).union(seniorSample)

Data Validation Pipeline

val inputData = env.readTextFile("input.txt")

// Add unique IDs for tracking
val trackedData = inputData.zipWithUniqueId()

// Validate and process
val validatedData = trackedData
  .filter { case (id, line) => 
    line.nonEmpty && line.length > 5  // Basic validation
  }
  .map { case (id, line) => 
    (id, line.toUpperCase, System.currentTimeMillis())
  }

// Sample for manual inspection
val inspectionSample = validatedData.sampleWithSize(
  withReplacement = false,
  numSamples = 1000,
  seed = System.currentTimeMillis()
)

// Calculate checksum for integrity verification
val integrityChecksum = validatedData.checksumHashCode()

// Count distribution across partitions
val distributionCheck = validatedData.countElementsPerPartition()

Performance Monitoring

val largeBatch = env.fromElements((1 to 1000000): _*)
  .setParallelism(8)

// Monitor partition distribution
val partitionStats = largeBatch
  .countElementsPerPartition()
  .collect()

partitionStats.foreach { case (partitionId, count) =>
  println(s"Partition $partitionId: $count elements")
}

// Sample for performance testing
val performanceTestSample = largeBatch.sampleWithSize(
  withReplacement = false,
  numSamples = 10000,
  seed = 42L
)

// Add tracking IDs
val trackedBatch = largeBatch.zipWithIndex()
val firstBatch = trackedBatch.filter(_._1 < 100000)
val secondBatch = trackedBatch.filter(_._1 >= 100000)

Types

// Checksum type
class ChecksumHashCode {
  def getChecksum: Long
  def getHashCode: Long
  override def equals(obj: Any): Boolean
  override def hashCode(): Int
  override def toString: String
}

// Data distribution interface
trait DataDistribution {
  def getBucketBoundary(bucketNum: Int, totalNumBuckets: Int): Array[AnyRef]
}

// Utility constants
object Utils {
  object RNG {
    def nextLong(): Long
  }
}

Performance Considerations

Sampling Performance

// For large datasets, sample early in the pipeline to reduce data volume
val massiveDataset = env.fromElements((1 to 10000000): _*)

// Good - sample early
val efficientPipeline = massiveDataset
  .sample(withReplacement = false, fraction = 0.01)  // Sample first
  .filter(_ % 2 == 0)  // Then apply expensive operations
  .map(_ * _ )

// Less efficient - sample after expensive operations
val inefficientPipeline = massiveDataset
  .filter(_ % 2 == 0)  // Expensive operation on full dataset
  .map(_ * _)
  .sample(withReplacement = false, fraction = 0.01)  // Sample last

Index Assignment

// zipWithIndex requires global coordination - use sparingly on large datasets
val smallDataset = env.fromElements(1 to 1000: _*)
val indexedSmall = smallDataset.zipWithIndex()  // OK for small data

val hugeDataset = env.fromElements(1 to 100000000: _*)
// Consider alternatives for huge datasets:
// 1. Use zipWithUniqueId if order doesn't matter
val uniqueIdVersion = hugeDataset.zipWithUniqueId()

// 2. Or add IDs during data generation if possible
val preIndexedData = env.fromElements((1 to 1000000).map(i => (i, s"value$i")): _*)

Partition Analysis

// Use countElementsPerPartition for debugging partition skew
val skewedData = env.fromElements(
  (1 to 1000).map(_ => "A") ++     // Most data in one key
  (1 to 10).map(_ => "B") ++       // Little data in other keys  
  (1 to 5).map(_ => "C"): _*
)

val partitionAnalysis = skewedData
  .groupBy(identity)
  .reduceGroup(_.size)
  .countElementsPerPartition()

// Use this information to rebalance if needed
val rebalanced = skewedData.rebalance()