Control over data distribution and partitioning strategies across the cluster. These operations determine how data is distributed among parallel processing instances.
Distribute data using hash-based partitioning to ensure even distribution.
class DataSet[T] {
/**
* Partitions data by hash of specified fields
* @param fields Field positions to use for hashing
* @return DataSet with hash partitioning
*/
def partitionByHash(fields: Int*): DataSet[T]
/**
* Partitions data by hash of named fields
* @param firstField First field name
* @param otherFields Additional field names
* @return DataSet with hash partitioning
*/
def partitionByHash(firstField: String, otherFields: String*): DataSet[T]
/**
* Partitions data by hash of key selector result
* @param fun Key selector function
* @return DataSet with hash partitioning based on key
*/
def partitionByHash[K: TypeInformation](fun: T => K): DataSet[T]
}Usage Examples:
import org.apache.flink.api.scala._
case class Customer(id: Int, name: String, region: String)
val env = ExecutionEnvironment.getExecutionEnvironment
val customers = env.fromElements(
Customer(1, "Alice", "North"),
Customer(2, "Bob", "South"),
Customer(3, "Charlie", "North"),
Customer(4, "Diana", "West")
)
// Partition by customer ID field
val partitionedById = customers.partitionByHash("id")
// Partition by multiple fields
val partitionedByRegionAndId = customers.partitionByHash("region", "id")
// Partition using key selector function
val partitionedByRegion = customers.partitionByHash(_.region)Distribute data using range-based partitioning for ordered distribution.
class DataSet[T] {
/**
* Partitions data by range of specified fields
* @param fields Field positions for range partitioning
* @return DataSet with range partitioning
*/
def partitionByRange(fields: Int*): DataSet[T]
/**
* Partitions data by range of named fields
* @param firstField First field name
* @param otherFields Additional field names
* @return DataSet with range partitioning
*/
def partitionByRange(firstField: String, otherFields: String*): DataSet[T]
/**
* Partitions data by range of key selector result
* @param fun Key selector function
* @return DataSet with range partitioning based on key
*/
def partitionByRange[K: TypeInformation](fun: T => K): DataSet[T]
}Advanced range partitioning using custom data distribution.
implicit class DataSetUtils[T](dataSet: DataSet[T]) {
/**
* Partitions by range using custom data distribution
* @param distribution Custom data distribution
* @param fields Field positions for partitioning
* @return DataSet with custom range partitioning
*/
def partitionByRange(distribution: DataDistribution, fields: Int*): DataSet[T]
/**
* Partitions by range using custom data distribution and field names
* @param distribution Custom data distribution
* @param firstField First field name
* @param otherFields Additional field names
* @return DataSet with custom range partitioning
*/
def partitionByRange(distribution: DataDistribution, firstField: String, otherFields: String*): DataSet[T]
/**
* Partitions by range using custom data distribution and key selector
* @param distribution Custom data distribution
* @param fun Key selector function
* @return DataSet with custom range partitioning
*/
def partitionByRange[K: TypeInformation](distribution: DataDistribution, fun: T => K): DataSet[T]
}Usage Examples:
// Range partition by age for ordered processing
case class Person(name: String, age: Int, salary: Double)
val people = env.fromElements(
Person("Alice", 25, 50000),
Person("Bob", 30, 60000),
Person("Charlie", 35, 70000)
)
// Simple range partitioning
val rangePartitioned = people.partitionByRange(_.age)
// Range partitioning with multiple fields
val rangeByAgeAndSalary = people.partitionByRange(p => (p.age, p.salary))Use custom partitioning logic for specialized distribution requirements.
class DataSet[T] {
/**
* Partitions using custom partitioner on specified field
* @param partitioner Custom partitioner implementation
* @param field Field position for partitioning
* @return DataSet with custom partitioning
*/
def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], field: Int): DataSet[T]
/**
* Partitions using custom partitioner on named field
* @param partitioner Custom partitioner implementation
* @param field Field name for partitioning
* @return DataSet with custom partitioning
*/
def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], field: String): DataSet[T]
/**
* Partitions using custom partitioner on key selector result
* @param partitioner Custom partitioner implementation
* @param fun Key selector function
* @return DataSet with custom partitioning
*/
def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], fun: T => K): DataSet[T]
}Usage Examples:
import org.apache.flink.api.common.functions.Partitioner
// Custom partitioner that groups regions together
class RegionPartitioner extends Partitioner[String] {
override def partition(key: String, numPartitions: Int): Int = {
key match {
case "North" | "South" => 0 % numPartitions
case "East" | "West" => 1 % numPartitions
case _ => 2 % numPartitions
}
}
}
val customPartitioned = customers.partitionCustom(new RegionPartitioner, _.region)Rebalance data across all parallel instances for even load distribution.
class DataSet[T] {
/**
* Rebalances data across all parallel instances using round-robin
* @return DataSet with rebalanced distribution
*/
def rebalance(): DataSet[T]
}Usage Examples:
// Rebalance after filtering to ensure even distribution
val filteredAndRebalanced = customers
.filter(_.region == "North")
.rebalance()
.map(processCustomer)Sort data within each partition for optimized processing.
class DataSet[T] {
/**
* Sorts elements within each partition by field position
* @param field Field position for sorting
* @param order Sort order (ASCENDING or DESCENDING)
* @return DataSet with sorted partitions
*/
def sortPartition(field: Int, order: Order): DataSet[T]
/**
* Sorts elements within each partition by field name
* @param field Field name for sorting
* @param order Sort order (ASCENDING or DESCENDING)
* @return DataSet with sorted partitions
*/
def sortPartition(field: String, order: Order): DataSet[T]
/**
* Sorts elements within each partition using key selector
* @param fun Key selector function for sorting
* @param order Sort order (ASCENDING or DESCENDING)
* @return DataSet with sorted partitions
*/
def sortPartition[K: TypeInformation](fun: T => K, order: Order): DataSet[T]
}Chain multiple sorting keys for complex partition sorting.
class PartitionSortedDataSet[T] extends DataSet[T] {
/**
* Adds secondary sort key by field position
* @param field Field position for secondary sorting
* @param order Sort order for secondary key
* @return DataSet with multi-key sorted partitions
*/
def sortPartition(field: Int, order: Order): DataSet[T]
/**
* Adds secondary sort key by field name
* @param field Field name for secondary sorting
* @param order Sort order for secondary key
* @return DataSet with multi-key sorted partitions
*/
def sortPartition(field: String, order: Order): DataSet[T]
/**
* Adds secondary sort key using key selector
* @param fun Key selector function for secondary sorting
* @param order Sort order for secondary key
* @return DataSet with multi-key sorted partitions
*/
def sortPartition[K: TypeInformation](fun: T => K, order: Order): DataSet[T]
}Usage Examples:
import org.apache.flink.api.common.operators.Order
// Sort within partitions by age, then by salary
val sortedPartitions = people
.partitionByHash(_.region)
.sortPartition(_.age, Order.ASCENDING)
.sortPartition(_.salary, Order.DESCENDING)
// Sort by multiple fields using field names
val sortedByFields = people
.sortPartition("age", Order.ASCENDING)
.sortPartition("salary", Order.DESCENDING)Control partitioning for grouped DataSets to optimize group processing.
class GroupedDataSet[T] {
/**
* Uses custom partitioner for group distribution
* @param partitioner Custom partitioner for group keys
* @return GroupedDataSet with custom partitioning
*/
def withPartitioner[K: TypeInformation](partitioner: Partitioner[K]): GroupedDataSet[T]
}Control partitioning for join, cross, and coGroup operations.
trait JoinFunctionAssigner[L, R] {
/**
* Uses custom partitioner for join distribution
* @param partitioner Custom partitioner for join keys
* @return JoinFunctionAssigner with custom partitioning
*/
def withPartitioner[K: TypeInformation](partitioner: Partitioner[K]): JoinFunctionAssigner[L, R]
}
class CoGroupDataSet[L, R] {
/**
* Uses custom partitioner for coGroup distribution
* @param partitioner Custom partitioner for coGroup keys
* @return CoGroupDataSet with custom partitioning
*/
def withPartitioner[K: TypeInformation](partitioner: Partitioner[K]): CoGroupDataSet[L, R]
}Usage Examples:
// Custom partitioning for joins
val joinResult = leftDataSet
.join(rightDataSet)
.where(_.key)
.equalTo(_.key)
.withPartitioner(new CustomKeyPartitioner)
.apply((left, right) => combineData(left, right))Hints for optimizing data distribution in binary operations.
class DataSet[T] {
/**
* Hints that this DataSet is small for broadcast operations
* @param other Large DataSet to join with
* @return Join operation optimized for broadcasting this DataSet
*/
def joinWithHuge[O](other: DataSet[O]): UnfinishedJoinOperation[T, O]
/**
* Hints that other DataSet is small for broadcast operations
* @param other Small DataSet to broadcast
* @return Join operation optimized for broadcasting other DataSet
*/
def joinWithTiny[O](other: DataSet[O]): UnfinishedJoinOperation[T, O]
/**
* Cross with broadcasting hint for small DataSet
* @param other Small DataSet to broadcast
* @return Cross operation with broadcast optimization
*/
def crossWithTiny[O](other: DataSet[O]): CrossDataSet[T, O]
/**
* Cross with broadcasting hint for large DataSet
* @param other Large DataSet (this will be broadcast)
* @return Cross operation with broadcast optimization
*/
def crossWithHuge[O](other: DataSet[O]): CrossDataSet[T, O]
}Set parallelism at the operation level to control resource usage.
class DataSet[T] {
/**
* Sets parallelism for this operation
* @param parallelism Degree of parallelism
* @return DataSet with specified parallelism
*/
def setParallelism(parallelism: Int): DataSet[T]
/**
* Gets the parallelism of this operation
* @return Current parallelism setting
*/
def getParallelism: Int
}Usage Examples:
// Set different parallelism for expensive operations
val result = data
.setParallelism(8) // Use 8 parallel instances
.map(expensiveTransformation)
.setParallelism(4) // Reduce to 4 for subsequent operations
.reduce(combineResults)Specify minimum and preferred resource requirements for operations.
class DataSet[T] {
/**
* Gets minimum resource requirements
* @return ResourceSpec with minimum requirements
*/
def minResources: ResourceSpec
/**
* Gets preferred resource requirements
* @return ResourceSpec with preferred requirements
*/
def preferredResources: ResourceSpec
}abstract class Partitioner[T] {
/**
* Determines partition for given key
* @param key Key to partition
* @param numPartitions Total number of partitions
* @return Partition index (0 to numPartitions-1)
*/
def partition(key: T, numPartitions: Int): Int
}
sealed trait Order
object Order {
case object ASCENDING extends Order
case object DESCENDING extends Order
}
class PartitionSortedDataSet[T] extends DataSet[T] {
// Represents a DataSet with sorted partitions that allows chaining additional sort keys
}
trait DataDistribution {
/**
* Gets bucket boundaries for range partitioning
* @return Array of bucket boundaries
*/
def getBucketBoundary(bucketNum: Int, totalNumBuckets: Int): AnyRef
}
class ResourceSpec {
/**
* Gets CPU cores requirement
* @return Number of CPU cores
*/
def getCpuCores: Double
/**
* Gets heap memory requirement in MB
* @return Heap memory in megabytes
*/
def getHeapMemoryInMB: Int
/**
* Gets direct memory requirement in MB
* @return Direct memory in megabytes
*/
def getDirectMemoryInMB: Int
/**
* Gets native memory requirement in MB
* @return Native memory in megabytes
*/
def getNativeMemoryInMB: Int
/**
* Gets network memory requirement in MB
* @return Network memory in megabytes
*/
def getNetworkMemoryInMB: Int
}
object ResourceSpec {
/**
* Creates ResourceSpec with default values
* @return Default ResourceSpec
*/
def DEFAULT: ResourceSpec
/**
* Creates ResourceSpec with unknown requirements
* @return Unknown ResourceSpec
*/
def UNKNOWN: ResourceSpec
/**
* Creates new ResourceSpec builder
* @return ResourceSpec builder
*/
def newBuilder(): ResourceSpec.Builder
}