CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-aissemble-foundation-core-python

Core foundational classes and utilities for the aiSSEMBLE platform, providing authentication, metadata management, configuration, file storage, and policy management capabilities.

Pending
Overview
Eval results
Files

config.mddocs/

Configuration Management

Comprehensive configuration management for Spark clusters and database connections supporting PostgreSQL, Elasticsearch, Neo4j, and messaging systems. The configuration framework provides property-based settings with environment variable overrides and standardized connection patterns for enterprise data infrastructure.

Capabilities

Spark RDBMS Configuration

Manages configuration for PySpark connections to relational database management systems with JDBC drivers, providing standardized database connectivity for data processing workflows.

class SparkRDBMSConfig:
    """
    Configurations for PySpark Relational Database Management System support.
    
    Constants:
    - DEFAULT_JDBC_URL = "jdbc:postgresql://postgres:5432/db"
    - DEFAULT_JDBC_DRIVER = "org.postgresql.Driver"
    - DEFAULT_USER = "postgres"
    - DEFAULT_PASSWORD = "password"
    """
    
    def __init__(self) -> None:
        """Initialize with spark-rdbms.properties"""
        ...
    
    def jdbc_url(self) -> str:
        """JDBC URL for database connection"""
        ...
    
    def jdbc_driver(self) -> str:
        """JDBC driver class name"""
        ...
    
    def user(self) -> str:
        """RDBMS user"""
        ...
    
    def password(self) -> str:
        """RDBMS user password"""
        ...

Spark Elasticsearch Configuration

Comprehensive Elasticsearch integration for Spark with support for cluster discovery, authentication, security settings, and performance tuning options for large-scale search and analytics workloads.

class SparkElasticsearchConfig:
    """
    Configurations for PySpark Elasticsearch support.
    
    Constants:
    - SPARK_ES_NODES = "spark.es.nodes"
    - SPARK_ES_PORT = "spark.es.port"
    - ES_NODES_PATH_PREFIX = "es.nodes.path.prefix"
    - ES_NODES_DISCOVERY = "es.nodes.discovery"
    - ES_NODES_CLIENT_ONLY = "es.nodes.client.only"
    - ES_NODES_DATA_ONLY = "es.nodes.data.only"
    - ES_NODES_INGEST_ONLY = "es.nodes.ingest.only"
    - ES_NODES_WAN_ONLY = "es.nodes.wan.only"
    - ES_HTTP_TIMEOUT = "es.http.timeout"
    - ES_HTTP_RETRIES = "es.http.retries"
    - ES_NET_HTTP_AUTH_USER = "es.net.http.auth.user"
    - ES_NET_HTTP_AUTH_PASS = "es.net.http.auth.pass"
    """
    
    def __init__(self) -> None:
        """Initialize with spark-elasticsearch.properties"""
        ...
    
    def spark_es_nodes(self) -> str:
        """List of Elasticsearch nodes (default: "localhost")"""
        ...
    
    def spark_es_port(self) -> str:
        """HTTP/REST port (default: "9200")"""
        ...
    
    def es_nodes_path_prefix(self) -> str:
        """Prefix for requests"""
        ...
    
    def es_nodes_discovery(self) -> str:
        """Node discovery setting"""
        ...
    
    def es_nodes_client_only(self) -> str:
        """Client nodes only setting"""
        ...
    
    def es_nodes_data_only(self) -> str:
        """Data nodes only setting"""
        ...
    
    def es_nodes_ingest_only(self) -> str:
        """Ingest nodes only setting"""
        ...
    
    def es_nodes_wan_only(self) -> str:
        """WAN only setting"""
        ...
    
    def es_http_timeout(self) -> str:
        """HTTP timeout setting"""
        ...
    
    def es_http_retries(self) -> str:
        """HTTP retries setting"""
        ...
    
    def es_net_http_auth_user(self) -> str:
        """Basic auth username"""
        ...
    
    def es_net_http_auth_pass(self) -> str:
        """Basic auth password"""
        ...
    
    def get_es_configs(self) -> dict:
        """Returns all Elasticsearch configurations"""
        ...
    
    def add_optional_config(self, configs: dict, config_key: str, config_value: str) -> None:
        """Adds optional configuration"""
        ...

Spark Neo4j Configuration

Advanced configuration management for Neo4j graph database integration with Spark including authentication methods, encryption settings, and connection optimization for graph analytics workflows.

class SparkNeo4jConfig:
    """
    Configurations for Spark Neo4j support.
    
    Constants:
    - URL = "url"
    - AUTHENTICATION_TYPE = "authentication.type"
    - AUTHENTICATION_BASIC_USERNAME = "authentication.basic.username"
    - AUTHENTICATION_BASIC_PASSWORD = "authentication.basic.password"
    - AUTHENTICATION_KERBEROS_TICKET = "authentication.kerberos.ticket"
    - AUTHENTICATION_CUSTOM_PRINCIPAL = "authentication.custom.principal"
    - AUTHENTICATION_CUSTOM_CREDENTIALS = "authentication.custom.credentials"
    - AUTHENTICATION_CUSTOM_REALM = "authentication.custom.realm"
    - ENCRYPTION_ENABLED = "encryption.enabled"
    - ENCRYPTION_TRUST_STRATEGY = "encryption.trust.strategy"
    - ENCRYPTION_CA_CERTIFICATE_PATH = "encryption.ca.certificate.path"
    - CONNECTION_MAX_LIFETIME_MSECS = "connection.max.lifetime.msecs"
    - CONNECTION_LIVENESS_TIMEOUT_MSECS = "connection.liveness.timeout.msecs"
    - CONNECTION_ACQUISITION_TIMEOUT_MSECS = "connection.acquisition.timeout.msecs"
    - CONNECTION_TIMEOUT_MSECS = "connection.timeout.msecs"
    - NEO4J_FORMAT = "org.neo4j.spark.DataSource"
    - LABELS_OPTION = "labels"
    """
    
    def __init__(self) -> None:
        """Initialize with spark-neo4j.properties"""
        ...
    
    def url(self) -> str:
        """Neo4j instance URL (default: "bolt://neo4j:7687")"""
        ...
    
    def authentication_type(self) -> str:
        """Authentication method (default: "basic")"""
        ...
    
    def authentication_basic_username(self) -> str:
        """Basic auth username (default: "neo4j")"""
        ...
    
    def authentication_basic_password(self) -> str:
        """Basic auth password (default: "p455w0rd")"""
        ...
    
    def authentication_kerberos_ticket(self) -> str:
        """Kerberos ticket"""
        ...
    
    def authentication_custom_principal(self) -> str:
        """Custom principal"""
        ...
    
    def authentication_custom_credentials(self) -> str:
        """Custom credentials"""
        ...
    
    def authentication_custom_realm(self) -> str:
        """Custom realm"""
        ...
    
    def encryption_enabled(self) -> str:
        """Encryption enabled setting"""
        ...
    
    def encryption_trust_strategy(self) -> str:
        """Trust strategy setting"""
        ...
    
    def encryption_ca_certificate_path(self) -> str:
        """Certificate path"""
        ...
    
    def connection_max_lifetime_msecs(self) -> str:
        """Connection lifetime"""
        ...
    
    def connection_liveness_timeout_msecs(self) -> str:
        """Liveness timeout"""
        ...
    
    def connection_acquisition_timeout_msecs(self) -> str:
        """Acquisition timeout"""
        ...
    
    def connection_timeout_msecs(self) -> str:
        """Connection timeout"""
        ...
    
    def get_spark_options(self) -> Dict[str, str]:
        """Returns spark options for Neo4j"""
        ...

Messaging Configuration

Configuration management for Kafka messaging systems providing standardized connection settings for distributed messaging and event streaming in data processing pipelines.

class MessagingConfig:
    """
    Configurations for messaging connections.
    """
    
    def __init__(self) -> None:
        """Initialize with messaging.properties"""
        ...
    
    def server(self) -> str:
        """Returns server address (default: "kafka-cluster:9093")"""
        ...
    
    def metadata_topic(self) -> str:
        """Returns topic for metadata (default: "metadata-ingest")"""
        ...

Usage Examples

Basic Database Configuration

from aissemble_core_config import SparkRDBMSConfig
from pyspark.sql import SparkSession

# Initialize database configuration
db_config = SparkRDBMSConfig()

# Create Spark session with database connectivity
spark = SparkSession.builder \
    .appName("DataProcessingJob") \
    .config("spark.jars.packages", "org.postgresql:postgresql:42.5.0") \
    .getOrCreate()

# Read data from PostgreSQL
df = spark.read \
    .format("jdbc") \
    .option("url", db_config.jdbc_url()) \
    .option("driver", db_config.jdbc_driver()) \
    .option("dbtable", "customer_transactions") \
    .option("user", db_config.user()) \
    .option("password", db_config.password()) \
    .load()

print(f"Connected to database: {db_config.jdbc_url()}")
print(f"Loaded {df.count()} records")

# Write processed data back
processed_df.write \
    .format("jdbc") \
    .option("url", db_config.jdbc_url()) \
    .option("driver", db_config.jdbc_driver()) \
    .option("dbtable", "processed_transactions") \
    .option("user", db_config.user()) \
    .option("password", db_config.password()) \
    .mode("append") \
    .save()

Elasticsearch Integration

from aissemble_core_config import SparkElasticsearchConfig
from pyspark.sql import SparkSession

# Initialize Elasticsearch configuration
es_config = SparkElasticsearchConfig()

# Create Spark session with Elasticsearch support
spark = SparkSession.builder \
    .appName("ElasticsearchAnalytics") \
    .config("spark.jars.packages", "org.elasticsearch:elasticsearch-spark-30_2.12:8.8.0") \
    .getOrCreate()

# Get all Elasticsearch configurations
es_configs = es_config.get_es_configs()

# Read data from Elasticsearch
df = spark.read \
    .format("org.elasticsearch.spark.sql") \
    .options(**es_configs) \
    .option("es.resource", "logs-2024/doc") \
    .option("es.query", '{"query":{"range":{"timestamp":{"gte":"2024-01-01"}}}}') \
    .load()

print(f"Connected to Elasticsearch: {es_config.spark_es_nodes()}:{es_config.spark_es_port()}")
print(f"Loaded {df.count()} log records")

# Write aggregated results back to Elasticsearch
aggregated_df.write \
    .format("org.elasticsearch.spark.sql") \
    .options(**es_configs) \
    .option("es.resource", "analytics-results/doc") \
    .option("es.mapping.id", "result_id") \
    .mode("append") \
    .save()

Neo4j Graph Analytics

from aissemble_core_config import SparkNeo4jConfig
from pyspark.sql import SparkSession

# Initialize Neo4j configuration
neo4j_config = SparkNeo4jConfig()

# Create Spark session with Neo4j connector
spark = SparkSession.builder \
    .appName("GraphAnalytics") \
    .config("spark.jars.packages", "org.neo4j:neo4j-connector-apache-spark_2.12:5.0.1_for_spark_3") \
    .getOrCreate()

# Get Neo4j connection options
neo4j_options = neo4j_config.get_spark_options()

# Read nodes from Neo4j
users_df = spark.read \
    .format(SparkNeo4jConfig.NEO4J_FORMAT) \
    .options(**neo4j_options) \
    .option(SparkNeo4jConfig.LABELS_OPTION, "User") \
    .load()

# Read relationships
relationships_df = spark.read \
    .format(SparkNeo4jConfig.NEO4J_FORMAT) \
    .options(**neo4j_options) \
    .option("relationship", "FRIENDS_WITH") \
    .option("relationship.source.labels", "User") \
    .option("relationship.target.labels", "User") \
    .load()

print(f"Connected to Neo4j: {neo4j_config.url()}")
print(f"Loaded {users_df.count()} users and {relationships_df.count()} relationships")

# Analyze graph structure
degree_centrality = relationships_df \
    .groupBy("source.id") \
    .count() \
    .withColumnRenamed("count", "degree") \
    .orderBy("degree", ascending=False)

# Write analysis results back to Neo4j
degree_centrality.write \
    .format(SparkNeo4jConfig.NEO4J_FORMAT) \
    .options(**neo4j_options) \
    .option(SparkNeo4jConfig.LABELS_OPTION, "UserAnalytics") \
    .option("node.keys", "user_id") \
    .mode("overwrite") \
    .save()

Kafka Messaging Integration

from aissemble_core_config import MessagingConfig
from kafka import KafkaProducer, KafkaConsumer
import json

# Initialize messaging configuration
messaging_config = MessagingConfig()

# Create Kafka producer
producer = KafkaProducer(
    bootstrap_servers=[messaging_config.server()],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# Send metadata to Kafka
metadata_message = {
    "pipeline_id": "data-processing-001",
    "status": "started",
    "timestamp": "2024-09-05T10:30:00Z",
    "records_processed": 0
}

producer.send(messaging_config.metadata_topic(), metadata_message)
producer.flush()

print(f"Sent message to Kafka server: {messaging_config.server()}")
print(f"Topic: {messaging_config.metadata_topic()}")

# Create Kafka consumer for monitoring
consumer = KafkaConsumer(
    messaging_config.metadata_topic(),
    bootstrap_servers=[messaging_config.server()],
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    auto_offset_reset='latest'
)

# Monitor metadata messages
for message in consumer:
    metadata = message.value
    print(f"Received metadata: {metadata['pipeline_id']} - {metadata['status']}")
    
    # Process metadata message
    if metadata['status'] == 'completed':
        print(f"Pipeline completed, processed {metadata['records_processed']} records")
        break

Multi-Database Configuration Manager

from aissemble_core_config import SparkRDBMSConfig, SparkElasticsearchConfig, SparkNeo4jConfig, MessagingConfig
from pyspark.sql import SparkSession

class MultiDatabaseManager:
    """Utility class for managing multiple database configurations"""
    
    def __init__(self):
        self.rdbms_config = SparkRDBMSConfig()
        self.es_config = SparkElasticsearchConfig()
        self.neo4j_config = SparkNeo4jConfig()
        self.messaging_config = MessagingConfig()
        
    def create_spark_session(self, app_name: str) -> SparkSession:
        """Create Spark session with all database connectors"""
        return SparkSession.builder \
            .appName(app_name) \
            .config("spark.jars.packages", 
                   "org.postgresql:postgresql:42.5.0,"
                   "org.elasticsearch:elasticsearch-spark-30_2.12:8.8.0,"
                   "org.neo4j:neo4j-connector-apache-spark_2.12:5.0.1_for_spark_3") \
            .getOrCreate()
    
    def get_all_configs(self) -> dict:
        """Get unified configuration dictionary"""
        return {
            "rdbms": {
                "url": self.rdbms_config.jdbc_url(),
                "driver": self.rdbms_config.jdbc_driver(),
                "user": self.rdbms_config.user(),
                "password": self.rdbms_config.password()
            },
            "elasticsearch": self.es_config.get_es_configs(),
            "neo4j": self.neo4j_config.get_spark_options(),
            "messaging": {
                "server": self.messaging_config.server(),
                "metadata_topic": self.messaging_config.metadata_topic()
            }
        }
    
    def test_connections(self):
        """Test connectivity to all configured systems"""
        configs = self.get_all_configs()
        
        print("Configuration Summary:")
        print(f"PostgreSQL: {configs['rdbms']['url']}")
        print(f"Elasticsearch: {self.es_config.spark_es_nodes()}:{self.es_config.spark_es_port()}")
        print(f"Neo4j: {self.neo4j_config.url()}")
        print(f"Kafka: {configs['messaging']['server']}")

# Usage example
db_manager = MultiDatabaseManager()
spark = db_manager.create_spark_session("MultiDatabaseApp")

# Get all configurations
all_configs = db_manager.get_all_configs()
print("All database configurations loaded successfully")

# Test connections
db_manager.test_connections()

Best Practices

Configuration Management

  • Use property files for environment-specific settings
  • Override with environment variables for containerized deployments
  • Validate configurations before creating Spark sessions
  • Use connection pooling for high-throughput applications

Security Considerations

  • Store sensitive credentials in secure configuration systems
  • Use encrypted connections for production deployments
  • Implement proper authentication for all database connections
  • Regular credential rotation and access reviews

Performance Optimization

  • Configure connection timeouts appropriately
  • Use batch operations for bulk data processing
  • Monitor connection pool utilization
  • Implement retry logic for transient failures

Install with Tessl CLI

npx tessl i tessl/pypi-aissemble-foundation-core-python

docs

auth.md

bom.md

config.md

filestore.md

index.md

inference.md

metadata.md

policy.md

tile.json