Docker-based integration testing framework for Apache Spark JDBC connectivity with multiple database systems
Utility functions for JDBC connection management, query execution, and resource cleanup with robust error handling and connection validation. These utilities provide a higher-level interface for database operations in the testing framework.
Main utility object providing JDBC operation helpers and connection management functions.
/**
* Utility object for JDBC connection management and operations
* Provides helper functions for common JDBC tasks with proper error handling
*/
object JDBCConnectionUtil {
/**
* Create a JDBC connection with properties
* @param url JDBC connection URL
* @param properties Connection properties (username, password, etc.)
* @return Active Connection object
*/
def createConnection(url: String, properties: Properties): Connection
/**
* Create a JDBC connection with username and password
* @param url JDBC connection URL
* @param username Database username
* @param password Database password
* @return Active Connection object
*/
def createConnection(url: String, username: String, password: String): Connection
/**
* Execute a SQL query and return ResultSet
* @param connection Active database connection
* @param sql SQL query to execute
* @return ResultSet containing query results
*/
def executeQuery(connection: Connection, sql: String): ResultSet
/**
* Execute a SQL update statement
* @param connection Active database connection
* @param sql SQL update/insert/delete statement
* @return Number of affected rows
*/
def executeUpdate(connection: Connection, sql: String): Int
/**
* Validate database connection health
* @param connection Connection to validate
* @return true if connection is valid and responsive
*/
def validateConnection(connection: Connection): Boolean
/**
* Close JDBC resources safely
* @param resources Variable number of AutoCloseable resources
*/
def closeResources(resources: AutoCloseable*): Unit
/**
* Execute SQL with automatic resource cleanup
* @param connection Database connection
* @param sql SQL statement to execute
* @param handler Function to process ResultSet
* @return Result of handler function
*/
def withStatement[T](connection: Connection, sql: String)(handler: ResultSet => T): T
/**
* Get database metadata information
* @param connection Database connection
* @return DatabaseMetaData object
*/
def getMetaData(connection: Connection): DatabaseMetaData
}Usage Examples:
import java.util.Properties
// Create connection with properties
val props = new Properties()
props.setProperty("user", "testuser")
props.setProperty("password", "testpass")
val connection = JDBCConnectionUtil.createConnection(jdbcUrl, props)
// Execute query with automatic cleanup
val result = JDBCConnectionUtil.withStatement(connection, "SELECT COUNT(*) FROM users") { rs =>
rs.next()
rs.getInt(1)
}
// Validate connection
if (JDBCConnectionUtil.validateConnection(connection)) {
// Connection is healthy
}
// Clean up resources
JDBCConnectionUtil.closeResources(connection)Functions for creating and managing database connections.
/**
* Create connection with default properties
* @param url JDBC URL
* @param username Database username
* @param password Database password
* @return Connection object
*/
def createConnection(url: String, username: String, password: String): Connection
/**
* Create connection with custom properties
* @param url JDBC URL
* @param properties Connection properties
* @return Connection object
*/
def createConnection(url: String, properties: Properties): Connection
/**
* Create connection with timeout
* @param url JDBC URL
* @param username Database username
* @param password Database password
* @param timeoutSeconds Connection timeout in seconds
* @return Connection object
*/
def createConnectionWithTimeout(url: String, username: String, password: String, timeoutSeconds: Int): Connection
/**
* Test connection without creating full connection
* @param url JDBC URL
* @param username Database username
* @param password Database password
* @return true if connection test succeeds
*/
def testConnection(url: String, username: String, password: String): BooleanFunctions for executing SQL queries and statements.
/**
* Execute SQL query and return ResultSet
* @param connection Database connection
* @param sql SQL query
* @return ResultSet with query results
*/
def executeQuery(connection: Connection, sql: String): ResultSet
/**
* Execute parameterized query
* @param connection Database connection
* @param sql SQL query with parameters (?)
* @param params Parameter values
* @return ResultSet with query results
*/
def executeQuery(connection: Connection, sql: String, params: Any*): ResultSet
/**
* Execute update statement (INSERT, UPDATE, DELETE)
* @param connection Database connection
* @param sql SQL statement
* @return Number of affected rows
*/
def executeUpdate(connection: Connection, sql: String): Int
/**
* Execute parameterized update statement
* @param connection Database connection
* @param sql SQL statement with parameters (?)
* @param params Parameter values
* @return Number of affected rows
*/
def executeUpdate(connection: Connection, sql: String, params: Any*): Int
/**
* Execute batch of SQL statements
* @param connection Database connection
* @param sqlStatements List of SQL statements
* @return Array of update counts
*/
def executeBatch(connection: Connection, sqlStatements: List[String]): Array[Int]Functions for managing JDBC resources and cleanup.
/**
* Close multiple AutoCloseable resources safely
* Handles exceptions and ensures all resources are closed
* @param resources Variable arguments of AutoCloseable resources
*/
def closeResources(resources: AutoCloseable*): Unit
/**
* Execute operation with automatic resource cleanup
* @param connection Database connection
* @param sql SQL statement
* @param handler Function to process ResultSet
* @return Result of handler function
*/
def withStatement[T](connection: Connection, sql: String)(handler: ResultSet => T): T
/**
* Execute operation with PreparedStatement
* @param connection Database connection
* @param sql SQL with parameters
* @param params Parameter values
* @param handler Function to process ResultSet
* @return Result of handler function
*/
def withPreparedStatement[T](connection: Connection, sql: String, params: Any*)(handler: ResultSet => T): T
/**
* Execute operation with transaction management
* @param connection Database connection
* @param operation Function to execute within transaction
* @return Result of operation
*/
def withTransaction[T](connection: Connection)(operation: Connection => T): TFunctions for testing and validating database connections.
/**
* Validate database connection health
* Tests if connection is active and responsive
* @param connection Connection to validate
* @return true if connection is valid
*/
def validateConnection(connection: Connection): Boolean
/**
* Validate connection with timeout
* @param connection Connection to validate
* @param timeoutSeconds Timeout for validation
* @return true if connection is valid within timeout
*/
def validateConnection(connection: Connection, timeoutSeconds: Int): Boolean
/**
* Check if connection is closed
* @param connection Connection to check
* @return true if connection is closed
*/
def isConnectionClosed(connection: Connection): Boolean
/**
* Test database connectivity with simple query
* @param connection Database connection
* @return true if test query succeeds
*/
def testConnectivity(connection: Connection): Boolean
/**
* Get connection information for debugging
* @param connection Database connection
* @return ConnectionInfo with details
*/
def getConnectionInfo(connection: Connection): ConnectionInfoFunctions for retrieving database schema and metadata information.
/**
* Get database metadata
* @param connection Database connection
* @return DatabaseMetaData object
*/
def getMetaData(connection: Connection): DatabaseMetaData
/**
* Get list of tables in database
* @param connection Database connection
* @param schema Schema name (optional)
* @return List of table names
*/
def getTables(connection: Connection, schema: Option[String] = None): List[String]
/**
* Get table column information
* @param connection Database connection
* @param tableName Table name
* @param schema Schema name (optional)
* @return List of ColumnInfo objects
*/
def getColumns(connection: Connection, tableName: String, schema: Option[String] = None): List[ColumnInfo]
/**
* Check if table exists
* @param connection Database connection
* @param tableName Table name to check
* @param schema Schema name (optional)
* @return true if table exists
*/
def tableExists(connection: Connection, tableName: String, schema: Option[String] = None): Boolean
/**
* Get database product information
* @param connection Database connection
* @return DatabaseProduct with name, version, etc.
*/
def getDatabaseProduct(connection: Connection): DatabaseProductUtilities for generating test data and setting up test database schemas.
/**
* Object for generating test data and managing test database schemas
* Provides utilities for creating consistent test datasets across different databases
*/
object TestDataGenerator {
/**
* Generate sample data based on schema definition
* @param schema StructType defining the data schema
* @param rowCount Number of rows to generate
* @return DataFrame with generated test data
*/
def generateSampleData(schema: StructType, rowCount: Int = 100): DataFrame
/**
* Create test tables in the database
* @param connection Database connection
* @param tableDefinitions List of table creation SQL statements
* @return Number of tables created
*/
def createTestTables(connection: Connection, tableDefinitions: List[String]): Int
/**
* Create test tables with predefined schemas
* @param connection Database connection
* @param tableSchemas Map of table name to schema definition
* @return Number of tables created
*/
def createTestTables(connection: Connection, tableSchemas: Map[String, StructType]): Int
/**
* Populate test data into database tables
* @param connection Database connection
* @param tableName Name of table to populate
* @param data DataFrame containing test data
* @return Number of rows inserted
*/
def populateTestData(connection: Connection, tableName: String, data: DataFrame): Int
/**
* Populate multiple tables with test data
* @param connection Database connection
* @param tableData Map of table name to DataFrame
* @return Map of table name to number of rows inserted
*/
def populateTestData(connection: Connection, tableData: Map[String, DataFrame]): Map[String, Int]
/**
* Generate common test data types (users, products, orders, etc.)
* @param dataType Type of test data to generate
* @param count Number of records
* @return DataFrame with generated data
*/
def generateCommonTestData(dataType: String, count: Int = 100): DataFrame
/**
* Create database-specific test schema
* @param connection Database connection
* @param databaseType Database type (postgresql, mysql, etc.)
* @return List of created table names
*/
def createDatabaseSpecificSchema(connection: Connection, databaseType: String): List[String]
/**
* Clean up test data and tables
* @param connection Database connection
* @param tableNames List of table names to drop
*/
def cleanupTestData(connection: Connection, tableNames: List[String]): Unit
}Usage Examples:
import org.apache.spark.sql.types._
import org.apache.spark.sql.types.DataTypes._
// Generate sample data with custom schema
val schema = StructType(Array(
StructField("id", IntegerType, false),
StructField("name", StringType, true),
StructField("email", StringType, true),
StructField("age", IntegerType, true)
))
val testData = TestDataGenerator.generateSampleData(schema, 1000)
// Create and populate test tables
val connection = JDBCConnectionUtil.createConnection(jdbcUrl, username, password)
val tableSchemas = Map(
"users" -> schema,
"products" -> productSchema
)
TestDataGenerator.createTestTables(connection, tableSchemas)
TestDataGenerator.populateTestData(connection, "users", testData)
// Generate common test data types
val userData = TestDataGenerator.generateCommonTestData("users", 500)
val orderData = TestDataGenerator.generateCommonTestData("orders", 200)
// Database-specific schema setup
val tableNames = TestDataGenerator.createDatabaseSpecificSchema(connection, "postgresql")
// Cleanup after tests
TestDataGenerator.cleanupTestData(connection, tableNames)case class ConnectionInfo(
url: String,
username: String,
driverClass: String,
isValid: Boolean,
isClosed: Boolean,
autoCommit: Boolean
)
case class ColumnInfo(
columnName: String,
dataType: String,
typeName: String,
columnSize: Int,
isNullable: Boolean,
isPrimaryKey: Boolean
)
case class DatabaseProduct(
productName: String,
productVersion: String,
driverName: String,
driverVersion: String,
jdbcMajorVersion: Int,
jdbcMinorVersion: Int
)The utility functions provide comprehensive error handling:
When using JDBC utilities:
withStatement or withPreparedStatement for automatic cleanupInstall with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-docker-integration-tests-2-11