or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

auth.mdbom.mdconfig.mdfilestore.mdindex.mdinference.mdmetadata.mdpolicy.md

config.mddocs/

0

# Configuration Management

1

2

Comprehensive configuration management for Spark clusters and database connections supporting PostgreSQL, Elasticsearch, Neo4j, and messaging systems. The configuration framework provides property-based settings with environment variable overrides and standardized connection patterns for enterprise data infrastructure.

3

4

## Capabilities

5

6

### Spark RDBMS Configuration

7

8

Manages configuration for PySpark connections to relational database management systems with JDBC drivers, providing standardized database connectivity for data processing workflows.

9

10

```python { .api }

11

class SparkRDBMSConfig:

12

"""

13

Configurations for PySpark Relational Database Management System support.

14

15

Constants:

16

- DEFAULT_JDBC_URL = "jdbc:postgresql://postgres:5432/db"

17

- DEFAULT_JDBC_DRIVER = "org.postgresql.Driver"

18

- DEFAULT_USER = "postgres"

19

- DEFAULT_PASSWORD = "password"

20

"""

21

22

def __init__(self) -> None:

23

"""Initialize with spark-rdbms.properties"""

24

...

25

26

def jdbc_url(self) -> str:

27

"""JDBC URL for database connection"""

28

...

29

30

def jdbc_driver(self) -> str:

31

"""JDBC driver class name"""

32

...

33

34

def user(self) -> str:

35

"""RDBMS user"""

36

...

37

38

def password(self) -> str:

39

"""RDBMS user password"""

40

...

41

```

42

43

### Spark Elasticsearch Configuration

44

45

Comprehensive Elasticsearch integration for Spark with support for cluster discovery, authentication, security settings, and performance tuning options for large-scale search and analytics workloads.

46

47

```python { .api }

48

class SparkElasticsearchConfig:

49

"""

50

Configurations for PySpark Elasticsearch support.

51

52

Constants:

53

- SPARK_ES_NODES = "spark.es.nodes"

54

- SPARK_ES_PORT = "spark.es.port"

55

- ES_NODES_PATH_PREFIX = "es.nodes.path.prefix"

56

- ES_NODES_DISCOVERY = "es.nodes.discovery"

57

- ES_NODES_CLIENT_ONLY = "es.nodes.client.only"

58

- ES_NODES_DATA_ONLY = "es.nodes.data.only"

59

- ES_NODES_INGEST_ONLY = "es.nodes.ingest.only"

60

- ES_NODES_WAN_ONLY = "es.nodes.wan.only"

61

- ES_HTTP_TIMEOUT = "es.http.timeout"

62

- ES_HTTP_RETRIES = "es.http.retries"

63

- ES_NET_HTTP_AUTH_USER = "es.net.http.auth.user"

64

- ES_NET_HTTP_AUTH_PASS = "es.net.http.auth.pass"

65

"""

66

67

def __init__(self) -> None:

68

"""Initialize with spark-elasticsearch.properties"""

69

...

70

71

def spark_es_nodes(self) -> str:

72

"""List of Elasticsearch nodes (default: "localhost")"""

73

...

74

75

def spark_es_port(self) -> str:

76

"""HTTP/REST port (default: "9200")"""

77

...

78

79

def es_nodes_path_prefix(self) -> str:

80

"""Prefix for requests"""

81

...

82

83

def es_nodes_discovery(self) -> str:

84

"""Node discovery setting"""

85

...

86

87

def es_nodes_client_only(self) -> str:

88

"""Client nodes only setting"""

89

...

90

91

def es_nodes_data_only(self) -> str:

92

"""Data nodes only setting"""

93

...

94

95

def es_nodes_ingest_only(self) -> str:

96

"""Ingest nodes only setting"""

97

...

98

99

def es_nodes_wan_only(self) -> str:

100

"""WAN only setting"""

101

...

102

103

def es_http_timeout(self) -> str:

104

"""HTTP timeout setting"""

105

...

106

107

def es_http_retries(self) -> str:

108

"""HTTP retries setting"""

109

...

110

111

def es_net_http_auth_user(self) -> str:

112

"""Basic auth username"""

113

...

114

115

def es_net_http_auth_pass(self) -> str:

116

"""Basic auth password"""

117

...

118

119

def get_es_configs(self) -> dict:

120

"""Returns all Elasticsearch configurations"""

121

...

122

123

def add_optional_config(self, configs: dict, config_key: str, config_value: str) -> None:

124

"""Adds optional configuration"""

125

...

126

```

127

128

### Spark Neo4j Configuration

129

130

Advanced configuration management for Neo4j graph database integration with Spark including authentication methods, encryption settings, and connection optimization for graph analytics workflows.

131

132

```python { .api }

133

class SparkNeo4jConfig:

134

"""

135

Configurations for Spark Neo4j support.

136

137

Constants:

138

- URL = "url"

139

- AUTHENTICATION_TYPE = "authentication.type"

140

- AUTHENTICATION_BASIC_USERNAME = "authentication.basic.username"

141

- AUTHENTICATION_BASIC_PASSWORD = "authentication.basic.password"

142

- AUTHENTICATION_KERBEROS_TICKET = "authentication.kerberos.ticket"

143

- AUTHENTICATION_CUSTOM_PRINCIPAL = "authentication.custom.principal"

144

- AUTHENTICATION_CUSTOM_CREDENTIALS = "authentication.custom.credentials"

145

- AUTHENTICATION_CUSTOM_REALM = "authentication.custom.realm"

146

- ENCRYPTION_ENABLED = "encryption.enabled"

147

- ENCRYPTION_TRUST_STRATEGY = "encryption.trust.strategy"

148

- ENCRYPTION_CA_CERTIFICATE_PATH = "encryption.ca.certificate.path"

149

- CONNECTION_MAX_LIFETIME_MSECS = "connection.max.lifetime.msecs"

150

- CONNECTION_LIVENESS_TIMEOUT_MSECS = "connection.liveness.timeout.msecs"

151

- CONNECTION_ACQUISITION_TIMEOUT_MSECS = "connection.acquisition.timeout.msecs"

152

- CONNECTION_TIMEOUT_MSECS = "connection.timeout.msecs"

153

- NEO4J_FORMAT = "org.neo4j.spark.DataSource"

154

- LABELS_OPTION = "labels"

155

"""

156

157

def __init__(self) -> None:

158

"""Initialize with spark-neo4j.properties"""

159

...

160

161

def url(self) -> str:

162

"""Neo4j instance URL (default: "bolt://neo4j:7687")"""

163

...

164

165

def authentication_type(self) -> str:

166

"""Authentication method (default: "basic")"""

167

...

168

169

def authentication_basic_username(self) -> str:

170

"""Basic auth username (default: "neo4j")"""

171

...

172

173

def authentication_basic_password(self) -> str:

174

"""Basic auth password (default: "p455w0rd")"""

175

...

176

177

def authentication_kerberos_ticket(self) -> str:

178

"""Kerberos ticket"""

179

...

180

181

def authentication_custom_principal(self) -> str:

182

"""Custom principal"""

183

...

184

185

def authentication_custom_credentials(self) -> str:

186

"""Custom credentials"""

187

...

188

189

def authentication_custom_realm(self) -> str:

190

"""Custom realm"""

191

...

192

193

def encryption_enabled(self) -> str:

194

"""Encryption enabled setting"""

195

...

196

197

def encryption_trust_strategy(self) -> str:

198

"""Trust strategy setting"""

199

...

200

201

def encryption_ca_certificate_path(self) -> str:

202

"""Certificate path"""

203

...

204

205

def connection_max_lifetime_msecs(self) -> str:

206

"""Connection lifetime"""

207

...

208

209

def connection_liveness_timeout_msecs(self) -> str:

210

"""Liveness timeout"""

211

...

212

213

def connection_acquisition_timeout_msecs(self) -> str:

214

"""Acquisition timeout"""

215

...

216

217

def connection_timeout_msecs(self) -> str:

218

"""Connection timeout"""

219

...

220

221

def get_spark_options(self) -> Dict[str, str]:

222

"""Returns spark options for Neo4j"""

223

...

224

```

225

226

### Messaging Configuration

227

228

Configuration management for Kafka messaging systems providing standardized connection settings for distributed messaging and event streaming in data processing pipelines.

229

230

```python { .api }

231

class MessagingConfig:

232

"""

233

Configurations for messaging connections.

234

"""

235

236

def __init__(self) -> None:

237

"""Initialize with messaging.properties"""

238

...

239

240

def server(self) -> str:

241

"""Returns server address (default: "kafka-cluster:9093")"""

242

...

243

244

def metadata_topic(self) -> str:

245

"""Returns topic for metadata (default: "metadata-ingest")"""

246

...

247

```

248

249

## Usage Examples

250

251

### Basic Database Configuration

252

253

```python

254

from aissemble_core_config import SparkRDBMSConfig

255

from pyspark.sql import SparkSession

256

257

# Initialize database configuration

258

db_config = SparkRDBMSConfig()

259

260

# Create Spark session with database connectivity

261

spark = SparkSession.builder \

262

.appName("DataProcessingJob") \

263

.config("spark.jars.packages", "org.postgresql:postgresql:42.5.0") \

264

.getOrCreate()

265

266

# Read data from PostgreSQL

267

df = spark.read \

268

.format("jdbc") \

269

.option("url", db_config.jdbc_url()) \

270

.option("driver", db_config.jdbc_driver()) \

271

.option("dbtable", "customer_transactions") \

272

.option("user", db_config.user()) \

273

.option("password", db_config.password()) \

274

.load()

275

276

print(f"Connected to database: {db_config.jdbc_url()}")

277

print(f"Loaded {df.count()} records")

278

279

# Write processed data back

280

processed_df.write \

281

.format("jdbc") \

282

.option("url", db_config.jdbc_url()) \

283

.option("driver", db_config.jdbc_driver()) \

284

.option("dbtable", "processed_transactions") \

285

.option("user", db_config.user()) \

286

.option("password", db_config.password()) \

287

.mode("append") \

288

.save()

289

```

290

291

### Elasticsearch Integration

292

293

```python

294

from aissemble_core_config import SparkElasticsearchConfig

295

from pyspark.sql import SparkSession

296

297

# Initialize Elasticsearch configuration

298

es_config = SparkElasticsearchConfig()

299

300

# Create Spark session with Elasticsearch support

301

spark = SparkSession.builder \

302

.appName("ElasticsearchAnalytics") \

303

.config("spark.jars.packages", "org.elasticsearch:elasticsearch-spark-30_2.12:8.8.0") \

304

.getOrCreate()

305

306

# Get all Elasticsearch configurations

307

es_configs = es_config.get_es_configs()

308

309

# Read data from Elasticsearch

310

df = spark.read \

311

.format("org.elasticsearch.spark.sql") \

312

.options(**es_configs) \

313

.option("es.resource", "logs-2024/doc") \

314

.option("es.query", '{"query":{"range":{"timestamp":{"gte":"2024-01-01"}}}}') \

315

.load()

316

317

print(f"Connected to Elasticsearch: {es_config.spark_es_nodes()}:{es_config.spark_es_port()}")

318

print(f"Loaded {df.count()} log records")

319

320

# Write aggregated results back to Elasticsearch

321

aggregated_df.write \

322

.format("org.elasticsearch.spark.sql") \

323

.options(**es_configs) \

324

.option("es.resource", "analytics-results/doc") \

325

.option("es.mapping.id", "result_id") \

326

.mode("append") \

327

.save()

328

```

329

330

### Neo4j Graph Analytics

331

332

```python

333

from aissemble_core_config import SparkNeo4jConfig

334

from pyspark.sql import SparkSession

335

336

# Initialize Neo4j configuration

337

neo4j_config = SparkNeo4jConfig()

338

339

# Create Spark session with Neo4j connector

340

spark = SparkSession.builder \

341

.appName("GraphAnalytics") \

342

.config("spark.jars.packages", "org.neo4j:neo4j-connector-apache-spark_2.12:5.0.1_for_spark_3") \

343

.getOrCreate()

344

345

# Get Neo4j connection options

346

neo4j_options = neo4j_config.get_spark_options()

347

348

# Read nodes from Neo4j

349

users_df = spark.read \

350

.format(SparkNeo4jConfig.NEO4J_FORMAT) \

351

.options(**neo4j_options) \

352

.option(SparkNeo4jConfig.LABELS_OPTION, "User") \

353

.load()

354

355

# Read relationships

356

relationships_df = spark.read \

357

.format(SparkNeo4jConfig.NEO4J_FORMAT) \

358

.options(**neo4j_options) \

359

.option("relationship", "FRIENDS_WITH") \

360

.option("relationship.source.labels", "User") \

361

.option("relationship.target.labels", "User") \

362

.load()

363

364

print(f"Connected to Neo4j: {neo4j_config.url()}")

365

print(f"Loaded {users_df.count()} users and {relationships_df.count()} relationships")

366

367

# Analyze graph structure

368

degree_centrality = relationships_df \

369

.groupBy("source.id") \

370

.count() \

371

.withColumnRenamed("count", "degree") \

372

.orderBy("degree", ascending=False)

373

374

# Write analysis results back to Neo4j

375

degree_centrality.write \

376

.format(SparkNeo4jConfig.NEO4J_FORMAT) \

377

.options(**neo4j_options) \

378

.option(SparkNeo4jConfig.LABELS_OPTION, "UserAnalytics") \

379

.option("node.keys", "user_id") \

380

.mode("overwrite") \

381

.save()

382

```

383

384

### Kafka Messaging Integration

385

386

```python

387

from aissemble_core_config import MessagingConfig

388

from kafka import KafkaProducer, KafkaConsumer

389

import json

390

391

# Initialize messaging configuration

392

messaging_config = MessagingConfig()

393

394

# Create Kafka producer

395

producer = KafkaProducer(

396

bootstrap_servers=[messaging_config.server()],

397

value_serializer=lambda v: json.dumps(v).encode('utf-8')

398

)

399

400

# Send metadata to Kafka

401

metadata_message = {

402

"pipeline_id": "data-processing-001",

403

"status": "started",

404

"timestamp": "2024-09-05T10:30:00Z",

405

"records_processed": 0

406

}

407

408

producer.send(messaging_config.metadata_topic(), metadata_message)

409

producer.flush()

410

411

print(f"Sent message to Kafka server: {messaging_config.server()}")

412

print(f"Topic: {messaging_config.metadata_topic()}")

413

414

# Create Kafka consumer for monitoring

415

consumer = KafkaConsumer(

416

messaging_config.metadata_topic(),

417

bootstrap_servers=[messaging_config.server()],

418

value_deserializer=lambda m: json.loads(m.decode('utf-8')),

419

auto_offset_reset='latest'

420

)

421

422

# Monitor metadata messages

423

for message in consumer:

424

metadata = message.value

425

print(f"Received metadata: {metadata['pipeline_id']} - {metadata['status']}")

426

427

# Process metadata message

428

if metadata['status'] == 'completed':

429

print(f"Pipeline completed, processed {metadata['records_processed']} records")

430

break

431

```

432

433

### Multi-Database Configuration Manager

434

435

```python

436

from aissemble_core_config import SparkRDBMSConfig, SparkElasticsearchConfig, SparkNeo4jConfig, MessagingConfig

437

from pyspark.sql import SparkSession

438

439

class MultiDatabaseManager:

440

"""Utility class for managing multiple database configurations"""

441

442

def __init__(self):

443

self.rdbms_config = SparkRDBMSConfig()

444

self.es_config = SparkElasticsearchConfig()

445

self.neo4j_config = SparkNeo4jConfig()

446

self.messaging_config = MessagingConfig()

447

448

def create_spark_session(self, app_name: str) -> SparkSession:

449

"""Create Spark session with all database connectors"""

450

return SparkSession.builder \

451

.appName(app_name) \

452

.config("spark.jars.packages",

453

"org.postgresql:postgresql:42.5.0,"

454

"org.elasticsearch:elasticsearch-spark-30_2.12:8.8.0,"

455

"org.neo4j:neo4j-connector-apache-spark_2.12:5.0.1_for_spark_3") \

456

.getOrCreate()

457

458

def get_all_configs(self) -> dict:

459

"""Get unified configuration dictionary"""

460

return {

461

"rdbms": {

462

"url": self.rdbms_config.jdbc_url(),

463

"driver": self.rdbms_config.jdbc_driver(),

464

"user": self.rdbms_config.user(),

465

"password": self.rdbms_config.password()

466

},

467

"elasticsearch": self.es_config.get_es_configs(),

468

"neo4j": self.neo4j_config.get_spark_options(),

469

"messaging": {

470

"server": self.messaging_config.server(),

471

"metadata_topic": self.messaging_config.metadata_topic()

472

}

473

}

474

475

def test_connections(self):

476

"""Test connectivity to all configured systems"""

477

configs = self.get_all_configs()

478

479

print("Configuration Summary:")

480

print(f"PostgreSQL: {configs['rdbms']['url']}")

481

print(f"Elasticsearch: {self.es_config.spark_es_nodes()}:{self.es_config.spark_es_port()}")

482

print(f"Neo4j: {self.neo4j_config.url()}")

483

print(f"Kafka: {configs['messaging']['server']}")

484

485

# Usage example

486

db_manager = MultiDatabaseManager()

487

spark = db_manager.create_spark_session("MultiDatabaseApp")

488

489

# Get all configurations

490

all_configs = db_manager.get_all_configs()

491

print("All database configurations loaded successfully")

492

493

# Test connections

494

db_manager.test_connections()

495

```

496

497

## Best Practices

498

499

### Configuration Management

500

- Use property files for environment-specific settings

501

- Override with environment variables for containerized deployments

502

- Validate configurations before creating Spark sessions

503

- Use connection pooling for high-throughput applications

504

505

### Security Considerations

506

- Store sensitive credentials in secure configuration systems

507

- Use encrypted connections for production deployments

508

- Implement proper authentication for all database connections

509

- Regular credential rotation and access reviews

510

511

### Performance Optimization

512

- Configure connection timeouts appropriately

513

- Use batch operations for bulk data processing

514

- Monitor connection pool utilization

515

- Implement retry logic for transient failures