Core utilities and metadata management library for the LakeSoul lakehouse framework providing database connection management, RBAC authorization, protobuf serialization, and native library integration.
—
The configuration management system provides centralized configuration for database connections, authorization settings, operational parameters, and LakeSoul-specific options. The system supports both global and local configuration with environment variable and system property support.
Singleton configuration manager for system-wide settings, particularly authorization configuration.
/**
* Global configuration management for LakeSoul system
* Singleton pattern with database-backed configuration storage
*/
public class GlobalConfig {
// Configuration keys
public static final String authZEnabledKey = "lakesoul.authz.enabled";
public static final boolean authZEnabledDefault = false;
public static final String authZCasbinModelKey = "lakesoul.authz.casbin.model";
/**
* Get singleton GlobalConfig instance
* Loads configuration from database on first access
* @return GlobalConfig singleton instance
*/
public static synchronized GlobalConfig get();
/**
* Check if authorization is enabled
* @return boolean true if authorization is enabled, false otherwise
*/
public boolean isAuthZEnabled();
/**
* Set authorization enabled status
* @param enabled true to enable authorization, false to disable
*/
public void setAuthZEnabled(boolean enabled);
/**
* Get Casbin model configuration
* @return String path to Casbin model file or model content
*/
public String getAuthZCasbinModel();
}Abstract class containing database and LakeSoul operational constants and configuration keys.
/**
* Configuration constants for LakeSoul database operations
* Contains operational parameters and formatting constants
*/
public abstract class DBConfig {
// Namespace and partitioning constants
public static final String LAKESOUL_DEFAULT_NAMESPACE = "default";
public static final String LAKESOUL_NAMESPACE_LEVEL_SPLITTER = ".";
public static final String LAKESOUL_NULL_STRING = "__L@KE$OUL_NULL__";
public static final String LAKESOUL_PARTITION_SPLITTER_OF_RANGE_AND_HASH = ";";
public static final String LAKESOUL_RANGE_PARTITION_SPLITTER = ",";
public static final String LAKESOUL_NON_PARTITION_TABLE_PART_DESC = "-5";
// Operational constants
public static final int MAX_COMMIT_ATTEMPTS = 3;
public static final int DEFAULT_CONNECTION_TIMEOUT = 30000;
public static final int DEFAULT_QUERY_TIMEOUT = 60000;
// Table metadata property keys
public static final String LAKESOUL_TABLE_PROPERTY_PREFIX = "lakesoul.";
public static final String LAKESOUL_HASH_PARTITION_NUM = "lakesoul.hash.partition.num";
public static final String LAKESOUL_RANGE_PARTITION_COLUMNS = "lakesoul.range.partition.columns";
}Nested class in DBConfig containing table-specific property constants.
/**
* Table information property constants
* Keys for table metadata properties stored as JSON
*/
public static class TableInfoProperty {
/** Hash bucket number for hash partitioning */
public static final String HASH_BUCKET_NUM = "hashBucketNum";
/** Dropped column names (comma-separated) */
public static final String DROPPED_COLUMN = "droppedColumn";
/** Delimiter for dropped column names */
public static final String DROPPED_COLUMN_SPLITTER = ",";
/** Timestamp of last schema change */
public static final String LAST_TABLE_SCHEMA_CHANGE_TIME = "last_schema_change_time";
/** Table format specification (parquet, delta, etc.) */
public static final String TABLE_FORMAT = "table.format";
/** Compression codec for data files */
public static final String COMPRESSION_CODEC = "compression.codec";
/** Data retention period in days */
public static final String RETENTION_DAYS = "retention.days";
/** Table owner information */
public static final String TABLE_OWNER = "table.owner";
/** Table creation timestamp */
public static final String CREATION_TIME = "creation.time";
/** Last access timestamp */
public static final String LAST_ACCESS_TIME = "last.access.time";
}Scala object containing configuration constants and options for LakeSoul operations.
/**
* Configuration constants and options for LakeSoul operations
* Centralized location for all LakeSoul-specific configuration keys
*/
object LakeSoulOptions {
// Write operation options
val REPLACE_WHERE_OPTION = "replaceWhere"
val MERGE_SCHEMA_OPTION = "mergeSchema"
val OVERWRITE_SCHEMA_OPTION = "overwriteSchema"
val UPSERT_OPTION = "upsert"
val DELETE_WHERE_OPTION = "deleteWhere"
// Partitioning options
val PARTITION_BY = "__partition_columns"
val RANGE_PARTITIONS = "rangePartitions"
val HASH_PARTITIONS = "hashPartitions"
val HASH_BUCKET_NUM = "hashBucketNum"
val DYNAMIC_PARTITION = "dynamicPartition"
// Table naming options
val SHORT_TABLE_NAME = "shortTableName"
val TABLE_PATH = "tablePath"
val TABLE_NAMESPACE = "tableNamespace"
// Read operation options
val READ_TYPE = "readtype"
val START_VERSION = "startVersion"
val END_VERSION = "endVersion"
val START_TIMESTAMP = "startTimestamp"
val END_TIMESTAMP = "endTimestamp"
val INCREMENTAL_PARTITION_DESC = "incrementalPartitionDesc"
// Data format options
val DATA_FORMAT = "dataFormat"
val COMPRESSION = "compression"
val FILE_SIZE_THRESHOLD = "fileSizeThreshold"
val ROW_GROUP_SIZE = "rowGroupSize"
val DICTIONARY_ENCODING = "dictionaryEncoding"
// Performance tuning options
val BATCH_SIZE = "batchSize"
val PARALLELISM = "parallelism"
val MEMORY_FRACTION = "memoryFraction"
val SPILL_THRESHOLD = "spillThreshold"
// CDC (Change Data Capture) options
val CDC_CHANGE_COLUMN = "cdcChangeColumn"
val CDC_PARTITION_COLUMN = "cdcPartitionColumn"
val CDC_START_TIMESTAMP = "cdcStartTimestamp"
val CDC_END_TIMESTAMP = "cdcEndTimestamp"
}Nested object in LakeSoulOptions defining read operation types.
/**
* Read operation type constants
* Defines different modes for reading LakeSoul tables
*/
object ReadType {
/** Full table read including all historical data */
val FULL_READ = "fullread"
/** Snapshot read at specific point in time */
val SNAPSHOT_READ = "snapshot"
/** Incremental read of changes between time ranges */
val INCREMENTAL_READ = "incremental"
/** Stream read for continuous processing */
val STREAM_READ = "stream"
/** Change data capture read */
val CDC_READ = "cdc"
}Database connection configuration through DBUtil and DataBaseProperty classes.
/**
* Utility class for database configuration management
* Handles configuration loading and database connection parameters
*/
public class DBUtil {
// Database connection configuration keys
public static final String urlKey = "lakesoul.pg.url";
public static final String usernameKey = "lakesoul.pg.username";
public static final String passwordKey = "lakesoul.pg.password";
public static final String driverKey = "lakesoul.pg.driver";
public static final String domainKey = "lakesoul.current.domain";
// Connection pool configuration keys
public static final String maxPoolSizeKey = "lakesoul.pg.pool.maxSize";
public static final String minPoolSizeKey = "lakesoul.pg.pool.minSize";
public static final String connectionTimeoutKey = "lakesoul.pg.pool.connectionTimeout";
public static final String idleTimeoutKey = "lakesoul.pg.pool.idleTimeout";
public static final String maxLifetimeKey = "lakesoul.pg.pool.maxLifetime";
// Default values
public static final String DEFAULT_DRIVER = "org.postgresql.Driver";
public static final String DEFAULT_DOMAIN = "public";
public static final int DEFAULT_MAX_POOL_SIZE = 10;
public static final int DEFAULT_MIN_POOL_SIZE = 2;
public static final long DEFAULT_CONNECTION_TIMEOUT = 30000L;
public static final long DEFAULT_IDLE_TIMEOUT = 600000L;
public static final long DEFAULT_MAX_LIFETIME = 1800000L;
/**
* Get database connection information from configuration
* Loads from system properties, environment variables, or configuration files
* @return DataBaseProperty object with connection details
*/
public static DataBaseProperty getDBInfo();
/**
* Fill HikariCP configuration with connection parameters
* @param config HikariConfig object to populate
*/
public static void fillDataSourceConfig(HikariConfig config);
/**
* Convert Java UUID to protobuf UUID
* @param uuid Java UUID instance
* @return Uuid protobuf UUID representation
*/
public static Uuid toProtoUuid(UUID uuid);
/**
* Convert protobuf UUID to Java UUID
* @param uuid Protobuf UUID instance
* @return UUID Java UUID representation
*/
public static UUID toJavaUUID(Uuid uuid);
/**
* Get configuration value with fallback to default
* @param key Configuration key
* @param defaultValue Default value if key not found
* @return String configuration value
*/
public static String getConfigValue(String key, String defaultValue);
/**
* Get boolean configuration value
* @param key Configuration key
* @param defaultValue Default value if key not found
* @return boolean configuration value
*/
public static boolean getBooleanConfigValue(String key, boolean defaultValue);
/**
* Get integer configuration value
* @param key Configuration key
* @param defaultValue Default value if key not found
* @return int configuration value
*/
public static int getIntConfigValue(String key, int defaultValue);
/**
* Get long configuration value
* @param key Configuration key
* @param defaultValue Default value if key not found
* @return long configuration value
*/
public static long getLongConfigValue(String key, long defaultValue);
}
/**
* Data transfer object for database connection properties
* Contains all necessary information for establishing database connections
*/
public class DataBaseProperty {
/**
* Get JDBC driver class name
* @return String driver class name
*/
public String getDriver();
/**
* Set JDBC driver class name
* @param driver Driver class name
*/
public void setDriver(String driver);
/**
* Get database connection URL
* @return String JDBC connection URL
*/
public String getUrl();
/**
* Set database connection URL
* @param url JDBC connection URL
*/
public void setUrl(String url);
/**
* Get database username
* @return String username for authentication
*/
public String getUsername();
/**
* Set database username
* @param username Username for authentication
*/
public void setUsername(String username);
/**
* Get database password
* @return String password for authentication
*/
public String getPassword();
/**
* Set database password
* @param password Password for authentication
*/
public void setPassword(String password);
/**
* Get database name
* @return String target database name
*/
public String getDbName();
/**
* Set database name
* @param dbName Target database name
*/
public void setDbName(String dbName);
/**
* Get database host
* @return String hostname or IP address
*/
public String getHost();
/**
* Set database host
* @param host Hostname or IP address
*/
public void setHost(String host);
/**
* Get database port
* @return int port number
*/
public int getPort();
/**
* Set database port
* @param port Port number
*/
public void setPort(int port);
/**
* Get maximum commit attempt count
* @return int maximum retry attempts for commits
*/
public int getMaxCommitAttempt();
/**
* Set maximum commit attempt count
* @param maxCommitAttempt Maximum retry attempts
*/
public void setMaxCommitAttempt(int maxCommitAttempt);
/**
* String representation of database properties
* Password is masked for security
* @return String formatted connection details
*/
public String toString();
}Usage Examples:
import com.dmetasoul.lakesoul.meta.*;
import com.dmetasoul.lakesoul.meta.LakeSoulOptions;
import java.util.Properties;
public class ConfigurationExample {
public void globalConfigurationExample() {
// Get global configuration
GlobalConfig globalConfig = GlobalConfig.get();
// Check authorization configuration
if (globalConfig.isAuthZEnabled()) {
System.out.println("Authorization enabled");
System.out.println("Casbin model: " + globalConfig.getAuthZCasbinModel());
// Initialize authorization system
initializeAuthorization();
} else {
System.out.println("Authorization disabled");
}
// Programmatically enable authorization
globalConfig.setAuthZEnabled(true);
System.out.println("Authorization now enabled: " + globalConfig.isAuthZEnabled());
}
public void databaseConfigurationExample() {
// Get database configuration
DataBaseProperty dbProps = DBUtil.getDBInfo();
System.out.println("Database Configuration:");
System.out.println(" Driver: " + dbProps.getDriver());
System.out.println(" URL: " + dbProps.getUrl());
System.out.println(" Host: " + dbProps.getHost());
System.out.println(" Port: " + dbProps.getPort());
System.out.println(" Database: " + dbProps.getDbName());
System.out.println(" Username: " + dbProps.getUsername());
System.out.println(" Max Commit Attempts: " + dbProps.getMaxCommitAttempt());
// Create custom database configuration
DataBaseProperty customProps = new DataBaseProperty();
customProps.setDriver("org.postgresql.Driver");
customProps.setHost("localhost");
customProps.setPort(5432);
customProps.setDbName("lakesoul_prod");
customProps.setUsername("lakesoul_user");
customProps.setPassword("secure_password");
customProps.setMaxCommitAttempt(5);
// Build URL from components
String url = String.format("jdbc:postgresql://%s:%d/%s",
customProps.getHost(),
customProps.getPort(),
customProps.getDbName());
customProps.setUrl(url);
System.out.println("Custom configuration: " + customProps.toString());
}
public void lakeSoulOptionsExample() {
// Create configuration map for LakeSoul operations
Map<String, String> options = new HashMap<>();
// Write operation configuration
options.put(LakeSoulOptions.MERGE_SCHEMA_OPTION(), "true");
options.put(LakeSoulOptions.REPLACE_WHERE_OPTION(), "date >= '2023-01-01'");
options.put(LakeSoulOptions.UPSERT_OPTION(), "true");
// Partitioning configuration
options.put(LakeSoulOptions.RANGE_PARTITIONS(), "date,region");
options.put(LakeSoulOptions.HASH_PARTITIONS(), "user_id");
options.put(LakeSoulOptions.HASH_BUCKET_NUM(), "16");
// Table configuration
options.put(LakeSoulOptions.SHORT_TABLE_NAME(), "user_events");
options.put(LakeSoulOptions.TABLE_NAMESPACE(), "analytics");
// Read operation configuration
options.put(LakeSoulOptions.READ_TYPE(), LakeSoulOptions.ReadType.INCREMENTAL_READ());
options.put(LakeSoulOptions.START_TIMESTAMP(), "1672531200000"); // 2023-01-01
options.put(LakeSoulOptions.END_TIMESTAMP(), "1672617600000"); // 2023-01-02
// Performance tuning
options.put(LakeSoulOptions.BATCH_SIZE(), "10000");
options.put(LakeSoulOptions.PARALLELISM(), "8");
options.put(LakeSoulOptions.MEMORY_FRACTION(), "0.8");
// Data format options
options.put(LakeSoulOptions.DATA_FORMAT(), "parquet");
options.put(LakeSoulOptions.COMPRESSION(), "snappy");
options.put(LakeSoulOptions.FILE_SIZE_THRESHOLD(), "134217728"); // 128MB
System.out.println("LakeSoul options configured: " + options.size() + " settings");
// Use options in operations
processWithOptions(options);
}
public void tablePropertiesExample() {
// Create table properties using DBConfig constants
JSONObject tableProps = new JSONObject();
// Hash partitioning configuration
tableProps.put(DBConfig.TableInfoProperty.HASH_BUCKET_NUM, "32");
// Schema evolution settings
tableProps.put(DBConfig.TableInfoProperty.LAST_TABLE_SCHEMA_CHANGE_TIME,
String.valueOf(System.currentTimeMillis()));
// Data format and compression
tableProps.put(DBConfig.TableInfoProperty.TABLE_FORMAT, "parquet");
tableProps.put(DBConfig.TableInfoProperty.COMPRESSION_CODEC, "zstd");
// Retention and ownership
tableProps.put(DBConfig.TableInfoProperty.RETENTION_DAYS, "90");
tableProps.put(DBConfig.TableInfoProperty.TABLE_OWNER, "data_team");
tableProps.put(DBConfig.TableInfoProperty.CREATION_TIME,
String.valueOf(System.currentTimeMillis()));
// Dropped columns (if any)
List<String> droppedColumns = Arrays.asList("old_column1", "deprecated_field");
tableProps.put(DBConfig.TableInfoProperty.DROPPED_COLUMN,
String.join(DBConfig.TableInfoProperty.DROPPED_COLUMN_SPLITTER, droppedColumns));
System.out.println("Table properties: " + tableProps.toJSONString());
// Use in table creation
createTableWithProperties("table_001", tableProps);
}
public void environmentBasedConfigurationExample() {
// Get configuration from various sources with fallbacks
// Database configuration with environment fallbacks
String dbUrl = DBUtil.getConfigValue(DBUtil.urlKey,
System.getenv().getOrDefault("LAKESOUL_PG_URL",
"jdbc:postgresql://localhost:5432/lakesoul"));
String dbUser = DBUtil.getConfigValue(DBUtil.usernameKey,
System.getenv().getOrDefault("LAKESOUL_PG_USERNAME", "lakesoul"));
String dbPassword = DBUtil.getConfigValue(DBUtil.passwordKey,
System.getenv().getOrDefault("LAKESOUL_PG_PASSWORD", "lakesoul"));
String domain = DBUtil.getConfigValue(DBUtil.domainKey,
System.getenv().getOrDefault("LAKESOUL_DOMAIN", "public"));
// Connection pool configuration
int maxPoolSize = DBUtil.getIntConfigValue(DBUtil.maxPoolSizeKey,
Integer.parseInt(System.getenv().getOrDefault("LAKESOUL_PG_POOL_MAX_SIZE", "10")));
long connectionTimeout = DBUtil.getLongConfigValue(DBUtil.connectionTimeoutKey,
Long.parseLong(System.getenv().getOrDefault("LAKESOUL_PG_CONNECTION_TIMEOUT", "30000")));
// Authorization configuration
boolean authEnabled = DBUtil.getBooleanConfigValue(GlobalConfig.authZEnabledKey,
Boolean.parseBoolean(System.getenv().getOrDefault("LAKESOUL_AUTHZ_ENABLED", "false")));
System.out.println("Configuration loaded from environment:");
System.out.println(" DB URL: " + maskUrl(dbUrl));
System.out.println(" DB User: " + dbUser);
System.out.println(" Domain: " + domain);
System.out.println(" Max Pool Size: " + maxPoolSize);
System.out.println(" Connection Timeout: " + connectionTimeout + "ms");
System.out.println(" Authorization Enabled: " + authEnabled);
}
public void configurationValidationExample() {
try {
// Validate database configuration
DataBaseProperty dbProps = DBUtil.getDBInfo();
validateDatabaseConfig(dbProps);
// Validate global configuration
GlobalConfig globalConfig = GlobalConfig.get();
validateGlobalConfig(globalConfig);
System.out.println("Configuration validation passed");
} catch (IllegalArgumentException e) {
System.err.println("Configuration validation failed: " + e.getMessage());
System.exit(1);
}
}
private void validateDatabaseConfig(DataBaseProperty dbProps) {
if (dbProps.getUrl() == null || dbProps.getUrl().isEmpty()) {
throw new IllegalArgumentException("Database URL is required");
}
if (dbProps.getUsername() == null || dbProps.getUsername().isEmpty()) {
throw new IllegalArgumentException("Database username is required");
}
if (dbProps.getPassword() == null || dbProps.getPassword().isEmpty()) {
throw new IllegalArgumentException("Database password is required");
}
if (dbProps.getPort() <= 0 || dbProps.getPort() > 65535) {
throw new IllegalArgumentException("Invalid database port: " + dbProps.getPort());
}
if (dbProps.getMaxCommitAttempt() <= 0) {
throw new IllegalArgumentException("Max commit attempts must be positive");
}
}
private void validateGlobalConfig(GlobalConfig config) {
if (config.isAuthZEnabled()) {
String casbinModel = config.getAuthZCasbinModel();
if (casbinModel == null || casbinModel.isEmpty()) {
throw new IllegalArgumentException("Casbin model is required when authorization is enabled");
}
}
}
private String maskUrl(String url) {
// Mask password in URL for logging
if (url != null && url.contains("://")) {
return url.replaceAll("://[^:]*:[^@]*@", "://***:***@");
}
return url;
}
private void initializeAuthorization() {
// Implementation for authorization setup
}
private void processWithOptions(Map<String, String> options) {
// Implementation using LakeSoul options
}
private void createTableWithProperties(String tableId, JSONObject properties) {
// Implementation for table creation
}
}Configuration Files:
System Properties (application.properties):
# Database connection
lakesoul.pg.url=jdbc:postgresql://localhost:5432/lakesoul
lakesoul.pg.username=lakesoul_user
lakesoul.pg.password=lakesoul_password
lakesoul.pg.driver=org.postgresql.Driver
# Connection pool settings
lakesoul.pg.pool.maxSize=20
lakesoul.pg.pool.minSize=5
lakesoul.pg.pool.connectionTimeout=30000
lakesoul.pg.pool.idleTimeout=600000
lakesoul.pg.pool.maxLifetime=1800000
# Security domain
lakesoul.current.domain=production
# Authorization settings
lakesoul.authz.enabled=true
lakesoul.authz.casbin.model=/config/rbac_model.conf
lakesoul.authz.casbin.policy=/config/policy.csv
# Native operations
lakesoul.native.metadata.query.enabled=true
lakesoul.native.metadata.update.enabled=true
lakesoul.native.metadata.max.retry.attempts=3Environment Variables:
# Database configuration
export LAKESOUL_PG_URL="jdbc:postgresql://prod-db:5432/lakesoul"
export LAKESOUL_PG_USERNAME="lakesoul_prod"
export LAKESOUL_PG_PASSWORD="secure_prod_password"
# Security
export LAKESOUL_DOMAIN="production"
export LAKESOUL_AUTHZ_ENABLED="true"
# Performance tuning
export LAKESOUL_PG_POOL_MAX_SIZE="50"
export LAKESOUL_PG_POOL_MIN_SIZE="10"Configuration Priority:
The configuration system follows this priority order (highest to lowest):
-Dlakesoul.pg.url=...)LAKESOUL_PG_URL)application.properties, lakesoul.conf)Thread Safety:
Configuration management is designed for concurrent access:
Install with Tessl CLI
npx tessl i tessl/maven-com-dmetasoul--lakesoul-common