Advanced utilities for sampling, indexing, data analysis, and partial function support. These utilities extend DataSet functionality with specialized operations.
Enhanced utilities available through implicit conversion on DataSets.
implicit class DataSetUtils[T](dataSet: DataSet[T]) {
/**
* Adds consecutive indices to each element starting from 0
* @return DataSet of (index, element) tuples
*/
def zipWithIndex: DataSet[(Long, T)]
/**
* Adds unique identifiers to each element
* @return DataSet of (uniqueId, element) tuples
*/
def zipWithUniqueId: DataSet[(Long, T)]
/**
* Counts the number of elements in each partition
* @return DataSet of (partitionIndex, elementCount) tuples
*/
def countElementsPerPartition: DataSet[(Int, Long)]
/**
* Computes checksum and total count of the DataSet
* @return ChecksumHashCode with checksum and count
*/
def checksumHashCode(): ChecksumHashCode
}Usage Examples:
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.utils._
val env = ExecutionEnvironment.getExecutionEnvironment
val data = env.fromElements("apple", "banana", "cherry", "date")
// Add consecutive indices
val indexed = data.zipWithIndex
// Result: [(0, "apple"), (1, "banana"), (2, "cherry"), (3, "date")]
// Add unique IDs (useful in distributed environment)
val withIds = data.zipWithUniqueId
// Result: [(uniqueId1, "apple"), (uniqueId2, "banana"), ...]
// Count elements per partition
val partitionCounts = data.countElementsPerPartition
// Result: [(0, 2), (1, 2)] for 2 partitions with 2 elements each
// Get checksum and count
val checksum = data.checksumHashCode()
println(s"Checksum: ${checksum.getChecksum}, Count: ${checksum.getCount}")Statistical sampling methods for data analysis and subset creation.
implicit class DataSetUtils[T](dataSet: DataSet[T]) {
/**
* Samples elements randomly by fraction
* @param withReplacement Whether to sample with replacement
* @param fraction Fraction of elements to sample (0.0 to 1.0)
* @param seed Random seed for reproducibility
* @return DataSet with sampled elements
*/
def sample(withReplacement: Boolean, fraction: Double, seed: Long = Random.nextLong()): DataSet[T]
/**
* Samples a specific number of elements
* @param withReplacement Whether to sample with replacement
* @param numSamples Number of elements to sample
* @param seed Random seed for reproducibility
* @return DataSet with sampled elements
*/
def sampleWithSize(withReplacement: Boolean, numSamples: Int, seed: Long = Random.nextLong()): DataSet[T]
}Usage Examples:
val largeDataset = env.generateSequence(1, 1000000)
// Sample 10% of elements without replacement
val sample10Percent = largeDataset.sample(withReplacement = false, fraction = 0.1)
// Sample exactly 1000 elements with replacement
val exactSample = largeDataset.sampleWithSize(withReplacement = true, numSamples = 1000)
// Reproducible sampling with seed
val reproducibleSample = largeDataset.sample(
withReplacement = false,
fraction = 0.05,
seed = 12345L
)Range partitioning with custom data distribution for optimized data placement.
implicit class DataSetUtils[T](dataSet: DataSet[T]) {
/**
* Partitions by range using custom data distribution
* @param distribution Custom data distribution
* @param fields Field positions for partitioning
* @return DataSet with custom range partitioning
*/
def partitionByRange(distribution: DataDistribution, fields: Int*): DataSet[T]
/**
* Partitions by range using custom data distribution and field names
* @param distribution Custom data distribution
* @param firstField First field name
* @param otherFields Additional field names
* @return DataSet with custom range partitioning
*/
def partitionByRange(
distribution: DataDistribution,
firstField: String,
otherFields: String*
): DataSet[T]
/**
* Partitions by range using custom data distribution and key selector
* @param distribution Custom data distribution
* @param fun Key selector function
* @return DataSet with custom range partitioning
*/
def partitionByRange[K: TypeInformation](
distribution: DataDistribution,
fun: T => K
): DataSet[T]
}Usage Examples:
// Custom data distribution for range partitioning
case class SalesRecord(region: String, amount: Double, date: String)
val salesData = env.fromElements(
SalesRecord("North", 1000.0, "2023-01-01"),
SalesRecord("South", 1500.0, "2023-01-02"),
SalesRecord("East", 800.0, "2023-01-03")
)
// Create custom distribution (implementation depends on requirements)
val customDistribution = new DataDistribution {
// Implementation for determining partition boundaries
}
val partitionedSales = salesData.partitionByRange(customDistribution, _.amount)Enable pattern matching and partial function usage in transformations.
// Import for partial function support
import org.apache.flink.api.scala.extensions._
implicit class OnDataSet[T](dataSet: DataSet[T]) {
/**
* Maps using partial function with pattern matching
* @param fun Partial function for transformation
* @return DataSet with partial function mapping
*/
def mapWith[R: TypeInformation: ClassTag](fun: T => R): DataSet[R]
/**
* FlatMaps using partial function
* @param fun Partial function returning traversable
* @return DataSet with partial function flatMap
*/
def flatMapWith[R: TypeInformation: ClassTag](fun: T => TraversableOnce[R]): DataSet[R]
/**
* Filters using partial function
* @param fun Partial function for filtering
* @return DataSet with partial function filtering
*/
def filterWith(fun: T => Boolean): DataSet[T]
/**
* Reduces using partial function
* @param fun Partial function for reduction
* @return DataSet with partial function reduction
*/
def reduceWith(fun: (T, T) => T): DataSet[T]
/**
* Reduces groups using partial function
* @param fun Partial function for group reduction
* @return DataSet with partial function group reduction
*/
def reduceGroupWith[R: TypeInformation: ClassTag](fun: Stream[T] => R): DataSet[R]
/**
* Groups by using partial function
* @param fun Partial function for key extraction
* @return GroupedDataSet with partial function grouping
*/
def groupingBy[K: TypeInformation](fun: T => K): GroupedDataSet[T]
/**
* MapPartition using partial function on streams
* @param fun Partial function for partition transformation
* @return DataSet with partial function mapPartition
*/
def mapPartitionWith[R: TypeInformation: ClassTag](fun: Stream[T] => R): DataSet[R]
}Usage Examples:
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.extensions._
sealed trait Event
case class UserEvent(userId: String, action: String) extends Event
case class SystemEvent(level: String, message: String) extends Event
case class ErrorEvent(error: String, stackTrace: String) extends Event
val events: DataSet[Event] = env.fromElements(
UserEvent("user1", "login"),
SystemEvent("INFO", "System started"),
ErrorEvent("NPE", "NullPointerException at line 42")
)
// Pattern matching with partial functions
val userActions = events.mapWith {
case UserEvent(userId, action) => s"$userId performed $action"
case _ => "Not a user event"
}
val criticalEvents = events.filterWith {
case ErrorEvent(_, _) => true
case SystemEvent("ERROR", _) => true
case _ => false
}
// Group by event type using pattern matching
val groupedByType = events.groupingBy {
case _: UserEvent => "user"
case _: SystemEvent => "system"
case _: ErrorEvent => "error"
}Partial function support for grouped operations.
implicit class OnGroupedDataSet[T](groupedDataSet: GroupedDataSet[T]) {
/**
* Reduces groups using partial function
* @param fun Partial function for group reduction
* @return DataSet with partial function group reduction
*/
def reduceWith(fun: (T, T) => T): DataSet[T]
/**
* Reduces groups using partial function on streams
* @param fun Partial function processing group as stream
* @return DataSet with stream-based group reduction
*/
def reduceGroupWith[R: TypeInformation: ClassTag](fun: Stream[T] => R): DataSet[R]
}Partial function support for join, cross, and coGroup operations.
implicit class OnJoinFunctionAssigner[L, R](joiner: JoinFunctionAssigner[L, R]) {
/**
* Applies join function using partial functions
* @param fun Partial function for joining elements
* @return DataSet with partial function join
*/
def applyWith[O: TypeInformation: ClassTag](fun: (L, R) => O): DataSet[O]
}
implicit class OnCrossDataSet[L, R](crossDataSet: CrossDataSet[L, R]) {
/**
* Applies cross function using partial functions
* @param fun Partial function for crossing elements
* @return DataSet with partial function cross
*/
def applyWith[O: TypeInformation: ClassTag](fun: (L, R) => O): DataSet[O]
}
implicit class OnCoGroupDataSet[L, R](coGroupDataSet: CoGroupDataSet[L, R]) {
/**
* Applies coGroup function using partial functions on streams
* @param fun Partial function for coGrouping streams
* @return DataSet with partial function coGroup
*/
def applyWith[O: TypeInformation: ClassTag](fun: (Stream[L], Stream[R]) => O): DataSet[O]
}Usage Examples:
case class Order(customerId: Int, productId: Int, amount: Double)
case class Customer(id: Int, name: String, segment: String)
val orders = env.fromElements(
Order(1, 101, 99.99),
Order(2, 102, 149.99)
)
val customers = env.fromElements(
Customer(1, "Alice", "Premium"),
Customer(2, "Bob", "Standard")
)
// Join with partial functions
val customerOrders = customers
.join(orders)
.where(_.id)
.equalTo(_.customerId)
.applyWith { (customer, order) =>
s"${customer.name} (${customer.segment}) ordered product ${order.productId} for $${order.amount}"
}Scala-friendly gauge metrics for monitoring.
class ScalaGauge[T](getValue: () => T) extends Gauge[T] {
/**
* Gets the current gauge value
* @return Current value
*/
override def getValue: T = getValue()
}Usage Examples:
import org.apache.flink.api.scala.metrics.ScalaGauge
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.metrics.MetricGroup
class MonitoredMapFunction extends RichMapFunction[String, String] {
@volatile private var processedCount = 0L
override def open(parameters: Configuration): Unit = {
val metricGroup: MetricGroup = getRuntimeContext.getMetricGroup
// Register Scala gauge
metricGroup.gauge("processedCount", new ScalaGauge(() => processedCount))
}
override def map(value: String): String = {
processedCount += 1
value.toUpperCase
}
}Utility functions in the utils package.
package object utils {
/**
* Gets call location name for debugging
* @param depth Stack depth to examine
* @return Call location name
*/
def getCallLocationName(depth: Int = 3): String
}class ChecksumHashCode {
/**
* Gets the computed checksum
* @return Checksum value
*/
def getChecksum: Long
/**
* Gets the element count
* @return Number of elements
*/
def getCount: Long
}
trait DataDistribution {
/**
* Gets bucket boundary for range partitioning
* @param bucketNum Bucket number
* @param totalNumBuckets Total number of buckets
* @return Bucket boundary object
*/
def getBucketBoundary(bucketNum: Int, totalNumBuckets: Int): AnyRef
}
// Wrapper classes for partial function support
class OnDataSet[T](dataSet: DataSet[T]) {
// Enhanced DataSet with partial function capabilities
}
class OnGroupedDataSet[T](groupedDataSet: GroupedDataSet[T]) {
// Enhanced GroupedDataSet with partial function capabilities
}
class OnJoinFunctionAssigner[L, R](joiner: JoinFunctionAssigner[L, R]) {
// Enhanced join operations with partial function capabilities
}
class OnCrossDataSet[L, R](crossDataSet: CrossDataSet[L, R]) {
// Enhanced cross operations with partial function capabilities
}
class OnCoGroupDataSet[L, R](coGroupDataSet: CoGroupDataSet[L, R]) {
// Enhanced coGroup operations with partial function capabilities
}
class ScalaGauge[T](getValue: () => T) extends Gauge[T] {
// Scala-friendly gauge metric implementation
}
// Stream type for partial function support
type Stream[T] = scala.collection.immutable.Stream[T]