0
# Configuration Management
1
2
Comprehensive configuration management for Spark clusters and database connections supporting PostgreSQL, Elasticsearch, Neo4j, and messaging systems. The configuration framework provides property-based settings with environment variable overrides and standardized connection patterns for enterprise data infrastructure.
3
4
## Capabilities
5
6
### Spark RDBMS Configuration
7
8
Manages configuration for PySpark connections to relational database management systems with JDBC drivers, providing standardized database connectivity for data processing workflows.
9
10
```python { .api }
11
class SparkRDBMSConfig:
12
"""
13
Configurations for PySpark Relational Database Management System support.
14
15
Constants:
16
- DEFAULT_JDBC_URL = "jdbc:postgresql://postgres:5432/db"
17
- DEFAULT_JDBC_DRIVER = "org.postgresql.Driver"
18
- DEFAULT_USER = "postgres"
19
- DEFAULT_PASSWORD = "password"
20
"""
21
22
def __init__(self) -> None:
23
"""Initialize with spark-rdbms.properties"""
24
...
25
26
def jdbc_url(self) -> str:
27
"""JDBC URL for database connection"""
28
...
29
30
def jdbc_driver(self) -> str:
31
"""JDBC driver class name"""
32
...
33
34
def user(self) -> str:
35
"""RDBMS user"""
36
...
37
38
def password(self) -> str:
39
"""RDBMS user password"""
40
...
41
```
42
43
### Spark Elasticsearch Configuration
44
45
Comprehensive Elasticsearch integration for Spark with support for cluster discovery, authentication, security settings, and performance tuning options for large-scale search and analytics workloads.
46
47
```python { .api }
48
class SparkElasticsearchConfig:
49
"""
50
Configurations for PySpark Elasticsearch support.
51
52
Constants:
53
- SPARK_ES_NODES = "spark.es.nodes"
54
- SPARK_ES_PORT = "spark.es.port"
55
- ES_NODES_PATH_PREFIX = "es.nodes.path.prefix"
56
- ES_NODES_DISCOVERY = "es.nodes.discovery"
57
- ES_NODES_CLIENT_ONLY = "es.nodes.client.only"
58
- ES_NODES_DATA_ONLY = "es.nodes.data.only"
59
- ES_NODES_INGEST_ONLY = "es.nodes.ingest.only"
60
- ES_NODES_WAN_ONLY = "es.nodes.wan.only"
61
- ES_HTTP_TIMEOUT = "es.http.timeout"
62
- ES_HTTP_RETRIES = "es.http.retries"
63
- ES_NET_HTTP_AUTH_USER = "es.net.http.auth.user"
64
- ES_NET_HTTP_AUTH_PASS = "es.net.http.auth.pass"
65
"""
66
67
def __init__(self) -> None:
68
"""Initialize with spark-elasticsearch.properties"""
69
...
70
71
def spark_es_nodes(self) -> str:
72
"""List of Elasticsearch nodes (default: "localhost")"""
73
...
74
75
def spark_es_port(self) -> str:
76
"""HTTP/REST port (default: "9200")"""
77
...
78
79
def es_nodes_path_prefix(self) -> str:
80
"""Prefix for requests"""
81
...
82
83
def es_nodes_discovery(self) -> str:
84
"""Node discovery setting"""
85
...
86
87
def es_nodes_client_only(self) -> str:
88
"""Client nodes only setting"""
89
...
90
91
def es_nodes_data_only(self) -> str:
92
"""Data nodes only setting"""
93
...
94
95
def es_nodes_ingest_only(self) -> str:
96
"""Ingest nodes only setting"""
97
...
98
99
def es_nodes_wan_only(self) -> str:
100
"""WAN only setting"""
101
...
102
103
def es_http_timeout(self) -> str:
104
"""HTTP timeout setting"""
105
...
106
107
def es_http_retries(self) -> str:
108
"""HTTP retries setting"""
109
...
110
111
def es_net_http_auth_user(self) -> str:
112
"""Basic auth username"""
113
...
114
115
def es_net_http_auth_pass(self) -> str:
116
"""Basic auth password"""
117
...
118
119
def get_es_configs(self) -> dict:
120
"""Returns all Elasticsearch configurations"""
121
...
122
123
def add_optional_config(self, configs: dict, config_key: str, config_value: str) -> None:
124
"""Adds optional configuration"""
125
...
126
```
127
128
### Spark Neo4j Configuration
129
130
Advanced configuration management for Neo4j graph database integration with Spark including authentication methods, encryption settings, and connection optimization for graph analytics workflows.
131
132
```python { .api }
133
class SparkNeo4jConfig:
134
"""
135
Configurations for Spark Neo4j support.
136
137
Constants:
138
- URL = "url"
139
- AUTHENTICATION_TYPE = "authentication.type"
140
- AUTHENTICATION_BASIC_USERNAME = "authentication.basic.username"
141
- AUTHENTICATION_BASIC_PASSWORD = "authentication.basic.password"
142
- AUTHENTICATION_KERBEROS_TICKET = "authentication.kerberos.ticket"
143
- AUTHENTICATION_CUSTOM_PRINCIPAL = "authentication.custom.principal"
144
- AUTHENTICATION_CUSTOM_CREDENTIALS = "authentication.custom.credentials"
145
- AUTHENTICATION_CUSTOM_REALM = "authentication.custom.realm"
146
- ENCRYPTION_ENABLED = "encryption.enabled"
147
- ENCRYPTION_TRUST_STRATEGY = "encryption.trust.strategy"
148
- ENCRYPTION_CA_CERTIFICATE_PATH = "encryption.ca.certificate.path"
149
- CONNECTION_MAX_LIFETIME_MSECS = "connection.max.lifetime.msecs"
150
- CONNECTION_LIVENESS_TIMEOUT_MSECS = "connection.liveness.timeout.msecs"
151
- CONNECTION_ACQUISITION_TIMEOUT_MSECS = "connection.acquisition.timeout.msecs"
152
- CONNECTION_TIMEOUT_MSECS = "connection.timeout.msecs"
153
- NEO4J_FORMAT = "org.neo4j.spark.DataSource"
154
- LABELS_OPTION = "labels"
155
"""
156
157
def __init__(self) -> None:
158
"""Initialize with spark-neo4j.properties"""
159
...
160
161
def url(self) -> str:
162
"""Neo4j instance URL (default: "bolt://neo4j:7687")"""
163
...
164
165
def authentication_type(self) -> str:
166
"""Authentication method (default: "basic")"""
167
...
168
169
def authentication_basic_username(self) -> str:
170
"""Basic auth username (default: "neo4j")"""
171
...
172
173
def authentication_basic_password(self) -> str:
174
"""Basic auth password (default: "p455w0rd")"""
175
...
176
177
def authentication_kerberos_ticket(self) -> str:
178
"""Kerberos ticket"""
179
...
180
181
def authentication_custom_principal(self) -> str:
182
"""Custom principal"""
183
...
184
185
def authentication_custom_credentials(self) -> str:
186
"""Custom credentials"""
187
...
188
189
def authentication_custom_realm(self) -> str:
190
"""Custom realm"""
191
...
192
193
def encryption_enabled(self) -> str:
194
"""Encryption enabled setting"""
195
...
196
197
def encryption_trust_strategy(self) -> str:
198
"""Trust strategy setting"""
199
...
200
201
def encryption_ca_certificate_path(self) -> str:
202
"""Certificate path"""
203
...
204
205
def connection_max_lifetime_msecs(self) -> str:
206
"""Connection lifetime"""
207
...
208
209
def connection_liveness_timeout_msecs(self) -> str:
210
"""Liveness timeout"""
211
...
212
213
def connection_acquisition_timeout_msecs(self) -> str:
214
"""Acquisition timeout"""
215
...
216
217
def connection_timeout_msecs(self) -> str:
218
"""Connection timeout"""
219
...
220
221
def get_spark_options(self) -> Dict[str, str]:
222
"""Returns spark options for Neo4j"""
223
...
224
```
225
226
### Messaging Configuration
227
228
Configuration management for Kafka messaging systems providing standardized connection settings for distributed messaging and event streaming in data processing pipelines.
229
230
```python { .api }
231
class MessagingConfig:
232
"""
233
Configurations for messaging connections.
234
"""
235
236
def __init__(self) -> None:
237
"""Initialize with messaging.properties"""
238
...
239
240
def server(self) -> str:
241
"""Returns server address (default: "kafka-cluster:9093")"""
242
...
243
244
def metadata_topic(self) -> str:
245
"""Returns topic for metadata (default: "metadata-ingest")"""
246
...
247
```
248
249
## Usage Examples
250
251
### Basic Database Configuration
252
253
```python
254
from aissemble_core_config import SparkRDBMSConfig
255
from pyspark.sql import SparkSession
256
257
# Initialize database configuration
258
db_config = SparkRDBMSConfig()
259
260
# Create Spark session with database connectivity
261
spark = SparkSession.builder \
262
.appName("DataProcessingJob") \
263
.config("spark.jars.packages", "org.postgresql:postgresql:42.5.0") \
264
.getOrCreate()
265
266
# Read data from PostgreSQL
267
df = spark.read \
268
.format("jdbc") \
269
.option("url", db_config.jdbc_url()) \
270
.option("driver", db_config.jdbc_driver()) \
271
.option("dbtable", "customer_transactions") \
272
.option("user", db_config.user()) \
273
.option("password", db_config.password()) \
274
.load()
275
276
print(f"Connected to database: {db_config.jdbc_url()}")
277
print(f"Loaded {df.count()} records")
278
279
# Write processed data back
280
processed_df.write \
281
.format("jdbc") \
282
.option("url", db_config.jdbc_url()) \
283
.option("driver", db_config.jdbc_driver()) \
284
.option("dbtable", "processed_transactions") \
285
.option("user", db_config.user()) \
286
.option("password", db_config.password()) \
287
.mode("append") \
288
.save()
289
```
290
291
### Elasticsearch Integration
292
293
```python
294
from aissemble_core_config import SparkElasticsearchConfig
295
from pyspark.sql import SparkSession
296
297
# Initialize Elasticsearch configuration
298
es_config = SparkElasticsearchConfig()
299
300
# Create Spark session with Elasticsearch support
301
spark = SparkSession.builder \
302
.appName("ElasticsearchAnalytics") \
303
.config("spark.jars.packages", "org.elasticsearch:elasticsearch-spark-30_2.12:8.8.0") \
304
.getOrCreate()
305
306
# Get all Elasticsearch configurations
307
es_configs = es_config.get_es_configs()
308
309
# Read data from Elasticsearch
310
df = spark.read \
311
.format("org.elasticsearch.spark.sql") \
312
.options(**es_configs) \
313
.option("es.resource", "logs-2024/doc") \
314
.option("es.query", '{"query":{"range":{"timestamp":{"gte":"2024-01-01"}}}}') \
315
.load()
316
317
print(f"Connected to Elasticsearch: {es_config.spark_es_nodes()}:{es_config.spark_es_port()}")
318
print(f"Loaded {df.count()} log records")
319
320
# Write aggregated results back to Elasticsearch
321
aggregated_df.write \
322
.format("org.elasticsearch.spark.sql") \
323
.options(**es_configs) \
324
.option("es.resource", "analytics-results/doc") \
325
.option("es.mapping.id", "result_id") \
326
.mode("append") \
327
.save()
328
```
329
330
### Neo4j Graph Analytics
331
332
```python
333
from aissemble_core_config import SparkNeo4jConfig
334
from pyspark.sql import SparkSession
335
336
# Initialize Neo4j configuration
337
neo4j_config = SparkNeo4jConfig()
338
339
# Create Spark session with Neo4j connector
340
spark = SparkSession.builder \
341
.appName("GraphAnalytics") \
342
.config("spark.jars.packages", "org.neo4j:neo4j-connector-apache-spark_2.12:5.0.1_for_spark_3") \
343
.getOrCreate()
344
345
# Get Neo4j connection options
346
neo4j_options = neo4j_config.get_spark_options()
347
348
# Read nodes from Neo4j
349
users_df = spark.read \
350
.format(SparkNeo4jConfig.NEO4J_FORMAT) \
351
.options(**neo4j_options) \
352
.option(SparkNeo4jConfig.LABELS_OPTION, "User") \
353
.load()
354
355
# Read relationships
356
relationships_df = spark.read \
357
.format(SparkNeo4jConfig.NEO4J_FORMAT) \
358
.options(**neo4j_options) \
359
.option("relationship", "FRIENDS_WITH") \
360
.option("relationship.source.labels", "User") \
361
.option("relationship.target.labels", "User") \
362
.load()
363
364
print(f"Connected to Neo4j: {neo4j_config.url()}")
365
print(f"Loaded {users_df.count()} users and {relationships_df.count()} relationships")
366
367
# Analyze graph structure
368
degree_centrality = relationships_df \
369
.groupBy("source.id") \
370
.count() \
371
.withColumnRenamed("count", "degree") \
372
.orderBy("degree", ascending=False)
373
374
# Write analysis results back to Neo4j
375
degree_centrality.write \
376
.format(SparkNeo4jConfig.NEO4J_FORMAT) \
377
.options(**neo4j_options) \
378
.option(SparkNeo4jConfig.LABELS_OPTION, "UserAnalytics") \
379
.option("node.keys", "user_id") \
380
.mode("overwrite") \
381
.save()
382
```
383
384
### Kafka Messaging Integration
385
386
```python
387
from aissemble_core_config import MessagingConfig
388
from kafka import KafkaProducer, KafkaConsumer
389
import json
390
391
# Initialize messaging configuration
392
messaging_config = MessagingConfig()
393
394
# Create Kafka producer
395
producer = KafkaProducer(
396
bootstrap_servers=[messaging_config.server()],
397
value_serializer=lambda v: json.dumps(v).encode('utf-8')
398
)
399
400
# Send metadata to Kafka
401
metadata_message = {
402
"pipeline_id": "data-processing-001",
403
"status": "started",
404
"timestamp": "2024-09-05T10:30:00Z",
405
"records_processed": 0
406
}
407
408
producer.send(messaging_config.metadata_topic(), metadata_message)
409
producer.flush()
410
411
print(f"Sent message to Kafka server: {messaging_config.server()}")
412
print(f"Topic: {messaging_config.metadata_topic()}")
413
414
# Create Kafka consumer for monitoring
415
consumer = KafkaConsumer(
416
messaging_config.metadata_topic(),
417
bootstrap_servers=[messaging_config.server()],
418
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
419
auto_offset_reset='latest'
420
)
421
422
# Monitor metadata messages
423
for message in consumer:
424
metadata = message.value
425
print(f"Received metadata: {metadata['pipeline_id']} - {metadata['status']}")
426
427
# Process metadata message
428
if metadata['status'] == 'completed':
429
print(f"Pipeline completed, processed {metadata['records_processed']} records")
430
break
431
```
432
433
### Multi-Database Configuration Manager
434
435
```python
436
from aissemble_core_config import SparkRDBMSConfig, SparkElasticsearchConfig, SparkNeo4jConfig, MessagingConfig
437
from pyspark.sql import SparkSession
438
439
class MultiDatabaseManager:
440
"""Utility class for managing multiple database configurations"""
441
442
def __init__(self):
443
self.rdbms_config = SparkRDBMSConfig()
444
self.es_config = SparkElasticsearchConfig()
445
self.neo4j_config = SparkNeo4jConfig()
446
self.messaging_config = MessagingConfig()
447
448
def create_spark_session(self, app_name: str) -> SparkSession:
449
"""Create Spark session with all database connectors"""
450
return SparkSession.builder \
451
.appName(app_name) \
452
.config("spark.jars.packages",
453
"org.postgresql:postgresql:42.5.0,"
454
"org.elasticsearch:elasticsearch-spark-30_2.12:8.8.0,"
455
"org.neo4j:neo4j-connector-apache-spark_2.12:5.0.1_for_spark_3") \
456
.getOrCreate()
457
458
def get_all_configs(self) -> dict:
459
"""Get unified configuration dictionary"""
460
return {
461
"rdbms": {
462
"url": self.rdbms_config.jdbc_url(),
463
"driver": self.rdbms_config.jdbc_driver(),
464
"user": self.rdbms_config.user(),
465
"password": self.rdbms_config.password()
466
},
467
"elasticsearch": self.es_config.get_es_configs(),
468
"neo4j": self.neo4j_config.get_spark_options(),
469
"messaging": {
470
"server": self.messaging_config.server(),
471
"metadata_topic": self.messaging_config.metadata_topic()
472
}
473
}
474
475
def test_connections(self):
476
"""Test connectivity to all configured systems"""
477
configs = self.get_all_configs()
478
479
print("Configuration Summary:")
480
print(f"PostgreSQL: {configs['rdbms']['url']}")
481
print(f"Elasticsearch: {self.es_config.spark_es_nodes()}:{self.es_config.spark_es_port()}")
482
print(f"Neo4j: {self.neo4j_config.url()}")
483
print(f"Kafka: {configs['messaging']['server']}")
484
485
# Usage example
486
db_manager = MultiDatabaseManager()
487
spark = db_manager.create_spark_session("MultiDatabaseApp")
488
489
# Get all configurations
490
all_configs = db_manager.get_all_configs()
491
print("All database configurations loaded successfully")
492
493
# Test connections
494
db_manager.test_connections()
495
```
496
497
## Best Practices
498
499
### Configuration Management
500
- Use property files for environment-specific settings
501
- Override with environment variables for containerized deployments
502
- Validate configurations before creating Spark sessions
503
- Use connection pooling for high-throughput applications
504
505
### Security Considerations
506
- Store sensitive credentials in secure configuration systems
507
- Use encrypted connections for production deployments
508
- Implement proper authentication for all database connections
509
- Regular credential rotation and access reviews
510
511
### Performance Optimization
512
- Configure connection timeouts appropriately
513
- Use batch operations for bulk data processing
514
- Monitor connection pool utilization
515
- Implement retry logic for transient failures