Spark SQL provides an extensive library of built-in functions for data manipulation, accessible through the org.apache.spark.sql.functions object and the Column class. These functions cover mathematical operations, string manipulation, date/time processing, aggregations, and complex data type operations.
class Column {
// Arithmetic operations
def +(other: Any): Column
def -(other: Any): Column
def *(other: Any): Column
def /(other: Any): Column
def %(other: Any): Column
def unary_-: Column
// Comparison operations
def ===(other: Any): Column
def !==(other: Any): Column
def >(other: Any): Column
def >=(other: Any): Column
def <(other: Any): Column
def <=(other: Any): Column
def <=> (other: Any): Column // Null-safe equality
// Logical operations
def &&(other: Column): Column
def ||(other: Column): Column
def unary_!: Column
// String operations
def contains(other: Any): Column
def startsWith(other: Column): Column
def startsWith(literal: String): Column
def endsWith(other: Column): Column
def endsWith(literal: String): Column
def rlike(literal: String): Column
def like(literal: String): Column
// Null operations
def isNull: Column
def isNotNull: Column
def isNaN: Column
// Type operations
def cast(to: DataType): Column
def cast(to: String): Column
def as(alias: String): Column
def as(alias: Symbol): Column
def name(alias: String): Column
// Collection operations
def getItem(key: Any): Column
def getField(fieldName: String): Column
// Sorting
def asc: Column
def desc: Column
def asc_nulls_first: Column
def asc_nulls_last: Column
def desc_nulls_first: Column
def desc_nulls_last: Column
// SQL expressions
def when(condition: Column, value: Any): Column
def otherwise(value: Any): Column
def over(window: WindowSpec): Column
def isin(list: Any*): Column
def between(lowerBound: Any, upperBound: Any): Column
}// Column references
def col(colName: String): Column
def column(colName: String): Column
// Literal values
def lit(literal: Any): Column
// Input metadata
def input_file_name(): Column
def monotonically_increasing_id(): Column
def spark_partition_id(): ColumnUsage Examples:
import org.apache.spark.sql.functions._
// Column references
val nameCol = col("name")
val ageCol = column("age")
// Literals
val constantValue = lit(42)
val stringLiteral = lit("Hello")
val dateLiteral = lit(java.sql.Date.valueOf("2023-01-01"))
// Metadata functions
val withFilename = df.withColumn("source_file", input_file_name())
val withId = df.withColumn("unique_id", monotonically_increasing_id())// Basic math
def abs(e: Column): Column
def ceil(e: Column): Column
def floor(e: Column): Column
def round(e: Column): Column
def round(e: Column, scale: Int): Column
def signum(e: Column): Column
// Exponential and logarithmic
def exp(e: Column): Column
def expm1(e: Column): Column
def log(e: Column): Column
def log10(e: Column): Column
def log2(e: Column): Column
def log1p(e: Column): Column
def pow(l: Column, r: Column): Column
def pow(l: Column, r: Double): Column
def sqrt(e: Column): Column
// Trigonometric
def sin(e: Column): Column
def cos(e: Column): Column
def tan(e: Column): Column
def asin(e: Column): Column
def acos(e: Column): Column
def atan(e: Column): Column
def atan2(l: Column, r: Column): Column
def sinh(e: Column): Column
def cosh(e: Column): Column
def tanh(e: Column): Column
// Angle conversion
def degrees(e: Column): Column
def radians(e: Column): Column
// Random functions
def rand(): Column
def rand(seed: Long): Column
def randn(): Column
def randn(seed: Long): ColumnUsage Examples:
// Basic calculations
val absValues = df.withColumn("abs_value", abs(col("amount")))
val rounded = df.withColumn("rounded_price", round(col("price"), 2))
// Power and logarithms
val squared = df.withColumn("squared", pow(col("value"), 2))
val logValues = df.withColumn("log_amount", log(col("amount")))
// Trigonometry for calculations
val distances = df.withColumn("distance",
sqrt(pow(col("x2") - col("x1"), 2) + pow(col("y2") - col("y1"), 2)))
// Random sampling
val withRandom = df.withColumn("random_score", rand(42))// String manipulation
def concat(exprs: Column*): Column
def concat_ws(sep: String, exprs: Column*): Column
def format_string(format: String, arguments: Column*): Column
def length(e: Column): Column
def lower(e: Column): Column
def upper(e: Column): Column
def initcap(e: Column): Column
// Trimming and padding
def ltrim(e: Column): Column
def rtrim(e: Column): Column
def trim(e: Column): Column
def lpad(str: Column, len: Int, pad: String): Column
def rpad(str: Column, len: Int, pad: String): Column
// Substring operations
def substring(str: Column, pos: Int, len: Int): Column
def substring_index(str: Column, delim: String, count: Int): Column
def left(str: Column, len: Int): Column
def right(str: Column, len: Int): Column
// Regular expressions
def regexp_extract(e: Column, exp: String, groupIdx: Int): Column
def regexp_replace(e: Column, pattern: String, replacement: String): Column
def rlike(str: Column, regexp: Column): Column
// String testing
def ascii(e: Column): Column
def base64(e: Column): Column
def unbase64(e: Column): Column
def encode(value: Column, charset: String): Column
def decode(value: Column, charset: String): Column
// String splitting and parsing
def split(str: Column, pattern: String): Column
def split(str: Column, pattern: String, limit: Int): Column
// Hashing
def md5(e: Column): Column
def sha1(e: Column): Column
def sha2(e: Column, numBits: Int): Column
def crc32(e: Column): Column
def hash(cols: Column*): Column
def xxhash64(cols: Column*): ColumnUsage Examples:
// String concatenation
val fullName = df.withColumn("full_name",
concat(col("first_name"), lit(" "), col("last_name")))
val csvData = df.withColumn("csv_row",
concat_ws(",", col("id"), col("name"), col("email")))
// String formatting
val formatted = df.withColumn("description",
format_string("User %s (ID: %d)", col("name"), col("id")))
// Text processing
val cleaned = df
.withColumn("trimmed", trim(col("description")))
.withColumn("upper_name", upper(col("name")))
.withColumn("name_length", length(col("name")))
// Regular expressions
val phoneExtract = df.withColumn("area_code",
regexp_extract(col("phone"), """(\d{3})-\d{3}-\d{4}""", 1))
val cleanedText = df.withColumn("clean_text",
regexp_replace(col("text"), "[^a-zA-Z0-9 ]", ""))
// String splitting
val nameParts = df.withColumn("name_parts", split(col("full_name"), " "))
// Hashing for data masking
val hashedEmail = df.withColumn("email_hash", sha2(col("email"), 256))// Current date/time
def current_date(): Column
def current_timestamp(): Column
def now(): Column
// Date arithmetic
def date_add(start: Column, days: Int): Column
def date_sub(start: Column, days: Int): Column
def datediff(end: Column, start: Column): Column
def add_months(start: Column, numMonths: Int): Column
def months_between(end: Column, start: Column): Column
def months_between(end: Column, start: Column, roundOff: Boolean): Column
// Date extraction
def year(e: Column): Column
def quarter(e: Column): Column
def month(e: Column): Column
def dayofmonth(e: Column): Column
def dayofweek(e: Column): Column
def dayofyear(e: Column): Column
def hour(e: Column): Column
def minute(e: Column): Column
def second(e: Column): Column
def weekofyear(e: Column): Column
// Date formatting and parsing
def date_format(dateExpr: Column, format: String): Column
def from_unixtime(ut: Column): Column
def from_unixtime(ut: Column, f: String): Column
def unix_timestamp(): Column
def unix_timestamp(s: Column): Column
def unix_timestamp(s: Column, p: String): Column
def to_timestamp(s: Column): Column
def to_timestamp(s: Column, fmt: String): Column
def to_date(e: Column): Column
def to_date(e: Column, fmt: String): Column
// Date/time truncation
def trunc(date: Column, format: String): Column
def date_trunc(format: String, timestamp: Column): Column
// Time zones
def from_utc_timestamp(ts: Column, tz: String): Column
def to_utc_timestamp(ts: Column, tz: String): ColumnUsage Examples:
// Current date and time
val withTimestamp = df.withColumn("processed_at", current_timestamp())
val withDate = df.withColumn("processed_date", current_date())
// Date calculations
val daysUntilDeadline = df.withColumn("days_left",
datediff(col("deadline"), current_date()))
val futureDate = df.withColumn("review_date",
add_months(col("created_date"), 6))
// Date part extraction
val withDateParts = df
.withColumn("year", year(col("created_date")))
.withColumn("month", month(col("created_date")))
.withColumn("day_of_week", dayofweek(col("created_date")))
// Date formatting
val formatted = df.withColumn("formatted_date",
date_format(col("timestamp"), "yyyy-MM-dd HH:mm"))
// Unix timestamp conversion
val unixTime = df.withColumn("unix_ts",
unix_timestamp(col("date_string"), "yyyy-MM-dd"))
val fromUnix = df.withColumn("readable_date",
from_unixtime(col("unix_timestamp")))
// Date parsing
val parsedDate = df.withColumn("parsed_date",
to_date(col("date_string"), "MM/dd/yyyy"))
// Time zone conversion
val utcTime = df.withColumn("utc_time",
to_utc_timestamp(col("local_time"), "America/New_York"))// Basic aggregations
def count(e: Column): Column
def sum(e: Column): Column
def avg(e: Column): Column
def mean(e: Column): Column
def max(e: Column): Column
def min(e: Column): Column
// Statistical functions
def stddev(e: Column): Column
def stddev_pop(e: Column): Column
def stddev_samp(e: Column): Column
def variance(e: Column): Column
def var_pop(e: Column): Column
def var_samp(e: Column): Column
def skewness(e: Column): Column
def kurtosis(e: Column): Column
// Collection aggregations
def collect_list(e: Column): Column
def collect_set(e: Column): Column
// Distinct counting
def countDistinct(expr: Column, exprs: Column*): Column
def approx_count_distinct(e: Column): Column
def approx_count_distinct(e: Column, rsd: Double): Column
// First/last values
def first(e: Column): Column
def first(e: Column, ignoreNulls: Boolean): Column
def last(e: Column): Column
def last(e: Column, ignoreNulls: Boolean): Column
// Percentiles
def expr(expr: String): Column // For percentile_approx, etc.Usage Examples:
// Basic aggregations
val summary = df.agg(
count(col("id")).alias("total_rows"),
sum(col("amount")).alias("total_amount"),
avg(col("amount")).alias("avg_amount"),
max(col("created_date")).alias("latest_date"),
min(col("created_date")).alias("earliest_date")
)
// Statistical analysis
val stats = df.agg(
stddev(col("score")).alias("std_dev"),
variance(col("score")).alias("variance"),
skewness(col("score")).alias("skewness"),
kurtosis(col("score")).alias("kurtosis")
)
// Collect values
val categories = df.agg(
collect_set(col("category")).alias("unique_categories"),
collect_list(col("name")).alias("all_names")
)
// Distinct counts
val uniqueCounts = df.agg(
countDistinct(col("user_id")).alias("unique_users"),
approx_count_distinct(col("session_id"), 0.05).alias("approx_sessions")
)
// First/last values (useful with ordering)
val firstLast = df
.orderBy(col("timestamp"))
.agg(
first(col("status")).alias("first_status"),
last(col("status")).alias("last_status")
)def when(condition: Column, value: Any): Column
def coalesce(e: Column*): Column
def isnull(e: Column): Column
def isnan(e: Column): Column
def nanvl(col1: Column, col2: Column): Column
def greatest(exprs: Column*): Column
def least(exprs: Column*): ColumnUsage Examples:
// Conditional logic
val categorized = df.withColumn("age_group",
when(col("age") < 18, "Minor")
.when(col("age") < 65, "Adult")
.otherwise("Senior")
)
// Handle nulls
val withDefaults = df.withColumn("description",
coalesce(col("description"), lit("No description available")))
// Handle NaN values
val cleanNaN = df.withColumn("clean_score",
nanvl(col("score"), lit(0.0)))
// Find extreme values
val ranges = df.withColumn("max_value",
greatest(col("value1"), col("value2"), col("value3")))// Array creation and manipulation
def array(cols: Column*): Column
def array_contains(column: Column, value: Any): Column
def array_distinct(e: Column): Column
def array_except(col1: Column, col2: Column): Column
def array_intersect(col1: Column, col2: Column): Column
def array_join(column: Column, delimiter: String): Column
def array_max(e: Column): Column
def array_min(e: Column): Column
def array_position(col: Column, value: Any): Column
def array_remove(col: Column, element: Any): Column
def array_repeat(col: Column, count: Int): Column
def array_sort(e: Column): Column
def array_union(col1: Column, col2: Column): Column
def arrays_overlap(a1: Column, a2: Column): Column
def arrays_zip(e: Column*): Column
def size(e: Column): Column
def slice(x: Column, start: Int, length: Int): Column
def sort_array(e: Column): Column
def sort_array(e: Column, asc: Boolean): Column
// Array explosion
def explode(e: Column): Column
def explode_outer(e: Column): Column
def posexplode(e: Column): Column
def posexplode_outer(e: Column): Column
// Map operations
def map(cols: Column*): Column
def map_keys(e: Column): Column
def map_values(e: Column): Column
def map_from_arrays(keys: Column, values: Column): Column
def map_from_entries(e: Column): ColumnUsage Examples:
// Create arrays
val arrayCol = df.withColumn("scores_array",
array(col("score1"), col("score2"), col("score3")))
// Array operations
val arrayOps = df
.withColumn("has_zero", array_contains(col("scores"), 0))
.withColumn("unique_scores", array_distinct(col("scores")))
.withColumn("max_score", array_max(col("scores")))
.withColumn("array_size", size(col("scores")))
// Array to string
val joined = df.withColumn("scores_csv",
array_join(col("scores"), ","))
// Explode arrays to rows
val exploded = df.select(col("id"), explode(col("tags")).alias("tag"))
// Position-based explosion
val withPosition = df.select(col("id"),
posexplode(col("values")).alias(Seq("pos", "value")))
// Map operations
val mapCol = df.withColumn("properties",
map(lit("name"), col("name"), lit("age"), col("age")))
val keys = df.withColumn("prop_keys", map_keys(col("properties")))
val values = df.withColumn("prop_values", map_values(col("properties")))// Ranking functions
def row_number(): Column
def rank(): Column
def dense_rank(): Column
def percent_rank(): Column
def ntile(n: Int): Column
def cume_dist(): Column
// Offset functions
def lag(e: Column, offset: Int): Column
def lag(e: Column, offset: Int, defaultValue: Any): Column
def lead(e: Column, offset: Int): Column
def lead(e: Column, offset: Int, defaultValue: Any): ColumnUsage Examples:
import org.apache.spark.sql.expressions.Window
// Window specifications
val windowSpec = Window.partitionBy("department").orderBy(col("salary").desc)
val rowsWindow = Window.partitionBy("category").orderBy("date")
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
// Ranking
val ranked = df.withColumn("salary_rank",
rank().over(windowSpec))
val rowNumbers = df.withColumn("row_num",
row_number().over(windowSpec))
val percentiles = df.withColumn("salary_percentile",
percent_rank().over(windowSpec))
// Lag/Lead for time series
val withLag = df.withColumn("prev_value",
lag(col("value"), 1).over(Window.partitionBy("id").orderBy("timestamp")))
val withLead = df.withColumn("next_value",
lead(col("value"), 1, 0).over(Window.partitionBy("id").orderBy("timestamp")))
// Running aggregations
val runningSum = df.withColumn("running_total",
sum(col("amount")).over(rowsWindow))def cast(col: Column, dataType: DataType): Column
def cast(col: Column, dataType: String): ColumnUsage Examples:
import org.apache.spark.sql.types._
// Type casting
val converted = df
.withColumn("age_int", col("age").cast(IntegerType))
.withColumn("score_double", col("score").cast("double"))
.withColumn("created_date", col("created_timestamp").cast(DateType))
// String to numeric conversions
val numeric = df
.withColumn("amount_decimal", col("amount_str").cast(DecimalType(10, 2)))
.withColumn("count_long", col("count_str").cast(LongType))
// Date/time conversions
val dates = df
.withColumn("date_from_string", to_date(col("date_str"), "yyyy-MM-dd"))
.withColumn("timestamp_from_string", to_timestamp(col("ts_str"), "yyyy-MM-dd HH:mm:ss"))def from_json(e: Column, schema: DataType): Column
def from_json(e: Column, schema: String): Column
def to_json(e: Column): Column
def json_tuple(json: Column, fields: String*): Column
def get_json_object(e: Column, path: String): ColumnUsage Examples:
import org.apache.spark.sql.types._
// JSON parsing
val jsonSchema = StructType(Array(
StructField("name", StringType, true),
StructField("age", IntegerType, true)
))
val parsed = df.withColumn("parsed_json",
from_json(col("json_string"), jsonSchema))
// Extract JSON fields
val name = df.withColumn("name",
get_json_object(col("json_data"), "$.name"))
// Convert to JSON
val jsonified = df.withColumn("row_as_json", to_json(struct("*")))
// JSON tuple extraction
val extracted = df.select(col("id"),
json_tuple(col("json_data"), "name", "age").alias(Seq("name", "age")))