Core utilities and metadata management library for the LakeSoul lakehouse framework providing database connection management, RBAC authorization, protobuf serialization, and native library integration.
—
The Scala Functional API provides high-level Scala objects for functional programming style operations with data file management and metadata operations. This API offers a more idiomatic Scala interface to LakeSoul's metadata system with immutable data structures, functional composition, and type-safe operations.
High-level data operations and file management with functional programming patterns.
/**
* High-level data operations and file management
* Provides functional programming interface for data file operations
*/
object DataOperation {
/** Database manager instance for metadata operations */
val dbManager = new DBManager
/**
* Get all data file information for a table
* @param tableId Unique table identifier
* @return Array[DataFileInfo] array of file information objects
*/
def getTableDataInfo(tableId: String): Array[DataFileInfo]
/**
* Get data file information for specific partitions
* @param partitionList Java list of PartitionInfo objects
* @return Array[DataFileInfo] array of file information objects
*/
def getTableDataInfo(partitionList: util.List[PartitionInfo]): Array[DataFileInfo]
/**
* Get data file information for partition array
* @param partition_info_arr Array of PartitionInfoScala objects
* @return Array[DataFileInfo] array of file information objects
*/
def getTableDataInfo(partition_info_arr: Array[PartitionInfoScala]): Array[DataFileInfo]
/**
* Get data file information for table with partition filtering
* @param tableId Unique table identifier
* @param partitions List of partition values to filter by
* @return Array[DataFileInfo] filtered array of file information
*/
def getTableDataInfo(tableId: String, partitions: List[String]): Array[DataFileInfo]
/**
* Get incremental data file information between timestamps
* @param table_id Unique table identifier
* @param partition_desc Partition description (empty for all partitions)
* @param startTimestamp Start timestamp in milliseconds
* @param endTimestamp End timestamp in milliseconds (0 for current time)
* @param readType Type of read operation (snapshot, incremental, etc.)
* @return Array[DataFileInfo] incremental file information
*/
def getIncrementalPartitionDataInfo(table_id: String, partition_desc: String,
startTimestamp: Long, endTimestamp: Long,
readType: String): Array[DataFileInfo]
/**
* Get data file information for single partition
* @param partition_info PartitionInfoScala object with partition details
* @return ArrayBuffer[DataFileInfo] mutable buffer of file information
*/
def getSinglePartitionDataInfo(partition_info: PartitionInfoScala): ArrayBuffer[DataFileInfo]
/**
* Drop data commit information for specific commit
* @param table_id Unique table identifier
* @param range Partition range/description
* @param commit_id UUID of commit to drop
*/
def dropDataInfoData(table_id: String, range: String, commit_id: UUID): Unit
/**
* Drop all data commit information for table
* @param table_id Unique table identifier
*/
def dropDataInfoData(table_id: String): Unit
}High-level metadata version management and table operations with functional API patterns.
/**
* High-level metadata version management and table operations
* Provides Scala-idiomatic interface for metadata management
*/
object MetaVersion {
/** Database manager instance for metadata operations */
val dbManager = new DBManager()
/**
* Create new namespace
* @param namespace Namespace name to create
*/
def createNamespace(namespace: String): Unit
/**
* List all available namespaces
* @return Array[String] array of namespace names
*/
def listNamespaces(): Array[String]
/**
* Check if namespace exists
* @param table_namespace Namespace name to check
* @return Boolean true if namespace exists, false otherwise
*/
def isNamespaceExists(table_namespace: String): Boolean
/**
* Drop namespace by name
* @param namespace Namespace name to drop
*/
def dropNamespaceByNamespace(namespace: String): Unit
/**
* Check if table exists by name (default namespace)
* @param table_name Table name to check
* @return Boolean true if table exists, false otherwise
*/
def isTableExists(table_name: String): Boolean
/**
* Check if specific table ID exists for table
* @param table_name Table name
* @param table_id Table ID to verify
* @return Boolean true if table ID matches, false otherwise
*/
def isTableIdExists(table_name: String, table_id: String): Boolean
/**
* Check if short table name exists in namespace
* @param short_table_name Short table name
* @param table_namespace Table namespace
* @return (Boolean, String) tuple of (exists, table_path)
*/
def isShortTableNameExists(short_table_name: String, table_namespace: String): (Boolean, String)
/**
* Get table path from short table name
* @param short_table_name Short table name
* @param table_namespace Table namespace
* @return String table path or null if not found
*/
def getTablePathFromShortTableName(short_table_name: String, table_namespace: String): String
/**
* Create new table with comprehensive configuration
* @param table_namespace Table namespace
* @param table_path Storage path for table
* @param short_table_name User-friendly table name
* @param table_id Unique table identifier
* @param table_schema JSON schema definition
* @param range_column Range partition column names
* @param hash_column Hash partition column names
* @param configuration Table configuration map
* @param bucket_num Number of hash buckets
*/
def createNewTable(table_namespace: String, table_path: String, short_table_name: String,
table_id: String, table_schema: String, range_column: String,
hash_column: String, configuration: Map[String, String], bucket_num: Int): Unit
/**
* List tables in specified namespaces
* @param namespace Array of namespace names to list from
* @return util.List[String] Java list of table paths
*/
def listTables(namespace: Array[String]): util.List[String]
/**
* Update table schema and configuration
* @param table_name Table name
* @param table_id Table ID
* @param table_schema New schema definition
* @param config Updated configuration
* @param new_read_version New read version
*/
def updateTableSchema(table_name: String, table_id: String, table_schema: String,
config: Map[String, String], new_read_version: Int): Unit
/**
* Delete table information
* @param table_name Table name
* @param table_id Table ID
* @param table_namespace Table namespace
*/
def deleteTableInfo(table_name: String, table_id: String, table_namespace: String): Unit
/**
* Get single partition information
* @param table_id Table ID
* @param range_value Partition range value
* @param range_id Partition range ID
* @return PartitionInfoScala partition information
*/
def getSinglePartitionInfo(table_id: String, range_value: String, range_id: String): PartitionInfoScala
/**
* Get partition information for specific version
* @param table_id Table ID
* @param range_value Partition range value
* @param version Specific version number
* @return Array[PartitionInfoScala] partition information array
*/
def getSinglePartitionInfoForVersion(table_id: String, range_value: String, version: Int): Array[PartitionInfoScala]
/**
* Get all versions of a partition
* @param table_id Table ID
* @param range_value Partition range value
* @return Array[PartitionInfoScala] all partition versions
*/
def getOnePartitionVersions(table_id: String, range_value: String): Array[PartitionInfoScala]
/**
* Get all partition information for table
* @param table_id Table ID
* @return Array[PartitionInfoScala] all partitions for table
*/
def getAllPartitionInfoScala(table_id: String): Array[PartitionInfoScala]
/**
* Convert Java PartitionInfo list to Scala array
* @param partitionList Java list of PartitionInfo
* @return Array[PartitionInfoScala] converted Scala array
*/
def convertPartitionInfoScala(partitionList: util.List[PartitionInfo]): Array[PartitionInfoScala]
/**
* Rollback partition to specific version
* @param table_id Table ID
* @param range_value Partition range value
* @param toVersion Target version to rollback to
*/
def rollbackPartitionInfoByVersion(table_id: String, range_value: String, toVersion: Int): Unit
/**
* Delete all partition information for table
* @param table_id Table ID
*/
def deletePartitionInfoByTableId(table_id: String): Unit
/**
* Delete specific partition information
* @param table_id Table ID
* @param range_value Partition range value
* @param range_id Partition range ID
*/
def deletePartitionInfoByRangeId(table_id: String, range_value: String, range_id: String): Unit
/**
* Get latest timestamp for partition
* @param table_id Table ID
* @param range_value Partition range value
* @return Long latest timestamp in milliseconds
*/
def getLastedTimestamp(table_id: String, range_value: String): Long
/**
* Get latest version up to specific time
* @param table_id Table ID
* @param range_value Partition range value
* @param utcMills Timestamp in UTC milliseconds
* @return Int latest version number
*/
def getLastedVersionUptoTime(table_id: String, range_value: String, utcMills: Long): Int
/**
* Clean metadata up to specific time and get files to delete
* @param table_id Table ID
* @param range_value Partition range value
* @param utcMills Timestamp in UTC milliseconds
* @return List[String] list of file paths to delete
*/
def cleanMetaUptoTime(table_id: String, range_value: String, utcMills: Long): List[String]
/**
* Clean all metadata (for testing)
*/
def cleanMeta(): Unit
}Scala-specific data types for functional programming with LakeSoul metadata.
/**
* Data transfer object for file information
* Immutable case class with functional operations
*/
case class DataFileInfo(
range_partitions: String,
path: String,
file_op: String,
size: Long,
modification_time: Long = -1L,
file_exist_cols: String = ""
) {
/**
* Get bucket ID from file path
* Uses lazy evaluation for performance
* @return Int bucket ID extracted from filename
*/
lazy val file_bucket_id: Int = BucketingUtils.getBucketId(new Path(path).getName)
.getOrElse(sys.error(s"Invalid bucket file $path"))
/**
* Get range version string combining partitions and columns
* @return String combined range and column information
*/
lazy val range_version: String = range_partitions + "-" + file_exist_cols
/**
* Create expired version of file info for cleanup
* @param deleteTime Timestamp when file should be deleted
* @return DataFileInfo new instance with updated modification time
*/
def expire(deleteTime: Long): DataFileInfo = this.copy(modification_time = deleteTime)
/**
* Hash code based on key fields for deduplication
* @return Int hash code
*/
override def hashCode(): Int = Objects.hash(range_partitions, path, file_op)
}
/**
* Scala representation of partition information
* Immutable case class for functional composition
*/
case class PartitionInfoScala(
table_id: String,
range_value: String,
version: Int = -1,
read_files: Array[UUID] = Array.empty[UUID],
expression: String = "",
commit_op: String = ""
) {
/**
* String representation of partition info
* @return String formatted partition information
*/
override def toString: String = {
s"partition info: {\ntable_name: $table_id,\nrange_value: $range_value}"
}
}Utility object for bucketed file handling with functional operations.
/**
* Utility functions for bucketed file handling
* Provides functional operations for bucket ID extraction
*/
object BucketingUtils {
// Regular expression for extracting bucket ID from filenames
private val bucketedFileName = """.*_(\d+)(?:\..*)?$""".r
/**
* Extract bucket ID from filename
* @param fileName Name of the bucketed file
* @return Option[Int] Some(bucketId) if valid bucketed file, None otherwise
*/
def getBucketId(fileName: String): Option[Int] = fileName match {
case bucketedFileName(bucketId) => Some(bucketId.toInt)
case _ => None
}
}Usage Examples:
import com.dmetasoul.lakesoul.meta.{DataOperation, MetaVersion, LakeSoulOptions}
import java.util.UUID
import scala.collection.JavaConverters._
object ScalaFunctionalAPIExample {
def basicTableOperationsExample(): Unit = {
// Create namespace using functional API
MetaVersion.createNamespace("analytics")
// Check if table exists
val tableExists = MetaVersion.isTableExists("user_events")
if (!tableExists) {
// Create new table with Scala Map for configuration
val config = Map(
"format" -> "parquet",
"compression" -> "snappy",
"retention.days" -> "30"
)
MetaVersion.createNewTable(
table_namespace = "analytics",
table_path = "/data/analytics/user_events",
short_table_name = "user_events",
table_id = "tbl_user_events_001",
table_schema = """{"type":"struct","fields":[...]}""",
range_column = "date,hour",
hash_column = "user_id",
configuration = config,
bucket_num = 16
)
println("Table created successfully")
}
// List all namespaces
val namespaces = MetaVersion.listNamespaces()
println(s"Available namespaces: ${namespaces.mkString(", ")}")
// Get table path from short name
val tablePath = MetaVersion.getTablePathFromShortTableName("user_events", "analytics")
println(s"Table path: $tablePath")
}
def dataFileOperationsExample(): Unit = {
val tableId = "tbl_user_events_001"
// Get all data files for table
val allFiles = DataOperation.getTableDataInfo(tableId)
println(s"Total files: ${allFiles.length}")
// Process files functionally
val parquetFiles = allFiles
.filter(_.path.endsWith(".parquet"))
.filter(_.file_op == "add")
.sortBy(_.modification_time)
println(s"Parquet files: ${parquetFiles.length}")
// Get incremental data between timestamps
val startTime = System.currentTimeMillis() - 86400000L // 24 hours ago
val endTime = System.currentTimeMillis()
val incrementalFiles = DataOperation.getIncrementalPartitionDataInfo(
table_id = tableId,
partition_desc = "date=2023-01-01",
startTimestamp = startTime,
endTimestamp = endTime,
readType = LakeSoulOptions.ReadType.INCREMENTAL_READ
)
println(s"Incremental files: ${incrementalFiles.length}")
// Calculate total size of files
val totalSize = incrementalFiles.map(_.size).sum
val avgSize = if (incrementalFiles.nonEmpty) totalSize / incrementalFiles.length else 0L
println(s"Total size: $totalSize bytes, Average: $avgSize bytes")
}
def partitionOperationsExample(): Unit = {
val tableId = "tbl_user_events_001"
// Get all partitions for table
val partitions = MetaVersion.getAllPartitionInfoScala(tableId)
println(s"Total partitions: ${partitions.length}")
// Group partitions by commit operation
val partitionsByOp = partitions.groupBy(_.commit_op)
partitionsByOp.foreach { case (op, parts) =>
println(s"$op: ${parts.length} partitions")
}
// Find partitions with multiple versions
val partitionVersions = partitions.groupBy(_.range_value)
val multiVersionPartitions = partitionVersions.filter(_._2.length > 1)
println(s"Multi-version partitions: ${multiVersionPartitions.size}")
multiVersionPartitions.foreach { case (rangeValue, versions) =>
val sortedVersions = versions.sortBy(_.version)
val latestVersion = sortedVersions.last.version
val oldestVersion = sortedVersions.head.version
println(s"Partition $rangeValue: versions $oldestVersion to $latestVersion")
// Get specific version details
val versionDetails = MetaVersion.getSinglePartitionInfoForVersion(
tableId, rangeValue, latestVersion
)
println(s" Latest version has ${versionDetails.length} entries")
}
}
def functionalDataProcessingExample(): Unit = {
val tableId = "tbl_user_events_001"
// Get data files and process functionally
val dataFiles = DataOperation.getTableDataInfo(tableId)
// Functional pipeline for data processing
val processedFiles = dataFiles
.filter(_.file_op == "add") // Only added files
.filter(_.size > 1024) // Minimum size filter
.map(file => file.copy(path = file.path.toLowerCase)) // Normalize paths
.groupBy(_.range_partitions) // Group by partition
.mapValues(files => files.sortBy(_.modification_time)) // Sort by time
.mapValues(files => files.take(10)) // Take latest 10 per partition
processedFiles.foreach { case (partition, files) =>
println(s"Partition $partition:")
files.foreach { file =>
val bucketId = BucketingUtils.getBucketId(new Path(file.path).getName)
println(s" File: ${file.path}, Size: ${file.size}, Bucket: ${bucketId.getOrElse("N/A")}")
}
}
// Calculate statistics
val stats = dataFiles.foldLeft((0, 0L, 0L)) { case ((count, totalSize, maxSize), file) =>
(count + 1, totalSize + file.size, math.max(maxSize, file.size))
}
val (fileCount, totalSize, maxSize) = stats
println(s"Statistics: $fileCount files, ${totalSize / 1024 / 1024}MB total, ${maxSize / 1024 / 1024}MB max")
}
def timeBasedOperationsExample(): Unit = {
val tableId = "tbl_user_events_001"
val partitionDesc = "date=2023-01-01"
// Get latest timestamp for partition
val latestTimestamp = MetaVersion.getLastedTimestamp(tableId, partitionDesc)
println(s"Latest timestamp: $latestTimestamp")
// Get version at specific time
val specificTime = System.currentTimeMillis() - 3600000L // 1 hour ago
val versionAtTime = MetaVersion.getLastedVersionUptoTime(tableId, partitionDesc, specificTime)
println(s"Version at time $specificTime: $versionAtTime")
// Clean old metadata and get files to delete
val cleanupTime = System.currentTimeMillis() - 7 * 86400000L // 7 days ago
val filesToDelete = MetaVersion.cleanMetaUptoTime(tableId, partitionDesc, cleanupTime)
println(s"Files to delete: ${filesToDelete.length}")
filesToDelete.take(5).foreach(println) // Show first 5
// Process cleanup with functional operations
val deletesByExtension = filesToDelete
.groupBy(path => path.substring(path.lastIndexOf('.') + 1))
.mapValues(_.size)
println("Files to delete by extension:")
deletesByExtension.foreach { case (ext, count) =>
println(s" .$ext: $count files")
}
}
def errorHandlingExample(): Unit = {
import scala.util.{Try, Success, Failure}
// Functional error handling with Try
val tableResult = Try {
MetaVersion.isTableExists("non_existent_table")
}
tableResult match {
case Success(exists) =>
println(s"Table check completed: $exists")
case Failure(exception) =>
println(s"Table check failed: ${exception.getMessage}")
}
// Chain operations with error handling
val chainedOperation = for {
namespaces <- Try(MetaVersion.listNamespaces())
firstNamespace = namespaces.headOption.getOrElse("default")
tables <- Try(MetaVersion.listTables(Array(firstNamespace)))
} yield (firstNamespace, tables.asScala.toList)
chainedOperation match {
case Success((namespace, tables)) =>
println(s"Namespace: $namespace, Tables: ${tables.length}")
case Failure(exception) =>
println(s"Chained operation failed: ${exception.getMessage}")
}
}
def immutableDataExample(): Unit = {
// Working with immutable data structures
val originalFile = DataFileInfo(
range_partitions = "date=2023-01-01",
path = "/data/file001.parquet",
file_op = "add",
size = 1024000L,
modification_time = System.currentTimeMillis(),
file_exist_cols = "[\"col1\",\"col2\"]"
)
// Create variations using copy
val expiredFile = originalFile.expire(System.currentTimeMillis() + 86400000L)
val renamedFile = originalFile.copy(path = "/data/renamed_file001.parquet")
println(s"Original: ${originalFile.path}")
println(s"Expired: ${expiredFile.modification_time}")
println(s"Renamed: ${renamedFile.path}")
// Functional composition with case classes
val partitionInfo = PartitionInfoScala(
table_id = "tbl_001",
range_value = "date=2023-01-01",
version = 1,
read_files = Array(UUID.randomUUID(), UUID.randomUUID()),
expression = "date >= '2023-01-01'",
commit_op = "AppendCommit"
)
// Create new version
val nextVersion = partitionInfo.copy(
version = partitionInfo.version + 1,
read_files = partitionInfo.read_files :+ UUID.randomUUID(),
commit_op = "CompactionCommit"
)
println(s"Original version: ${partitionInfo.version}")
println(s"Next version: ${nextVersion.version}")
println(s"Files: ${partitionInfo.read_files.length} -> ${nextVersion.read_files.length}")
}
}Functional Programming Patterns:
The Scala API embraces functional programming principles:
Type Safety:
The Scala API provides compile-time type safety:
Performance Characteristics:
The Scala API maintains high performance:
Install with Tessl CLI
npx tessl i tessl/maven-com-dmetasoul--lakesoul-common