or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

advanced-features.mdcontainer-management.mdcore-framework.mddatabase-testing.mdindex.mdjdbc-utilities.md

advanced-features.mddocs/

0

# Advanced Testing Features

1

2

Specialized testing capabilities including cross-database compatibility, DataSource V2 integration, Kerberos authentication, and join pushdown optimization testing. These advanced features enable comprehensive validation of Spark's database integration capabilities.

3

4

## Capabilities

5

6

### Cross-Database Query Testing

7

8

Test suite for validating query compatibility and data operations across different database systems.

9

10

```scala { .api }

11

/**

12

* Cross-database query compatibility test suite

13

* Tests queries that should work consistently across different databases

14

*/

15

class CrossDatabaseQuerySuite extends DockerJDBCIntegrationSuite {

16

17

/** List of database types to test against */

18

def supportedDatabases: List[String]

19

20

/** Test cross-database joins */

21

def testCrossDbJoins(): Unit

22

23

/** Test data type mapping between databases */

24

def testDataTypeMapping(): Unit

25

26

/** Test SQL dialect compatibility */

27

def testSqlDialectCompatibility(): Unit

28

29

/** Test common SQL functions across databases */

30

def testCommonSqlFunctions(): Unit

31

32

/** Test aggregate operations consistency */

33

def testAggregateOperations(): Unit

34

35

/** Test transaction behavior across databases */

36

def testTransactionBehavior(): Unit

37

}

38

```

39

40

**Usage Examples:**

41

42

```scala

43

class MyQueryCompatibilityTest extends CrossDatabaseQuerySuite {

44

override def supportedDatabases = List("postgresql", "mysql", "sqlserver")

45

46

test("test standard SQL functions") {

47

supportedDatabases.foreach { dbType =>

48

withDatabase(dbType) {

49

val df = spark.sql("SELECT UPPER(name), LENGTH(name) FROM users")

50

assert(df.count() > 0)

51

}

52

}

53

}

54

}

55

```

56

57

### DataSource V2 Integration Testing

58

59

Test suite for Spark's DataSource V2 API integration with JDBC data sources.

60

61

```scala { .api }

62

/**

63

* DataSource V2 API integration test suite

64

* Tests modern Spark DataSource API with JDBC sources

65

*/

66

class DataSourceV2TestSuite extends DockerJDBCIntegrationSuite {

67

68

/** Test namespace operations (CREATE/DROP database) */

69

def testNamespaceOperations(): Unit

70

71

/** Test table operations (CREATE/DROP/ALTER table) */

72

def testTableOperations(): Unit

73

74

/** Test partition handling */

75

def testPartitionHandling(): Unit

76

77

/** Test catalog operations */

78

def testCatalogOperations(): Unit

79

80

/** Test streaming integration */

81

def testStreamingIntegration(): Unit

82

83

/** Test column pruning optimization */

84

def testColumnPruning(): Unit

85

86

/** Test predicate pushdown */

87

def testPredicatePushdown(): Unit

88

}

89

```

90

91

```scala { .api }

92

/**

93

* Test namespace CRUD operations

94

* Validates CREATE DATABASE, DROP DATABASE operations

95

*/

96

def testNamespaceOperations(): Unit

97

98

/**

99

* Test table CRUD operations

100

* Validates CREATE TABLE, DROP TABLE, ALTER TABLE operations

101

*/

102

def testTableOperations(): Unit

103

104

/**

105

* Test table partition management

106

* Validates partition-aware reads and writes

107

*/

108

def testPartitionHandling(): Unit

109

110

/**

111

* Test catalog integration

112

* Validates catalog API with JDBC sources

113

*/

114

def testCatalogOperations(): Unit

115

116

/**

117

* Test streaming read/write operations

118

* Validates structured streaming with JDBC

119

*/

120

def testStreamingIntegration(): Unit

121

```

122

123

### Kerberos Authentication Testing

124

125

Test suite for secure database authentication using Kerberos protocol.

126

127

```scala { .api }

128

/**

129

* Kerberos authentication integration test suite

130

* Tests secure authentication with Kerberos-enabled databases

131

*/

132

class KerberosTestSuite extends DockerJDBCIntegrationSuite {

133

134

/** Test Kerberos login functionality */

135

def testKerberosLogin(): Unit

136

137

/** Test secure JDBC connections */

138

def testSecureJdbcConnection(): Unit

139

140

/** Test Kerberos ticket renewal */

141

def testTicketRenewal(): Unit

142

143

/** Test delegation token support */

144

def testDelegationTokens(): Unit

145

146

/** Test principal mapping */

147

def testPrincipalMapping(): Unit

148

149

/** Test cross-realm authentication */

150

def testCrossRealmAuth(): Unit

151

}

152

```

153

154

```scala { .api }

155

/**

156

* Test Kerberos login process

157

* Validates TGT acquisition and validation

158

*/

159

def testKerberosLogin(): Unit

160

161

/**

162

* Test secure JDBC connection establishment

163

* Validates JDBC connection with Kerberos authentication

164

*/

165

def testSecureJdbcConnection(): Unit

166

167

/**

168

* Test automatic ticket renewal

169

* Validates long-running job ticket renewal

170

*/

171

def testTicketRenewal(): Unit

172

173

/**

174

* Test delegation token generation and usage

175

* Validates token-based authentication for distributed jobs

176

*/

177

def testDelegationTokens(): Unit

178

```

179

180

### Join Pushdown Optimization Testing

181

182

Test suite for validating Spark's join pushdown optimization features with JDBC sources.

183

184

```scala { .api }

185

/**

186

* Join pushdown optimization test suite

187

* Tests Spark's ability to push joins down to the database

188

*/

189

class JoinPushdownTestSuite extends DockerJDBCIntegrationSuite {

190

191

/** Test simple join pushdown */

192

def testSimpleJoinPushdown(): Unit

193

194

/** Test complex join scenarios */

195

def testComplexJoinPushdown(): Unit

196

197

/** Test join pushdown performance improvements */

198

def testJoinPushdownPerformance(): Unit

199

200

/** Test join pushdown with filters */

201

def testJoinPushdownWithFilters(): Unit

202

203

/** Test join pushdown limitations */

204

def testJoinPushdownLimitations(): Unit

205

206

/** Test cross-database join behavior */

207

def testCrossDatabaseJoins(): Unit

208

}

209

```

210

211

```scala { .api }

212

/**

213

* Test basic join pushdown optimization

214

* Validates that simple joins are pushed to database

215

*/

216

def testSimpleJoinPushdown(): Unit

217

218

/**

219

* Test complex join scenarios

220

* Validates multi-table joins, outer joins, etc.

221

*/

222

def testComplexJoinPushdown(): Unit

223

224

/**

225

* Test performance improvements from join pushdown

226

* Measures execution time and data transfer reduction

227

*/

228

def testJoinPushdownPerformance(): Unit

229

230

/**

231

* Test join pushdown with WHERE clause filters

232

* Validates combined predicate and join pushdown

233

*/

234

def testJoinPushdownWithFilters(): Unit

235

236

/**

237

* Test scenarios where join pushdown cannot be applied

238

* Validates fallback to Spark-side joins

239

*/

240

def testJoinPushdownLimitations(): Unit

241

```

242

243

### Performance Testing

244

245

Test suite for measuring and validating database operation performance.

246

247

```scala { .api }

248

/**

249

* Performance testing suite for database operations

250

* Measures query execution times, throughput, and resource usage

251

*/

252

class PerformanceTestSuite extends DockerJDBCIntegrationSuite {

253

254

/** Test query execution performance */

255

def testQueryPerformance(): Unit

256

257

/** Test bulk data loading performance */

258

def testBulkLoadPerformance(): Unit

259

260

/** Test concurrent connection performance */

261

def testConcurrentConnections(): Unit

262

263

/** Test large result set handling */

264

def testLargeResultSets(): Unit

265

266

/** Test connection pooling efficiency */

267

def testConnectionPooling(): Unit

268

269

/** Test memory usage optimization */

270

def testMemoryUsage(): Unit

271

}

272

```

273

274

### Integration Testing Utilities

275

276

Utility functions and helpers for advanced integration testing scenarios.

277

278

```scala { .api }

279

/**

280

* Utility object for advanced integration testing

281

* Provides helper functions for complex test scenarios

282

*/

283

object IntegrationTestUtil {

284

285

/**

286

* Run test against multiple database types

287

* @param databases List of database types to test

288

* @param testFunction Test function to execute

289

*/

290

def testAcrossDatabases(databases: List[String])(testFunction: String => Unit): Unit

291

292

/**

293

* Measure query execution time

294

* @param operation Operation to measure

295

* @return Execution time in milliseconds

296

*/

297

def measureExecutionTime[T](operation: => T): (T, Long)

298

299

/**

300

* Compare query results across databases

301

* @param sql SQL query to execute

302

* @param databases List of databases to compare

303

* @return Comparison result

304

*/

305

def compareQueryResults(sql: String, databases: List[String]): QueryComparisonResult

306

307

/**

308

* Generate test data for performance testing

309

* @param rowCount Number of rows to generate

310

* @param schema Schema definition

311

* @return Generated test data

312

*/

313

def generateTestData(rowCount: Int, schema: StructType): DataFrame

314

315

/**

316

* Validate query plan contains expected optimizations

317

* @param df DataFrame to analyze

318

* @param expectedOptimizations List of expected optimizations

319

* @return true if all optimizations are present

320

*/

321

def validateQueryPlan(df: DataFrame, expectedOptimizations: List[String]): Boolean

322

}

323

```

324

325

## Types

326

327

```scala { .api }

328

case class QueryComparisonResult(

329

isIdentical: Boolean,

330

rowCountDifferences: Map[String, Long],

331

schemaDifferences: Map[String, List[String]],

332

dataDifferences: Map[String, List[String]]

333

)

334

335

case class PerformanceMetrics(

336

executionTimeMs: Long,

337

rowsProcessed: Long,

338

bytesRead: Long,

339

memoryUsedMB: Long,

340

cpuTimeMs: Long

341

)

342

343

case class OptimizationResult(

344

optimizationType: String,

345

isApplied: Boolean,

346

performanceGain: Option[Double],

347

details: Map[String, Any]

348

)

349

350

case class KerberosConfig(

351

realm: String,

352

kdc: String,

353

principal: String,

354

keytab: String,

355

ticketLifetime: Duration

356

)

357

```

358

359

## Advanced Usage Examples

360

361

### Cross-Database Testing

362

363

```scala

364

class DatabaseCompatibilityTest extends CrossDatabaseQuerySuite {

365

override def supportedDatabases = List("postgresql", "mysql", "oracle")

366

367

test("test date functions across databases") {

368

val testQuery = "SELECT CURRENT_DATE, EXTRACT(YEAR FROM CURRENT_DATE)"

369

370

val results = supportedDatabases.map { dbType =>

371

withDatabase(dbType) {

372

spark.sql(testQuery).collect()

373

}

374

}

375

376

// Validate all databases return same logical results

377

assert(results.map(_.length).distinct.length == 1)

378

}

379

}

380

```

381

382

### Performance Benchmarking

383

384

```scala

385

class JoinPerformanceTest extends JoinPushdownTestSuite {

386

test("measure join pushdown performance improvement") {

387

val (resultWithPushdown, timeWithPushdown) = measureExecutionTime {

388

spark.sql("SELECT * FROM table1 t1 JOIN table2 t2 ON t1.id = t2.id").count()

389

}

390

391

// Disable join pushdown

392

spark.conf.set("spark.sql.jdbc.pushDownJoin", "false")

393

394

val (resultWithoutPushdown, timeWithoutPushdown) = measureExecutionTime {

395

spark.sql("SELECT * FROM table1 t1 JOIN table2 t2 ON t1.id = t2.id").count()

396

}

397

398

assert(resultWithPushdown == resultWithoutPushdown)

399

assert(timeWithPushdown < timeWithoutPushdown)

400

}

401

}

402

```

403

404

### Kerberos Authentication

405

406

```scala

407

class SecureConnectionTest extends KerberosTestSuite {

408

test("test secure connection with Kerberos") {

409

val kerberosConfig = KerberosConfig(

410

realm = "EXAMPLE.COM",

411

kdc = "kdc.example.com",

412

principal = "spark/hadoop@EXAMPLE.COM",

413

keytab = "/etc/security/keytabs/spark.keytab",

414

ticketLifetime = Duration.ofHours(8)

415

)

416

417

withKerberosAuth(kerberosConfig) {

418

val df = spark.read

419

.format("jdbc")

420

.option("url", getSecureJdbcUrl())

421

.option("dbtable", "secure_table")

422

.load()

423

424

assert(df.count() > 0)

425

}

426

}

427

}

428

```