Specialized testing capabilities including cross-database compatibility, DataSource V2 integration, Kerberos authentication, and join pushdown optimization testing. These advanced features enable comprehensive validation of Spark's database integration capabilities.
Test suite for validating query compatibility and data operations across different database systems.
/**
* Cross-database query compatibility test suite
* Tests queries that should work consistently across different databases
*/
class CrossDatabaseQuerySuite extends DockerJDBCIntegrationSuite {
/** List of database types to test against */
def supportedDatabases: List[String]
/** Test cross-database joins */
def testCrossDbJoins(): Unit
/** Test data type mapping between databases */
def testDataTypeMapping(): Unit
/** Test SQL dialect compatibility */
def testSqlDialectCompatibility(): Unit
/** Test common SQL functions across databases */
def testCommonSqlFunctions(): Unit
/** Test aggregate operations consistency */
def testAggregateOperations(): Unit
/** Test transaction behavior across databases */
def testTransactionBehavior(): Unit
}Usage Examples:
class MyQueryCompatibilityTest extends CrossDatabaseQuerySuite {
override def supportedDatabases = List("postgresql", "mysql", "sqlserver")
test("test standard SQL functions") {
supportedDatabases.foreach { dbType =>
withDatabase(dbType) {
val df = spark.sql("SELECT UPPER(name), LENGTH(name) FROM users")
assert(df.count() > 0)
}
}
}
}Test suite for Spark's DataSource V2 API integration with JDBC data sources.
/**
* DataSource V2 API integration test suite
* Tests modern Spark DataSource API with JDBC sources
*/
class DataSourceV2TestSuite extends DockerJDBCIntegrationSuite {
/** Test namespace operations (CREATE/DROP database) */
def testNamespaceOperations(): Unit
/** Test table operations (CREATE/DROP/ALTER table) */
def testTableOperations(): Unit
/** Test partition handling */
def testPartitionHandling(): Unit
/** Test catalog operations */
def testCatalogOperations(): Unit
/** Test streaming integration */
def testStreamingIntegration(): Unit
/** Test column pruning optimization */
def testColumnPruning(): Unit
/** Test predicate pushdown */
def testPredicatePushdown(): Unit
}/**
* Test namespace CRUD operations
* Validates CREATE DATABASE, DROP DATABASE operations
*/
def testNamespaceOperations(): Unit
/**
* Test table CRUD operations
* Validates CREATE TABLE, DROP TABLE, ALTER TABLE operations
*/
def testTableOperations(): Unit
/**
* Test table partition management
* Validates partition-aware reads and writes
*/
def testPartitionHandling(): Unit
/**
* Test catalog integration
* Validates catalog API with JDBC sources
*/
def testCatalogOperations(): Unit
/**
* Test streaming read/write operations
* Validates structured streaming with JDBC
*/
def testStreamingIntegration(): UnitTest suite for secure database authentication using Kerberos protocol.
/**
* Kerberos authentication integration test suite
* Tests secure authentication with Kerberos-enabled databases
*/
class KerberosTestSuite extends DockerJDBCIntegrationSuite {
/** Test Kerberos login functionality */
def testKerberosLogin(): Unit
/** Test secure JDBC connections */
def testSecureJdbcConnection(): Unit
/** Test Kerberos ticket renewal */
def testTicketRenewal(): Unit
/** Test delegation token support */
def testDelegationTokens(): Unit
/** Test principal mapping */
def testPrincipalMapping(): Unit
/** Test cross-realm authentication */
def testCrossRealmAuth(): Unit
}/**
* Test Kerberos login process
* Validates TGT acquisition and validation
*/
def testKerberosLogin(): Unit
/**
* Test secure JDBC connection establishment
* Validates JDBC connection with Kerberos authentication
*/
def testSecureJdbcConnection(): Unit
/**
* Test automatic ticket renewal
* Validates long-running job ticket renewal
*/
def testTicketRenewal(): Unit
/**
* Test delegation token generation and usage
* Validates token-based authentication for distributed jobs
*/
def testDelegationTokens(): UnitTest suite for validating Spark's join pushdown optimization features with JDBC sources.
/**
* Join pushdown optimization test suite
* Tests Spark's ability to push joins down to the database
*/
class JoinPushdownTestSuite extends DockerJDBCIntegrationSuite {
/** Test simple join pushdown */
def testSimpleJoinPushdown(): Unit
/** Test complex join scenarios */
def testComplexJoinPushdown(): Unit
/** Test join pushdown performance improvements */
def testJoinPushdownPerformance(): Unit
/** Test join pushdown with filters */
def testJoinPushdownWithFilters(): Unit
/** Test join pushdown limitations */
def testJoinPushdownLimitations(): Unit
/** Test cross-database join behavior */
def testCrossDatabaseJoins(): Unit
}/**
* Test basic join pushdown optimization
* Validates that simple joins are pushed to database
*/
def testSimpleJoinPushdown(): Unit
/**
* Test complex join scenarios
* Validates multi-table joins, outer joins, etc.
*/
def testComplexJoinPushdown(): Unit
/**
* Test performance improvements from join pushdown
* Measures execution time and data transfer reduction
*/
def testJoinPushdownPerformance(): Unit
/**
* Test join pushdown with WHERE clause filters
* Validates combined predicate and join pushdown
*/
def testJoinPushdownWithFilters(): Unit
/**
* Test scenarios where join pushdown cannot be applied
* Validates fallback to Spark-side joins
*/
def testJoinPushdownLimitations(): UnitTest suite for measuring and validating database operation performance.
/**
* Performance testing suite for database operations
* Measures query execution times, throughput, and resource usage
*/
class PerformanceTestSuite extends DockerJDBCIntegrationSuite {
/** Test query execution performance */
def testQueryPerformance(): Unit
/** Test bulk data loading performance */
def testBulkLoadPerformance(): Unit
/** Test concurrent connection performance */
def testConcurrentConnections(): Unit
/** Test large result set handling */
def testLargeResultSets(): Unit
/** Test connection pooling efficiency */
def testConnectionPooling(): Unit
/** Test memory usage optimization */
def testMemoryUsage(): Unit
}Utility functions and helpers for advanced integration testing scenarios.
/**
* Utility object for advanced integration testing
* Provides helper functions for complex test scenarios
*/
object IntegrationTestUtil {
/**
* Run test against multiple database types
* @param databases List of database types to test
* @param testFunction Test function to execute
*/
def testAcrossDatabases(databases: List[String])(testFunction: String => Unit): Unit
/**
* Measure query execution time
* @param operation Operation to measure
* @return Execution time in milliseconds
*/
def measureExecutionTime[T](operation: => T): (T, Long)
/**
* Compare query results across databases
* @param sql SQL query to execute
* @param databases List of databases to compare
* @return Comparison result
*/
def compareQueryResults(sql: String, databases: List[String]): QueryComparisonResult
/**
* Generate test data for performance testing
* @param rowCount Number of rows to generate
* @param schema Schema definition
* @return Generated test data
*/
def generateTestData(rowCount: Int, schema: StructType): DataFrame
/**
* Validate query plan contains expected optimizations
* @param df DataFrame to analyze
* @param expectedOptimizations List of expected optimizations
* @return true if all optimizations are present
*/
def validateQueryPlan(df: DataFrame, expectedOptimizations: List[String]): Boolean
}case class QueryComparisonResult(
isIdentical: Boolean,
rowCountDifferences: Map[String, Long],
schemaDifferences: Map[String, List[String]],
dataDifferences: Map[String, List[String]]
)
case class PerformanceMetrics(
executionTimeMs: Long,
rowsProcessed: Long,
bytesRead: Long,
memoryUsedMB: Long,
cpuTimeMs: Long
)
case class OptimizationResult(
optimizationType: String,
isApplied: Boolean,
performanceGain: Option[Double],
details: Map[String, Any]
)
case class KerberosConfig(
realm: String,
kdc: String,
principal: String,
keytab: String,
ticketLifetime: Duration
)class DatabaseCompatibilityTest extends CrossDatabaseQuerySuite {
override def supportedDatabases = List("postgresql", "mysql", "oracle")
test("test date functions across databases") {
val testQuery = "SELECT CURRENT_DATE, EXTRACT(YEAR FROM CURRENT_DATE)"
val results = supportedDatabases.map { dbType =>
withDatabase(dbType) {
spark.sql(testQuery).collect()
}
}
// Validate all databases return same logical results
assert(results.map(_.length).distinct.length == 1)
}
}class JoinPerformanceTest extends JoinPushdownTestSuite {
test("measure join pushdown performance improvement") {
val (resultWithPushdown, timeWithPushdown) = measureExecutionTime {
spark.sql("SELECT * FROM table1 t1 JOIN table2 t2 ON t1.id = t2.id").count()
}
// Disable join pushdown
spark.conf.set("spark.sql.jdbc.pushDownJoin", "false")
val (resultWithoutPushdown, timeWithoutPushdown) = measureExecutionTime {
spark.sql("SELECT * FROM table1 t1 JOIN table2 t2 ON t1.id = t2.id").count()
}
assert(resultWithPushdown == resultWithoutPushdown)
assert(timeWithPushdown < timeWithoutPushdown)
}
}class SecureConnectionTest extends KerberosTestSuite {
test("test secure connection with Kerberos") {
val kerberosConfig = KerberosConfig(
realm = "EXAMPLE.COM",
kdc = "kdc.example.com",
principal = "spark/hadoop@EXAMPLE.COM",
keytab = "/etc/security/keytabs/spark.keytab",
ticketLifetime = Duration.ofHours(8)
)
withKerberosAuth(kerberosConfig) {
val df = spark.read
.format("jdbc")
.option("url", getSecureJdbcUrl())
.option("dbtable", "secure_table")
.load()
assert(df.count() > 0)
}
}
}