or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

aggregations.mdcatalog.mddata-io.mddataframe-dataset.mdfunctions-expressions.mdindex.mdsession-management.mdstreaming.md
tile.json

functions-expressions.mddocs/

SQL Functions and Expressions

Spark SQL provides an extensive library of built-in functions for data manipulation, accessible through the org.apache.spark.sql.functions object and the Column class. These functions cover mathematical operations, string manipulation, date/time processing, aggregations, and complex data type operations.

Column Class

class Column {
  // Arithmetic operations
  def +(other: Any): Column
  def -(other: Any): Column
  def *(other: Any): Column
  def /(other: Any): Column
  def %(other: Any): Column
  def unary_-: Column
  
  // Comparison operations
  def ===(other: Any): Column
  def !==(other: Any): Column
  def >(other: Any): Column
  def >=(other: Any): Column
  def <(other: Any): Column
  def <=(other: Any): Column
  def <=> (other: Any): Column  // Null-safe equality
  
  // Logical operations
  def &&(other: Column): Column
  def ||(other: Column): Column
  def unary_!: Column
  
  // String operations
  def contains(other: Any): Column
  def startsWith(other: Column): Column
  def startsWith(literal: String): Column
  def endsWith(other: Column): Column
  def endsWith(literal: String): Column
  def rlike(literal: String): Column
  def like(literal: String): Column
  
  // Null operations
  def isNull: Column
  def isNotNull: Column
  def isNaN: Column
  
  // Type operations
  def cast(to: DataType): Column
  def cast(to: String): Column
  def as(alias: String): Column
  def as(alias: Symbol): Column
  def name(alias: String): Column
  
  // Collection operations
  def getItem(key: Any): Column
  def getField(fieldName: String): Column
  
  // Sorting
  def asc: Column
  def desc: Column
  def asc_nulls_first: Column
  def asc_nulls_last: Column
  def desc_nulls_first: Column
  def desc_nulls_last: Column
  
  // SQL expressions
  def when(condition: Column, value: Any): Column
  def otherwise(value: Any): Column
  def over(window: WindowSpec): Column
  def isin(list: Any*): Column
  def between(lowerBound: Any, upperBound: Any): Column
}

Core Functions

Column Creation

// Column references
def col(colName: String): Column
def column(colName: String): Column

// Literal values
def lit(literal: Any): Column

// Input metadata
def input_file_name(): Column
def monotonically_increasing_id(): Column
def spark_partition_id(): Column

Usage Examples:

import org.apache.spark.sql.functions._

// Column references
val nameCol = col("name")
val ageCol = column("age")

// Literals
val constantValue = lit(42)
val stringLiteral = lit("Hello")
val dateLiteral = lit(java.sql.Date.valueOf("2023-01-01"))

// Metadata functions
val withFilename = df.withColumn("source_file", input_file_name())
val withId = df.withColumn("unique_id", monotonically_increasing_id())

Mathematical Functions

// Basic math
def abs(e: Column): Column
def ceil(e: Column): Column
def floor(e: Column): Column
def round(e: Column): Column
def round(e: Column, scale: Int): Column
def signum(e: Column): Column

// Exponential and logarithmic
def exp(e: Column): Column
def expm1(e: Column): Column
def log(e: Column): Column
def log10(e: Column): Column
def log2(e: Column): Column
def log1p(e: Column): Column
def pow(l: Column, r: Column): Column
def pow(l: Column, r: Double): Column
def sqrt(e: Column): Column

// Trigonometric
def sin(e: Column): Column
def cos(e: Column): Column
def tan(e: Column): Column
def asin(e: Column): Column
def acos(e: Column): Column
def atan(e: Column): Column
def atan2(l: Column, r: Column): Column
def sinh(e: Column): Column
def cosh(e: Column): Column
def tanh(e: Column): Column

// Angle conversion
def degrees(e: Column): Column
def radians(e: Column): Column

// Random functions
def rand(): Column
def rand(seed: Long): Column
def randn(): Column
def randn(seed: Long): Column

Usage Examples:

// Basic calculations
val absValues = df.withColumn("abs_value", abs(col("amount")))
val rounded = df.withColumn("rounded_price", round(col("price"), 2))

// Power and logarithms
val squared = df.withColumn("squared", pow(col("value"), 2))
val logValues = df.withColumn("log_amount", log(col("amount")))

// Trigonometry for calculations
val distances = df.withColumn("distance", 
  sqrt(pow(col("x2") - col("x1"), 2) + pow(col("y2") - col("y1"), 2)))

// Random sampling
val withRandom = df.withColumn("random_score", rand(42))

String Functions

// String manipulation
def concat(exprs: Column*): Column
def concat_ws(sep: String, exprs: Column*): Column
def format_string(format: String, arguments: Column*): Column
def length(e: Column): Column
def lower(e: Column): Column
def upper(e: Column): Column
def initcap(e: Column): Column

// Trimming and padding
def ltrim(e: Column): Column
def rtrim(e: Column): Column
def trim(e: Column): Column
def lpad(str: Column, len: Int, pad: String): Column
def rpad(str: Column, len: Int, pad: String): Column

// Substring operations
def substring(str: Column, pos: Int, len: Int): Column
def substring_index(str: Column, delim: String, count: Int): Column
def left(str: Column, len: Int): Column
def right(str: Column, len: Int): Column

// Regular expressions
def regexp_extract(e: Column, exp: String, groupIdx: Int): Column
def regexp_replace(e: Column, pattern: String, replacement: String): Column
def rlike(str: Column, regexp: Column): Column

// String testing
def ascii(e: Column): Column
def base64(e: Column): Column
def unbase64(e: Column): Column
def encode(value: Column, charset: String): Column
def decode(value: Column, charset: String): Column

// String splitting and parsing
def split(str: Column, pattern: String): Column
def split(str: Column, pattern: String, limit: Int): Column

// Hashing
def md5(e: Column): Column
def sha1(e: Column): Column
def sha2(e: Column, numBits: Int): Column
def crc32(e: Column): Column
def hash(cols: Column*): Column
def xxhash64(cols: Column*): Column

Usage Examples:

// String concatenation
val fullName = df.withColumn("full_name", 
  concat(col("first_name"), lit(" "), col("last_name")))

val csvData = df.withColumn("csv_row", 
  concat_ws(",", col("id"), col("name"), col("email")))

// String formatting
val formatted = df.withColumn("description", 
  format_string("User %s (ID: %d)", col("name"), col("id")))

// Text processing
val cleaned = df
  .withColumn("trimmed", trim(col("description")))
  .withColumn("upper_name", upper(col("name")))
  .withColumn("name_length", length(col("name")))

// Regular expressions
val phoneExtract = df.withColumn("area_code", 
  regexp_extract(col("phone"), """(\d{3})-\d{3}-\d{4}""", 1))

val cleanedText = df.withColumn("clean_text", 
  regexp_replace(col("text"), "[^a-zA-Z0-9 ]", ""))

// String splitting
val nameParts = df.withColumn("name_parts", split(col("full_name"), " "))

// Hashing for data masking
val hashedEmail = df.withColumn("email_hash", sha2(col("email"), 256))

Date and Time Functions

// Current date/time
def current_date(): Column
def current_timestamp(): Column
def now(): Column

// Date arithmetic
def date_add(start: Column, days: Int): Column
def date_sub(start: Column, days: Int): Column
def datediff(end: Column, start: Column): Column
def add_months(start: Column, numMonths: Int): Column
def months_between(end: Column, start: Column): Column
def months_between(end: Column, start: Column, roundOff: Boolean): Column

// Date extraction
def year(e: Column): Column
def quarter(e: Column): Column
def month(e: Column): Column
def dayofmonth(e: Column): Column
def dayofweek(e: Column): Column
def dayofyear(e: Column): Column
def hour(e: Column): Column
def minute(e: Column): Column
def second(e: Column): Column
def weekofyear(e: Column): Column

// Date formatting and parsing
def date_format(dateExpr: Column, format: String): Column
def from_unixtime(ut: Column): Column
def from_unixtime(ut: Column, f: String): Column
def unix_timestamp(): Column
def unix_timestamp(s: Column): Column
def unix_timestamp(s: Column, p: String): Column
def to_timestamp(s: Column): Column
def to_timestamp(s: Column, fmt: String): Column
def to_date(e: Column): Column
def to_date(e: Column, fmt: String): Column

// Date/time truncation
def trunc(date: Column, format: String): Column
def date_trunc(format: String, timestamp: Column): Column

// Time zones
def from_utc_timestamp(ts: Column, tz: String): Column
def to_utc_timestamp(ts: Column, tz: String): Column

Usage Examples:

// Current date and time
val withTimestamp = df.withColumn("processed_at", current_timestamp())
val withDate = df.withColumn("processed_date", current_date())

// Date calculations
val daysUntilDeadline = df.withColumn("days_left", 
  datediff(col("deadline"), current_date()))

val futureDate = df.withColumn("review_date", 
  add_months(col("created_date"), 6))

// Date part extraction
val withDateParts = df
  .withColumn("year", year(col("created_date")))
  .withColumn("month", month(col("created_date")))
  .withColumn("day_of_week", dayofweek(col("created_date")))

// Date formatting
val formatted = df.withColumn("formatted_date", 
  date_format(col("timestamp"), "yyyy-MM-dd HH:mm"))

// Unix timestamp conversion
val unixTime = df.withColumn("unix_ts", 
  unix_timestamp(col("date_string"), "yyyy-MM-dd"))

val fromUnix = df.withColumn("readable_date", 
  from_unixtime(col("unix_timestamp")))

// Date parsing
val parsedDate = df.withColumn("parsed_date", 
  to_date(col("date_string"), "MM/dd/yyyy"))

// Time zone conversion
val utcTime = df.withColumn("utc_time", 
  to_utc_timestamp(col("local_time"), "America/New_York"))

Aggregate Functions

// Basic aggregations
def count(e: Column): Column
def sum(e: Column): Column
def avg(e: Column): Column
def mean(e: Column): Column
def max(e: Column): Column
def min(e: Column): Column

// Statistical functions
def stddev(e: Column): Column
def stddev_pop(e: Column): Column
def stddev_samp(e: Column): Column
def variance(e: Column): Column
def var_pop(e: Column): Column
def var_samp(e: Column): Column
def skewness(e: Column): Column
def kurtosis(e: Column): Column

// Collection aggregations
def collect_list(e: Column): Column
def collect_set(e: Column): Column

// Distinct counting
def countDistinct(expr: Column, exprs: Column*): Column
def approx_count_distinct(e: Column): Column
def approx_count_distinct(e: Column, rsd: Double): Column

// First/last values
def first(e: Column): Column
def first(e: Column, ignoreNulls: Boolean): Column
def last(e: Column): Column
def last(e: Column, ignoreNulls: Boolean): Column

// Percentiles
def expr(expr: String): Column  // For percentile_approx, etc.

Usage Examples:

// Basic aggregations
val summary = df.agg(
  count(col("id")).alias("total_rows"),
  sum(col("amount")).alias("total_amount"),
  avg(col("amount")).alias("avg_amount"),
  max(col("created_date")).alias("latest_date"),
  min(col("created_date")).alias("earliest_date")
)

// Statistical analysis
val stats = df.agg(
  stddev(col("score")).alias("std_dev"),
  variance(col("score")).alias("variance"),
  skewness(col("score")).alias("skewness"),
  kurtosis(col("score")).alias("kurtosis")
)

// Collect values
val categories = df.agg(
  collect_set(col("category")).alias("unique_categories"),
  collect_list(col("name")).alias("all_names")
)

// Distinct counts
val uniqueCounts = df.agg(
  countDistinct(col("user_id")).alias("unique_users"),
  approx_count_distinct(col("session_id"), 0.05).alias("approx_sessions")
)

// First/last values (useful with ordering)
val firstLast = df
  .orderBy(col("timestamp"))
  .agg(
    first(col("status")).alias("first_status"),
    last(col("status")).alias("last_status")
  )

Conditional Functions

def when(condition: Column, value: Any): Column
def coalesce(e: Column*): Column
def isnull(e: Column): Column
def isnan(e: Column): Column
def nanvl(col1: Column, col2: Column): Column
def greatest(exprs: Column*): Column
def least(exprs: Column*): Column

Usage Examples:

// Conditional logic
val categorized = df.withColumn("age_group",
  when(col("age") < 18, "Minor")
    .when(col("age") < 65, "Adult")
    .otherwise("Senior")
)

// Handle nulls
val withDefaults = df.withColumn("description", 
  coalesce(col("description"), lit("No description available")))

// Handle NaN values
val cleanNaN = df.withColumn("clean_score", 
  nanvl(col("score"), lit(0.0)))

// Find extreme values
val ranges = df.withColumn("max_value", 
  greatest(col("value1"), col("value2"), col("value3")))

Array and Map Functions

// Array creation and manipulation
def array(cols: Column*): Column
def array_contains(column: Column, value: Any): Column
def array_distinct(e: Column): Column
def array_except(col1: Column, col2: Column): Column
def array_intersect(col1: Column, col2: Column): Column
def array_join(column: Column, delimiter: String): Column
def array_max(e: Column): Column
def array_min(e: Column): Column
def array_position(col: Column, value: Any): Column
def array_remove(col: Column, element: Any): Column
def array_repeat(col: Column, count: Int): Column
def array_sort(e: Column): Column
def array_union(col1: Column, col2: Column): Column
def arrays_overlap(a1: Column, a2: Column): Column
def arrays_zip(e: Column*): Column
def size(e: Column): Column
def slice(x: Column, start: Int, length: Int): Column
def sort_array(e: Column): Column
def sort_array(e: Column, asc: Boolean): Column

// Array explosion
def explode(e: Column): Column
def explode_outer(e: Column): Column
def posexplode(e: Column): Column
def posexplode_outer(e: Column): Column

// Map operations
def map(cols: Column*): Column
def map_keys(e: Column): Column
def map_values(e: Column): Column
def map_from_arrays(keys: Column, values: Column): Column
def map_from_entries(e: Column): Column

Usage Examples:

// Create arrays
val arrayCol = df.withColumn("scores_array", 
  array(col("score1"), col("score2"), col("score3")))

// Array operations
val arrayOps = df
  .withColumn("has_zero", array_contains(col("scores"), 0))
  .withColumn("unique_scores", array_distinct(col("scores")))
  .withColumn("max_score", array_max(col("scores")))
  .withColumn("array_size", size(col("scores")))

// Array to string
val joined = df.withColumn("scores_csv", 
  array_join(col("scores"), ","))

// Explode arrays to rows
val exploded = df.select(col("id"), explode(col("tags")).alias("tag"))

// Position-based explosion
val withPosition = df.select(col("id"), 
  posexplode(col("values")).alias(Seq("pos", "value")))

// Map operations
val mapCol = df.withColumn("properties", 
  map(lit("name"), col("name"), lit("age"), col("age")))

val keys = df.withColumn("prop_keys", map_keys(col("properties")))
val values = df.withColumn("prop_values", map_values(col("properties")))

Window Functions

// Ranking functions
def row_number(): Column
def rank(): Column
def dense_rank(): Column
def percent_rank(): Column
def ntile(n: Int): Column
def cume_dist(): Column

// Offset functions
def lag(e: Column, offset: Int): Column
def lag(e: Column, offset: Int, defaultValue: Any): Column
def lead(e: Column, offset: Int): Column
def lead(e: Column, offset: Int, defaultValue: Any): Column

Usage Examples:

import org.apache.spark.sql.expressions.Window

// Window specifications
val windowSpec = Window.partitionBy("department").orderBy(col("salary").desc)
val rowsWindow = Window.partitionBy("category").orderBy("date")
  .rowsBetween(Window.unboundedPreceding, Window.currentRow)

// Ranking
val ranked = df.withColumn("salary_rank", 
  rank().over(windowSpec))

val rowNumbers = df.withColumn("row_num", 
  row_number().over(windowSpec))

val percentiles = df.withColumn("salary_percentile", 
  percent_rank().over(windowSpec))

// Lag/Lead for time series
val withLag = df.withColumn("prev_value", 
  lag(col("value"), 1).over(Window.partitionBy("id").orderBy("timestamp")))

val withLead = df.withColumn("next_value", 
  lead(col("value"), 1, 0).over(Window.partitionBy("id").orderBy("timestamp")))

// Running aggregations
val runningSum = df.withColumn("running_total", 
  sum(col("amount")).over(rowsWindow))

Type Conversion Functions

def cast(col: Column, dataType: DataType): Column
def cast(col: Column, dataType: String): Column

Usage Examples:

import org.apache.spark.sql.types._

// Type casting
val converted = df
  .withColumn("age_int", col("age").cast(IntegerType))
  .withColumn("score_double", col("score").cast("double"))
  .withColumn("created_date", col("created_timestamp").cast(DateType))

// String to numeric conversions
val numeric = df
  .withColumn("amount_decimal", col("amount_str").cast(DecimalType(10, 2)))
  .withColumn("count_long", col("count_str").cast(LongType))

// Date/time conversions
val dates = df
  .withColumn("date_from_string", to_date(col("date_str"), "yyyy-MM-dd"))
  .withColumn("timestamp_from_string", to_timestamp(col("ts_str"), "yyyy-MM-dd HH:mm:ss"))

JSON Functions

def from_json(e: Column, schema: DataType): Column
def from_json(e: Column, schema: String): Column  
def to_json(e: Column): Column
def json_tuple(json: Column, fields: String*): Column
def get_json_object(e: Column, path: String): Column

Usage Examples:

import org.apache.spark.sql.types._

// JSON parsing
val jsonSchema = StructType(Array(
  StructField("name", StringType, true),
  StructField("age", IntegerType, true)
))

val parsed = df.withColumn("parsed_json", 
  from_json(col("json_string"), jsonSchema))

// Extract JSON fields
val name = df.withColumn("name", 
  get_json_object(col("json_data"), "$.name"))

// Convert to JSON
val jsonified = df.withColumn("row_as_json", to_json(struct("*")))

// JSON tuple extraction
val extracted = df.select(col("id"), 
  json_tuple(col("json_data"), "name", "age").alias(Seq("name", "age")))