CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-com-dmetasoul--lakesoul-common

Core utilities and metadata management library for the LakeSoul lakehouse framework providing database connection management, RBAC authorization, protobuf serialization, and native library integration.

Pending
Overview
Eval results
Files

scala-functional-api.mddocs/

Scala Functional API

The Scala Functional API provides high-level Scala objects for functional programming style operations with data file management and metadata operations. This API offers a more idiomatic Scala interface to LakeSoul's metadata system with immutable data structures, functional composition, and type-safe operations.

Capabilities

DataOperation Object

High-level data operations and file management with functional programming patterns.

/**
 * High-level data operations and file management
 * Provides functional programming interface for data file operations
 */
object DataOperation {
    /** Database manager instance for metadata operations */
    val dbManager = new DBManager
    
    /**
     * Get all data file information for a table
     * @param tableId Unique table identifier
     * @return Array[DataFileInfo] array of file information objects
     */
    def getTableDataInfo(tableId: String): Array[DataFileInfo]
    
    /**
     * Get data file information for specific partitions
     * @param partitionList Java list of PartitionInfo objects
     * @return Array[DataFileInfo] array of file information objects
     */
    def getTableDataInfo(partitionList: util.List[PartitionInfo]): Array[DataFileInfo]
    
    /**
     * Get data file information for partition array
     * @param partition_info_arr Array of PartitionInfoScala objects
     * @return Array[DataFileInfo] array of file information objects
     */
    def getTableDataInfo(partition_info_arr: Array[PartitionInfoScala]): Array[DataFileInfo]
    
    /**
     * Get data file information for table with partition filtering
     * @param tableId Unique table identifier
     * @param partitions List of partition values to filter by
     * @return Array[DataFileInfo] filtered array of file information
     */
    def getTableDataInfo(tableId: String, partitions: List[String]): Array[DataFileInfo]
    
    /**
     * Get incremental data file information between timestamps
     * @param table_id Unique table identifier
     * @param partition_desc Partition description (empty for all partitions)
     * @param startTimestamp Start timestamp in milliseconds
     * @param endTimestamp End timestamp in milliseconds (0 for current time)
     * @param readType Type of read operation (snapshot, incremental, etc.)
     * @return Array[DataFileInfo] incremental file information
     */
    def getIncrementalPartitionDataInfo(table_id: String, partition_desc: String, 
                                       startTimestamp: Long, endTimestamp: Long, 
                                       readType: String): Array[DataFileInfo]
    
    /**
     * Get data file information for single partition
     * @param partition_info PartitionInfoScala object with partition details
     * @return ArrayBuffer[DataFileInfo] mutable buffer of file information
     */
    def getSinglePartitionDataInfo(partition_info: PartitionInfoScala): ArrayBuffer[DataFileInfo]
    
    /**
     * Drop data commit information for specific commit
     * @param table_id Unique table identifier
     * @param range Partition range/description
     * @param commit_id UUID of commit to drop
     */
    def dropDataInfoData(table_id: String, range: String, commit_id: UUID): Unit
    
    /**
     * Drop all data commit information for table
     * @param table_id Unique table identifier
     */
    def dropDataInfoData(table_id: String): Unit
}

MetaVersion Object

High-level metadata version management and table operations with functional API patterns.

/**
 * High-level metadata version management and table operations
 * Provides Scala-idiomatic interface for metadata management
 */
object MetaVersion {
    /** Database manager instance for metadata operations */
    val dbManager = new DBManager()
    
    /**
     * Create new namespace
     * @param namespace Namespace name to create
     */
    def createNamespace(namespace: String): Unit
    
    /**
     * List all available namespaces
     * @return Array[String] array of namespace names
     */
    def listNamespaces(): Array[String]
    
    /**
     * Check if namespace exists
     * @param table_namespace Namespace name to check
     * @return Boolean true if namespace exists, false otherwise
     */
    def isNamespaceExists(table_namespace: String): Boolean
    
    /**
     * Drop namespace by name
     * @param namespace Namespace name to drop
     */
    def dropNamespaceByNamespace(namespace: String): Unit
    
    /**
     * Check if table exists by name (default namespace)
     * @param table_name Table name to check
     * @return Boolean true if table exists, false otherwise
     */
    def isTableExists(table_name: String): Boolean
    
    /**
     * Check if specific table ID exists for table
     * @param table_name Table name
     * @param table_id Table ID to verify
     * @return Boolean true if table ID matches, false otherwise
     */
    def isTableIdExists(table_name: String, table_id: String): Boolean
    
    /**
     * Check if short table name exists in namespace
     * @param short_table_name Short table name
     * @param table_namespace Table namespace
     * @return (Boolean, String) tuple of (exists, table_path)
     */
    def isShortTableNameExists(short_table_name: String, table_namespace: String): (Boolean, String)
    
    /**
     * Get table path from short table name
     * @param short_table_name Short table name
     * @param table_namespace Table namespace
     * @return String table path or null if not found
     */
    def getTablePathFromShortTableName(short_table_name: String, table_namespace: String): String
    
    /**
     * Create new table with comprehensive configuration
     * @param table_namespace Table namespace
     * @param table_path Storage path for table
     * @param short_table_name User-friendly table name
     * @param table_id Unique table identifier
     * @param table_schema JSON schema definition
     * @param range_column Range partition column names
     * @param hash_column Hash partition column names  
     * @param configuration Table configuration map
     * @param bucket_num Number of hash buckets
     */
    def createNewTable(table_namespace: String, table_path: String, short_table_name: String,
                      table_id: String, table_schema: String, range_column: String,
                      hash_column: String, configuration: Map[String, String], bucket_num: Int): Unit
    
    /**
     * List tables in specified namespaces
     * @param namespace Array of namespace names to list from
     * @return util.List[String] Java list of table paths
     */
    def listTables(namespace: Array[String]): util.List[String]
    
    /**
     * Update table schema and configuration
     * @param table_name Table name
     * @param table_id Table ID
     * @param table_schema New schema definition
     * @param config Updated configuration
     * @param new_read_version New read version
     */
    def updateTableSchema(table_name: String, table_id: String, table_schema: String, 
                         config: Map[String, String], new_read_version: Int): Unit
    
    /**
     * Delete table information
     * @param table_name Table name
     * @param table_id Table ID
     * @param table_namespace Table namespace
     */
    def deleteTableInfo(table_name: String, table_id: String, table_namespace: String): Unit
    
    /**
     * Get single partition information
     * @param table_id Table ID
     * @param range_value Partition range value
     * @param range_id Partition range ID
     * @return PartitionInfoScala partition information
     */
    def getSinglePartitionInfo(table_id: String, range_value: String, range_id: String): PartitionInfoScala
    
    /**
     * Get partition information for specific version
     * @param table_id Table ID
     * @param range_value Partition range value
     * @param version Specific version number
     * @return Array[PartitionInfoScala] partition information array
     */
    def getSinglePartitionInfoForVersion(table_id: String, range_value: String, version: Int): Array[PartitionInfoScala]
    
    /**
     * Get all versions of a partition
     * @param table_id Table ID
     * @param range_value Partition range value
     * @return Array[PartitionInfoScala] all partition versions
     */
    def getOnePartitionVersions(table_id: String, range_value: String): Array[PartitionInfoScala]
    
    /**
     * Get all partition information for table
     * @param table_id Table ID
     * @return Array[PartitionInfoScala] all partitions for table
     */
    def getAllPartitionInfoScala(table_id: String): Array[PartitionInfoScala]
    
    /**
     * Convert Java PartitionInfo list to Scala array
     * @param partitionList Java list of PartitionInfo
     * @return Array[PartitionInfoScala] converted Scala array
     */
    def convertPartitionInfoScala(partitionList: util.List[PartitionInfo]): Array[PartitionInfoScala]
    
    /**
     * Rollback partition to specific version
     * @param table_id Table ID
     * @param range_value Partition range value
     * @param toVersion Target version to rollback to
     */
    def rollbackPartitionInfoByVersion(table_id: String, range_value: String, toVersion: Int): Unit
    
    /**
     * Delete all partition information for table
     * @param table_id Table ID
     */
    def deletePartitionInfoByTableId(table_id: String): Unit
    
    /**
     * Delete specific partition information
     * @param table_id Table ID
     * @param range_value Partition range value
     * @param range_id Partition range ID
     */
    def deletePartitionInfoByRangeId(table_id: String, range_value: String, range_id: String): Unit
    
    /**
     * Get latest timestamp for partition
     * @param table_id Table ID
     * @param range_value Partition range value
     * @return Long latest timestamp in milliseconds
     */
    def getLastedTimestamp(table_id: String, range_value: String): Long
    
    /**
     * Get latest version up to specific time
     * @param table_id Table ID
     * @param range_value Partition range value
     * @param utcMills Timestamp in UTC milliseconds
     * @return Int latest version number
     */
    def getLastedVersionUptoTime(table_id: String, range_value: String, utcMills: Long): Int
    
    /**
     * Clean metadata up to specific time and get files to delete
     * @param table_id Table ID
     * @param range_value Partition range value
     * @param utcMills Timestamp in UTC milliseconds
     * @return List[String] list of file paths to delete
     */
    def cleanMetaUptoTime(table_id: String, range_value: String, utcMills: Long): List[String]
    
    /**
     * Clean all metadata (for testing)
     */
    def cleanMeta(): Unit
}

Scala Data Types

Scala-specific data types for functional programming with LakeSoul metadata.

/**
 * Data transfer object for file information
 * Immutable case class with functional operations
 */
case class DataFileInfo(
    range_partitions: String,
    path: String, 
    file_op: String,
    size: Long,
    modification_time: Long = -1L,
    file_exist_cols: String = ""
) {
    /**
     * Get bucket ID from file path
     * Uses lazy evaluation for performance
     * @return Int bucket ID extracted from filename
     */
    lazy val file_bucket_id: Int = BucketingUtils.getBucketId(new Path(path).getName)
        .getOrElse(sys.error(s"Invalid bucket file $path"))
    
    /**
     * Get range version string combining partitions and columns
     * @return String combined range and column information
     */
    lazy val range_version: String = range_partitions + "-" + file_exist_cols
    
    /**
     * Create expired version of file info for cleanup
     * @param deleteTime Timestamp when file should be deleted
     * @return DataFileInfo new instance with updated modification time
     */
    def expire(deleteTime: Long): DataFileInfo = this.copy(modification_time = deleteTime)
    
    /**
     * Hash code based on key fields for deduplication
     * @return Int hash code
     */
    override def hashCode(): Int = Objects.hash(range_partitions, path, file_op)
}

/**
 * Scala representation of partition information
 * Immutable case class for functional composition
 */
case class PartitionInfoScala(
    table_id: String,
    range_value: String, 
    version: Int = -1,
    read_files: Array[UUID] = Array.empty[UUID],
    expression: String = "",
    commit_op: String = ""
) {
    /**
     * String representation of partition info
     * @return String formatted partition information
     */
    override def toString: String = {
        s"partition info: {\ntable_name: $table_id,\nrange_value: $range_value}"
    }
}

BucketingUtils Object

Utility object for bucketed file handling with functional operations.

/**
 * Utility functions for bucketed file handling
 * Provides functional operations for bucket ID extraction
 */
object BucketingUtils {
    // Regular expression for extracting bucket ID from filenames
    private val bucketedFileName = """.*_(\d+)(?:\..*)?$""".r
    
    /**
     * Extract bucket ID from filename
     * @param fileName Name of the bucketed file
     * @return Option[Int] Some(bucketId) if valid bucketed file, None otherwise
     */
    def getBucketId(fileName: String): Option[Int] = fileName match {
        case bucketedFileName(bucketId) => Some(bucketId.toInt)
        case _ => None
    }
}

Usage Examples:

import com.dmetasoul.lakesoul.meta.{DataOperation, MetaVersion, LakeSoulOptions}
import java.util.UUID
import scala.collection.JavaConverters._

object ScalaFunctionalAPIExample {
    
    def basicTableOperationsExample(): Unit = {
        // Create namespace using functional API
        MetaVersion.createNamespace("analytics")
        
        // Check if table exists
        val tableExists = MetaVersion.isTableExists("user_events")
        if (!tableExists) {
            // Create new table with Scala Map for configuration
            val config = Map(
                "format" -> "parquet",
                "compression" -> "snappy",
                "retention.days" -> "30"
            )
            
            MetaVersion.createNewTable(
                table_namespace = "analytics",
                table_path = "/data/analytics/user_events", 
                short_table_name = "user_events",
                table_id = "tbl_user_events_001",
                table_schema = """{"type":"struct","fields":[...]}""",
                range_column = "date,hour",
                hash_column = "user_id", 
                configuration = config,
                bucket_num = 16
            )
            
            println("Table created successfully")
        }
        
        // List all namespaces
        val namespaces = MetaVersion.listNamespaces()
        println(s"Available namespaces: ${namespaces.mkString(", ")}")
        
        // Get table path from short name
        val tablePath = MetaVersion.getTablePathFromShortTableName("user_events", "analytics")
        println(s"Table path: $tablePath")
    }
    
    def dataFileOperationsExample(): Unit = {
        val tableId = "tbl_user_events_001"
        
        // Get all data files for table
        val allFiles = DataOperation.getTableDataInfo(tableId)
        println(s"Total files: ${allFiles.length}")
        
        // Process files functionally
        val parquetFiles = allFiles
            .filter(_.path.endsWith(".parquet"))
            .filter(_.file_op == "add")
            .sortBy(_.modification_time)
        
        println(s"Parquet files: ${parquetFiles.length}")
        
        // Get incremental data between timestamps
        val startTime = System.currentTimeMillis() - 86400000L // 24 hours ago
        val endTime = System.currentTimeMillis()
        
        val incrementalFiles = DataOperation.getIncrementalPartitionDataInfo(
            table_id = tableId,
            partition_desc = "date=2023-01-01", 
            startTimestamp = startTime,
            endTimestamp = endTime,
            readType = LakeSoulOptions.ReadType.INCREMENTAL_READ
        )
        
        println(s"Incremental files: ${incrementalFiles.length}")
        
        // Calculate total size of files
        val totalSize = incrementalFiles.map(_.size).sum
        val avgSize = if (incrementalFiles.nonEmpty) totalSize / incrementalFiles.length else 0L
        
        println(s"Total size: $totalSize bytes, Average: $avgSize bytes")
    }
    
    def partitionOperationsExample(): Unit = {
        val tableId = "tbl_user_events_001"
        
        // Get all partitions for table
        val partitions = MetaVersion.getAllPartitionInfoScala(tableId)
        println(s"Total partitions: ${partitions.length}")
        
        // Group partitions by commit operation
        val partitionsByOp = partitions.groupBy(_.commit_op)
        partitionsByOp.foreach { case (op, parts) =>
            println(s"$op: ${parts.length} partitions")
        }
        
        // Find partitions with multiple versions
        val partitionVersions = partitions.groupBy(_.range_value)
        val multiVersionPartitions = partitionVersions.filter(_._2.length > 1)
        
        println(s"Multi-version partitions: ${multiVersionPartitions.size}")
        
        multiVersionPartitions.foreach { case (rangeValue, versions) =>
            val sortedVersions = versions.sortBy(_.version)
            val latestVersion = sortedVersions.last.version
            val oldestVersion = sortedVersions.head.version
            
            println(s"Partition $rangeValue: versions $oldestVersion to $latestVersion")
            
            // Get specific version details
            val versionDetails = MetaVersion.getSinglePartitionInfoForVersion(
                tableId, rangeValue, latestVersion
            )
            println(s"  Latest version has ${versionDetails.length} entries")
        }
    }
    
    def functionalDataProcessingExample(): Unit = {
        val tableId = "tbl_user_events_001"
        
        // Get data files and process functionally
        val dataFiles = DataOperation.getTableDataInfo(tableId)
        
        // Functional pipeline for data processing
        val processedFiles = dataFiles
            .filter(_.file_op == "add")                           // Only added files
            .filter(_.size > 1024)                               // Minimum size filter
            .map(file => file.copy(path = file.path.toLowerCase)) // Normalize paths
            .groupBy(_.range_partitions)                          // Group by partition
            .mapValues(files => files.sortBy(_.modification_time)) // Sort by time
            .mapValues(files => files.take(10))                   // Take latest 10 per partition
        
        processedFiles.foreach { case (partition, files) =>
            println(s"Partition $partition:")
            files.foreach { file =>
                val bucketId = BucketingUtils.getBucketId(new Path(file.path).getName)
                println(s"  File: ${file.path}, Size: ${file.size}, Bucket: ${bucketId.getOrElse("N/A")}")
            }
        }
        
        // Calculate statistics
        val stats = dataFiles.foldLeft((0, 0L, 0L)) { case ((count, totalSize, maxSize), file) =>
            (count + 1, totalSize + file.size, math.max(maxSize, file.size))
        }
        
        val (fileCount, totalSize, maxSize) = stats
        println(s"Statistics: $fileCount files, ${totalSize / 1024 / 1024}MB total, ${maxSize / 1024 / 1024}MB max")
    }
    
    def timeBasedOperationsExample(): Unit = {
        val tableId = "tbl_user_events_001"
        val partitionDesc = "date=2023-01-01"
        
        // Get latest timestamp for partition
        val latestTimestamp = MetaVersion.getLastedTimestamp(tableId, partitionDesc)
        println(s"Latest timestamp: $latestTimestamp")
        
        // Get version at specific time
        val specificTime = System.currentTimeMillis() - 3600000L // 1 hour ago
        val versionAtTime = MetaVersion.getLastedVersionUptoTime(tableId, partitionDesc, specificTime)
        println(s"Version at time $specificTime: $versionAtTime")
        
        // Clean old metadata and get files to delete
        val cleanupTime = System.currentTimeMillis() - 7 * 86400000L // 7 days ago
        val filesToDelete = MetaVersion.cleanMetaUptoTime(tableId, partitionDesc, cleanupTime)
        
        println(s"Files to delete: ${filesToDelete.length}")
        filesToDelete.take(5).foreach(println) // Show first 5
        
        // Process cleanup with functional operations
        val deletesByExtension = filesToDelete
            .groupBy(path => path.substring(path.lastIndexOf('.') + 1))
            .mapValues(_.size)
        
        println("Files to delete by extension:")
        deletesByExtension.foreach { case (ext, count) =>
            println(s"  .$ext: $count files")
        }
    }
    
    def errorHandlingExample(): Unit = {
        import scala.util.{Try, Success, Failure}
        
        // Functional error handling with Try
        val tableResult = Try {
            MetaVersion.isTableExists("non_existent_table")
        }
        
        tableResult match {
            case Success(exists) => 
                println(s"Table check completed: $exists")
            case Failure(exception) => 
                println(s"Table check failed: ${exception.getMessage}")
        }
        
        // Chain operations with error handling
        val chainedOperation = for {
            namespaces <- Try(MetaVersion.listNamespaces())
            firstNamespace = namespaces.headOption.getOrElse("default")
            tables <- Try(MetaVersion.listTables(Array(firstNamespace)))
        } yield (firstNamespace, tables.asScala.toList)
        
        chainedOperation match {
            case Success((namespace, tables)) =>
                println(s"Namespace: $namespace, Tables: ${tables.length}")
            case Failure(exception) =>
                println(s"Chained operation failed: ${exception.getMessage}")
        }
    }
    
    def immutableDataExample(): Unit = {
        // Working with immutable data structures
        val originalFile = DataFileInfo(
            range_partitions = "date=2023-01-01",
            path = "/data/file001.parquet",
            file_op = "add",
            size = 1024000L,
            modification_time = System.currentTimeMillis(),
            file_exist_cols = "[\"col1\",\"col2\"]"
        )
        
        // Create variations using copy
        val expiredFile = originalFile.expire(System.currentTimeMillis() + 86400000L)
        val renamedFile = originalFile.copy(path = "/data/renamed_file001.parquet")
        
        println(s"Original: ${originalFile.path}")
        println(s"Expired: ${expiredFile.modification_time}")
        println(s"Renamed: ${renamedFile.path}")
        
        // Functional composition with case classes
        val partitionInfo = PartitionInfoScala(
            table_id = "tbl_001",
            range_value = "date=2023-01-01",
            version = 1,
            read_files = Array(UUID.randomUUID(), UUID.randomUUID()),
            expression = "date >= '2023-01-01'",
            commit_op = "AppendCommit"
        )
        
        // Create new version
        val nextVersion = partitionInfo.copy(
            version = partitionInfo.version + 1,
            read_files = partitionInfo.read_files :+ UUID.randomUUID(),
            commit_op = "CompactionCommit"
        )
        
        println(s"Original version: ${partitionInfo.version}")
        println(s"Next version: ${nextVersion.version}")
        println(s"Files: ${partitionInfo.read_files.length} -> ${nextVersion.read_files.length}")
    }
}

Functional Programming Patterns:

The Scala API embraces functional programming principles:

  1. Immutable Data Structures: All case classes are immutable with copy methods for modifications
  2. Pure Functions: Most operations are side-effect free and return new instances
  3. Functional Composition: Operations can be chained and composed easily
  4. Option Types: Safe handling of nullable values with Option[T]
  5. Collection Operations: Rich collection API with map, filter, fold, etc.
  6. Pattern Matching: Extensive use of pattern matching for type-safe operations
  7. Lazy Evaluation: Lazy vals for expensive computations
  8. Higher-Order Functions: Functions that take other functions as parameters

Type Safety:

The Scala API provides compile-time type safety:

  • Strong Typing: All operations are strongly typed with compile-time checking
  • Generic Types: Type-safe collections and operations
  • Case Class Validation: Automatic equals, hashCode, and toString implementations
  • Pattern Matching: Exhaustive pattern matching with compiler warnings
  • Implicit Conversions: Safe conversions between Java and Scala types

Performance Characteristics:

The Scala API maintains high performance:

  • Zero-Copy Operations: Immutable data structures use structural sharing
  • Lazy Evaluation: Expensive operations are computed only when needed
  • Collection Optimization: Scala collections are optimized for functional operations
  • JVM Integration: Seamless integration with Java components
  • Memory Efficient: Case classes and collections minimize memory allocation

Install with Tessl CLI

npx tessl i tessl/maven-com-dmetasoul--lakesoul-common

docs

authorization-security.md

configuration-management.md

database-connection.md

entity-models.md

index.md

metadata-management.md

native-operations.md

scala-functional-api.md

tile.json