Docker-based integration testing framework for Apache Spark JDBC connectivity with multiple database systems
Base testing infrastructure providing Docker container lifecycle management and shared test utilities for database integration testing. This framework extends Spark's SharedSparkSession to provide database-specific testing capabilities.
Abstract base class that provides the foundation for all database integration tests with Docker container management and common test utilities.
/**
* Base abstract class for Docker-based JDBC integration tests
* Extends SharedSparkSession to provide Spark context and session
*/
abstract class DockerJDBCIntegrationSuite extends SharedSparkSession {
/** Database type identifier (e.g., "postgresql", "mysql") */
def databaseType: String
/** Docker image name and tag for the database */
def databaseImage: String
/** Start Docker container for the database */
def startDockerContainer(): Unit
/** Stop and cleanup Docker container */
def stopDockerContainer(): Unit
/** Get JDBC connection URL for the test database */
def getJdbcUrl(): String
/** Get active JDBC connection to the test database */
def getJdbcConnection(): Connection
/** Setup initial test data in the database */
def setupTestData(): Unit
/** Run JDBC connectivity test */
def runJdbcTest(sql: String): DataFrame
/** Get database-specific JDBC properties */
def getJdbcProperties(): Properties
/** Validate database connection health */
def validateConnection(): Boolean
}Usage Examples:
class MyPostgreSQLTest extends DockerJDBCIntegrationSuite {
override val databaseType = "postgresql"
override val databaseImage = "postgres:13"
override def beforeAll(): Unit = {
super.beforeAll()
startDockerContainer()
setupTestData()
}
override def afterAll(): Unit = {
try {
stopDockerContainer()
} finally {
super.afterAll()
}
}
test("test table operations") {
val df = runJdbcTest("SELECT * FROM test_table")
assert(df.count() > 0)
assert(df.columns.contains("id"))
}
test("test data insertion") {
val connection = getJdbcConnection()
val statement = connection.createStatement()
statement.execute("INSERT INTO test_table VALUES (1, 'test')")
val df = spark.read
.format("jdbc")
.option("url", getJdbcUrl())
.option("dbtable", "test_table")
.load()
assert(df.filter($"id" === 1).count() == 1)
}
}Methods for managing Docker container lifecycle during test execution.
/**
* Start Docker container for the database
* Creates and starts a new container instance
* Waits for database to be ready for connections
*/
def startDockerContainer(): Unit
/**
* Stop and cleanup Docker container
* Gracefully stops the container and removes it
* Cleans up any associated resources
*/
def stopDockerContainer(): Unit
/**
* Get JDBC connection URL for the test database
* @return JDBC URL string for connecting to the test database
*/
def getJdbcUrl(): String
/**
* Get active JDBC connection to the test database
* @return Active Connection object for database operations
*/
def getJdbcConnection(): ConnectionMethods for managing test data setup and cleanup.
/**
* Setup initial test data in the database
* Creates necessary tables and populates with test data
* Should be called after container startup
*/
def setupTestData(): Unit
/**
* Run JDBC connectivity test with SQL query
* @param sql SQL query to execute
* @return DataFrame containing query results
*/
def runJdbcTest(sql: String): DataFrame
/**
* Get database-specific JDBC properties
* @return Properties object with database-specific settings
*/
def getJdbcProperties(): Properties
/**
* Validate database connection health
* @return true if connection is healthy, false otherwise
*/
def validateConnection(): BooleanIntegration with Spark's testing framework and session management.
/**
* Access to shared Spark session for JDBC operations
* Inherited from SharedSparkSession
*/
def spark: SparkSession
/**
* Create DataFrame from JDBC source
* @param tableName Name of the database table
* @return DataFrame containing table data
*/
def readJdbcTable(tableName: String): DataFrame
/**
* Write DataFrame to JDBC destination
* @param df DataFrame to write
* @param tableName Target table name
* @param mode Write mode (append, overwrite, etc.)
*/
def writeJdbcTable(df: DataFrame, tableName: String, mode: String = "append"): UnitThe framework follows a standard test lifecycle pattern:
The framework provides robust error handling for common scenarios:
All methods include proper exception handling and resource cleanup to prevent test isolation issues.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-docker-integration-tests-2-11