Apache Spark Hive Thrift Server provides HiveServer2 compatibility for Spark SQL, enabling JDBC/ODBC connectivity and Hive CLI compatibility for Spark SQL queries
npx @tessl/cli install tessl/maven-org-apache-spark--spark-hive-thriftserver_2-12@3.5.0Apache Spark Hive Thrift Server provides HiveServer2 compatibility for Spark SQL, enabling JDBC/ODBC connectivity and Hive CLI compatibility for Spark SQL queries. It offers a complete thrift-based server implementation with session management, authentication, and comprehensive metadata operations.
import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2
import org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver
import org.apache.spark.sql.hive.thriftserver.SparkSQLEnv
import org.apache.spark.sql.hive.thriftserver.SparkSQLCLIService
import org.apache.spark.sql.hive.thriftserver.SparkSQLSessionManager
import org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager
import org.apache.spark.sql.SQLContextFor Java usage (requires Hive dependencies):
import org.apache.hive.service.cli.ICLIService;
import org.apache.hive.service.cli.SessionHandle;
import org.apache.hive.service.cli.OperationHandle;Note: Many interfaces (ICLIService, SessionHandle, etc.) are provided by the Apache Hive library, which is included as a dependency of this module.
import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2
import org.apache.spark.sql.SQLContext
// Initialize Spark SQL environment
SparkSQLEnv.init()
// Start the thrift server with SQL context
val server = HiveThriftServer2.startWithContext(SparkSQLEnv.sqlContext)import org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver
// Start interactive SQL CLI
SparkSQLCLIDriver.main(Array("--hiveconf", "hive.server2.thrift.port=10000"))// Standard JDBC connection to Spark Thrift Server
String url = "jdbc:hive2://localhost:10000/default";
Connection conn = DriverManager.getConnection(url, "username", "password");
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery("SELECT * FROM my_table");The Spark Hive Thrift Server is built around several key components:
HiveThriftServer2 provides the main server lifecycle and initializationSparkSQLCLIService implements the core CLI service interface with Spark SQL integrationSparkSQLSessionManager handles client sessions and their associated SQL contextsSparkSQLOperationManager creates and manages SQL operations and metadata operationsSparkSQLDriver and SparkExecuteStatementOperation execute SQL queries using Spark SQL engineSparkSQLCLIDriver provides interactive command-line interfaceCore server lifecycle management and initialization with Spark SQL integration.
object HiveThriftServer2 {
def startWithContext(sqlContext: SQLContext): HiveThriftServer2
def main(args: Array[String]): Unit
// Note: ExecutionState is private[thriftserver] - not part of public API
private[thriftserver] object ExecutionState extends Enumeration {
val STARTED, COMPILED, CANCELED, TIMEDOUT, FAILED, FINISHED, CLOSED = Value
}
}Comprehensive CLI service implementation providing HiveServer2 compatibility with Spark SQL enhancements.
class SparkSQLCLIService(hiveServer: HiveServer2, sqlContext: SQLContext) extends CLIService(hiveServer) {
override def init(hiveConf: HiveConf): Unit
override def start(): Unit
override def getInfo(sessionHandle: SessionHandle, getInfoType: GetInfoType): GetInfoValue
}Client session management with SQL context association and configuration handling.
class SparkSQLSessionManager(hiveServer: HiveServer2, sqlContext: SQLContext) extends SessionManager(hiveServer) {
override def openSession(
protocol: TProtocolVersion,
username: String,
passwd: String,
ipAddress: String,
sessionConf: java.util.Map[String, String],
withImpersonation: Boolean,
delegationToken: String
): SessionHandle
override def closeSession(sessionHandle: SessionHandle): Unit
def setConfMap(conf: SQLContext, confMap: java.util.Map[String, String]): Unit
}Manages SQL operations and metadata operations with session context mapping.
class SparkSQLOperationManager extends OperationManager {
val sessionToContexts: ConcurrentHashMap[SessionHandle, SQLContext]
override def newExecuteStatementOperation(
parentSession: HiveSession,
statement: String,
confOverlay: java.util.Map[String, String],
async: Boolean,
queryTimeout: Long
): ExecuteStatementOperation
override def newGetTablesOperation(
parentSession: HiveSession,
catalogName: String,
schemaName: String,
tableName: String,
tableTypes: java.util.List[String]
): MetadataOperation
override def newGetColumnsOperation(
parentSession: HiveSession,
catalogName: String,
schemaName: String,
tableName: String,
columnName: String
): GetColumnsOperation
override def newGetSchemasOperation(
parentSession: HiveSession,
catalogName: String,
schemaName: String
): GetSchemasOperation
override def newGetFunctionsOperation(
parentSession: HiveSession,
catalogName: String,
schemaName: String,
functionName: String
): GetFunctionsOperation
override def newGetTypeInfoOperation(parentSession: HiveSession): GetTypeInfoOperation
override def newGetCatalogsOperation(parentSession: HiveSession): GetCatalogsOperation
override def newGetTableTypesOperation(parentSession: HiveSession): GetTableTypesOperation
}SQL statement execution with Spark SQL engine integration and result handling.
class SparkExecuteStatementOperation {
def getNextRowSet(order: FetchOrientation, maxRowsL: Long): TRowSet
def getResultSetSchema: TTableSchema
def runInternal(): Unit
def cancel(): Unit
def timeoutCancel(): Unit
}
class SparkSQLDriver(context: SQLContext) extends Driver {
override def init(): Unit
override def run(command: String): CommandProcessorResponse
override def close(): Int
override def getResults(res: JList[_]): Boolean
override def getSchema: Schema
override def destroy(): Unit
}Comprehensive metadata operations for catalogs, schemas, tables, columns, functions, and type information.
interface ICLIService {
OperationHandle getCatalogs(SessionHandle sessionHandle);
OperationHandle getSchemas(SessionHandle sessionHandle, String catalogName, String schemaName);
OperationHandle getTables(SessionHandle sessionHandle, String catalogName, String schemaName, String tableName, List<String> tableTypes);
OperationHandle getColumns(SessionHandle sessionHandle, String catalogName, String schemaName, String tableName, String columnName);
OperationHandle getFunctions(SessionHandle sessionHandle, String catalogName, String schemaName, String functionName);
OperationHandle getTypeInfo(SessionHandle sessionHandle);
}Interactive command-line interface with SQL completion, history, and signal handling.
object SparkSQLCLIDriver {
def main(args: Array[String]): Unit
def installSignalHandler(): Unit
def printUsage(): Unit
}
class SparkSQLCLIDriver {
def processCmd(cmd: String): Int
def processLine(line: String, allowInterrupting: Boolean): Int
def printMasterAndAppId(): Unit
}Spark Web UI integration for monitoring thrift server sessions, queries, and performance metrics.
class ThriftServerTab {
def detach(): Unit
}
class HiveThriftServer2Listener {
// Event listener for UI display and metrics collection
}class SessionHandle extends Handle {
// Identifies client sessions
}
class OperationHandle extends Handle {
// Identifies operations (queries, metadata calls)
}
abstract class Handle {
HandleIdentifier getHandleIdentifier()
}enum OperationType {
EXECUTE_STATEMENT,
GET_TYPE_INFO,
GET_CATALOGS,
GET_SCHEMAS,
GET_TABLES,
GET_COLUMNS,
GET_FUNCTIONS,
GET_PRIMARY_KEYS,
GET_CROSS_REFERENCE
}
enum OperationState {
INITIALIZED,
RUNNING,
FINISHED,
CANCELED,
CLOSED,
ERROR,
UNKNOWN
}abstract class RowSet {
// Base class for result sets
}
class RowBasedSet extends RowSet {
// Row-based result set implementation
}
class ColumnBasedSet extends RowSet {
// Column-based result set implementation
}
class TableSchema {
List<ColumnDescriptor> getColumns()
}
class ColumnDescriptor {
String getName()
TypeDescriptor getTypeDescriptor()
String getComment()
}enum FetchOrientation {
FETCH_NEXT,
FETCH_PRIOR,
FETCH_RELATIVE,
FETCH_ABSOLUTE,
FETCH_FIRST,
FETCH_LAST
}
enum FetchType {
QUERY_OUTPUT,
LOG
}
class GetInfoValue {
String getStringValue()
short getShortValue()
int getIntValue()
long getLongValue()
}