Core utilities and metadata management library for the LakeSoul lakehouse framework providing database connection management, RBAC authorization, protobuf serialization, and native library integration.
—
The native operations system provides high-performance metadata operations through JNR-FFI integration with Rust components. This system offers connection pooling, retry logic, and asynchronous operations for scalable metadata access with significantly improved performance over traditional JDBC operations.
The primary interface for native metadata operations, providing singleton access with automatic connection management and retry logic.
/**
* Java client for native metadata operations with connection pooling and retry logic
* Provides high-performance alternative to JDBC-based metadata operations
*/
public class NativeMetadataJavaClient implements AutoCloseable {
/**
* Get singleton instance of native metadata client
* Initializes connection pool and runtime on first access
* @return NativeMetadataJavaClient singleton instance
*/
public static NativeMetadataJavaClient getInstance();
/**
* Shutdown singleton instance and cleanup resources
* Should be called during application shutdown
*/
public static void shutDownInstance();
}Static methods for common metadata operations with automatic connection management and retry logic.
/**
* Insert operation using native client
* @param insertType Type of DAO insert operation to perform
* @param jniWrapper JniWrapper containing entity data to insert
* @return Integer result code (0 for success, non-zero for error)
*/
public static Integer insert(NativeUtils.CodedDaoType insertType, JniWrapper jniWrapper);
/**
* Query operation using native client
* @param queryType Type of DAO query operation to perform
* @param params List of string parameters for the query
* @return JniWrapper containing query results
*/
public static JniWrapper query(NativeUtils.CodedDaoType queryType, List<String> params);
/**
* Update operation using native client
* @param updateType Type of DAO update operation to perform
* @param params List of string parameters for the update
* @return Integer result code (0 for success, non-zero for error)
*/
public static Integer update(NativeUtils.CodedDaoType updateType, List<String> params);
/**
* Scalar query operation using native client
* @param queryScalarType Type of DAO scalar query operation to perform
* @param params List of string parameters for the query
* @return List<String> scalar results
*/
public static List<String> queryScalar(NativeUtils.CodedDaoType queryScalarType, List<String> params);
/**
* Clean all metadata (for testing purposes)
* WARNING: This will delete all metadata in the system
* @return int result code (0 for success)
*/
public static int cleanMeta();Methods for creating and managing split descriptors for table data access.
/**
* Create split description array for table access
* @param tableName Name of the table
* @param namespace Namespace containing the table
* @return List<SplitDesc> split descriptors for table data access
*/
public List<SplitDesc> createSplitDescArray(String tableName, String namespace);
/**
* Close native client and cleanup resources
* Implements AutoCloseable for try-with-resources usage
*/
public void close();Utility class providing configuration constants and operation type definitions for native operations.
/**
* Utility class for native operations and DAO type definitions
* Contains configuration flags and operation type mappings
*/
public class NativeUtils {
/** Flag indicating whether native metadata queries are enabled */
public static final boolean NATIVE_METADATA_QUERY_ENABLED;
/** Flag indicating whether native metadata updates are enabled */
public static final boolean NATIVE_METADATA_UPDATE_ENABLED;
/** Maximum number of retry attempts for native operations */
public static final int NATIVE_METADATA_MAX_RETRY_ATTEMPTS;
/** Parameter delimiter for native operation parameters */
public static final String PARAM_DELIM;
}Enumeration defining all available DAO operation types with codes and parameter requirements.
/**
* Enumeration of all DAO operation types with operation codes
* Maps high-level operations to native function calls
*/
public enum CodedDaoType {
// Table operations
SELECT_TABLE_INFO_BY_TABLE_ID(101, 1),
SELECT_TABLE_INFO_BY_TABLE_PATH(102, 1),
SELECT_TABLE_INFO_BY_TABLE_NAME_NAMESPACE(103, 2),
INSERT_TABLE_INFO(104, 0),
UPDATE_TABLE_INFO_BY_TABLE_ID(105, 4),
DELETE_TABLE_INFO(106, 2),
LIST_TABLE_NAME_BY_NAMESPACE(107, 1),
LIST_TABLE_PATH_BY_NAMESPACE(108, 1),
// Partition operations
SELECT_PARTITION_INFO_BY_TABLE_ID_PARTITION_DESC(201, 2),
SELECT_PARTITION_INFO_BY_TABLE_ID_PARTITION_DESC_VERSION(202, 3),
SELECT_PARTITION_INFO_BY_TABLE_ID(203, 1),
INSERT_PARTITION_INFO(204, 0),
DELETE_PARTITION_INFO_BY_TABLE_ID(205, 1),
DELETE_PARTITION_INFO_BY_TABLE_ID_PARTITION_DESC(206, 2),
// Data commit operations
INSERT_DATA_COMMIT_INFO(301, 0),
SELECT_DATA_COMMIT_INFO_BY_TABLE_ID_PARTITION_DESC_COMMIT_LIST(302, -1),
DELETE_DATA_COMMIT_INFO_BY_TABLE_ID(303, 1),
DELETE_DATA_COMMIT_INFO_BY_TABLE_ID_PARTITION_DESC(304, 2),
// Namespace operations
INSERT_NAMESPACE(401, 0),
SELECT_NAMESPACE_BY_NAMESPACE(402, 1),
DELETE_NAMESPACE_BY_NAMESPACE(403, 1),
LIST_NAMESPACES(404, 0),
UPDATE_NAMESPACE_PROPERTIES(405, 2),
// Utility operations
CLEAN_META_FOR_TEST(901, 0);
private final int code;
private final int parameterCount;
/**
* Get operation code for native function mapping
* @return int operation code
*/
public int getCode();
/**
* Get required parameter count (-1 for variable parameters)
* @return int parameter count requirement
*/
public int getParameterCount();
}The LibLakeSoulMetaData interface provides direct access to native Rust functions via JNR-FFI.
/**
* JNR-FFI interface for native Rust library integration
* Provides direct access to high-performance metadata operations
*/
public interface LibLakeSoulMetaData extends Library {
/**
* Create Tokio async runtime for native operations
* @return Pointer to runtime instance
*/
Pointer create_tokio_runtime();
/**
* Free Tokio async runtime
* @param runtime Pointer to runtime instance to free
*/
void free_tokio_runtime(Pointer runtime);
/**
* Create PostgreSQL client with async runtime
* @param runtime Pointer to Tokio runtime
* @param config Database connection configuration string
* @return Pointer to client instance
*/
Pointer create_tokio_postgres_client(Pointer runtime, String config);
/**
* Free PostgreSQL client
* @param client Pointer to client instance to free
*/
void free_tokio_postgres_client(Pointer client);
/**
* Execute query operation asynchronously
* @param runtime Pointer to Tokio runtime
* @param client Pointer to PostgreSQL client
* @param sql SQL query string
* @param callback Callback for handling results
*/
void execute_query(Pointer runtime, Pointer client, String sql, Pointer callback);
/**
* Execute update operation asynchronously
* @param runtime Pointer to Tokio runtime
* @param client Pointer to PostgreSQL client
* @param sql SQL update string
* @param callback Callback for handling results
*/
void execute_update(Pointer runtime, Pointer client, String sql, Pointer callback);
/**
* Execute scalar query operation asynchronously
* @param runtime Pointer to Tokio runtime
* @param client Pointer to PostgreSQL client
* @param sql SQL query string
* @param callback Callback for handling scalar results
*/
void execute_query_scalar(Pointer runtime, Pointer client, String sql, Pointer callback);
/**
* Execute insert operation asynchronously
* @param runtime Pointer to Tokio runtime
* @param client Pointer to PostgreSQL client
* @param sql SQL insert string
* @param callback Callback for handling results
*/
void execute_insert(Pointer runtime, Pointer client, String sql, Pointer callback);
/**
* Create split description array for table data access
* @return Pointer to split description array
*/
Pointer create_split_desc_array();
/**
* Clean all metadata for testing purposes
* @param runtime Pointer to Tokio runtime
* @param client Pointer to PostgreSQL client
* @param callback Callback for handling results
*/
void clean_meta_for_test(Pointer runtime, Pointer client, Pointer callback);
/**
* Debug function for native library diagnostics
*/
void debug();
}Callback interfaces for handling asynchronous native operation results.
/**
* Generic callback interface for native operations
* @param <T> Result type
*/
public interface Callback<T> extends com.sun.jna.Callback {
void invoke(T result);
}
/**
* Boolean result callback for native operations
*/
public interface BooleanCallback extends Callback<Boolean> {
void invoke(Boolean result);
}
/**
* Integer result callback for native operations
*/
public interface IntegerCallback extends Callback<Integer> {
void invoke(Integer result);
}
/**
* String result callback for native operations
*/
public interface StringCallback extends Callback<String> {
void invoke(String result);
}
/**
* Void callback for operations without return values
*/
public interface VoidCallback extends Callback<Void> {
void invoke();
}Utility class for loading the native library using JNR-FFI.
/**
* Loads the native library using JNR-FFI
* Handles library loading and initialization
*/
public class JnrLoader {
/**
* Get loaded native library instance
* Performs lazy loading and initialization on first access
* @return LibLakeSoulMetaData interface to native library
*/
public static LibLakeSoulMetaData get();
}Data transfer object for split/partition descriptions used in data access operations.
/**
* Data transfer object for split/partition descriptions
* Contains information needed for efficient data access
*/
public class SplitDesc {
/**
* Get file paths for this split
* @return List<String> data file paths
*/
public List<String> getFilePaths();
/**
* Set file paths for this split
* @param filePaths List of data file paths
*/
public void setFilePaths(List<String> filePaths);
/**
* Get primary key columns
* @return List<String> primary key column names
*/
public List<String> getPrimaryKeys();
/**
* Set primary key columns
* @param primaryKeys List of primary key column names
*/
public void setPrimaryKeys(List<String> primaryKeys);
/**
* Get partition description
* @return String partition identifier
*/
public String getPartitionDesc();
/**
* Set partition description
* @param partitionDesc Partition identifier
*/
public void setPartitionDesc(String partitionDesc);
/**
* Get table schema
* @return String JSON schema definition
*/
public String getTableSchema();
/**
* Set table schema
* @param tableSchema JSON schema definition
*/
public void setTableSchema(String tableSchema);
/**
* String representation of split description
* @return String formatted split information
*/
public String toString();
}Usage Examples:
import com.dmetasoul.lakesoul.meta.jnr.*;
import com.dmetasoul.lakesoul.meta.entity.*;
import java.util.*;
public class NativeOperationsExample {
public void basicNativeOperationsExample() {
// High-level native operations
try {
// Query table information using native client
List<String> params = Arrays.asList("table_123");
JniWrapper result = NativeMetadataJavaClient.query(
NativeUtils.CodedDaoType.SELECT_TABLE_INFO_BY_TABLE_ID,
params
);
System.out.println("Query completed successfully");
// Insert new table info
TableInfo tableInfo = TableInfo.newBuilder()
.setTableId("new_table_001")
.setTableNamespace("analytics")
.setTableName("events")
.setTablePath("/data/events")
.build();
JniWrapper insertData = JniWrapper.newBuilder()
.addTableInfo(tableInfo)
.build();
Integer insertResult = NativeMetadataJavaClient.insert(
NativeUtils.CodedDaoType.INSERT_TABLE_INFO,
insertData
);
if (insertResult == 0) {
System.out.println("Insert successful");
} else {
System.err.println("Insert failed with code: " + insertResult);
}
} catch (Exception e) {
System.err.println("Native operation failed: " + e.getMessage());
e.printStackTrace();
}
}
public void splitDescriptionExample() {
try (NativeMetadataJavaClient client = NativeMetadataJavaClient.getInstance()) {
// Create split descriptors for table access
List<SplitDesc> splits = client.createSplitDescArray("user_events", "analytics");
for (SplitDesc split : splits) {
System.out.println("Split partition: " + split.getPartitionDesc());
System.out.println("File paths: " + split.getFilePaths());
System.out.println("Primary keys: " + split.getPrimaryKeys());
System.out.println("Schema: " + split.getTableSchema());
System.out.println("------------------------");
}
} catch (Exception e) {
System.err.println("Split description creation failed: " + e.getMessage());
}
}
public void lowLevelNativeExample() {
// Low-level native library access
LibLakeSoulMetaData nativeLib = JnrLoader.get();
// Create runtime and client
Pointer runtime = nativeLib.create_tokio_runtime();
Pointer client = nativeLib.create_tokio_postgres_client(
runtime,
"postgresql://user:pass@localhost:5432/lakesoul"
);
try {
// Execute query with callback
LibLakeSoulMetaData.StringCallback callback = new LibLakeSoulMetaData.StringCallback() {
@Override
public void invoke(String result) {
System.out.println("Query result: " + result);
}
};
nativeLib.execute_query(
runtime,
client,
"SELECT table_id, table_name FROM table_info LIMIT 10",
callback
);
// Wait for async operation to complete
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.err.println("Operation interrupted: " + e.getMessage());
} finally {
// Cleanup resources
nativeLib.free_tokio_postgres_client(client);
nativeLib.free_tokio_runtime(runtime);
}
}
public void configurationExample() {
// Check native operation configuration
if (NativeUtils.NATIVE_METADATA_QUERY_ENABLED) {
System.out.println("Native queries enabled");
System.out.println("Max retry attempts: " + NativeUtils.NATIVE_METADATA_MAX_RETRY_ATTEMPTS);
// Use native operations for queries
performNativeQuery();
} else {
System.out.println("Native queries disabled, falling back to JDBC");
performJdbcQuery();
}
if (NativeUtils.NATIVE_METADATA_UPDATE_ENABLED) {
System.out.println("Native updates enabled");
performNativeUpdate();
} else {
System.out.println("Native updates disabled, using JDBC");
performJdbcUpdate();
}
}
public void batchOperationsExample() {
try {
// Batch query multiple tables
List<String> tableIds = Arrays.asList("table_001", "table_002", "table_003");
List<JniWrapper> results = new ArrayList<>();
for (String tableId : tableIds) {
JniWrapper result = NativeMetadataJavaClient.query(
NativeUtils.CodedDaoType.SELECT_TABLE_INFO_BY_TABLE_ID,
Arrays.asList(tableId)
);
results.add(result);
}
System.out.println("Batch query completed: " + results.size() + " results");
// Batch scalar queries
List<String> namespaces = NativeMetadataJavaClient.queryScalar(
NativeUtils.CodedDaoType.LIST_NAMESPACES,
new ArrayList<>()
);
System.out.println("Available namespaces: " + namespaces);
} catch (Exception e) {
System.err.println("Batch operations failed: " + e.getMessage());
}
}
public void errorHandlingExample() {
try {
// Attempt operation that might fail
JniWrapper result = NativeMetadataJavaClient.query(
NativeUtils.CodedDaoType.SELECT_TABLE_INFO_BY_TABLE_ID,
Arrays.asList("non_existent_table")
);
// Process result...
} catch (RuntimeException e) {
// Native operations may throw RuntimeException for various error conditions
System.err.println("Native operation error: " + e.getMessage());
// Retry with exponential backoff or fallback to JDBC
fallbackToJdbcOperation();
} catch (Exception e) {
System.err.println("Unexpected error: " + e.getMessage());
e.printStackTrace();
}
}
private void performNativeQuery() {
// Implementation for native query
}
private void performJdbcQuery() {
// Implementation for JDBC fallback
}
private void performNativeUpdate() {
// Implementation for native update
}
private void performJdbcUpdate() {
// Implementation for JDBC update
}
private void fallbackToJdbcOperation() {
// Implementation for JDBC fallback
}
}Performance Characteristics:
Native operations provide significant performance improvements:
Configuration Requirements:
Native operations can be configured via system properties:
# Enable/disable native operations
lakesoul.native.metadata.query.enabled=true
lakesoul.native.metadata.update.enabled=true
# Retry configuration
lakesoul.native.metadata.max.retry.attempts=3
# Connection configuration
lakesoul.pg.url=jdbc:postgresql://localhost:5432/lakesoul
lakesoul.pg.username=lakesoul_user
lakesoul.pg.password=lakesoul_passwordThread Safety:
Native operations are designed for concurrent usage:
Install with Tessl CLI
npx tessl i tessl/maven-com-dmetasoul--lakesoul-common