Catalyst query optimization framework and expression evaluation engine for Apache Spark SQL
—
This section covers utility classes for date/time operations, string manipulation, and other common operations in Spark Catalyst.
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.sql.types._Comprehensive utilities for date and time operations.
object DateTimeUtils {
// String conversion
def stringToTimestamp(s: UTF8String): Option[Long]
def timestampToString(us: Long): String
def dateToString(days: Int): String
def stringToDate(s: UTF8String): Option[Int]
// Current time functions
def currentTimestamp(): Long
def currentDate(): Int
// Date arithmetic
def dateAddMonths(days: Int, months: Int): Int
def dateAddInterval(days: Int, interval: CalendarInterval): Int
def timestampAddInterval(timestamp: Long, interval: CalendarInterval): Long
// Date/time extraction
def getYear(days: Int): Int
def getMonth(days: Int): Int
def getDayOfMonth(days: Int): Int
def getDayOfYear(days: Int): Int
def getWeekOfYear(days: Int): Int
def getHour(timestamp: Long): Int
def getMinute(timestamp: Long): Int
def getSecond(timestamp: Long): Int
// Formatting
def formatTimestamp(timestamp: Long, format: String): String
def parseTimestamp(s: String, format: String): Option[Long]
// Constants
val MICROS_PER_SECOND: Long = 1000000L
val MICROS_PER_MILLIS: Long = 1000L
val MILLIS_PER_SECOND: Long = 1000L
val SECONDS_PER_DAY: Long = 24 * 60 * 60
val MICROS_PER_DAY: Long = SECONDS_PER_DAY * MICROS_PER_SECOND
}import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.unsafe.types.UTF8String
// Current date and time
val currentTs = DateTimeUtils.currentTimestamp()
val currentDate = DateTimeUtils.currentDate()
// String conversions
val dateString = UTF8String.fromString("2023-12-25")
val date = DateTimeUtils.stringToDate(dateString) // Some(19724)
val dateStr = DateTimeUtils.dateToString(19724) // "2023-12-25"
// Time extraction
val year = DateTimeUtils.getYear(19724) // 2023
val month = DateTimeUtils.getMonth(19724) // 12
val day = DateTimeUtils.getDayOfMonth(19724) // 25
// Timestamp operations
val timestampString = UTF8String.fromString("2023-12-25 14:30:00")
val timestamp = DateTimeUtils.stringToTimestamp(timestampString)
val hour = DateTimeUtils.getHour(timestamp.get) // 14
val minute = DateTimeUtils.getMinute(timestamp.get) // 30Efficient UTF-8 string representation optimized for Spark SQL.
abstract class UTF8String extends Comparable[UTF8String] {
def numBytes(): Int
def numChars(): Int
def toString(): String
def getBytes(): Array[Byte]
// String operations
def contains(substring: UTF8String): Boolean
def startsWith(prefix: UTF8String): Boolean
def endsWith(suffix: UTF8String): Boolean
def indexOf(substring: UTF8String, start: Int): Int
def substring(start: Int): UTF8String
def substring(start: Int, until: Int): UTF8String
def trim(): UTF8String
def trimLeft(): UTF8String
def trimRight(): UTF8String
def reverse(): UTF8String
def repeat(times: Int): UTF8String
// Case operations
def toUpperCase(): UTF8String
def toLowerCase(): UTF8String
// Comparison
def compare(other: UTF8String): Int
def equals(other: Any): Boolean
def hashCode(): Int
// Conversion
def toLong(): Long
def toDouble(): Double
def toInt(): Int
}
object UTF8String {
def fromString(str: String): UTF8String
def fromBytes(bytes: Array[Byte]): UTF8String
def fromBytes(bytes: Array[Byte], offset: Int, numBytes: Int): UTF8String
val EMPTY_UTF8: UTF8String
// String manipulation utilities
def concat(inputs: UTF8String*): UTF8String
def concatWs(separator: UTF8String, inputs: UTF8String*): UTF8String
}import org.apache.spark.unsafe.types.UTF8String
// Create UTF8String instances
val str1 = UTF8String.fromString("Hello World")
val str2 = UTF8String.fromString("hello")
// String operations
val upper = str2.toUpperCase() // "HELLO"
val substring = str1.substring(0, 5) // "Hello"
val contains = str1.contains(UTF8String.fromString("World")) // true
// Concatenation
val concat = UTF8String.concat(str2, UTF8String.fromString(" "), str1)
val concatWs = UTF8String.concatWs(UTF8String.fromString("-"), str1, str2)
// Comparison
val comparison = str1.compare(str2) // > 0 (case sensitive)Abstract representation for array data in Catalyst.
abstract class ArrayData {
def numElements(): Int
def isNullAt(ordinal: Int): Boolean
def get(ordinal: Int, elementType: DataType): Any
def getBoolean(ordinal: Int): Boolean
def getByte(ordinal: Int): Byte
def getShort(ordinal: Int): Short
def getInt(ordinal: Int): Int
def getLong(ordinal: Int): Long
def getFloat(ordinal: Int): Float
def getDouble(ordinal: Int): Double
def getDecimal(ordinal: Int, precision: Int, scale: Int): Decimal
def getUTF8String(ordinal: Int): UTF8String
def getBinary(ordinal: Int): Array[Byte]
def getInterval(ordinal: Int): CalendarInterval
def getStruct(ordinal: Int, numFields: Int): InternalRow
def getArray(ordinal: Int): ArrayData
def getMap(ordinal: Int): MapData
def toArray[T](elementType: DataType): Array[T]
def toObjectArray(elementType: DataType): Array[AnyRef]
def copy(): ArrayData
}Abstract representation for map data in Catalyst.
abstract class MapData {
def numElements(): Int
def keyArray(): ArrayData
def valueArray(): ArrayData
def copy(): MapData
// Java Map conversion
def toMap[K, V](keyType: DataType, valueType: DataType): java.util.Map[K, V]
def toScalaMap[K, V](keyType: DataType, valueType: DataType): scala.collection.Map[K, V]
}Represents a calendar interval (years, months, days, microseconds).
case class CalendarInterval(months: Int, days: Int, microseconds: Long) {
def add(that: CalendarInterval): CalendarInterval
def subtract(that: CalendarInterval): CalendarInterval
def negate(): CalendarInterval
def toString: String
}
object CalendarInterval {
val EMPTY: CalendarInterval = new CalendarInterval(0, 0, 0)
def fromString(input: String): CalendarInterval
def fromSingleUnitString(unit: String, input: String): CalendarInterval
// Factory methods
def fromYearMonthString(input: String): CalendarInterval
def fromDayTimeString(input: String): CalendarInterval
}import org.apache.spark.sql.catalyst.util.CalendarInterval
// Create intervals
val interval1 = new CalendarInterval(2, 15, 3600000000L) // 2 months, 15 days, 1 hour
val interval2 = CalendarInterval.fromString("1 year 2 months 3 days")
// Interval arithmetic
val sum = interval1.add(interval2)
val diff = interval1.subtract(interval2)
val negated = interval1.negate()object MathUtils {
def floorDiv(x: Long, y: Long): Long
def floorMod(x: Long, y: Long): Long
def addExact(x: Long, y: Long): Long
def subtractExact(x: Long, y: Long): Long
def multiplyExact(x: Long, y: Long): Long
def toIntExact(value: Long): Int
// Rounding functions
def round(value: Double, scale: Int): Double
def roundUp(value: Double, scale: Int): Double
def roundDown(value: Double, scale: Int): Double
}Specialized map for attributes with efficient lookups.
class AttributeMap[A](val baseMap: Map[Attribute, A]) {
def get(k: Attribute): Option[A]
def apply(k: Attribute): A
def contains(k: Attribute): Boolean
def size: Int
def isEmpty: Boolean
def nonEmpty: Boolean
def ++(other: AttributeMap[A]): AttributeMap[A]
def -(key: Attribute): AttributeMap[A]
def updated(key: Attribute, value: A): AttributeMap[A]
def keys: Iterable[Attribute]
def values: Iterable[A]
def foreach[U](f: ((Attribute, A)) => U): Unit
def map[B](f: ((Attribute, A)) => (Attribute, B)): AttributeMap[B]
}
object AttributeMap {
def empty[A]: AttributeMap[A] = new AttributeMap(Map.empty)
def apply[A](kvs: (Attribute, A)*): AttributeMap[A] = new AttributeMap(kvs.toMap)
}Efficient set implementation for attributes.
class AttributeSet private (val baseSet: Set[Attribute]) {
def contains(a: Attribute): Boolean
def subsetOf(other: AttributeSet): Boolean
def intersect(other: AttributeSet): AttributeSet
def ++(other: AttributeSet): AttributeSet
def +(attr: Attribute): AttributeSet
def -(attr: Attribute): AttributeSet
def filter(f: Attribute => Boolean): AttributeSet
def map(f: Attribute => Attribute): AttributeSet
def size: Int
def isEmpty: Boolean
def nonEmpty: Boolean
def toSeq: Seq[Attribute]
}
object AttributeSet {
def empty: AttributeSet = new AttributeSet(Set.empty)
def apply(attrs: Attribute*): AttributeSet = new AttributeSet(attrs.toSet)
def fromAttributeSets(sets: Seq[AttributeSet]): AttributeSet
}object HashUtils {
def murmur3Hash(input: Any, dataType: DataType, seed: Int): Int
def xxHash64(input: Any, dataType: DataType, seed: Long): Long
// Hash array elements
def hashArray(array: ArrayData, elementType: DataType, seed: Int): Int
def hashMap(map: MapData, keyType: DataType, valueType: DataType, seed: Int): Int
def hashStruct(struct: InternalRow, structType: StructType, seed: Int): Int
}import org.apache.spark.sql.catalyst.util._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.sql.catalyst.InternalRow
// Working with dates and times
val dateStr = UTF8String.fromString("2023-12-25")
val date = DateTimeUtils.stringToDate(dateStr).get
val year = DateTimeUtils.getYear(date)
val formattedDate = DateTimeUtils.dateToString(date)
// String processing
val name = UTF8String.fromString("John Doe")
val upperName = name.toUpperCase()
val firstName = name.substring(0, 4)
// Interval operations
val interval = new CalendarInterval(1, 0, 0) // 1 month
val futureDate = DateTimeUtils.dateAddInterval(date, interval)
// Working with collections
val attrs = Seq(attr1, attr2, attr3)
val attrSet = AttributeSet(attrs: _*)
val attrMap = AttributeMap(attrs.zip(Seq("value1", "value2", "value3")): _*)
println(s"Date: $formattedDate, Year: $year")
println(s"Name: $upperName, First: $firstName")
println(s"Attribute set size: ${attrSet.size}")These utilities provide the foundation for efficient data processing operations throughout the Catalyst framework, enabling high-performance query execution with proper handling of various data types and operations.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-catalyst