or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

binary-operations.mddata-transformations.mdexecution-environment.mdgrouping-aggregation.mdindex.mdinput-output.mdpartitioning-distribution.mdtype-system.mdutilities.md
tile.json

partitioning-distribution.mddocs/

Partitioning and Distribution

Control over data distribution and partitioning strategies across the cluster. These operations determine how data is distributed among parallel processing instances.

Capabilities

Hash Partitioning

Distribute data using hash-based partitioning to ensure even distribution.

class DataSet[T] {
  /**
   * Partitions data by hash of specified fields
   * @param fields Field positions to use for hashing
   * @return DataSet with hash partitioning
   */
  def partitionByHash(fields: Int*): DataSet[T]
  
  /**
   * Partitions data by hash of named fields
   * @param firstField First field name
   * @param otherFields Additional field names
   * @return DataSet with hash partitioning
   */
  def partitionByHash(firstField: String, otherFields: String*): DataSet[T]
  
  /**
   * Partitions data by hash of key selector result
   * @param fun Key selector function
   * @return DataSet with hash partitioning based on key
   */
  def partitionByHash[K: TypeInformation](fun: T => K): DataSet[T]
}

Usage Examples:

import org.apache.flink.api.scala._

case class Customer(id: Int, name: String, region: String)

val env = ExecutionEnvironment.getExecutionEnvironment
val customers = env.fromElements(
  Customer(1, "Alice", "North"),
  Customer(2, "Bob", "South"),  
  Customer(3, "Charlie", "North"),
  Customer(4, "Diana", "West")
)

// Partition by customer ID field
val partitionedById = customers.partitionByHash("id")

// Partition by multiple fields
val partitionedByRegionAndId = customers.partitionByHash("region", "id")

// Partition using key selector function
val partitionedByRegion = customers.partitionByHash(_.region)

Range Partitioning

Distribute data using range-based partitioning for ordered distribution.

class DataSet[T] {
  /**
   * Partitions data by range of specified fields
   * @param fields Field positions for range partitioning
   * @return DataSet with range partitioning
   */
  def partitionByRange(fields: Int*): DataSet[T]
  
  /**
   * Partitions data by range of named fields
   * @param firstField First field name
   * @param otherFields Additional field names
   * @return DataSet with range partitioning
   */
  def partitionByRange(firstField: String, otherFields: String*): DataSet[T]
  
  /**
   * Partitions data by range of key selector result
   * @param fun Key selector function
   * @return DataSet with range partitioning based on key
   */
  def partitionByRange[K: TypeInformation](fun: T => K): DataSet[T]
}

Range Partitioning with Data Distribution

Advanced range partitioning using custom data distribution.

implicit class DataSetUtils[T](dataSet: DataSet[T]) {
  /**
   * Partitions by range using custom data distribution
   * @param distribution Custom data distribution
   * @param fields Field positions for partitioning
   * @return DataSet with custom range partitioning
   */
  def partitionByRange(distribution: DataDistribution, fields: Int*): DataSet[T]
  
  /**
   * Partitions by range using custom data distribution and field names
   * @param distribution Custom data distribution
   * @param firstField First field name
   * @param otherFields Additional field names
   * @return DataSet with custom range partitioning
   */
  def partitionByRange(distribution: DataDistribution, firstField: String, otherFields: String*): DataSet[T]
  
  /**
   * Partitions by range using custom data distribution and key selector
   * @param distribution Custom data distribution
   * @param fun Key selector function
   * @return DataSet with custom range partitioning
   */
  def partitionByRange[K: TypeInformation](distribution: DataDistribution, fun: T => K): DataSet[T]
}

Usage Examples:

// Range partition by age for ordered processing
case class Person(name: String, age: Int, salary: Double)

val people = env.fromElements(
  Person("Alice", 25, 50000),
  Person("Bob", 30, 60000),
  Person("Charlie", 35, 70000)
)

// Simple range partitioning
val rangePartitioned = people.partitionByRange(_.age)

// Range partitioning with multiple fields
val rangeByAgeAndSalary = people.partitionByRange(p => (p.age, p.salary))

Custom Partitioning

Use custom partitioning logic for specialized distribution requirements.

class DataSet[T] {
  /**
   * Partitions using custom partitioner on specified field
   * @param partitioner Custom partitioner implementation
   * @param field Field position for partitioning
   * @return DataSet with custom partitioning
   */
  def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], field: Int): DataSet[T]
  
  /**
   * Partitions using custom partitioner on named field
   * @param partitioner Custom partitioner implementation
   * @param field Field name for partitioning
   * @return DataSet with custom partitioning
   */
  def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], field: String): DataSet[T]
  
  /**
   * Partitions using custom partitioner on key selector result
   * @param partitioner Custom partitioner implementation
   * @param fun Key selector function
   * @return DataSet with custom partitioning
   */
  def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], fun: T => K): DataSet[T]
}

Usage Examples:

import org.apache.flink.api.common.functions.Partitioner

// Custom partitioner that groups regions together
class RegionPartitioner extends Partitioner[String] {
  override def partition(key: String, numPartitions: Int): Int = {
    key match {
      case "North" | "South" => 0 % numPartitions
      case "East" | "West" => 1 % numPartitions
      case _ => 2 % numPartitions
    }
  }
}

val customPartitioned = customers.partitionCustom(new RegionPartitioner, _.region)

Load Balancing

Rebalance data across all parallel instances for even load distribution.

class DataSet[T] {
  /**
   * Rebalances data across all parallel instances using round-robin
   * @return DataSet with rebalanced distribution
   */
  def rebalance(): DataSet[T]
}

Usage Examples:

// Rebalance after filtering to ensure even distribution
val filteredAndRebalanced = customers
  .filter(_.region == "North")
  .rebalance()
  .map(processCustomer)

Partition Sorting

Sort data within each partition for optimized processing.

class DataSet[T] {
  /**
   * Sorts elements within each partition by field position
   * @param field Field position for sorting
   * @param order Sort order (ASCENDING or DESCENDING)
   * @return DataSet with sorted partitions
   */
  def sortPartition(field: Int, order: Order): DataSet[T]
  
  /**
   * Sorts elements within each partition by field name
   * @param field Field name for sorting
   * @param order Sort order (ASCENDING or DESCENDING)
   * @return DataSet with sorted partitions
   */
  def sortPartition(field: String, order: Order): DataSet[T]
  
  /**
   * Sorts elements within each partition using key selector
   * @param fun Key selector function for sorting
   * @param order Sort order (ASCENDING or DESCENDING)  
   * @return DataSet with sorted partitions
   */
  def sortPartition[K: TypeInformation](fun: T => K, order: Order): DataSet[T]
}

Multi-Key Partition Sorting

Chain multiple sorting keys for complex partition sorting.

class PartitionSortedDataSet[T] extends DataSet[T] {
  /**
   * Adds secondary sort key by field position
   * @param field Field position for secondary sorting
   * @param order Sort order for secondary key
   * @return DataSet with multi-key sorted partitions
   */
  def sortPartition(field: Int, order: Order): DataSet[T]
  
  /**
   * Adds secondary sort key by field name
   * @param field Field name for secondary sorting
   * @param order Sort order for secondary key
   * @return DataSet with multi-key sorted partitions
   */
  def sortPartition(field: String, order: Order): DataSet[T]
  
  /**
   * Adds secondary sort key using key selector
   * @param fun Key selector function for secondary sorting
   * @param order Sort order for secondary key
   * @return DataSet with multi-key sorted partitions
   */
  def sortPartition[K: TypeInformation](fun: T => K, order: Order): DataSet[T]
}

Usage Examples:

import org.apache.flink.api.common.operators.Order

// Sort within partitions by age, then by salary
val sortedPartitions = people
  .partitionByHash(_.region)
  .sortPartition(_.age, Order.ASCENDING)
  .sortPartition(_.salary, Order.DESCENDING)

// Sort by multiple fields using field names
val sortedByFields = people
  .sortPartition("age", Order.ASCENDING)
  .sortPartition("salary", Order.DESCENDING)

Grouped Partitioning

Control partitioning for grouped DataSets to optimize group processing.

class GroupedDataSet[T] {
  /**
   * Uses custom partitioner for group distribution
   * @param partitioner Custom partitioner for group keys
   * @return GroupedDataSet with custom partitioning
   */
  def withPartitioner[K: TypeInformation](partitioner: Partitioner[K]): GroupedDataSet[T]
}

Binary Operation Partitioning

Control partitioning for join, cross, and coGroup operations.

trait JoinFunctionAssigner[L, R] {
  /**
   * Uses custom partitioner for join distribution
   * @param partitioner Custom partitioner for join keys
   * @return JoinFunctionAssigner with custom partitioning
   */
  def withPartitioner[K: TypeInformation](partitioner: Partitioner[K]): JoinFunctionAssigner[L, R]
}

class CoGroupDataSet[L, R] {
  /**
   * Uses custom partitioner for coGroup distribution
   * @param partitioner Custom partitioner for coGroup keys
   * @return CoGroupDataSet with custom partitioning
   */
  def withPartitioner[K: TypeInformation](partitioner: Partitioner[K]): CoGroupDataSet[L, R]
}

Usage Examples:

// Custom partitioning for joins
val joinResult = leftDataSet
  .join(rightDataSet)
  .where(_.key)
  .equalTo(_.key)
  .withPartitioner(new CustomKeyPartitioner)
  .apply((left, right) => combineData(left, right))

Distribution Strategies

Hints for optimizing data distribution in binary operations.

class DataSet[T] {
  /**
   * Hints that this DataSet is small for broadcast operations
   * @param other Large DataSet to join with
   * @return Join operation optimized for broadcasting this DataSet
   */
  def joinWithHuge[O](other: DataSet[O]): UnfinishedJoinOperation[T, O]
  
  /**
   * Hints that other DataSet is small for broadcast operations
   * @param other Small DataSet to broadcast
   * @return Join operation optimized for broadcasting other DataSet
   */  
  def joinWithTiny[O](other: DataSet[O]): UnfinishedJoinOperation[T, O]
  
  /**
   * Cross with broadcasting hint for small DataSet
   * @param other Small DataSet to broadcast
   * @return Cross operation with broadcast optimization
   */
  def crossWithTiny[O](other: DataSet[O]): CrossDataSet[T, O]
  
  /**
   * Cross with broadcasting hint for large DataSet
   * @param other Large DataSet (this will be broadcast)
   * @return Cross operation with broadcast optimization
   */
  def crossWithHuge[O](other: DataSet[O]): CrossDataSet[T, O]
}

Parallelism Control

Set parallelism at the operation level to control resource usage.

class DataSet[T] {
  /**
   * Sets parallelism for this operation
   * @param parallelism Degree of parallelism
   * @return DataSet with specified parallelism
   */
  def setParallelism(parallelism: Int): DataSet[T]
  
  /**
   * Gets the parallelism of this operation
   * @return Current parallelism setting
   */
  def getParallelism: Int
}

Usage Examples:

// Set different parallelism for expensive operations
val result = data
  .setParallelism(8)  // Use 8 parallel instances
  .map(expensiveTransformation)
  .setParallelism(4)  // Reduce to 4 for subsequent operations
  .reduce(combineResults)

Resource Requirements

Specify minimum and preferred resource requirements for operations.

class DataSet[T] {
  /**
   * Gets minimum resource requirements
   * @return ResourceSpec with minimum requirements
   */
  def minResources: ResourceSpec
  
  /**
   * Gets preferred resource requirements
   * @return ResourceSpec with preferred requirements
   */
  def preferredResources: ResourceSpec
}

Types

abstract class Partitioner[T] {
  /**
   * Determines partition for given key
   * @param key Key to partition
   * @param numPartitions Total number of partitions
   * @return Partition index (0 to numPartitions-1)
   */
  def partition(key: T, numPartitions: Int): Int
}

sealed trait Order
object Order {
  case object ASCENDING extends Order
  case object DESCENDING extends Order
}

class PartitionSortedDataSet[T] extends DataSet[T] {
  // Represents a DataSet with sorted partitions that allows chaining additional sort keys
}

trait DataDistribution {
  /**
   * Gets bucket boundaries for range partitioning
   * @return Array of bucket boundaries
   */
  def getBucketBoundary(bucketNum: Int, totalNumBuckets: Int): AnyRef
}

class ResourceSpec {
  /**
   * Gets CPU cores requirement
   * @return Number of CPU cores 
   */
  def getCpuCores: Double
  
  /**
   * Gets heap memory requirement in MB
   * @return Heap memory in megabytes
   */
  def getHeapMemoryInMB: Int
  
  /**
   * Gets direct memory requirement in MB
   * @return Direct memory in megabytes
   */
  def getDirectMemoryInMB: Int
  
  /**
   * Gets native memory requirement in MB
   * @return Native memory in megabytes
   */
  def getNativeMemoryInMB: Int
  
  /**
   * Gets network memory requirement in MB
   * @return Network memory in megabytes
   */
  def getNetworkMemoryInMB: Int
}

object ResourceSpec {
  /**
   * Creates ResourceSpec with default values
   * @return Default ResourceSpec
   */
  def DEFAULT: ResourceSpec
  
  /**
   * Creates ResourceSpec with unknown requirements
   * @return Unknown ResourceSpec
   */
  def UNKNOWN: ResourceSpec
  
  /**
   * Creates new ResourceSpec builder
   * @return ResourceSpec builder
   */
  def newBuilder(): ResourceSpec.Builder
}