Utility functions for JDBC connection management, query execution, and resource cleanup with robust error handling and connection validation. These utilities provide a higher-level interface for database operations in the testing framework.
Main utility object providing JDBC operation helpers and connection management functions.
/**
* Utility object for JDBC connection management and operations
* Provides helper functions for common JDBC tasks with proper error handling
*/
object JDBCConnectionUtil {
/**
* Create a JDBC connection with properties
* @param url JDBC connection URL
* @param properties Connection properties (username, password, etc.)
* @return Active Connection object
*/
def createConnection(url: String, properties: Properties): Connection
/**
* Create a JDBC connection with username and password
* @param url JDBC connection URL
* @param username Database username
* @param password Database password
* @return Active Connection object
*/
def createConnection(url: String, username: String, password: String): Connection
/**
* Execute a SQL query and return ResultSet
* @param connection Active database connection
* @param sql SQL query to execute
* @return ResultSet containing query results
*/
def executeQuery(connection: Connection, sql: String): ResultSet
/**
* Execute a SQL update statement
* @param connection Active database connection
* @param sql SQL update/insert/delete statement
* @return Number of affected rows
*/
def executeUpdate(connection: Connection, sql: String): Int
/**
* Validate database connection health
* @param connection Connection to validate
* @return true if connection is valid and responsive
*/
def validateConnection(connection: Connection): Boolean
/**
* Close JDBC resources safely
* @param resources Variable number of AutoCloseable resources
*/
def closeResources(resources: AutoCloseable*): Unit
/**
* Execute SQL with automatic resource cleanup
* @param connection Database connection
* @param sql SQL statement to execute
* @param handler Function to process ResultSet
* @return Result of handler function
*/
def withStatement[T](connection: Connection, sql: String)(handler: ResultSet => T): T
/**
* Get database metadata information
* @param connection Database connection
* @return DatabaseMetaData object
*/
def getMetaData(connection: Connection): DatabaseMetaData
}Usage Examples:
import java.util.Properties
// Create connection with properties
val props = new Properties()
props.setProperty("user", "testuser")
props.setProperty("password", "testpass")
val connection = JDBCConnectionUtil.createConnection(jdbcUrl, props)
// Execute query with automatic cleanup
val result = JDBCConnectionUtil.withStatement(connection, "SELECT COUNT(*) FROM users") { rs =>
rs.next()
rs.getInt(1)
}
// Validate connection
if (JDBCConnectionUtil.validateConnection(connection)) {
// Connection is healthy
}
// Clean up resources
JDBCConnectionUtil.closeResources(connection)Functions for creating and managing database connections.
/**
* Create connection with default properties
* @param url JDBC URL
* @param username Database username
* @param password Database password
* @return Connection object
*/
def createConnection(url: String, username: String, password: String): Connection
/**
* Create connection with custom properties
* @param url JDBC URL
* @param properties Connection properties
* @return Connection object
*/
def createConnection(url: String, properties: Properties): Connection
/**
* Create connection with timeout
* @param url JDBC URL
* @param username Database username
* @param password Database password
* @param timeoutSeconds Connection timeout in seconds
* @return Connection object
*/
def createConnectionWithTimeout(url: String, username: String, password: String, timeoutSeconds: Int): Connection
/**
* Test connection without creating full connection
* @param url JDBC URL
* @param username Database username
* @param password Database password
* @return true if connection test succeeds
*/
def testConnection(url: String, username: String, password: String): BooleanFunctions for executing SQL queries and statements.
/**
* Execute SQL query and return ResultSet
* @param connection Database connection
* @param sql SQL query
* @return ResultSet with query results
*/
def executeQuery(connection: Connection, sql: String): ResultSet
/**
* Execute parameterized query
* @param connection Database connection
* @param sql SQL query with parameters (?)
* @param params Parameter values
* @return ResultSet with query results
*/
def executeQuery(connection: Connection, sql: String, params: Any*): ResultSet
/**
* Execute update statement (INSERT, UPDATE, DELETE)
* @param connection Database connection
* @param sql SQL statement
* @return Number of affected rows
*/
def executeUpdate(connection: Connection, sql: String): Int
/**
* Execute parameterized update statement
* @param connection Database connection
* @param sql SQL statement with parameters (?)
* @param params Parameter values
* @return Number of affected rows
*/
def executeUpdate(connection: Connection, sql: String, params: Any*): Int
/**
* Execute batch of SQL statements
* @param connection Database connection
* @param sqlStatements List of SQL statements
* @return Array of update counts
*/
def executeBatch(connection: Connection, sqlStatements: List[String]): Array[Int]Functions for managing JDBC resources and cleanup.
/**
* Close multiple AutoCloseable resources safely
* Handles exceptions and ensures all resources are closed
* @param resources Variable arguments of AutoCloseable resources
*/
def closeResources(resources: AutoCloseable*): Unit
/**
* Execute operation with automatic resource cleanup
* @param connection Database connection
* @param sql SQL statement
* @param handler Function to process ResultSet
* @return Result of handler function
*/
def withStatement[T](connection: Connection, sql: String)(handler: ResultSet => T): T
/**
* Execute operation with PreparedStatement
* @param connection Database connection
* @param sql SQL with parameters
* @param params Parameter values
* @param handler Function to process ResultSet
* @return Result of handler function
*/
def withPreparedStatement[T](connection: Connection, sql: String, params: Any*)(handler: ResultSet => T): T
/**
* Execute operation with transaction management
* @param connection Database connection
* @param operation Function to execute within transaction
* @return Result of operation
*/
def withTransaction[T](connection: Connection)(operation: Connection => T): TFunctions for testing and validating database connections.
/**
* Validate database connection health
* Tests if connection is active and responsive
* @param connection Connection to validate
* @return true if connection is valid
*/
def validateConnection(connection: Connection): Boolean
/**
* Validate connection with timeout
* @param connection Connection to validate
* @param timeoutSeconds Timeout for validation
* @return true if connection is valid within timeout
*/
def validateConnection(connection: Connection, timeoutSeconds: Int): Boolean
/**
* Check if connection is closed
* @param connection Connection to check
* @return true if connection is closed
*/
def isConnectionClosed(connection: Connection): Boolean
/**
* Test database connectivity with simple query
* @param connection Database connection
* @return true if test query succeeds
*/
def testConnectivity(connection: Connection): Boolean
/**
* Get connection information for debugging
* @param connection Database connection
* @return ConnectionInfo with details
*/
def getConnectionInfo(connection: Connection): ConnectionInfoFunctions for retrieving database schema and metadata information.
/**
* Get database metadata
* @param connection Database connection
* @return DatabaseMetaData object
*/
def getMetaData(connection: Connection): DatabaseMetaData
/**
* Get list of tables in database
* @param connection Database connection
* @param schema Schema name (optional)
* @return List of table names
*/
def getTables(connection: Connection, schema: Option[String] = None): List[String]
/**
* Get table column information
* @param connection Database connection
* @param tableName Table name
* @param schema Schema name (optional)
* @return List of ColumnInfo objects
*/
def getColumns(connection: Connection, tableName: String, schema: Option[String] = None): List[ColumnInfo]
/**
* Check if table exists
* @param connection Database connection
* @param tableName Table name to check
* @param schema Schema name (optional)
* @return true if table exists
*/
def tableExists(connection: Connection, tableName: String, schema: Option[String] = None): Boolean
/**
* Get database product information
* @param connection Database connection
* @return DatabaseProduct with name, version, etc.
*/
def getDatabaseProduct(connection: Connection): DatabaseProductUtilities for generating test data and setting up test database schemas.
/**
* Object for generating test data and managing test database schemas
* Provides utilities for creating consistent test datasets across different databases
*/
object TestDataGenerator {
/**
* Generate sample data based on schema definition
* @param schema StructType defining the data schema
* @param rowCount Number of rows to generate
* @return DataFrame with generated test data
*/
def generateSampleData(schema: StructType, rowCount: Int = 100): DataFrame
/**
* Create test tables in the database
* @param connection Database connection
* @param tableDefinitions List of table creation SQL statements
* @return Number of tables created
*/
def createTestTables(connection: Connection, tableDefinitions: List[String]): Int
/**
* Create test tables with predefined schemas
* @param connection Database connection
* @param tableSchemas Map of table name to schema definition
* @return Number of tables created
*/
def createTestTables(connection: Connection, tableSchemas: Map[String, StructType]): Int
/**
* Populate test data into database tables
* @param connection Database connection
* @param tableName Name of table to populate
* @param data DataFrame containing test data
* @return Number of rows inserted
*/
def populateTestData(connection: Connection, tableName: String, data: DataFrame): Int
/**
* Populate multiple tables with test data
* @param connection Database connection
* @param tableData Map of table name to DataFrame
* @return Map of table name to number of rows inserted
*/
def populateTestData(connection: Connection, tableData: Map[String, DataFrame]): Map[String, Int]
/**
* Generate common test data types (users, products, orders, etc.)
* @param dataType Type of test data to generate
* @param count Number of records
* @return DataFrame with generated data
*/
def generateCommonTestData(dataType: String, count: Int = 100): DataFrame
/**
* Create database-specific test schema
* @param connection Database connection
* @param databaseType Database type (postgresql, mysql, etc.)
* @return List of created table names
*/
def createDatabaseSpecificSchema(connection: Connection, databaseType: String): List[String]
/**
* Clean up test data and tables
* @param connection Database connection
* @param tableNames List of table names to drop
*/
def cleanupTestData(connection: Connection, tableNames: List[String]): Unit
}Usage Examples:
import org.apache.spark.sql.types._
import org.apache.spark.sql.types.DataTypes._
// Generate sample data with custom schema
val schema = StructType(Array(
StructField("id", IntegerType, false),
StructField("name", StringType, true),
StructField("email", StringType, true),
StructField("age", IntegerType, true)
))
val testData = TestDataGenerator.generateSampleData(schema, 1000)
// Create and populate test tables
val connection = JDBCConnectionUtil.createConnection(jdbcUrl, username, password)
val tableSchemas = Map(
"users" -> schema,
"products" -> productSchema
)
TestDataGenerator.createTestTables(connection, tableSchemas)
TestDataGenerator.populateTestData(connection, "users", testData)
// Generate common test data types
val userData = TestDataGenerator.generateCommonTestData("users", 500)
val orderData = TestDataGenerator.generateCommonTestData("orders", 200)
// Database-specific schema setup
val tableNames = TestDataGenerator.createDatabaseSpecificSchema(connection, "postgresql")
// Cleanup after tests
TestDataGenerator.cleanupTestData(connection, tableNames)case class ConnectionInfo(
url: String,
username: String,
driverClass: String,
isValid: Boolean,
isClosed: Boolean,
autoCommit: Boolean
)
case class ColumnInfo(
columnName: String,
dataType: String,
typeName: String,
columnSize: Int,
isNullable: Boolean,
isPrimaryKey: Boolean
)
case class DatabaseProduct(
productName: String,
productVersion: String,
driverName: String,
driverVersion: String,
jdbcMajorVersion: Int,
jdbcMinorVersion: Int
)The utility functions provide comprehensive error handling:
When using JDBC utilities:
withStatement or withPreparedStatement for automatic cleanup