Apache Flink Scala API providing type-safe operations and functional programming paradigms for distributed stream and batch processing applications.
The Flink Scala API provides additional utility functions through the utils package, offering extended functionality for DataSet operations including sampling, indexing, and partitioning.
import org.apache.flink.api.scala.utils._This import adds implicit conversions that extend DataSet with additional utility methods.
class DataSet[T] {
def countElementsPerPartition(): DataSet[(Int, Long)] // (partitionId, elementCount)
}val data = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.setParallelism(3)
// Count elements in each partition
val partitionCounts = data.countElementsPerPartition()
partitionCounts.collect()
// Result: (0, 3), (1, 3), (2, 4) - partition ID and count per partitionclass DataSet[T] {
def zipWithIndex(): DataSet[(Long, T)] // (index, element)
}val words = env.fromElements("apple", "banana", "cherry", "date")
// Add sequential index to each element
val indexed = words.zipWithIndex()
indexed.collect()
// Result: (0, "apple"), (1, "banana"), (2, "cherry"), (3, "date")
// Use indexed data
val evenIndexed = indexed.filter(_._1 % 2 == 0).map(_._2)
// Result: "apple", "cherry"class DataSet[T] {
def zipWithUniqueId(): DataSet[(Long, T)] // (uniqueId, element)
}val data = env.fromElements("A", "B", "C", "D", "E")
.setParallelism(2)
// Add unique ID to each element (not necessarily sequential)
val withUniqueId = data.zipWithUniqueId()
withUniqueId.collect()
// Result: (0, "A"), (1, "B"), (2, "C"), (3, "D"), (4, "E")
// Note: IDs are unique but may not be in exact sequential order across partitionsclass DataSet[T] {
def sample(
withReplacement: Boolean,
fraction: Double,
seed: Long = Utils.RNG.nextLong()
): DataSet[T]
}val numbers = env.fromElements(1 to 1000: _*)
// Sample approximately 10% of elements without replacement
val sample10Percent = numbers.sample(
withReplacement = false,
fraction = 0.1,
seed = 12345L
)
// Sample with replacement (elements can appear multiple times)
val sampleWithReplacement = numbers.sample(
withReplacement = true,
fraction = 0.05,
seed = 67890L
)
println(s"Original size: ${numbers.count()}")
println(s"Sample size: ${sample10Percent.count()}")class DataSet[T] {
def sampleWithSize(
withReplacement: Boolean,
numSamples: Int,
seed: Long = Utils.RNG.nextLong()
): DataSet[T]
}val largeDataset = env.fromElements(1 to 10000: _*)
// Sample exactly 100 elements without replacement
val fixedSample = largeDataset.sampleWithSize(
withReplacement = false,
numSamples = 100,
seed = 42L
)
// Sample with replacement - can get duplicates
val sampleWithDuplicates = largeDataset.sampleWithSize(
withReplacement = true,
numSamples = 50,
seed = 123L
)
fixedSample.count() // Always returns 100 (or dataset size if smaller)class DataSet[T] {
def partitionByRange(
distribution: DataDistribution,
fields: Int*
): DataSet[T]
def partitionByRange(
distribution: DataDistribution,
firstField: String,
otherFields: String*
): DataSet[T]
}import org.apache.flink.api.common.distributions.DataDistribution
val salesData = env.fromElements(
("Q1", 1000), ("Q2", 1500), ("Q3", 1200), ("Q4", 1800),
("Q1", 1100), ("Q2", 1600), ("Q3", 1300), ("Q4", 1900)
)
// Create a custom data distribution
class QuarterDistribution extends DataDistribution {
override def getBucketBoundary(bucketNum: Int, totalNumBuckets: Int): Array[AnyRef] = {
val quarters = Array("Q1", "Q2", "Q3", "Q4")
Array(quarters(bucketNum * quarters.length / totalNumBuckets))
}
}
// Partition by range using the custom distribution
val rangePartitioned = salesData.partitionByRange(
new QuarterDistribution(),
0 // Partition by first field (quarter)
)class DataSet[T] {
def checksumHashCode(): ChecksumHashCode
}val data1 = env.fromElements("apple", "banana", "cherry")
val data2 = env.fromElements("apple", "banana", "cherry")
val data3 = env.fromElements("apple", "cherry", "banana") // Different order
// Calculate checksums
val checksum1 = data1.checksumHashCode()
val checksum2 = data2.checksumHashCode()
val checksum3 = data3.checksumHashCode()
// Compare datasets for equality
println(s"data1 == data2: ${checksum1 == checksum2}") // true (same content)
println(s"data1 == data3: ${checksum1 == checksum3}") // false (different order)case class Record(id: Int, value: String, timestamp: Long)
val records = env.fromElements(
Record(1, "A", 1000L),
Record(2, "B", 2000L),
Record(3, "C", 3000L),
Record(4, "D", 4000L),
Record(5, "E", 5000L)
)
// Count records per partition for load balancing analysis
val partitionAnalysis = records.countElementsPerPartition()
// Sample data for quality checks
val qualitySample = records.sample(withReplacement = false, fraction = 0.1, seed = 42L)
// Add index for tracking
val indexedRecords = records.zipWithIndex()
// Verify data integrity
val dataChecksum = records.checksumHashCode()val populationData = env.fromElements((1 to 100000).map(i => (i, s"Person$i", Math.random() * 100)): _*)
// Stratified sampling - sample from different age groups
val youngSample = populationData
.filter(_._3 < 30) // Age < 30
.sample(withReplacement = false, fraction = 0.05)
val middleAgedSample = populationData
.filter(p => p._3 >= 30 && p._3 < 60) // Age 30-60
.sample(withReplacement = false, fraction = 0.03)
val seniorSample = populationData
.filter(_._3 >= 60) // Age >= 60
.sample(withReplacement = false, fraction = 0.02)
// Combine stratified samples
val stratifiedSample = youngSample.union(middleAgedSample).union(seniorSample)val inputData = env.readTextFile("input.txt")
// Add unique IDs for tracking
val trackedData = inputData.zipWithUniqueId()
// Validate and process
val validatedData = trackedData
.filter { case (id, line) =>
line.nonEmpty && line.length > 5 // Basic validation
}
.map { case (id, line) =>
(id, line.toUpperCase, System.currentTimeMillis())
}
// Sample for manual inspection
val inspectionSample = validatedData.sampleWithSize(
withReplacement = false,
numSamples = 1000,
seed = System.currentTimeMillis()
)
// Calculate checksum for integrity verification
val integrityChecksum = validatedData.checksumHashCode()
// Count distribution across partitions
val distributionCheck = validatedData.countElementsPerPartition()val largeBatch = env.fromElements((1 to 1000000): _*)
.setParallelism(8)
// Monitor partition distribution
val partitionStats = largeBatch
.countElementsPerPartition()
.collect()
partitionStats.foreach { case (partitionId, count) =>
println(s"Partition $partitionId: $count elements")
}
// Sample for performance testing
val performanceTestSample = largeBatch.sampleWithSize(
withReplacement = false,
numSamples = 10000,
seed = 42L
)
// Add tracking IDs
val trackedBatch = largeBatch.zipWithIndex()
val firstBatch = trackedBatch.filter(_._1 < 100000)
val secondBatch = trackedBatch.filter(_._1 >= 100000)// Checksum type
class ChecksumHashCode {
def getChecksum: Long
def getHashCode: Long
override def equals(obj: Any): Boolean
override def hashCode(): Int
override def toString: String
}
// Data distribution interface
trait DataDistribution {
def getBucketBoundary(bucketNum: Int, totalNumBuckets: Int): Array[AnyRef]
}
// Utility constants
object Utils {
object RNG {
def nextLong(): Long
}
}// For large datasets, sample early in the pipeline to reduce data volume
val massiveDataset = env.fromElements((1 to 10000000): _*)
// Good - sample early
val efficientPipeline = massiveDataset
.sample(withReplacement = false, fraction = 0.01) // Sample first
.filter(_ % 2 == 0) // Then apply expensive operations
.map(_ * _ )
// Less efficient - sample after expensive operations
val inefficientPipeline = massiveDataset
.filter(_ % 2 == 0) // Expensive operation on full dataset
.map(_ * _)
.sample(withReplacement = false, fraction = 0.01) // Sample last// zipWithIndex requires global coordination - use sparingly on large datasets
val smallDataset = env.fromElements(1 to 1000: _*)
val indexedSmall = smallDataset.zipWithIndex() // OK for small data
val hugeDataset = env.fromElements(1 to 100000000: _*)
// Consider alternatives for huge datasets:
// 1. Use zipWithUniqueId if order doesn't matter
val uniqueIdVersion = hugeDataset.zipWithUniqueId()
// 2. Or add IDs during data generation if possible
val preIndexedData = env.fromElements((1 to 1000000).map(i => (i, s"value$i")): _*)// Use countElementsPerPartition for debugging partition skew
val skewedData = env.fromElements(
(1 to 1000).map(_ => "A") ++ // Most data in one key
(1 to 10).map(_ => "B") ++ // Little data in other keys
(1 to 5).map(_ => "C"): _*
)
val partitionAnalysis = skewedData
.groupBy(identity)
.reduceGroup(_.size)
.countElementsPerPartition()
// Use this information to rebalance if needed
val rebalanced = skewedData.rebalance()Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-scala-2-12@1.20.2