The Flink Scala API provides additional utility functions through the utils package, offering extended functionality for DataSet operations including sampling, indexing, and partitioning.
import org.apache.flink.api.scala.utils._This import adds implicit conversions that extend DataSet with additional utility methods.
class DataSet[T] {
def countElementsPerPartition(): DataSet[(Int, Long)] // (partitionId, elementCount)
}val data = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.setParallelism(3)
// Count elements in each partition
val partitionCounts = data.countElementsPerPartition()
partitionCounts.collect()
// Result: (0, 3), (1, 3), (2, 4) - partition ID and count per partitionclass DataSet[T] {
def zipWithIndex(): DataSet[(Long, T)] // (index, element)
}val words = env.fromElements("apple", "banana", "cherry", "date")
// Add sequential index to each element
val indexed = words.zipWithIndex()
indexed.collect()
// Result: (0, "apple"), (1, "banana"), (2, "cherry"), (3, "date")
// Use indexed data
val evenIndexed = indexed.filter(_._1 % 2 == 0).map(_._2)
// Result: "apple", "cherry"class DataSet[T] {
def zipWithUniqueId(): DataSet[(Long, T)] // (uniqueId, element)
}val data = env.fromElements("A", "B", "C", "D", "E")
.setParallelism(2)
// Add unique ID to each element (not necessarily sequential)
val withUniqueId = data.zipWithUniqueId()
withUniqueId.collect()
// Result: (0, "A"), (1, "B"), (2, "C"), (3, "D"), (4, "E")
// Note: IDs are unique but may not be in exact sequential order across partitionsclass DataSet[T] {
def sample(
withReplacement: Boolean,
fraction: Double,
seed: Long = Utils.RNG.nextLong()
): DataSet[T]
}val numbers = env.fromElements(1 to 1000: _*)
// Sample approximately 10% of elements without replacement
val sample10Percent = numbers.sample(
withReplacement = false,
fraction = 0.1,
seed = 12345L
)
// Sample with replacement (elements can appear multiple times)
val sampleWithReplacement = numbers.sample(
withReplacement = true,
fraction = 0.05,
seed = 67890L
)
println(s"Original size: ${numbers.count()}")
println(s"Sample size: ${sample10Percent.count()}")class DataSet[T] {
def sampleWithSize(
withReplacement: Boolean,
numSamples: Int,
seed: Long = Utils.RNG.nextLong()
): DataSet[T]
}val largeDataset = env.fromElements(1 to 10000: _*)
// Sample exactly 100 elements without replacement
val fixedSample = largeDataset.sampleWithSize(
withReplacement = false,
numSamples = 100,
seed = 42L
)
// Sample with replacement - can get duplicates
val sampleWithDuplicates = largeDataset.sampleWithSize(
withReplacement = true,
numSamples = 50,
seed = 123L
)
fixedSample.count() // Always returns 100 (or dataset size if smaller)class DataSet[T] {
def partitionByRange(
distribution: DataDistribution,
fields: Int*
): DataSet[T]
def partitionByRange(
distribution: DataDistribution,
firstField: String,
otherFields: String*
): DataSet[T]
}import org.apache.flink.api.common.distributions.DataDistribution
val salesData = env.fromElements(
("Q1", 1000), ("Q2", 1500), ("Q3", 1200), ("Q4", 1800),
("Q1", 1100), ("Q2", 1600), ("Q3", 1300), ("Q4", 1900)
)
// Create a custom data distribution
class QuarterDistribution extends DataDistribution {
override def getBucketBoundary(bucketNum: Int, totalNumBuckets: Int): Array[AnyRef] = {
val quarters = Array("Q1", "Q2", "Q3", "Q4")
Array(quarters(bucketNum * quarters.length / totalNumBuckets))
}
}
// Partition by range using the custom distribution
val rangePartitioned = salesData.partitionByRange(
new QuarterDistribution(),
0 // Partition by first field (quarter)
)class DataSet[T] {
def checksumHashCode(): ChecksumHashCode
}val data1 = env.fromElements("apple", "banana", "cherry")
val data2 = env.fromElements("apple", "banana", "cherry")
val data3 = env.fromElements("apple", "cherry", "banana") // Different order
// Calculate checksums
val checksum1 = data1.checksumHashCode()
val checksum2 = data2.checksumHashCode()
val checksum3 = data3.checksumHashCode()
// Compare datasets for equality
println(s"data1 == data2: ${checksum1 == checksum2}") // true (same content)
println(s"data1 == data3: ${checksum1 == checksum3}") // false (different order)case class Record(id: Int, value: String, timestamp: Long)
val records = env.fromElements(
Record(1, "A", 1000L),
Record(2, "B", 2000L),
Record(3, "C", 3000L),
Record(4, "D", 4000L),
Record(5, "E", 5000L)
)
// Count records per partition for load balancing analysis
val partitionAnalysis = records.countElementsPerPartition()
// Sample data for quality checks
val qualitySample = records.sample(withReplacement = false, fraction = 0.1, seed = 42L)
// Add index for tracking
val indexedRecords = records.zipWithIndex()
// Verify data integrity
val dataChecksum = records.checksumHashCode()val populationData = env.fromElements((1 to 100000).map(i => (i, s"Person$i", Math.random() * 100)): _*)
// Stratified sampling - sample from different age groups
val youngSample = populationData
.filter(_._3 < 30) // Age < 30
.sample(withReplacement = false, fraction = 0.05)
val middleAgedSample = populationData
.filter(p => p._3 >= 30 && p._3 < 60) // Age 30-60
.sample(withReplacement = false, fraction = 0.03)
val seniorSample = populationData
.filter(_._3 >= 60) // Age >= 60
.sample(withReplacement = false, fraction = 0.02)
// Combine stratified samples
val stratifiedSample = youngSample.union(middleAgedSample).union(seniorSample)val inputData = env.readTextFile("input.txt")
// Add unique IDs for tracking
val trackedData = inputData.zipWithUniqueId()
// Validate and process
val validatedData = trackedData
.filter { case (id, line) =>
line.nonEmpty && line.length > 5 // Basic validation
}
.map { case (id, line) =>
(id, line.toUpperCase, System.currentTimeMillis())
}
// Sample for manual inspection
val inspectionSample = validatedData.sampleWithSize(
withReplacement = false,
numSamples = 1000,
seed = System.currentTimeMillis()
)
// Calculate checksum for integrity verification
val integrityChecksum = validatedData.checksumHashCode()
// Count distribution across partitions
val distributionCheck = validatedData.countElementsPerPartition()val largeBatch = env.fromElements((1 to 1000000): _*)
.setParallelism(8)
// Monitor partition distribution
val partitionStats = largeBatch
.countElementsPerPartition()
.collect()
partitionStats.foreach { case (partitionId, count) =>
println(s"Partition $partitionId: $count elements")
}
// Sample for performance testing
val performanceTestSample = largeBatch.sampleWithSize(
withReplacement = false,
numSamples = 10000,
seed = 42L
)
// Add tracking IDs
val trackedBatch = largeBatch.zipWithIndex()
val firstBatch = trackedBatch.filter(_._1 < 100000)
val secondBatch = trackedBatch.filter(_._1 >= 100000)// Checksum type
class ChecksumHashCode {
def getChecksum: Long
def getHashCode: Long
override def equals(obj: Any): Boolean
override def hashCode(): Int
override def toString: String
}
// Data distribution interface
trait DataDistribution {
def getBucketBoundary(bucketNum: Int, totalNumBuckets: Int): Array[AnyRef]
}
// Utility constants
object Utils {
object RNG {
def nextLong(): Long
}
}// For large datasets, sample early in the pipeline to reduce data volume
val massiveDataset = env.fromElements((1 to 10000000): _*)
// Good - sample early
val efficientPipeline = massiveDataset
.sample(withReplacement = false, fraction = 0.01) // Sample first
.filter(_ % 2 == 0) // Then apply expensive operations
.map(_ * _ )
// Less efficient - sample after expensive operations
val inefficientPipeline = massiveDataset
.filter(_ % 2 == 0) // Expensive operation on full dataset
.map(_ * _)
.sample(withReplacement = false, fraction = 0.01) // Sample last// zipWithIndex requires global coordination - use sparingly on large datasets
val smallDataset = env.fromElements(1 to 1000: _*)
val indexedSmall = smallDataset.zipWithIndex() // OK for small data
val hugeDataset = env.fromElements(1 to 100000000: _*)
// Consider alternatives for huge datasets:
// 1. Use zipWithUniqueId if order doesn't matter
val uniqueIdVersion = hugeDataset.zipWithUniqueId()
// 2. Or add IDs during data generation if possible
val preIndexedData = env.fromElements((1 to 1000000).map(i => (i, s"value$i")): _*)// Use countElementsPerPartition for debugging partition skew
val skewedData = env.fromElements(
(1 to 1000).map(_ => "A") ++ // Most data in one key
(1 to 10).map(_ => "B") ++ // Little data in other keys
(1 to 5).map(_ => "C"): _*
)
val partitionAnalysis = skewedData
.groupBy(identity)
.reduceGroup(_.size)
.countElementsPerPartition()
// Use this information to rebalance if needed
val rebalanced = skewedData.rebalance()