Spec Registry
Help your agents use open-source better. Learn more.
Find usage specs for your project’s dependencies
- Author
- tessl
- Last updated
- Spec files
maven-apache-spark
Describes: maven/org.apache.spark/spark-parent
- Description
- Lightning-fast unified analytics engine for large-scale data processing with high-level APIs in Scala, Java, Python, and R
- Author
- tessl
- Last updated
sql.md docs/
1# Spark SQL23Spark SQL provides a programming interface for working with structured and semi-structured data. It allows querying data via SQL and the DataFrame API, with support for various data sources including JSON, Parquet, and Hive tables.45## SQLContext67The main entry point for Spark SQL functionality.89### SQLContext Class1011```scala { .api }12class SQLContext(sparkContext: SparkContext) extends Serializable with Logging {13// Alternative constructor for existing HiveContext compatibility14def this(sparkContext: SparkContext, cacheManager: CacheManager) = this(sparkContext)15}16```1718### Creating SQLContext1920```scala21import org.apache.spark.sql.SQLContext22import org.apache.spark.{SparkContext, SparkConf}2324val conf = new SparkConf().setAppName("SQL App").setMaster("local")25val sc = new SparkContext(conf)26val sqlContext = new SQLContext(sc)2728// Import implicits for implicit conversions29import sqlContext.implicits._30```3132## Data Sources3334### JSON Files3536**jsonFile**: Read JSON files as SchemaRDD37```scala { .api }38def jsonFile(path: String): SchemaRDD39```4041```scala42// Read JSON file43val people = sqlContext.jsonFile("people.json")4445// Show schema46people.printSchema()4748// Show data49people.show()5051// Example JSON structure:52// {"name":"Michael"}53// {"name":"Andy", "age":30}54// {"name":"Justin", "age":19}55```5657**jsonRDD**: Create SchemaRDD from RDD of JSON strings58```scala { .api }59def jsonRDD(json: RDD[String]): SchemaRDD60```6162```scala63// Create JSON RDD64val jsonStrings = sc.parallelize(Array(65"""{"name":"Alice", "age":25}""",66"""{"name":"Bob", "age":30}""",67"""{"name":"Charlie", "age":35}"""68))6970val jsonDF = sqlContext.jsonRDD(jsonStrings)71```7273### Parquet Files7475**parquetFile**: Read Parquet files76```scala { .api }77def parquetFile(path: String): SchemaRDD78```7980```scala81// Read Parquet file82val parquetData = sqlContext.parquetFile("users.parquet")8384// Parquet automatically preserves schema85parquetData.printSchema()86```8788### RDD to SchemaRDD Conversion8990**createSchemaRDD**: Convert RDD to SchemaRDD91```scala { .api }92def createSchemaRDD[A <: Product : TypeTag](rdd: RDD[A]): SchemaRDD93```9495```scala96// Define case class for schema97case class Person(name: String, age: Int)9899// Create RDD of case class instances100val peopleRDD = sc.parallelize(Seq(101Person("Alice", 25),102Person("Bob", 30),103Person("Charlie", 35)104))105106// Convert to SchemaRDD107val peopleDF = sqlContext.createSchemaRDD(peopleRDD)108```109110## SchemaRDD111112The main data structure for structured data in Spark SQL.113114### SchemaRDD Class115116```scala { .api }117class SchemaRDD(sqlContext: SQLContext, logicalPlan: LogicalPlan) extends RDD[Row](sqlContext.sparkContext, Nil) with SchemaRDDLike {118// Schema operations119def printSchema(): Unit120def schema: StructType121122// Registration123def registerAsTable(tableName: String): Unit124def registerTempTable(tableName: String): Unit125126// Transformations127def select(exprs: ColumnExpression*): SchemaRDD128def where(condition: ColumnExpression): SchemaRDD129def filter(condition: ColumnExpression): SchemaRDD130def groupBy(cols: ColumnExpression*): GroupedSchemaRDD131def orderBy(sortCol: ColumnExpression, sortCols: ColumnExpression*): SchemaRDD132def limit(n: Int): SchemaRDD133def unionAll(other: SchemaRDD): SchemaRDD134def intersect(other: SchemaRDD): SchemaRDD135def except(other: SchemaRDD): SchemaRDD136def join(right: SchemaRDD, joinExprs: ColumnExpression, joinType: String): SchemaRDD137138// Actions139def show(): Unit140def collect(): Array[Row]141def count(): Long142def first(): Row143def take(n: Int): Array[Row]144145// Save operations146def saveAsParquetFile(path: String): Unit147def saveAsTable(tableName: String): Unit148def insertInto(tableName: String): Unit149def insertInto(tableName: String, overwrite: Boolean): Unit150}151```152153### Schema Operations154155```scala156val people = sqlContext.jsonFile("people.json")157158// Print schema in tree format159people.printSchema()160// Output:161// root162// |-- age: long (nullable = true)163// |-- name: string (nullable = true)164165// Access schema programmatically166val schema = people.schema167println(s"Schema has ${schema.fields.length} fields")168169schema.fields.foreach { field =>170println(s"Field: ${field.name}, Type: ${field.dataType}, Nullable: ${field.nullable}")171}172```173174### Basic Operations175176```scala177// Show first 20 rows178people.show()179180// Show custom number of rows181people.show(10)182183// Collect all data (be careful with large datasets)184val allPeople = people.collect()185allPeople.foreach(println)186187// Take first n rows188val firstFive = people.take(5)189190// Count rows191val totalCount = people.count()192```193194## SQL Queries195196### Table Registration and Querying197198**sql**: Execute SQL queries199```scala { .api }200def sql(sqlText: String): SchemaRDD201```202203**registerAsTable**: Register SchemaRDD as temporary table204```scala { .api }205def registerAsTable(tableName: String): Unit206```207208```scala209val people = sqlContext.jsonFile("people.json")210211// Register as temporary table212people.registerAsTable("people")213214// Execute SQL queries215val adults = sqlContext.sql("SELECT name, age FROM people WHERE age >= 18")216adults.show()217218// More complex queries219val summary = sqlContext.sql("""220SELECT221COUNT(*) as total_people,222AVG(age) as avg_age,223MIN(age) as min_age,224MAX(age) as max_age225FROM people226WHERE age IS NOT NULL227""")228229summary.show()230231// Joins between tables232val addresses = sqlContext.jsonFile("addresses.json")233addresses.registerAsTable("addresses")234235val joined = sqlContext.sql("""236SELECT p.name, p.age, a.city237FROM people p238JOIN addresses a ON p.name = a.name239""")240```241242### Built-in Functions243244```scala245// String functions246sqlContext.sql("SELECT name, UPPER(name), LENGTH(name) FROM people").show()247248// Math functions249sqlContext.sql("SELECT age, SQRT(age), ROUND(age/10.0, 2) FROM people").show()250251// Date functions (if date columns exist)252sqlContext.sql("SELECT name, YEAR(birth_date), MONTH(birth_date) FROM people").show()253254// Aggregate functions255sqlContext.sql("""256SELECT257COUNT(*) as count,258SUM(age) as total_age,259AVG(age) as avg_age,260STDDEV(age) as stddev_age261FROM people262GROUP BY FLOOR(age/10) * 10263""").show()264```265266## DataFrame API (Programmatic)267268Alternative to SQL for structured data operations.269270### Column Expressions271272```scala273import org.apache.spark.sql.catalyst.expressions._274275// Access columns276val nameCol = people("name")277val ageCol = people("age")278279// Column operations280val agePlus10 = people("age") + 10281val upperName = Upper(people("name"))282```283284### Transformations285286**select**: Choose columns and expressions287```scala { .api }288def select(exprs: ColumnExpression*): SchemaRDD289```290291```scala292// Select specific columns293val namesAndAges = people.select(people("name"), people("age"))294295// Select with expressions296val transformed = people.select(297people("name"),298people("age") + 1 as "age_next_year",299Upper(people("name")) as "upper_name"300)301```302303**where/filter**: Filter rows based on conditions304```scala { .api }305def where(condition: ColumnExpression): SchemaRDD306def filter(condition: ColumnExpression): SchemaRDD // alias for where307```308309```scala310// Filter adults311val adults = people.where(people("age") >= 18)312313// Multiple conditions314val youngAdults = people.filter(315people("age") >= 18 && people("age") < 30316)317318// String operations319val namesWithA = people.where(people("name").startsWith("A"))320```321322**groupBy**: Group data for aggregation323```scala { .api }324def groupBy(cols: ColumnExpression*): GroupedSchemaRDD325```326327```scala328// Group by age ranges329val ageGroups = people.groupBy(people("age") / 10 * 10)330val ageCounts = ageGroups.count()331332// Multiple grouping columns (if available)333val grouped = people.groupBy(people("department"), people("age") / 10 * 10)334val summary = grouped.agg(335Count(people("name")) as "count",336Avg(people("age")) as "avg_age"337)338```339340**orderBy**: Sort data341```scala { .api }342def orderBy(sortCol: ColumnExpression, sortCols: ColumnExpression*): SchemaRDD343```344345```scala346// Sort by age ascending347val sortedByAge = people.orderBy(people("age"))348349// Sort by age descending350val sortedByAgeDesc = people.orderBy(people("age").desc)351352// Multiple sort columns353val sorted = people.orderBy(people("age").desc, people("name"))354```355356### Joins357358**join**: Join with another SchemaRDD359```scala { .api }360def join(right: SchemaRDD, joinExprs: ColumnExpression, joinType: String = "inner"): SchemaRDD361```362363```scala364// Assume we have addresses SchemaRDD365val addresses = sqlContext.jsonFile("addresses.json")366367// Inner join368val joined = people.join(369addresses,370people("name") === addresses("name"),371"inner"372)373374// Left outer join375val leftJoined = people.join(376addresses,377people("name") === addresses("name"),378"left_outer"379)380381// Join types: "inner", "left_outer", "right_outer", "full_outer"382```383384### Set Operations385386```scala387val people1 = sqlContext.jsonFile("people1.json")388val people2 = sqlContext.jsonFile("people2.json")389390// Union (must have same schema)391val allPeople = people1.unionAll(people2)392393// Intersection394val common = people1.intersect(people2)395396// Difference397val unique = people1.except(people2)398```399400## Data Types and Schema401402### Data Types403404```scala { .api }405import org.apache.spark.sql.catalyst.types._406407// Primitive types408StringType // String409IntegerType // Int410LongType // Long411DoubleType // Double412FloatType // Float413BooleanType // Boolean414BinaryType // Array[Byte]415TimestampType // java.sql.Timestamp416417// Complex types418ArrayType(elementType: DataType, containsNull: Boolean)419MapType(keyType: DataType, valueType: DataType, valueContainsNull: Boolean)420StructType(fields: Array[StructField])421```422423### Schema Definition424425```scala426import org.apache.spark.sql.catalyst.types._427428// Define schema manually429val schema = StructType(Array(430StructField("name", StringType, nullable = true),431StructField("age", IntegerType, nullable = true),432StructField("addresses", ArrayType(StringType), nullable = true)433))434435// Create SchemaRDD with predefined schema436val rowRDD = sc.parallelize(Seq(437Row("Alice", 25, Array("123 Main St", "456 Oak Ave")),438Row("Bob", 30, Array("789 Pine St"))439))440441val schemaRDD = sqlContext.applySchema(rowRDD, schema)442```443444## Working with Row Objects445446### Row Class447448```scala { .api }449abstract class Row extends Serializable {450def length: Int451def get(i: Int): Any452def isNullAt(i: Int): Boolean453454// Typed getters455def getString(i: Int): String456def getInt(i: Int): Int457def getLong(i: Int): Long458def getDouble(i: Int): Double459def getFloat(i: Int): Float460def getBoolean(i: Int): Boolean461def getAs[T](i: Int): T462def getAs[T](fieldName: String): T463}464```465466```scala467val people = sqlContext.jsonFile("people.json")468val rows = people.collect()469470rows.foreach { row =>471val name = row.getString(0) // First field472val age = row.getInt(1) // Second field473474// Or by field name (if available)475val nameByField = row.getAs[String]("name")476val ageByField = row.getAs[Int]("age")477478println(s"$name is $age years old")479}480481// Safe access with null checking482rows.foreach { row =>483val name = if (row.isNullAt(0)) "Unknown" else row.getString(0)484val age = if (row.isNullAt(1)) 0 else row.getInt(1)485486println(s"$name is $age years old")487}488```489490## Save Operations491492### Saving Data493494**saveAsParquetFile**: Save as Parquet format495```scala { .api }496def saveAsParquetFile(path: String): Unit497```498499**saveAsTable**: Save as persistent table500```scala { .api }501def saveAsTable(tableName: String): Unit502```503504**insertInto**: Insert into existing table505```scala { .api }506def insertInto(tableName: String): Unit507def insertInto(tableName: String, overwrite: Boolean): Unit508```509510```scala511val people = sqlContext.jsonFile("people.json")512513// Save as Parquet (recommended for performance)514people.saveAsParquetFile("people.parquet")515516// Save as persistent table (requires Hive support)517people.saveAsTable("people_table")518519// Insert into existing table520people.insertInto("existing_people_table")521522// Overwrite existing table523people.insertInto("existing_people_table", overwrite = true)524```525526## Caching and Performance527528### Caching Tables529530**cacheTable**: Cache table in memory531```scala { .api }532def cacheTable(tableName: String): Unit533```534535**uncacheTable**: Remove table from cache536```scala { .api }537def uncacheTable(tableName: String): Unit538```539540```scala541// Register and cache table542people.registerAsTable("people")543sqlContext.cacheTable("people")544545// Now queries will use cached data546val adults = sqlContext.sql("SELECT * FROM people WHERE age >= 18")547val seniors = sqlContext.sql("SELECT * FROM people WHERE age >= 65")548549// Remove from cache when done550sqlContext.uncacheTable("people")551```552553### Performance Optimization554555```scala556// Cache frequently accessed SchemaRDDs557val cachedPeople = people.cache()558559// Use Parquet for better performance560val parquetPeople = sqlContext.parquetFile("people.parquet")561562// Repartition for better parallelism563val repartitioned = people.repartition(10)564565// Coalesce to reduce small files566val coalesced = people.coalesce(1)567```568569## Configuration and Settings570571```scala572// Access SQL configuration573val sqlConf = sqlContext.conf574575// Set configuration properties576sqlConf.setConf("spark.sql.shuffle.partitions", "200")577sqlConf.setConf("spark.sql.codegen", "true")578579// Get configuration values580val shufflePartitions = sqlConf.getConf("spark.sql.shuffle.partitions")581val codegenEnabled = sqlConf.getConf("spark.sql.codegen")582```583584## Advanced Usage Patterns585586### Complex Data Processing587588```scala589// Complex analytical query590val analysis = sqlContext.sql("""591SELECT592CASE593WHEN age < 18 THEN 'Minor'594WHEN age < 65 THEN 'Adult'595ELSE 'Senior'596END as age_group,597COUNT(*) as count,598AVG(age) as avg_age599FROM people600WHERE age IS NOT NULL601GROUP BY602CASE603WHEN age < 18 THEN 'Minor'604WHEN age < 65 THEN 'Adult'605ELSE 'Senior'606END607ORDER BY avg_age608""")609610analysis.show()611```612613### Window Functions (Limited Support)614615```scala616// Basic ranking within groups (limited in Spark 1.0)617val ranked = sqlContext.sql("""618SELECT name, age, department,619ROW_NUMBER() OVER (PARTITION BY department ORDER BY age DESC) as rank620FROM employees621""")622```623624This comprehensive guide covers the Spark SQL API available in Spark 1.0.0 for working with structured data using both SQL and programmatic DataFrame operations.