0
# Scala Functional API
1
2
The Scala Functional API provides high-level Scala objects for functional programming style operations with data file management and metadata operations. This API offers a more idiomatic Scala interface to LakeSoul's metadata system with immutable data structures, functional composition, and type-safe operations.
3
4
## Capabilities
5
6
### DataOperation Object
7
8
High-level data operations and file management with functional programming patterns.
9
10
```scala { .api }
11
/**
12
* High-level data operations and file management
13
* Provides functional programming interface for data file operations
14
*/
15
object DataOperation {
16
/** Database manager instance for metadata operations */
17
val dbManager = new DBManager
18
19
/**
20
* Get all data file information for a table
21
* @param tableId Unique table identifier
22
* @return Array[DataFileInfo] array of file information objects
23
*/
24
def getTableDataInfo(tableId: String): Array[DataFileInfo]
25
26
/**
27
* Get data file information for specific partitions
28
* @param partitionList Java list of PartitionInfo objects
29
* @return Array[DataFileInfo] array of file information objects
30
*/
31
def getTableDataInfo(partitionList: util.List[PartitionInfo]): Array[DataFileInfo]
32
33
/**
34
* Get data file information for partition array
35
* @param partition_info_arr Array of PartitionInfoScala objects
36
* @return Array[DataFileInfo] array of file information objects
37
*/
38
def getTableDataInfo(partition_info_arr: Array[PartitionInfoScala]): Array[DataFileInfo]
39
40
/**
41
* Get data file information for table with partition filtering
42
* @param tableId Unique table identifier
43
* @param partitions List of partition values to filter by
44
* @return Array[DataFileInfo] filtered array of file information
45
*/
46
def getTableDataInfo(tableId: String, partitions: List[String]): Array[DataFileInfo]
47
48
/**
49
* Get incremental data file information between timestamps
50
* @param table_id Unique table identifier
51
* @param partition_desc Partition description (empty for all partitions)
52
* @param startTimestamp Start timestamp in milliseconds
53
* @param endTimestamp End timestamp in milliseconds (0 for current time)
54
* @param readType Type of read operation (snapshot, incremental, etc.)
55
* @return Array[DataFileInfo] incremental file information
56
*/
57
def getIncrementalPartitionDataInfo(table_id: String, partition_desc: String,
58
startTimestamp: Long, endTimestamp: Long,
59
readType: String): Array[DataFileInfo]
60
61
/**
62
* Get data file information for single partition
63
* @param partition_info PartitionInfoScala object with partition details
64
* @return ArrayBuffer[DataFileInfo] mutable buffer of file information
65
*/
66
def getSinglePartitionDataInfo(partition_info: PartitionInfoScala): ArrayBuffer[DataFileInfo]
67
68
/**
69
* Drop data commit information for specific commit
70
* @param table_id Unique table identifier
71
* @param range Partition range/description
72
* @param commit_id UUID of commit to drop
73
*/
74
def dropDataInfoData(table_id: String, range: String, commit_id: UUID): Unit
75
76
/**
77
* Drop all data commit information for table
78
* @param table_id Unique table identifier
79
*/
80
def dropDataInfoData(table_id: String): Unit
81
}
82
```
83
84
### MetaVersion Object
85
86
High-level metadata version management and table operations with functional API patterns.
87
88
```scala { .api }
89
/**
90
* High-level metadata version management and table operations
91
* Provides Scala-idiomatic interface for metadata management
92
*/
93
object MetaVersion {
94
/** Database manager instance for metadata operations */
95
val dbManager = new DBManager()
96
97
/**
98
* Create new namespace
99
* @param namespace Namespace name to create
100
*/
101
def createNamespace(namespace: String): Unit
102
103
/**
104
* List all available namespaces
105
* @return Array[String] array of namespace names
106
*/
107
def listNamespaces(): Array[String]
108
109
/**
110
* Check if namespace exists
111
* @param table_namespace Namespace name to check
112
* @return Boolean true if namespace exists, false otherwise
113
*/
114
def isNamespaceExists(table_namespace: String): Boolean
115
116
/**
117
* Drop namespace by name
118
* @param namespace Namespace name to drop
119
*/
120
def dropNamespaceByNamespace(namespace: String): Unit
121
122
/**
123
* Check if table exists by name (default namespace)
124
* @param table_name Table name to check
125
* @return Boolean true if table exists, false otherwise
126
*/
127
def isTableExists(table_name: String): Boolean
128
129
/**
130
* Check if specific table ID exists for table
131
* @param table_name Table name
132
* @param table_id Table ID to verify
133
* @return Boolean true if table ID matches, false otherwise
134
*/
135
def isTableIdExists(table_name: String, table_id: String): Boolean
136
137
/**
138
* Check if short table name exists in namespace
139
* @param short_table_name Short table name
140
* @param table_namespace Table namespace
141
* @return (Boolean, String) tuple of (exists, table_path)
142
*/
143
def isShortTableNameExists(short_table_name: String, table_namespace: String): (Boolean, String)
144
145
/**
146
* Get table path from short table name
147
* @param short_table_name Short table name
148
* @param table_namespace Table namespace
149
* @return String table path or null if not found
150
*/
151
def getTablePathFromShortTableName(short_table_name: String, table_namespace: String): String
152
153
/**
154
* Create new table with comprehensive configuration
155
* @param table_namespace Table namespace
156
* @param table_path Storage path for table
157
* @param short_table_name User-friendly table name
158
* @param table_id Unique table identifier
159
* @param table_schema JSON schema definition
160
* @param range_column Range partition column names
161
* @param hash_column Hash partition column names
162
* @param configuration Table configuration map
163
* @param bucket_num Number of hash buckets
164
*/
165
def createNewTable(table_namespace: String, table_path: String, short_table_name: String,
166
table_id: String, table_schema: String, range_column: String,
167
hash_column: String, configuration: Map[String, String], bucket_num: Int): Unit
168
169
/**
170
* List tables in specified namespaces
171
* @param namespace Array of namespace names to list from
172
* @return util.List[String] Java list of table paths
173
*/
174
def listTables(namespace: Array[String]): util.List[String]
175
176
/**
177
* Update table schema and configuration
178
* @param table_name Table name
179
* @param table_id Table ID
180
* @param table_schema New schema definition
181
* @param config Updated configuration
182
* @param new_read_version New read version
183
*/
184
def updateTableSchema(table_name: String, table_id: String, table_schema: String,
185
config: Map[String, String], new_read_version: Int): Unit
186
187
/**
188
* Delete table information
189
* @param table_name Table name
190
* @param table_id Table ID
191
* @param table_namespace Table namespace
192
*/
193
def deleteTableInfo(table_name: String, table_id: String, table_namespace: String): Unit
194
195
/**
196
* Get single partition information
197
* @param table_id Table ID
198
* @param range_value Partition range value
199
* @param range_id Partition range ID
200
* @return PartitionInfoScala partition information
201
*/
202
def getSinglePartitionInfo(table_id: String, range_value: String, range_id: String): PartitionInfoScala
203
204
/**
205
* Get partition information for specific version
206
* @param table_id Table ID
207
* @param range_value Partition range value
208
* @param version Specific version number
209
* @return Array[PartitionInfoScala] partition information array
210
*/
211
def getSinglePartitionInfoForVersion(table_id: String, range_value: String, version: Int): Array[PartitionInfoScala]
212
213
/**
214
* Get all versions of a partition
215
* @param table_id Table ID
216
* @param range_value Partition range value
217
* @return Array[PartitionInfoScala] all partition versions
218
*/
219
def getOnePartitionVersions(table_id: String, range_value: String): Array[PartitionInfoScala]
220
221
/**
222
* Get all partition information for table
223
* @param table_id Table ID
224
* @return Array[PartitionInfoScala] all partitions for table
225
*/
226
def getAllPartitionInfoScala(table_id: String): Array[PartitionInfoScala]
227
228
/**
229
* Convert Java PartitionInfo list to Scala array
230
* @param partitionList Java list of PartitionInfo
231
* @return Array[PartitionInfoScala] converted Scala array
232
*/
233
def convertPartitionInfoScala(partitionList: util.List[PartitionInfo]): Array[PartitionInfoScala]
234
235
/**
236
* Rollback partition to specific version
237
* @param table_id Table ID
238
* @param range_value Partition range value
239
* @param toVersion Target version to rollback to
240
*/
241
def rollbackPartitionInfoByVersion(table_id: String, range_value: String, toVersion: Int): Unit
242
243
/**
244
* Delete all partition information for table
245
* @param table_id Table ID
246
*/
247
def deletePartitionInfoByTableId(table_id: String): Unit
248
249
/**
250
* Delete specific partition information
251
* @param table_id Table ID
252
* @param range_value Partition range value
253
* @param range_id Partition range ID
254
*/
255
def deletePartitionInfoByRangeId(table_id: String, range_value: String, range_id: String): Unit
256
257
/**
258
* Get latest timestamp for partition
259
* @param table_id Table ID
260
* @param range_value Partition range value
261
* @return Long latest timestamp in milliseconds
262
*/
263
def getLastedTimestamp(table_id: String, range_value: String): Long
264
265
/**
266
* Get latest version up to specific time
267
* @param table_id Table ID
268
* @param range_value Partition range value
269
* @param utcMills Timestamp in UTC milliseconds
270
* @return Int latest version number
271
*/
272
def getLastedVersionUptoTime(table_id: String, range_value: String, utcMills: Long): Int
273
274
/**
275
* Clean metadata up to specific time and get files to delete
276
* @param table_id Table ID
277
* @param range_value Partition range value
278
* @param utcMills Timestamp in UTC milliseconds
279
* @return List[String] list of file paths to delete
280
*/
281
def cleanMetaUptoTime(table_id: String, range_value: String, utcMills: Long): List[String]
282
283
/**
284
* Clean all metadata (for testing)
285
*/
286
def cleanMeta(): Unit
287
}
288
```
289
290
### Scala Data Types
291
292
Scala-specific data types for functional programming with LakeSoul metadata.
293
294
```scala { .api }
295
/**
296
* Data transfer object for file information
297
* Immutable case class with functional operations
298
*/
299
case class DataFileInfo(
300
range_partitions: String,
301
path: String,
302
file_op: String,
303
size: Long,
304
modification_time: Long = -1L,
305
file_exist_cols: String = ""
306
) {
307
/**
308
* Get bucket ID from file path
309
* Uses lazy evaluation for performance
310
* @return Int bucket ID extracted from filename
311
*/
312
lazy val file_bucket_id: Int = BucketingUtils.getBucketId(new Path(path).getName)
313
.getOrElse(sys.error(s"Invalid bucket file $path"))
314
315
/**
316
* Get range version string combining partitions and columns
317
* @return String combined range and column information
318
*/
319
lazy val range_version: String = range_partitions + "-" + file_exist_cols
320
321
/**
322
* Create expired version of file info for cleanup
323
* @param deleteTime Timestamp when file should be deleted
324
* @return DataFileInfo new instance with updated modification time
325
*/
326
def expire(deleteTime: Long): DataFileInfo = this.copy(modification_time = deleteTime)
327
328
/**
329
* Hash code based on key fields for deduplication
330
* @return Int hash code
331
*/
332
override def hashCode(): Int = Objects.hash(range_partitions, path, file_op)
333
}
334
335
/**
336
* Scala representation of partition information
337
* Immutable case class for functional composition
338
*/
339
case class PartitionInfoScala(
340
table_id: String,
341
range_value: String,
342
version: Int = -1,
343
read_files: Array[UUID] = Array.empty[UUID],
344
expression: String = "",
345
commit_op: String = ""
346
) {
347
/**
348
* String representation of partition info
349
* @return String formatted partition information
350
*/
351
override def toString: String = {
352
s"partition info: {\ntable_name: $table_id,\nrange_value: $range_value}"
353
}
354
}
355
```
356
357
### BucketingUtils Object
358
359
Utility object for bucketed file handling with functional operations.
360
361
```scala { .api }
362
/**
363
* Utility functions for bucketed file handling
364
* Provides functional operations for bucket ID extraction
365
*/
366
object BucketingUtils {
367
// Regular expression for extracting bucket ID from filenames
368
private val bucketedFileName = """.*_(\d+)(?:\..*)?$""".r
369
370
/**
371
* Extract bucket ID from filename
372
* @param fileName Name of the bucketed file
373
* @return Option[Int] Some(bucketId) if valid bucketed file, None otherwise
374
*/
375
def getBucketId(fileName: String): Option[Int] = fileName match {
376
case bucketedFileName(bucketId) => Some(bucketId.toInt)
377
case _ => None
378
}
379
}
380
```
381
382
**Usage Examples:**
383
384
```scala
385
import com.dmetasoul.lakesoul.meta.{DataOperation, MetaVersion, LakeSoulOptions}
386
import java.util.UUID
387
import scala.collection.JavaConverters._
388
389
object ScalaFunctionalAPIExample {
390
391
def basicTableOperationsExample(): Unit = {
392
// Create namespace using functional API
393
MetaVersion.createNamespace("analytics")
394
395
// Check if table exists
396
val tableExists = MetaVersion.isTableExists("user_events")
397
if (!tableExists) {
398
// Create new table with Scala Map for configuration
399
val config = Map(
400
"format" -> "parquet",
401
"compression" -> "snappy",
402
"retention.days" -> "30"
403
)
404
405
MetaVersion.createNewTable(
406
table_namespace = "analytics",
407
table_path = "/data/analytics/user_events",
408
short_table_name = "user_events",
409
table_id = "tbl_user_events_001",
410
table_schema = """{"type":"struct","fields":[...]}""",
411
range_column = "date,hour",
412
hash_column = "user_id",
413
configuration = config,
414
bucket_num = 16
415
)
416
417
println("Table created successfully")
418
}
419
420
// List all namespaces
421
val namespaces = MetaVersion.listNamespaces()
422
println(s"Available namespaces: ${namespaces.mkString(", ")}")
423
424
// Get table path from short name
425
val tablePath = MetaVersion.getTablePathFromShortTableName("user_events", "analytics")
426
println(s"Table path: $tablePath")
427
}
428
429
def dataFileOperationsExample(): Unit = {
430
val tableId = "tbl_user_events_001"
431
432
// Get all data files for table
433
val allFiles = DataOperation.getTableDataInfo(tableId)
434
println(s"Total files: ${allFiles.length}")
435
436
// Process files functionally
437
val parquetFiles = allFiles
438
.filter(_.path.endsWith(".parquet"))
439
.filter(_.file_op == "add")
440
.sortBy(_.modification_time)
441
442
println(s"Parquet files: ${parquetFiles.length}")
443
444
// Get incremental data between timestamps
445
val startTime = System.currentTimeMillis() - 86400000L // 24 hours ago
446
val endTime = System.currentTimeMillis()
447
448
val incrementalFiles = DataOperation.getIncrementalPartitionDataInfo(
449
table_id = tableId,
450
partition_desc = "date=2023-01-01",
451
startTimestamp = startTime,
452
endTimestamp = endTime,
453
readType = LakeSoulOptions.ReadType.INCREMENTAL_READ
454
)
455
456
println(s"Incremental files: ${incrementalFiles.length}")
457
458
// Calculate total size of files
459
val totalSize = incrementalFiles.map(_.size).sum
460
val avgSize = if (incrementalFiles.nonEmpty) totalSize / incrementalFiles.length else 0L
461
462
println(s"Total size: $totalSize bytes, Average: $avgSize bytes")
463
}
464
465
def partitionOperationsExample(): Unit = {
466
val tableId = "tbl_user_events_001"
467
468
// Get all partitions for table
469
val partitions = MetaVersion.getAllPartitionInfoScala(tableId)
470
println(s"Total partitions: ${partitions.length}")
471
472
// Group partitions by commit operation
473
val partitionsByOp = partitions.groupBy(_.commit_op)
474
partitionsByOp.foreach { case (op, parts) =>
475
println(s"$op: ${parts.length} partitions")
476
}
477
478
// Find partitions with multiple versions
479
val partitionVersions = partitions.groupBy(_.range_value)
480
val multiVersionPartitions = partitionVersions.filter(_._2.length > 1)
481
482
println(s"Multi-version partitions: ${multiVersionPartitions.size}")
483
484
multiVersionPartitions.foreach { case (rangeValue, versions) =>
485
val sortedVersions = versions.sortBy(_.version)
486
val latestVersion = sortedVersions.last.version
487
val oldestVersion = sortedVersions.head.version
488
489
println(s"Partition $rangeValue: versions $oldestVersion to $latestVersion")
490
491
// Get specific version details
492
val versionDetails = MetaVersion.getSinglePartitionInfoForVersion(
493
tableId, rangeValue, latestVersion
494
)
495
println(s" Latest version has ${versionDetails.length} entries")
496
}
497
}
498
499
def functionalDataProcessingExample(): Unit = {
500
val tableId = "tbl_user_events_001"
501
502
// Get data files and process functionally
503
val dataFiles = DataOperation.getTableDataInfo(tableId)
504
505
// Functional pipeline for data processing
506
val processedFiles = dataFiles
507
.filter(_.file_op == "add") // Only added files
508
.filter(_.size > 1024) // Minimum size filter
509
.map(file => file.copy(path = file.path.toLowerCase)) // Normalize paths
510
.groupBy(_.range_partitions) // Group by partition
511
.mapValues(files => files.sortBy(_.modification_time)) // Sort by time
512
.mapValues(files => files.take(10)) // Take latest 10 per partition
513
514
processedFiles.foreach { case (partition, files) =>
515
println(s"Partition $partition:")
516
files.foreach { file =>
517
val bucketId = BucketingUtils.getBucketId(new Path(file.path).getName)
518
println(s" File: ${file.path}, Size: ${file.size}, Bucket: ${bucketId.getOrElse("N/A")}")
519
}
520
}
521
522
// Calculate statistics
523
val stats = dataFiles.foldLeft((0, 0L, 0L)) { case ((count, totalSize, maxSize), file) =>
524
(count + 1, totalSize + file.size, math.max(maxSize, file.size))
525
}
526
527
val (fileCount, totalSize, maxSize) = stats
528
println(s"Statistics: $fileCount files, ${totalSize / 1024 / 1024}MB total, ${maxSize / 1024 / 1024}MB max")
529
}
530
531
def timeBasedOperationsExample(): Unit = {
532
val tableId = "tbl_user_events_001"
533
val partitionDesc = "date=2023-01-01"
534
535
// Get latest timestamp for partition
536
val latestTimestamp = MetaVersion.getLastedTimestamp(tableId, partitionDesc)
537
println(s"Latest timestamp: $latestTimestamp")
538
539
// Get version at specific time
540
val specificTime = System.currentTimeMillis() - 3600000L // 1 hour ago
541
val versionAtTime = MetaVersion.getLastedVersionUptoTime(tableId, partitionDesc, specificTime)
542
println(s"Version at time $specificTime: $versionAtTime")
543
544
// Clean old metadata and get files to delete
545
val cleanupTime = System.currentTimeMillis() - 7 * 86400000L // 7 days ago
546
val filesToDelete = MetaVersion.cleanMetaUptoTime(tableId, partitionDesc, cleanupTime)
547
548
println(s"Files to delete: ${filesToDelete.length}")
549
filesToDelete.take(5).foreach(println) // Show first 5
550
551
// Process cleanup with functional operations
552
val deletesByExtension = filesToDelete
553
.groupBy(path => path.substring(path.lastIndexOf('.') + 1))
554
.mapValues(_.size)
555
556
println("Files to delete by extension:")
557
deletesByExtension.foreach { case (ext, count) =>
558
println(s" .$ext: $count files")
559
}
560
}
561
562
def errorHandlingExample(): Unit = {
563
import scala.util.{Try, Success, Failure}
564
565
// Functional error handling with Try
566
val tableResult = Try {
567
MetaVersion.isTableExists("non_existent_table")
568
}
569
570
tableResult match {
571
case Success(exists) =>
572
println(s"Table check completed: $exists")
573
case Failure(exception) =>
574
println(s"Table check failed: ${exception.getMessage}")
575
}
576
577
// Chain operations with error handling
578
val chainedOperation = for {
579
namespaces <- Try(MetaVersion.listNamespaces())
580
firstNamespace = namespaces.headOption.getOrElse("default")
581
tables <- Try(MetaVersion.listTables(Array(firstNamespace)))
582
} yield (firstNamespace, tables.asScala.toList)
583
584
chainedOperation match {
585
case Success((namespace, tables)) =>
586
println(s"Namespace: $namespace, Tables: ${tables.length}")
587
case Failure(exception) =>
588
println(s"Chained operation failed: ${exception.getMessage}")
589
}
590
}
591
592
def immutableDataExample(): Unit = {
593
// Working with immutable data structures
594
val originalFile = DataFileInfo(
595
range_partitions = "date=2023-01-01",
596
path = "/data/file001.parquet",
597
file_op = "add",
598
size = 1024000L,
599
modification_time = System.currentTimeMillis(),
600
file_exist_cols = "[\"col1\",\"col2\"]"
601
)
602
603
// Create variations using copy
604
val expiredFile = originalFile.expire(System.currentTimeMillis() + 86400000L)
605
val renamedFile = originalFile.copy(path = "/data/renamed_file001.parquet")
606
607
println(s"Original: ${originalFile.path}")
608
println(s"Expired: ${expiredFile.modification_time}")
609
println(s"Renamed: ${renamedFile.path}")
610
611
// Functional composition with case classes
612
val partitionInfo = PartitionInfoScala(
613
table_id = "tbl_001",
614
range_value = "date=2023-01-01",
615
version = 1,
616
read_files = Array(UUID.randomUUID(), UUID.randomUUID()),
617
expression = "date >= '2023-01-01'",
618
commit_op = "AppendCommit"
619
)
620
621
// Create new version
622
val nextVersion = partitionInfo.copy(
623
version = partitionInfo.version + 1,
624
read_files = partitionInfo.read_files :+ UUID.randomUUID(),
625
commit_op = "CompactionCommit"
626
)
627
628
println(s"Original version: ${partitionInfo.version}")
629
println(s"Next version: ${nextVersion.version}")
630
println(s"Files: ${partitionInfo.read_files.length} -> ${nextVersion.read_files.length}")
631
}
632
}
633
```
634
635
**Functional Programming Patterns:**
636
637
The Scala API embraces functional programming principles:
638
639
1. **Immutable Data Structures**: All case classes are immutable with copy methods for modifications
640
2. **Pure Functions**: Most operations are side-effect free and return new instances
641
3. **Functional Composition**: Operations can be chained and composed easily
642
4. **Option Types**: Safe handling of nullable values with Option[T]
643
5. **Collection Operations**: Rich collection API with map, filter, fold, etc.
644
6. **Pattern Matching**: Extensive use of pattern matching for type-safe operations
645
7. **Lazy Evaluation**: Lazy vals for expensive computations
646
8. **Higher-Order Functions**: Functions that take other functions as parameters
647
648
**Type Safety:**
649
650
The Scala API provides compile-time type safety:
651
652
- **Strong Typing**: All operations are strongly typed with compile-time checking
653
- **Generic Types**: Type-safe collections and operations
654
- **Case Class Validation**: Automatic equals, hashCode, and toString implementations
655
- **Pattern Matching**: Exhaustive pattern matching with compiler warnings
656
- **Implicit Conversions**: Safe conversions between Java and Scala types
657
658
**Performance Characteristics:**
659
660
The Scala API maintains high performance:
661
662
- **Zero-Copy Operations**: Immutable data structures use structural sharing
663
- **Lazy Evaluation**: Expensive operations are computed only when needed
664
- **Collection Optimization**: Scala collections are optimized for functional operations
665
- **JVM Integration**: Seamless integration with Java components
666
- **Memory Efficient**: Case classes and collections minimize memory allocation