CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-com-dmetasoul--lakesoul-common

Core utilities and metadata management library for the LakeSoul lakehouse framework providing database connection management, RBAC authorization, protobuf serialization, and native library integration.

Pending
Overview
Eval results
Files

configuration-management.mddocs/

Configuration Management

The configuration management system provides centralized configuration for database connections, authorization settings, operational parameters, and LakeSoul-specific options. The system supports both global and local configuration with environment variable and system property support.

Capabilities

GlobalConfig Class

Singleton configuration manager for system-wide settings, particularly authorization configuration.

/**
 * Global configuration management for LakeSoul system
 * Singleton pattern with database-backed configuration storage
 */
public class GlobalConfig {
    // Configuration keys
    public static final String authZEnabledKey = "lakesoul.authz.enabled";
    public static final boolean authZEnabledDefault = false;
    public static final String authZCasbinModelKey = "lakesoul.authz.casbin.model";
    
    /**
     * Get singleton GlobalConfig instance
     * Loads configuration from database on first access
     * @return GlobalConfig singleton instance
     */
    public static synchronized GlobalConfig get();
    
    /**
     * Check if authorization is enabled
     * @return boolean true if authorization is enabled, false otherwise
     */
    public boolean isAuthZEnabled();
    
    /**
     * Set authorization enabled status
     * @param enabled true to enable authorization, false to disable
     */
    public void setAuthZEnabled(boolean enabled);
    
    /**
     * Get Casbin model configuration
     * @return String path to Casbin model file or model content
     */
    public String getAuthZCasbinModel();
}

DBConfig Class

Abstract class containing database and LakeSoul operational constants and configuration keys.

/**
 * Configuration constants for LakeSoul database operations
 * Contains operational parameters and formatting constants
 */
public abstract class DBConfig {
    // Namespace and partitioning constants
    public static final String LAKESOUL_DEFAULT_NAMESPACE = "default";
    public static final String LAKESOUL_NAMESPACE_LEVEL_SPLITTER = ".";
    public static final String LAKESOUL_NULL_STRING = "__L@KE$OUL_NULL__";
    public static final String LAKESOUL_PARTITION_SPLITTER_OF_RANGE_AND_HASH = ";";
    public static final String LAKESOUL_RANGE_PARTITION_SPLITTER = ",";
    public static final String LAKESOUL_NON_PARTITION_TABLE_PART_DESC = "-5";
    
    // Operational constants
    public static final int MAX_COMMIT_ATTEMPTS = 3;
    public static final int DEFAULT_CONNECTION_TIMEOUT = 30000;
    public static final int DEFAULT_QUERY_TIMEOUT = 60000;
    
    // Table metadata property keys
    public static final String LAKESOUL_TABLE_PROPERTY_PREFIX = "lakesoul.";
    public static final String LAKESOUL_HASH_PARTITION_NUM = "lakesoul.hash.partition.num";
    public static final String LAKESOUL_RANGE_PARTITION_COLUMNS = "lakesoul.range.partition.columns";
}

TableInfoProperty Constants

Nested class in DBConfig containing table-specific property constants.

/**
 * Table information property constants
 * Keys for table metadata properties stored as JSON
 */
public static class TableInfoProperty {
    /** Hash bucket number for hash partitioning */
    public static final String HASH_BUCKET_NUM = "hashBucketNum";
    
    /** Dropped column names (comma-separated) */
    public static final String DROPPED_COLUMN = "droppedColumn";
    
    /** Delimiter for dropped column names */
    public static final String DROPPED_COLUMN_SPLITTER = ",";
    
    /** Timestamp of last schema change */
    public static final String LAST_TABLE_SCHEMA_CHANGE_TIME = "last_schema_change_time";
    
    /** Table format specification (parquet, delta, etc.) */
    public static final String TABLE_FORMAT = "table.format";
    
    /** Compression codec for data files */
    public static final String COMPRESSION_CODEC = "compression.codec";
    
    /** Data retention period in days */
    public static final String RETENTION_DAYS = "retention.days";
    
    /** Table owner information */
    public static final String TABLE_OWNER = "table.owner";
    
    /** Table creation timestamp */
    public static final String CREATION_TIME = "creation.time";
    
    /** Last access timestamp */
    public static final String LAST_ACCESS_TIME = "last.access.time";
}

LakeSoulOptions (Scala Object)

Scala object containing configuration constants and options for LakeSoul operations.

/**
 * Configuration constants and options for LakeSoul operations
 * Centralized location for all LakeSoul-specific configuration keys
 */
object LakeSoulOptions {
    // Write operation options
    val REPLACE_WHERE_OPTION = "replaceWhere"
    val MERGE_SCHEMA_OPTION = "mergeSchema"
    val OVERWRITE_SCHEMA_OPTION = "overwriteSchema"
    val UPSERT_OPTION = "upsert"
    val DELETE_WHERE_OPTION = "deleteWhere"
    
    // Partitioning options
    val PARTITION_BY = "__partition_columns"
    val RANGE_PARTITIONS = "rangePartitions"
    val HASH_PARTITIONS = "hashPartitions"
    val HASH_BUCKET_NUM = "hashBucketNum"
    val DYNAMIC_PARTITION = "dynamicPartition"
    
    // Table naming options
    val SHORT_TABLE_NAME = "shortTableName" 
    val TABLE_PATH = "tablePath"
    val TABLE_NAMESPACE = "tableNamespace"
    
    // Read operation options
    val READ_TYPE = "readtype"
    val START_VERSION = "startVersion"
    val END_VERSION = "endVersion"
    val START_TIMESTAMP = "startTimestamp"
    val END_TIMESTAMP = "endTimestamp"
    val INCREMENTAL_PARTITION_DESC = "incrementalPartitionDesc"
    
    // Data format options
    val DATA_FORMAT = "dataFormat"
    val COMPRESSION = "compression"
    val FILE_SIZE_THRESHOLD = "fileSizeThreshold"
    val ROW_GROUP_SIZE = "rowGroupSize"
    val DICTIONARY_ENCODING = "dictionaryEncoding"
    
    // Performance tuning options
    val BATCH_SIZE = "batchSize"
    val PARALLELISM = "parallelism"
    val MEMORY_FRACTION = "memoryFraction"
    val SPILL_THRESHOLD = "spillThreshold"
    
    // CDC (Change Data Capture) options
    val CDC_CHANGE_COLUMN = "cdcChangeColumn"
    val CDC_PARTITION_COLUMN = "cdcPartitionColumn"
    val CDC_START_TIMESTAMP = "cdcStartTimestamp"
    val CDC_END_TIMESTAMP = "cdcEndTimestamp"
}

ReadType Constants

Nested object in LakeSoulOptions defining read operation types.

/**
 * Read operation type constants
 * Defines different modes for reading LakeSoul tables
 */
object ReadType {
    /** Full table read including all historical data */
    val FULL_READ = "fullread"
    
    /** Snapshot read at specific point in time */
    val SNAPSHOT_READ = "snapshot"
    
    /** Incremental read of changes between time ranges */
    val INCREMENTAL_READ = "incremental"
    
    /** Stream read for continuous processing */
    val STREAM_READ = "stream"
    
    /** Change data capture read */
    val CDC_READ = "cdc"
}

Database Configuration

Database connection configuration through DBUtil and DataBaseProperty classes.

/**
 * Utility class for database configuration management
 * Handles configuration loading and database connection parameters
 */
public class DBUtil {
    // Database connection configuration keys
    public static final String urlKey = "lakesoul.pg.url";
    public static final String usernameKey = "lakesoul.pg.username";
    public static final String passwordKey = "lakesoul.pg.password";
    public static final String driverKey = "lakesoul.pg.driver";
    public static final String domainKey = "lakesoul.current.domain";
    
    // Connection pool configuration keys
    public static final String maxPoolSizeKey = "lakesoul.pg.pool.maxSize";
    public static final String minPoolSizeKey = "lakesoul.pg.pool.minSize";
    public static final String connectionTimeoutKey = "lakesoul.pg.pool.connectionTimeout";
    public static final String idleTimeoutKey = "lakesoul.pg.pool.idleTimeout";
    public static final String maxLifetimeKey = "lakesoul.pg.pool.maxLifetime";
    
    // Default values
    public static final String DEFAULT_DRIVER = "org.postgresql.Driver";
    public static final String DEFAULT_DOMAIN = "public";
    public static final int DEFAULT_MAX_POOL_SIZE = 10;
    public static final int DEFAULT_MIN_POOL_SIZE = 2;
    public static final long DEFAULT_CONNECTION_TIMEOUT = 30000L;
    public static final long DEFAULT_IDLE_TIMEOUT = 600000L;
    public static final long DEFAULT_MAX_LIFETIME = 1800000L;
    
    /**
     * Get database connection information from configuration
     * Loads from system properties, environment variables, or configuration files
     * @return DataBaseProperty object with connection details
     */
    public static DataBaseProperty getDBInfo();
    
    /**
     * Fill HikariCP configuration with connection parameters
     * @param config HikariConfig object to populate
     */
    public static void fillDataSourceConfig(HikariConfig config);
    
    /**
     * Convert Java UUID to protobuf UUID
     * @param uuid Java UUID instance
     * @return Uuid protobuf UUID representation
     */
    public static Uuid toProtoUuid(UUID uuid);
    
    /**
     * Convert protobuf UUID to Java UUID
     * @param uuid Protobuf UUID instance
     * @return UUID Java UUID representation
     */
    public static UUID toJavaUUID(Uuid uuid);
    
    /**
     * Get configuration value with fallback to default
     * @param key Configuration key
     * @param defaultValue Default value if key not found
     * @return String configuration value
     */
    public static String getConfigValue(String key, String defaultValue);
    
    /**
     * Get boolean configuration value
     * @param key Configuration key
     * @param defaultValue Default value if key not found
     * @return boolean configuration value
     */
    public static boolean getBooleanConfigValue(String key, boolean defaultValue);
    
    /**
     * Get integer configuration value
     * @param key Configuration key
     * @param defaultValue Default value if key not found
     * @return int configuration value
     */
    public static int getIntConfigValue(String key, int defaultValue);
    
    /**
     * Get long configuration value
     * @param key Configuration key
     * @param defaultValue Default value if key not found
     * @return long configuration value
     */
    public static long getLongConfigValue(String key, long defaultValue);
}

/**
 * Data transfer object for database connection properties
 * Contains all necessary information for establishing database connections
 */
public class DataBaseProperty {
    /**
     * Get JDBC driver class name
     * @return String driver class name
     */
    public String getDriver();
    
    /**
     * Set JDBC driver class name
     * @param driver Driver class name
     */
    public void setDriver(String driver);
    
    /**
     * Get database connection URL
     * @return String JDBC connection URL
     */
    public String getUrl();
    
    /**
     * Set database connection URL
     * @param url JDBC connection URL
     */
    public void setUrl(String url);
    
    /**
     * Get database username
     * @return String username for authentication
     */
    public String getUsername();
    
    /**
     * Set database username
     * @param username Username for authentication
     */
    public void setUsername(String username);
    
    /**
     * Get database password
     * @return String password for authentication
     */
    public String getPassword();
    
    /**
     * Set database password
     * @param password Password for authentication
     */
    public void setPassword(String password);
    
    /**
     * Get database name
     * @return String target database name
     */
    public String getDbName();
    
    /**
     * Set database name
     * @param dbName Target database name
     */
    public void setDbName(String dbName);
    
    /**
     * Get database host
     * @return String hostname or IP address
     */
    public String getHost();
    
    /**
     * Set database host
     * @param host Hostname or IP address
     */
    public void setHost(String host);
    
    /**
     * Get database port
     * @return int port number
     */
    public int getPort();
    
    /**
     * Set database port
     * @param port Port number
     */
    public void setPort(int port);
    
    /**
     * Get maximum commit attempt count
     * @return int maximum retry attempts for commits
     */
    public int getMaxCommitAttempt();
    
    /**
     * Set maximum commit attempt count
     * @param maxCommitAttempt Maximum retry attempts
     */
    public void setMaxCommitAttempt(int maxCommitAttempt);
    
    /**
     * String representation of database properties
     * Password is masked for security
     * @return String formatted connection details
     */
    public String toString();
}

Usage Examples:

import com.dmetasoul.lakesoul.meta.*;
import com.dmetasoul.lakesoul.meta.LakeSoulOptions;
import java.util.Properties;

public class ConfigurationExample {
    
    public void globalConfigurationExample() {
        // Get global configuration
        GlobalConfig globalConfig = GlobalConfig.get();
        
        // Check authorization configuration
        if (globalConfig.isAuthZEnabled()) {
            System.out.println("Authorization enabled");
            System.out.println("Casbin model: " + globalConfig.getAuthZCasbinModel());
            
            // Initialize authorization system
            initializeAuthorization();
        } else {
            System.out.println("Authorization disabled");
        }
        
        // Programmatically enable authorization
        globalConfig.setAuthZEnabled(true);
        System.out.println("Authorization now enabled: " + globalConfig.isAuthZEnabled());
    }
    
    public void databaseConfigurationExample() {
        // Get database configuration
        DataBaseProperty dbProps = DBUtil.getDBInfo();
        
        System.out.println("Database Configuration:");
        System.out.println("  Driver: " + dbProps.getDriver());
        System.out.println("  URL: " + dbProps.getUrl());
        System.out.println("  Host: " + dbProps.getHost());
        System.out.println("  Port: " + dbProps.getPort());
        System.out.println("  Database: " + dbProps.getDbName());
        System.out.println("  Username: " + dbProps.getUsername());
        System.out.println("  Max Commit Attempts: " + dbProps.getMaxCommitAttempt());
        
        // Create custom database configuration
        DataBaseProperty customProps = new DataBaseProperty();
        customProps.setDriver("org.postgresql.Driver");
        customProps.setHost("localhost");
        customProps.setPort(5432);
        customProps.setDbName("lakesoul_prod");
        customProps.setUsername("lakesoul_user");
        customProps.setPassword("secure_password");
        customProps.setMaxCommitAttempt(5);
        
        // Build URL from components
        String url = String.format("jdbc:postgresql://%s:%d/%s", 
            customProps.getHost(), 
            customProps.getPort(), 
            customProps.getDbName());
        customProps.setUrl(url);
        
        System.out.println("Custom configuration: " + customProps.toString());
    }
    
    public void lakeSoulOptionsExample() {
        // Create configuration map for LakeSoul operations
        Map<String, String> options = new HashMap<>();
        
        // Write operation configuration
        options.put(LakeSoulOptions.MERGE_SCHEMA_OPTION(), "true");
        options.put(LakeSoulOptions.REPLACE_WHERE_OPTION(), "date >= '2023-01-01'");
        options.put(LakeSoulOptions.UPSERT_OPTION(), "true");
        
        // Partitioning configuration
        options.put(LakeSoulOptions.RANGE_PARTITIONS(), "date,region");
        options.put(LakeSoulOptions.HASH_PARTITIONS(), "user_id");
        options.put(LakeSoulOptions.HASH_BUCKET_NUM(), "16");
        
        // Table configuration  
        options.put(LakeSoulOptions.SHORT_TABLE_NAME(), "user_events");
        options.put(LakeSoulOptions.TABLE_NAMESPACE(), "analytics");
        
        // Read operation configuration
        options.put(LakeSoulOptions.READ_TYPE(), LakeSoulOptions.ReadType.INCREMENTAL_READ());
        options.put(LakeSoulOptions.START_TIMESTAMP(), "1672531200000"); // 2023-01-01
        options.put(LakeSoulOptions.END_TIMESTAMP(), "1672617600000");   // 2023-01-02
        
        // Performance tuning
        options.put(LakeSoulOptions.BATCH_SIZE(), "10000");
        options.put(LakeSoulOptions.PARALLELISM(), "8");
        options.put(LakeSoulOptions.MEMORY_FRACTION(), "0.8");
        
        // Data format options
        options.put(LakeSoulOptions.DATA_FORMAT(), "parquet");
        options.put(LakeSoulOptions.COMPRESSION(), "snappy");
        options.put(LakeSoulOptions.FILE_SIZE_THRESHOLD(), "134217728"); // 128MB
        
        System.out.println("LakeSoul options configured: " + options.size() + " settings");
        
        // Use options in operations
        processWithOptions(options);
    }
    
    public void tablePropertiesExample() {
        // Create table properties using DBConfig constants
        JSONObject tableProps = new JSONObject();
        
        // Hash partitioning configuration
        tableProps.put(DBConfig.TableInfoProperty.HASH_BUCKET_NUM, "32");
        
        // Schema evolution settings
        tableProps.put(DBConfig.TableInfoProperty.LAST_TABLE_SCHEMA_CHANGE_TIME, 
                      String.valueOf(System.currentTimeMillis()));
        
        // Data format and compression
        tableProps.put(DBConfig.TableInfoProperty.TABLE_FORMAT, "parquet");
        tableProps.put(DBConfig.TableInfoProperty.COMPRESSION_CODEC, "zstd");
        
        // Retention and ownership
        tableProps.put(DBConfig.TableInfoProperty.RETENTION_DAYS, "90");
        tableProps.put(DBConfig.TableInfoProperty.TABLE_OWNER, "data_team");
        tableProps.put(DBConfig.TableInfoProperty.CREATION_TIME, 
                      String.valueOf(System.currentTimeMillis()));
        
        // Dropped columns (if any)
        List<String> droppedColumns = Arrays.asList("old_column1", "deprecated_field");
        tableProps.put(DBConfig.TableInfoProperty.DROPPED_COLUMN, 
                      String.join(DBConfig.TableInfoProperty.DROPPED_COLUMN_SPLITTER, droppedColumns));
        
        System.out.println("Table properties: " + tableProps.toJSONString());
        
        // Use in table creation
        createTableWithProperties("table_001", tableProps);
    }
    
    public void environmentBasedConfigurationExample() {
        // Get configuration from various sources with fallbacks
        
        // Database configuration with environment fallbacks
        String dbUrl = DBUtil.getConfigValue(DBUtil.urlKey, 
            System.getenv().getOrDefault("LAKESOUL_PG_URL", 
                "jdbc:postgresql://localhost:5432/lakesoul"));
                
        String dbUser = DBUtil.getConfigValue(DBUtil.usernameKey,
            System.getenv().getOrDefault("LAKESOUL_PG_USERNAME", "lakesoul"));
            
        String dbPassword = DBUtil.getConfigValue(DBUtil.passwordKey,
            System.getenv().getOrDefault("LAKESOUL_PG_PASSWORD", "lakesoul"));
            
        String domain = DBUtil.getConfigValue(DBUtil.domainKey,
            System.getenv().getOrDefault("LAKESOUL_DOMAIN", "public"));
        
        // Connection pool configuration
        int maxPoolSize = DBUtil.getIntConfigValue(DBUtil.maxPoolSizeKey, 
            Integer.parseInt(System.getenv().getOrDefault("LAKESOUL_PG_POOL_MAX_SIZE", "10")));
            
        long connectionTimeout = DBUtil.getLongConfigValue(DBUtil.connectionTimeoutKey,
            Long.parseLong(System.getenv().getOrDefault("LAKESOUL_PG_CONNECTION_TIMEOUT", "30000")));
        
        // Authorization configuration
        boolean authEnabled = DBUtil.getBooleanConfigValue(GlobalConfig.authZEnabledKey,
            Boolean.parseBoolean(System.getenv().getOrDefault("LAKESOUL_AUTHZ_ENABLED", "false")));
        
        System.out.println("Configuration loaded from environment:");
        System.out.println("  DB URL: " + maskUrl(dbUrl));
        System.out.println("  DB User: " + dbUser);
        System.out.println("  Domain: " + domain);
        System.out.println("  Max Pool Size: " + maxPoolSize);
        System.out.println("  Connection Timeout: " + connectionTimeout + "ms");
        System.out.println("  Authorization Enabled: " + authEnabled);
    }
    
    public void configurationValidationExample() {
        try {
            // Validate database configuration
            DataBaseProperty dbProps = DBUtil.getDBInfo();
            validateDatabaseConfig(dbProps);
            
            // Validate global configuration
            GlobalConfig globalConfig = GlobalConfig.get();
            validateGlobalConfig(globalConfig);
            
            System.out.println("Configuration validation passed");
            
        } catch (IllegalArgumentException e) {
            System.err.println("Configuration validation failed: " + e.getMessage());
            System.exit(1);
        }
    }
    
    private void validateDatabaseConfig(DataBaseProperty dbProps) {
        if (dbProps.getUrl() == null || dbProps.getUrl().isEmpty()) {
            throw new IllegalArgumentException("Database URL is required");
        }
        
        if (dbProps.getUsername() == null || dbProps.getUsername().isEmpty()) {
            throw new IllegalArgumentException("Database username is required");
        }
        
        if (dbProps.getPassword() == null || dbProps.getPassword().isEmpty()) {
            throw new IllegalArgumentException("Database password is required");
        }
        
        if (dbProps.getPort() <= 0 || dbProps.getPort() > 65535) {
            throw new IllegalArgumentException("Invalid database port: " + dbProps.getPort());
        }
        
        if (dbProps.getMaxCommitAttempt() <= 0) {
            throw new IllegalArgumentException("Max commit attempts must be positive");
        }
    }
    
    private void validateGlobalConfig(GlobalConfig config) {
        if (config.isAuthZEnabled()) {
            String casbinModel = config.getAuthZCasbinModel();
            if (casbinModel == null || casbinModel.isEmpty()) {
                throw new IllegalArgumentException("Casbin model is required when authorization is enabled");
            }
        }
    }
    
    private String maskUrl(String url) {
        // Mask password in URL for logging
        if (url != null && url.contains("://")) {
            return url.replaceAll("://[^:]*:[^@]*@", "://***:***@");
        }
        return url;
    }
    
    private void initializeAuthorization() {
        // Implementation for authorization setup
    }
    
    private void processWithOptions(Map<String, String> options) {
        // Implementation using LakeSoul options
    }
    
    private void createTableWithProperties(String tableId, JSONObject properties) {
        // Implementation for table creation
    }
}

Configuration Files:

System Properties (application.properties):

# Database connection
lakesoul.pg.url=jdbc:postgresql://localhost:5432/lakesoul
lakesoul.pg.username=lakesoul_user
lakesoul.pg.password=lakesoul_password
lakesoul.pg.driver=org.postgresql.Driver

# Connection pool settings
lakesoul.pg.pool.maxSize=20
lakesoul.pg.pool.minSize=5
lakesoul.pg.pool.connectionTimeout=30000
lakesoul.pg.pool.idleTimeout=600000
lakesoul.pg.pool.maxLifetime=1800000

# Security domain
lakesoul.current.domain=production

# Authorization settings
lakesoul.authz.enabled=true
lakesoul.authz.casbin.model=/config/rbac_model.conf
lakesoul.authz.casbin.policy=/config/policy.csv

# Native operations
lakesoul.native.metadata.query.enabled=true
lakesoul.native.metadata.update.enabled=true
lakesoul.native.metadata.max.retry.attempts=3

Environment Variables:

# Database configuration
export LAKESOUL_PG_URL="jdbc:postgresql://prod-db:5432/lakesoul"
export LAKESOUL_PG_USERNAME="lakesoul_prod"
export LAKESOUL_PG_PASSWORD="secure_prod_password"

# Security
export LAKESOUL_DOMAIN="production"
export LAKESOUL_AUTHZ_ENABLED="true"

# Performance tuning
export LAKESOUL_PG_POOL_MAX_SIZE="50"
export LAKESOUL_PG_POOL_MIN_SIZE="10"

Configuration Priority:

The configuration system follows this priority order (highest to lowest):

  1. System Properties (-Dlakesoul.pg.url=...)
  2. Environment Variables (LAKESOUL_PG_URL)
  3. Configuration Files (application.properties, lakesoul.conf)
  4. Database Configuration (global_config table)
  5. Default Values (built-in defaults)

Thread Safety:

Configuration management is designed for concurrent access:

  • Singleton Initialization: Thread-safe lazy initialization with double-checked locking
  • Immutable Configuration: Configuration objects are immutable after creation
  • Read-Only Access: Most configuration access is read-only after initialization
  • Global State: Global configuration changes are synchronized
  • Environment Independence: No shared mutable state between threads

Install with Tessl CLI

npx tessl i tessl/maven-com-dmetasoul--lakesoul-common

docs

authorization-security.md

configuration-management.md

database-connection.md

entity-models.md

index.md

metadata-management.md

native-operations.md

scala-functional-api.md

tile.json