0
# Advanced Testing Features
1
2
Specialized testing capabilities including cross-database compatibility, DataSource V2 integration, Kerberos authentication, and join pushdown optimization testing. These advanced features enable comprehensive validation of Spark's database integration capabilities.
3
4
## Capabilities
5
6
### Cross-Database Query Testing
7
8
Test suite for validating query compatibility and data operations across different database systems.
9
10
```scala { .api }
11
/**
12
* Cross-database query compatibility test suite
13
* Tests queries that should work consistently across different databases
14
*/
15
class CrossDatabaseQuerySuite extends DockerJDBCIntegrationSuite {
16
17
/** List of database types to test against */
18
def supportedDatabases: List[String]
19
20
/** Test cross-database joins */
21
def testCrossDbJoins(): Unit
22
23
/** Test data type mapping between databases */
24
def testDataTypeMapping(): Unit
25
26
/** Test SQL dialect compatibility */
27
def testSqlDialectCompatibility(): Unit
28
29
/** Test common SQL functions across databases */
30
def testCommonSqlFunctions(): Unit
31
32
/** Test aggregate operations consistency */
33
def testAggregateOperations(): Unit
34
35
/** Test transaction behavior across databases */
36
def testTransactionBehavior(): Unit
37
}
38
```
39
40
**Usage Examples:**
41
42
```scala
43
class MyQueryCompatibilityTest extends CrossDatabaseQuerySuite {
44
override def supportedDatabases = List("postgresql", "mysql", "sqlserver")
45
46
test("test standard SQL functions") {
47
supportedDatabases.foreach { dbType =>
48
withDatabase(dbType) {
49
val df = spark.sql("SELECT UPPER(name), LENGTH(name) FROM users")
50
assert(df.count() > 0)
51
}
52
}
53
}
54
}
55
```
56
57
### DataSource V2 Integration Testing
58
59
Test suite for Spark's DataSource V2 API integration with JDBC data sources.
60
61
```scala { .api }
62
/**
63
* DataSource V2 API integration test suite
64
* Tests modern Spark DataSource API with JDBC sources
65
*/
66
class DataSourceV2TestSuite extends DockerJDBCIntegrationSuite {
67
68
/** Test namespace operations (CREATE/DROP database) */
69
def testNamespaceOperations(): Unit
70
71
/** Test table operations (CREATE/DROP/ALTER table) */
72
def testTableOperations(): Unit
73
74
/** Test partition handling */
75
def testPartitionHandling(): Unit
76
77
/** Test catalog operations */
78
def testCatalogOperations(): Unit
79
80
/** Test streaming integration */
81
def testStreamingIntegration(): Unit
82
83
/** Test column pruning optimization */
84
def testColumnPruning(): Unit
85
86
/** Test predicate pushdown */
87
def testPredicatePushdown(): Unit
88
}
89
```
90
91
```scala { .api }
92
/**
93
* Test namespace CRUD operations
94
* Validates CREATE DATABASE, DROP DATABASE operations
95
*/
96
def testNamespaceOperations(): Unit
97
98
/**
99
* Test table CRUD operations
100
* Validates CREATE TABLE, DROP TABLE, ALTER TABLE operations
101
*/
102
def testTableOperations(): Unit
103
104
/**
105
* Test table partition management
106
* Validates partition-aware reads and writes
107
*/
108
def testPartitionHandling(): Unit
109
110
/**
111
* Test catalog integration
112
* Validates catalog API with JDBC sources
113
*/
114
def testCatalogOperations(): Unit
115
116
/**
117
* Test streaming read/write operations
118
* Validates structured streaming with JDBC
119
*/
120
def testStreamingIntegration(): Unit
121
```
122
123
### Kerberos Authentication Testing
124
125
Test suite for secure database authentication using Kerberos protocol.
126
127
```scala { .api }
128
/**
129
* Kerberos authentication integration test suite
130
* Tests secure authentication with Kerberos-enabled databases
131
*/
132
class KerberosTestSuite extends DockerJDBCIntegrationSuite {
133
134
/** Test Kerberos login functionality */
135
def testKerberosLogin(): Unit
136
137
/** Test secure JDBC connections */
138
def testSecureJdbcConnection(): Unit
139
140
/** Test Kerberos ticket renewal */
141
def testTicketRenewal(): Unit
142
143
/** Test delegation token support */
144
def testDelegationTokens(): Unit
145
146
/** Test principal mapping */
147
def testPrincipalMapping(): Unit
148
149
/** Test cross-realm authentication */
150
def testCrossRealmAuth(): Unit
151
}
152
```
153
154
```scala { .api }
155
/**
156
* Test Kerberos login process
157
* Validates TGT acquisition and validation
158
*/
159
def testKerberosLogin(): Unit
160
161
/**
162
* Test secure JDBC connection establishment
163
* Validates JDBC connection with Kerberos authentication
164
*/
165
def testSecureJdbcConnection(): Unit
166
167
/**
168
* Test automatic ticket renewal
169
* Validates long-running job ticket renewal
170
*/
171
def testTicketRenewal(): Unit
172
173
/**
174
* Test delegation token generation and usage
175
* Validates token-based authentication for distributed jobs
176
*/
177
def testDelegationTokens(): Unit
178
```
179
180
### Join Pushdown Optimization Testing
181
182
Test suite for validating Spark's join pushdown optimization features with JDBC sources.
183
184
```scala { .api }
185
/**
186
* Join pushdown optimization test suite
187
* Tests Spark's ability to push joins down to the database
188
*/
189
class JoinPushdownTestSuite extends DockerJDBCIntegrationSuite {
190
191
/** Test simple join pushdown */
192
def testSimpleJoinPushdown(): Unit
193
194
/** Test complex join scenarios */
195
def testComplexJoinPushdown(): Unit
196
197
/** Test join pushdown performance improvements */
198
def testJoinPushdownPerformance(): Unit
199
200
/** Test join pushdown with filters */
201
def testJoinPushdownWithFilters(): Unit
202
203
/** Test join pushdown limitations */
204
def testJoinPushdownLimitations(): Unit
205
206
/** Test cross-database join behavior */
207
def testCrossDatabaseJoins(): Unit
208
}
209
```
210
211
```scala { .api }
212
/**
213
* Test basic join pushdown optimization
214
* Validates that simple joins are pushed to database
215
*/
216
def testSimpleJoinPushdown(): Unit
217
218
/**
219
* Test complex join scenarios
220
* Validates multi-table joins, outer joins, etc.
221
*/
222
def testComplexJoinPushdown(): Unit
223
224
/**
225
* Test performance improvements from join pushdown
226
* Measures execution time and data transfer reduction
227
*/
228
def testJoinPushdownPerformance(): Unit
229
230
/**
231
* Test join pushdown with WHERE clause filters
232
* Validates combined predicate and join pushdown
233
*/
234
def testJoinPushdownWithFilters(): Unit
235
236
/**
237
* Test scenarios where join pushdown cannot be applied
238
* Validates fallback to Spark-side joins
239
*/
240
def testJoinPushdownLimitations(): Unit
241
```
242
243
### Performance Testing
244
245
Test suite for measuring and validating database operation performance.
246
247
```scala { .api }
248
/**
249
* Performance testing suite for database operations
250
* Measures query execution times, throughput, and resource usage
251
*/
252
class PerformanceTestSuite extends DockerJDBCIntegrationSuite {
253
254
/** Test query execution performance */
255
def testQueryPerformance(): Unit
256
257
/** Test bulk data loading performance */
258
def testBulkLoadPerformance(): Unit
259
260
/** Test concurrent connection performance */
261
def testConcurrentConnections(): Unit
262
263
/** Test large result set handling */
264
def testLargeResultSets(): Unit
265
266
/** Test connection pooling efficiency */
267
def testConnectionPooling(): Unit
268
269
/** Test memory usage optimization */
270
def testMemoryUsage(): Unit
271
}
272
```
273
274
### Integration Testing Utilities
275
276
Utility functions and helpers for advanced integration testing scenarios.
277
278
```scala { .api }
279
/**
280
* Utility object for advanced integration testing
281
* Provides helper functions for complex test scenarios
282
*/
283
object IntegrationTestUtil {
284
285
/**
286
* Run test against multiple database types
287
* @param databases List of database types to test
288
* @param testFunction Test function to execute
289
*/
290
def testAcrossDatabases(databases: List[String])(testFunction: String => Unit): Unit
291
292
/**
293
* Measure query execution time
294
* @param operation Operation to measure
295
* @return Execution time in milliseconds
296
*/
297
def measureExecutionTime[T](operation: => T): (T, Long)
298
299
/**
300
* Compare query results across databases
301
* @param sql SQL query to execute
302
* @param databases List of databases to compare
303
* @return Comparison result
304
*/
305
def compareQueryResults(sql: String, databases: List[String]): QueryComparisonResult
306
307
/**
308
* Generate test data for performance testing
309
* @param rowCount Number of rows to generate
310
* @param schema Schema definition
311
* @return Generated test data
312
*/
313
def generateTestData(rowCount: Int, schema: StructType): DataFrame
314
315
/**
316
* Validate query plan contains expected optimizations
317
* @param df DataFrame to analyze
318
* @param expectedOptimizations List of expected optimizations
319
* @return true if all optimizations are present
320
*/
321
def validateQueryPlan(df: DataFrame, expectedOptimizations: List[String]): Boolean
322
}
323
```
324
325
## Types
326
327
```scala { .api }
328
case class QueryComparisonResult(
329
isIdentical: Boolean,
330
rowCountDifferences: Map[String, Long],
331
schemaDifferences: Map[String, List[String]],
332
dataDifferences: Map[String, List[String]]
333
)
334
335
case class PerformanceMetrics(
336
executionTimeMs: Long,
337
rowsProcessed: Long,
338
bytesRead: Long,
339
memoryUsedMB: Long,
340
cpuTimeMs: Long
341
)
342
343
case class OptimizationResult(
344
optimizationType: String,
345
isApplied: Boolean,
346
performanceGain: Option[Double],
347
details: Map[String, Any]
348
)
349
350
case class KerberosConfig(
351
realm: String,
352
kdc: String,
353
principal: String,
354
keytab: String,
355
ticketLifetime: Duration
356
)
357
```
358
359
## Advanced Usage Examples
360
361
### Cross-Database Testing
362
363
```scala
364
class DatabaseCompatibilityTest extends CrossDatabaseQuerySuite {
365
override def supportedDatabases = List("postgresql", "mysql", "oracle")
366
367
test("test date functions across databases") {
368
val testQuery = "SELECT CURRENT_DATE, EXTRACT(YEAR FROM CURRENT_DATE)"
369
370
val results = supportedDatabases.map { dbType =>
371
withDatabase(dbType) {
372
spark.sql(testQuery).collect()
373
}
374
}
375
376
// Validate all databases return same logical results
377
assert(results.map(_.length).distinct.length == 1)
378
}
379
}
380
```
381
382
### Performance Benchmarking
383
384
```scala
385
class JoinPerformanceTest extends JoinPushdownTestSuite {
386
test("measure join pushdown performance improvement") {
387
val (resultWithPushdown, timeWithPushdown) = measureExecutionTime {
388
spark.sql("SELECT * FROM table1 t1 JOIN table2 t2 ON t1.id = t2.id").count()
389
}
390
391
// Disable join pushdown
392
spark.conf.set("spark.sql.jdbc.pushDownJoin", "false")
393
394
val (resultWithoutPushdown, timeWithoutPushdown) = measureExecutionTime {
395
spark.sql("SELECT * FROM table1 t1 JOIN table2 t2 ON t1.id = t2.id").count()
396
}
397
398
assert(resultWithPushdown == resultWithoutPushdown)
399
assert(timeWithPushdown < timeWithoutPushdown)
400
}
401
}
402
```
403
404
### Kerberos Authentication
405
406
```scala
407
class SecureConnectionTest extends KerberosTestSuite {
408
test("test secure connection with Kerberos") {
409
val kerberosConfig = KerberosConfig(
410
realm = "EXAMPLE.COM",
411
kdc = "kdc.example.com",
412
principal = "spark/hadoop@EXAMPLE.COM",
413
keytab = "/etc/security/keytabs/spark.keytab",
414
ticketLifetime = Duration.ofHours(8)
415
)
416
417
withKerberosAuth(kerberosConfig) {
418
val df = spark.read
419
.format("jdbc")
420
.option("url", getSecureJdbcUrl())
421
.option("dbtable", "secure_table")
422
.load()
423
424
assert(df.count() > 0)
425
}
426
}
427
}
428
```