or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

advanced-features.mdcontainer-management.mdcore-framework.mddatabase-testing.mdindex.mdjdbc-utilities.md
tile.json

advanced-features.mddocs/

Advanced Testing Features

Specialized testing capabilities including cross-database compatibility, DataSource V2 integration, Kerberos authentication, and join pushdown optimization testing. These advanced features enable comprehensive validation of Spark's database integration capabilities.

Capabilities

Cross-Database Query Testing

Test suite for validating query compatibility and data operations across different database systems.

/**
 * Cross-database query compatibility test suite
 * Tests queries that should work consistently across different databases
 */
class CrossDatabaseQuerySuite extends DockerJDBCIntegrationSuite {
  
  /** List of database types to test against */
  def supportedDatabases: List[String]
  
  /** Test cross-database joins */
  def testCrossDbJoins(): Unit
  
  /** Test data type mapping between databases */
  def testDataTypeMapping(): Unit
  
  /** Test SQL dialect compatibility */
  def testSqlDialectCompatibility(): Unit
  
  /** Test common SQL functions across databases */
  def testCommonSqlFunctions(): Unit
  
  /** Test aggregate operations consistency */
  def testAggregateOperations(): Unit
  
  /** Test transaction behavior across databases */
  def testTransactionBehavior(): Unit
}

Usage Examples:

class MyQueryCompatibilityTest extends CrossDatabaseQuerySuite {
  override def supportedDatabases = List("postgresql", "mysql", "sqlserver")
  
  test("test standard SQL functions") {
    supportedDatabases.foreach { dbType =>
      withDatabase(dbType) {
        val df = spark.sql("SELECT UPPER(name), LENGTH(name) FROM users")
        assert(df.count() > 0)
      }
    }
  }
}

DataSource V2 Integration Testing

Test suite for Spark's DataSource V2 API integration with JDBC data sources.

/**
 * DataSource V2 API integration test suite
 * Tests modern Spark DataSource API with JDBC sources
 */
class DataSourceV2TestSuite extends DockerJDBCIntegrationSuite {
  
  /** Test namespace operations (CREATE/DROP database) */
  def testNamespaceOperations(): Unit
  
  /** Test table operations (CREATE/DROP/ALTER table) */
  def testTableOperations(): Unit
  
  /** Test partition handling */
  def testPartitionHandling(): Unit
  
  /** Test catalog operations */
  def testCatalogOperations(): Unit
  
  /** Test streaming integration */
  def testStreamingIntegration(): Unit
  
  /** Test column pruning optimization */
  def testColumnPruning(): Unit
  
  /** Test predicate pushdown */
  def testPredicatePushdown(): Unit
}
/**
 * Test namespace CRUD operations
 * Validates CREATE DATABASE, DROP DATABASE operations
 */
def testNamespaceOperations(): Unit

/**
 * Test table CRUD operations  
 * Validates CREATE TABLE, DROP TABLE, ALTER TABLE operations
 */
def testTableOperations(): Unit

/**
 * Test table partition management
 * Validates partition-aware reads and writes
 */
def testPartitionHandling(): Unit

/**
 * Test catalog integration
 * Validates catalog API with JDBC sources
 */
def testCatalogOperations(): Unit

/**
 * Test streaming read/write operations
 * Validates structured streaming with JDBC
 */
def testStreamingIntegration(): Unit

Kerberos Authentication Testing

Test suite for secure database authentication using Kerberos protocol.

/**
 * Kerberos authentication integration test suite
 * Tests secure authentication with Kerberos-enabled databases
 */
class KerberosTestSuite extends DockerJDBCIntegrationSuite {
  
  /** Test Kerberos login functionality */
  def testKerberosLogin(): Unit
  
  /** Test secure JDBC connections */
  def testSecureJdbcConnection(): Unit
  
  /** Test Kerberos ticket renewal */
  def testTicketRenewal(): Unit
  
  /** Test delegation token support */
  def testDelegationTokens(): Unit
  
  /** Test principal mapping */
  def testPrincipalMapping(): Unit
  
  /** Test cross-realm authentication */
  def testCrossRealmAuth(): Unit
}
/**
 * Test Kerberos login process
 * Validates TGT acquisition and validation
 */
def testKerberosLogin(): Unit

/**
 * Test secure JDBC connection establishment  
 * Validates JDBC connection with Kerberos authentication
 */
def testSecureJdbcConnection(): Unit

/**
 * Test automatic ticket renewal
 * Validates long-running job ticket renewal
 */
def testTicketRenewal(): Unit

/**
 * Test delegation token generation and usage
 * Validates token-based authentication for distributed jobs
 */
def testDelegationTokens(): Unit

Join Pushdown Optimization Testing

Test suite for validating Spark's join pushdown optimization features with JDBC sources.

/**
 * Join pushdown optimization test suite
 * Tests Spark's ability to push joins down to the database
 */
class JoinPushdownTestSuite extends DockerJDBCIntegrationSuite {
  
  /** Test simple join pushdown */
  def testSimpleJoinPushdown(): Unit
  
  /** Test complex join scenarios */  
  def testComplexJoinPushdown(): Unit
  
  /** Test join pushdown performance improvements */
  def testJoinPushdownPerformance(): Unit
  
  /** Test join pushdown with filters */
  def testJoinPushdownWithFilters(): Unit
  
  /** Test join pushdown limitations */
  def testJoinPushdownLimitations(): Unit
  
  /** Test cross-database join behavior */
  def testCrossDatabaseJoins(): Unit
}
/**
 * Test basic join pushdown optimization
 * Validates that simple joins are pushed to database
 */
def testSimpleJoinPushdown(): Unit

/**
 * Test complex join scenarios
 * Validates multi-table joins, outer joins, etc.
 */
def testComplexJoinPushdown(): Unit

/**
 * Test performance improvements from join pushdown
 * Measures execution time and data transfer reduction
 */
def testJoinPushdownPerformance(): Unit

/**
 * Test join pushdown with WHERE clause filters
 * Validates combined predicate and join pushdown
 */
def testJoinPushdownWithFilters(): Unit

/**
 * Test scenarios where join pushdown cannot be applied
 * Validates fallback to Spark-side joins
 */
def testJoinPushdownLimitations(): Unit

Performance Testing

Test suite for measuring and validating database operation performance.

/**
 * Performance testing suite for database operations
 * Measures query execution times, throughput, and resource usage
 */
class PerformanceTestSuite extends DockerJDBCIntegrationSuite {
  
  /** Test query execution performance */
  def testQueryPerformance(): Unit
  
  /** Test bulk data loading performance */
  def testBulkLoadPerformance(): Unit
  
  /** Test concurrent connection performance */
  def testConcurrentConnections(): Unit
  
  /** Test large result set handling */
  def testLargeResultSets(): Unit
  
  /** Test connection pooling efficiency */
  def testConnectionPooling(): Unit
  
  /** Test memory usage optimization */
  def testMemoryUsage(): Unit
}

Integration Testing Utilities

Utility functions and helpers for advanced integration testing scenarios.

/**
 * Utility object for advanced integration testing
 * Provides helper functions for complex test scenarios
 */
object IntegrationTestUtil {
  
  /**
   * Run test against multiple database types
   * @param databases List of database types to test
   * @param testFunction Test function to execute
   */
  def testAcrossDatabases(databases: List[String])(testFunction: String => Unit): Unit
  
  /**
   * Measure query execution time
   * @param operation Operation to measure
   * @return Execution time in milliseconds
   */
  def measureExecutionTime[T](operation: => T): (T, Long)
  
  /**
   * Compare query results across databases
   * @param sql SQL query to execute
   * @param databases List of databases to compare
   * @return Comparison result
   */
  def compareQueryResults(sql: String, databases: List[String]): QueryComparisonResult
  
  /**
   * Generate test data for performance testing
   * @param rowCount Number of rows to generate
   * @param schema Schema definition
   * @return Generated test data
   */
  def generateTestData(rowCount: Int, schema: StructType): DataFrame
  
  /**
   * Validate query plan contains expected optimizations
   * @param df DataFrame to analyze
   * @param expectedOptimizations List of expected optimizations
   * @return true if all optimizations are present
   */
  def validateQueryPlan(df: DataFrame, expectedOptimizations: List[String]): Boolean
}

Types

case class QueryComparisonResult(
  isIdentical: Boolean,
  rowCountDifferences: Map[String, Long],
  schemaDifferences: Map[String, List[String]],
  dataDifferences: Map[String, List[String]]
)

case class PerformanceMetrics(
  executionTimeMs: Long,
  rowsProcessed: Long,
  bytesRead: Long,
  memoryUsedMB: Long,
  cpuTimeMs: Long
)

case class OptimizationResult(
  optimizationType: String,
  isApplied: Boolean,
  performanceGain: Option[Double],
  details: Map[String, Any]
)

case class KerberosConfig(
  realm: String,
  kdc: String,
  principal: String,
  keytab: String,
  ticketLifetime: Duration
)

Advanced Usage Examples

Cross-Database Testing

class DatabaseCompatibilityTest extends CrossDatabaseQuerySuite {
  override def supportedDatabases = List("postgresql", "mysql", "oracle")
  
  test("test date functions across databases") {
    val testQuery = "SELECT CURRENT_DATE, EXTRACT(YEAR FROM CURRENT_DATE)"
    
    val results = supportedDatabases.map { dbType =>
      withDatabase(dbType) {
        spark.sql(testQuery).collect()
      }
    }
    
    // Validate all databases return same logical results
    assert(results.map(_.length).distinct.length == 1)
  }
}

Performance Benchmarking

class JoinPerformanceTest extends JoinPushdownTestSuite {
  test("measure join pushdown performance improvement") {
    val (resultWithPushdown, timeWithPushdown) = measureExecutionTime {
      spark.sql("SELECT * FROM table1 t1 JOIN table2 t2 ON t1.id = t2.id").count()
    }
    
    // Disable join pushdown
    spark.conf.set("spark.sql.jdbc.pushDownJoin", "false")
    
    val (resultWithoutPushdown, timeWithoutPushdown) = measureExecutionTime {
      spark.sql("SELECT * FROM table1 t1 JOIN table2 t2 ON t1.id = t2.id").count()
    }
    
    assert(resultWithPushdown == resultWithoutPushdown)
    assert(timeWithPushdown < timeWithoutPushdown)
  }
}

Kerberos Authentication

class SecureConnectionTest extends KerberosTestSuite {
  test("test secure connection with Kerberos") {
    val kerberosConfig = KerberosConfig(
      realm = "EXAMPLE.COM",
      kdc = "kdc.example.com",
      principal = "spark/hadoop@EXAMPLE.COM",
      keytab = "/etc/security/keytabs/spark.keytab",
      ticketLifetime = Duration.ofHours(8)
    )
    
    withKerberosAuth(kerberosConfig) {
      val df = spark.read
        .format("jdbc")
        .option("url", getSecureJdbcUrl())
        .option("dbtable", "secure_table")
        .load()
      
      assert(df.count() > 0)
    }
  }
}