or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

authorization-security.mdconfiguration-management.mddatabase-connection.mdentity-models.mdindex.mdmetadata-management.mdnative-operations.mdscala-functional-api.md

scala-functional-api.mddocs/

0

# Scala Functional API

1

2

The Scala Functional API provides high-level Scala objects for functional programming style operations with data file management and metadata operations. This API offers a more idiomatic Scala interface to LakeSoul's metadata system with immutable data structures, functional composition, and type-safe operations.

3

4

## Capabilities

5

6

### DataOperation Object

7

8

High-level data operations and file management with functional programming patterns.

9

10

```scala { .api }

11

/**

12

* High-level data operations and file management

13

* Provides functional programming interface for data file operations

14

*/

15

object DataOperation {

16

/** Database manager instance for metadata operations */

17

val dbManager = new DBManager

18

19

/**

20

* Get all data file information for a table

21

* @param tableId Unique table identifier

22

* @return Array[DataFileInfo] array of file information objects

23

*/

24

def getTableDataInfo(tableId: String): Array[DataFileInfo]

25

26

/**

27

* Get data file information for specific partitions

28

* @param partitionList Java list of PartitionInfo objects

29

* @return Array[DataFileInfo] array of file information objects

30

*/

31

def getTableDataInfo(partitionList: util.List[PartitionInfo]): Array[DataFileInfo]

32

33

/**

34

* Get data file information for partition array

35

* @param partition_info_arr Array of PartitionInfoScala objects

36

* @return Array[DataFileInfo] array of file information objects

37

*/

38

def getTableDataInfo(partition_info_arr: Array[PartitionInfoScala]): Array[DataFileInfo]

39

40

/**

41

* Get data file information for table with partition filtering

42

* @param tableId Unique table identifier

43

* @param partitions List of partition values to filter by

44

* @return Array[DataFileInfo] filtered array of file information

45

*/

46

def getTableDataInfo(tableId: String, partitions: List[String]): Array[DataFileInfo]

47

48

/**

49

* Get incremental data file information between timestamps

50

* @param table_id Unique table identifier

51

* @param partition_desc Partition description (empty for all partitions)

52

* @param startTimestamp Start timestamp in milliseconds

53

* @param endTimestamp End timestamp in milliseconds (0 for current time)

54

* @param readType Type of read operation (snapshot, incremental, etc.)

55

* @return Array[DataFileInfo] incremental file information

56

*/

57

def getIncrementalPartitionDataInfo(table_id: String, partition_desc: String,

58

startTimestamp: Long, endTimestamp: Long,

59

readType: String): Array[DataFileInfo]

60

61

/**

62

* Get data file information for single partition

63

* @param partition_info PartitionInfoScala object with partition details

64

* @return ArrayBuffer[DataFileInfo] mutable buffer of file information

65

*/

66

def getSinglePartitionDataInfo(partition_info: PartitionInfoScala): ArrayBuffer[DataFileInfo]

67

68

/**

69

* Drop data commit information for specific commit

70

* @param table_id Unique table identifier

71

* @param range Partition range/description

72

* @param commit_id UUID of commit to drop

73

*/

74

def dropDataInfoData(table_id: String, range: String, commit_id: UUID): Unit

75

76

/**

77

* Drop all data commit information for table

78

* @param table_id Unique table identifier

79

*/

80

def dropDataInfoData(table_id: String): Unit

81

}

82

```

83

84

### MetaVersion Object

85

86

High-level metadata version management and table operations with functional API patterns.

87

88

```scala { .api }

89

/**

90

* High-level metadata version management and table operations

91

* Provides Scala-idiomatic interface for metadata management

92

*/

93

object MetaVersion {

94

/** Database manager instance for metadata operations */

95

val dbManager = new DBManager()

96

97

/**

98

* Create new namespace

99

* @param namespace Namespace name to create

100

*/

101

def createNamespace(namespace: String): Unit

102

103

/**

104

* List all available namespaces

105

* @return Array[String] array of namespace names

106

*/

107

def listNamespaces(): Array[String]

108

109

/**

110

* Check if namespace exists

111

* @param table_namespace Namespace name to check

112

* @return Boolean true if namespace exists, false otherwise

113

*/

114

def isNamespaceExists(table_namespace: String): Boolean

115

116

/**

117

* Drop namespace by name

118

* @param namespace Namespace name to drop

119

*/

120

def dropNamespaceByNamespace(namespace: String): Unit

121

122

/**

123

* Check if table exists by name (default namespace)

124

* @param table_name Table name to check

125

* @return Boolean true if table exists, false otherwise

126

*/

127

def isTableExists(table_name: String): Boolean

128

129

/**

130

* Check if specific table ID exists for table

131

* @param table_name Table name

132

* @param table_id Table ID to verify

133

* @return Boolean true if table ID matches, false otherwise

134

*/

135

def isTableIdExists(table_name: String, table_id: String): Boolean

136

137

/**

138

* Check if short table name exists in namespace

139

* @param short_table_name Short table name

140

* @param table_namespace Table namespace

141

* @return (Boolean, String) tuple of (exists, table_path)

142

*/

143

def isShortTableNameExists(short_table_name: String, table_namespace: String): (Boolean, String)

144

145

/**

146

* Get table path from short table name

147

* @param short_table_name Short table name

148

* @param table_namespace Table namespace

149

* @return String table path or null if not found

150

*/

151

def getTablePathFromShortTableName(short_table_name: String, table_namespace: String): String

152

153

/**

154

* Create new table with comprehensive configuration

155

* @param table_namespace Table namespace

156

* @param table_path Storage path for table

157

* @param short_table_name User-friendly table name

158

* @param table_id Unique table identifier

159

* @param table_schema JSON schema definition

160

* @param range_column Range partition column names

161

* @param hash_column Hash partition column names

162

* @param configuration Table configuration map

163

* @param bucket_num Number of hash buckets

164

*/

165

def createNewTable(table_namespace: String, table_path: String, short_table_name: String,

166

table_id: String, table_schema: String, range_column: String,

167

hash_column: String, configuration: Map[String, String], bucket_num: Int): Unit

168

169

/**

170

* List tables in specified namespaces

171

* @param namespace Array of namespace names to list from

172

* @return util.List[String] Java list of table paths

173

*/

174

def listTables(namespace: Array[String]): util.List[String]

175

176

/**

177

* Update table schema and configuration

178

* @param table_name Table name

179

* @param table_id Table ID

180

* @param table_schema New schema definition

181

* @param config Updated configuration

182

* @param new_read_version New read version

183

*/

184

def updateTableSchema(table_name: String, table_id: String, table_schema: String,

185

config: Map[String, String], new_read_version: Int): Unit

186

187

/**

188

* Delete table information

189

* @param table_name Table name

190

* @param table_id Table ID

191

* @param table_namespace Table namespace

192

*/

193

def deleteTableInfo(table_name: String, table_id: String, table_namespace: String): Unit

194

195

/**

196

* Get single partition information

197

* @param table_id Table ID

198

* @param range_value Partition range value

199

* @param range_id Partition range ID

200

* @return PartitionInfoScala partition information

201

*/

202

def getSinglePartitionInfo(table_id: String, range_value: String, range_id: String): PartitionInfoScala

203

204

/**

205

* Get partition information for specific version

206

* @param table_id Table ID

207

* @param range_value Partition range value

208

* @param version Specific version number

209

* @return Array[PartitionInfoScala] partition information array

210

*/

211

def getSinglePartitionInfoForVersion(table_id: String, range_value: String, version: Int): Array[PartitionInfoScala]

212

213

/**

214

* Get all versions of a partition

215

* @param table_id Table ID

216

* @param range_value Partition range value

217

* @return Array[PartitionInfoScala] all partition versions

218

*/

219

def getOnePartitionVersions(table_id: String, range_value: String): Array[PartitionInfoScala]

220

221

/**

222

* Get all partition information for table

223

* @param table_id Table ID

224

* @return Array[PartitionInfoScala] all partitions for table

225

*/

226

def getAllPartitionInfoScala(table_id: String): Array[PartitionInfoScala]

227

228

/**

229

* Convert Java PartitionInfo list to Scala array

230

* @param partitionList Java list of PartitionInfo

231

* @return Array[PartitionInfoScala] converted Scala array

232

*/

233

def convertPartitionInfoScala(partitionList: util.List[PartitionInfo]): Array[PartitionInfoScala]

234

235

/**

236

* Rollback partition to specific version

237

* @param table_id Table ID

238

* @param range_value Partition range value

239

* @param toVersion Target version to rollback to

240

*/

241

def rollbackPartitionInfoByVersion(table_id: String, range_value: String, toVersion: Int): Unit

242

243

/**

244

* Delete all partition information for table

245

* @param table_id Table ID

246

*/

247

def deletePartitionInfoByTableId(table_id: String): Unit

248

249

/**

250

* Delete specific partition information

251

* @param table_id Table ID

252

* @param range_value Partition range value

253

* @param range_id Partition range ID

254

*/

255

def deletePartitionInfoByRangeId(table_id: String, range_value: String, range_id: String): Unit

256

257

/**

258

* Get latest timestamp for partition

259

* @param table_id Table ID

260

* @param range_value Partition range value

261

* @return Long latest timestamp in milliseconds

262

*/

263

def getLastedTimestamp(table_id: String, range_value: String): Long

264

265

/**

266

* Get latest version up to specific time

267

* @param table_id Table ID

268

* @param range_value Partition range value

269

* @param utcMills Timestamp in UTC milliseconds

270

* @return Int latest version number

271

*/

272

def getLastedVersionUptoTime(table_id: String, range_value: String, utcMills: Long): Int

273

274

/**

275

* Clean metadata up to specific time and get files to delete

276

* @param table_id Table ID

277

* @param range_value Partition range value

278

* @param utcMills Timestamp in UTC milliseconds

279

* @return List[String] list of file paths to delete

280

*/

281

def cleanMetaUptoTime(table_id: String, range_value: String, utcMills: Long): List[String]

282

283

/**

284

* Clean all metadata (for testing)

285

*/

286

def cleanMeta(): Unit

287

}

288

```

289

290

### Scala Data Types

291

292

Scala-specific data types for functional programming with LakeSoul metadata.

293

294

```scala { .api }

295

/**

296

* Data transfer object for file information

297

* Immutable case class with functional operations

298

*/

299

case class DataFileInfo(

300

range_partitions: String,

301

path: String,

302

file_op: String,

303

size: Long,

304

modification_time: Long = -1L,

305

file_exist_cols: String = ""

306

) {

307

/**

308

* Get bucket ID from file path

309

* Uses lazy evaluation for performance

310

* @return Int bucket ID extracted from filename

311

*/

312

lazy val file_bucket_id: Int = BucketingUtils.getBucketId(new Path(path).getName)

313

.getOrElse(sys.error(s"Invalid bucket file $path"))

314

315

/**

316

* Get range version string combining partitions and columns

317

* @return String combined range and column information

318

*/

319

lazy val range_version: String = range_partitions + "-" + file_exist_cols

320

321

/**

322

* Create expired version of file info for cleanup

323

* @param deleteTime Timestamp when file should be deleted

324

* @return DataFileInfo new instance with updated modification time

325

*/

326

def expire(deleteTime: Long): DataFileInfo = this.copy(modification_time = deleteTime)

327

328

/**

329

* Hash code based on key fields for deduplication

330

* @return Int hash code

331

*/

332

override def hashCode(): Int = Objects.hash(range_partitions, path, file_op)

333

}

334

335

/**

336

* Scala representation of partition information

337

* Immutable case class for functional composition

338

*/

339

case class PartitionInfoScala(

340

table_id: String,

341

range_value: String,

342

version: Int = -1,

343

read_files: Array[UUID] = Array.empty[UUID],

344

expression: String = "",

345

commit_op: String = ""

346

) {

347

/**

348

* String representation of partition info

349

* @return String formatted partition information

350

*/

351

override def toString: String = {

352

s"partition info: {\ntable_name: $table_id,\nrange_value: $range_value}"

353

}

354

}

355

```

356

357

### BucketingUtils Object

358

359

Utility object for bucketed file handling with functional operations.

360

361

```scala { .api }

362

/**

363

* Utility functions for bucketed file handling

364

* Provides functional operations for bucket ID extraction

365

*/

366

object BucketingUtils {

367

// Regular expression for extracting bucket ID from filenames

368

private val bucketedFileName = """.*_(\d+)(?:\..*)?$""".r

369

370

/**

371

* Extract bucket ID from filename

372

* @param fileName Name of the bucketed file

373

* @return Option[Int] Some(bucketId) if valid bucketed file, None otherwise

374

*/

375

def getBucketId(fileName: String): Option[Int] = fileName match {

376

case bucketedFileName(bucketId) => Some(bucketId.toInt)

377

case _ => None

378

}

379

}

380

```

381

382

**Usage Examples:**

383

384

```scala

385

import com.dmetasoul.lakesoul.meta.{DataOperation, MetaVersion, LakeSoulOptions}

386

import java.util.UUID

387

import scala.collection.JavaConverters._

388

389

object ScalaFunctionalAPIExample {

390

391

def basicTableOperationsExample(): Unit = {

392

// Create namespace using functional API

393

MetaVersion.createNamespace("analytics")

394

395

// Check if table exists

396

val tableExists = MetaVersion.isTableExists("user_events")

397

if (!tableExists) {

398

// Create new table with Scala Map for configuration

399

val config = Map(

400

"format" -> "parquet",

401

"compression" -> "snappy",

402

"retention.days" -> "30"

403

)

404

405

MetaVersion.createNewTable(

406

table_namespace = "analytics",

407

table_path = "/data/analytics/user_events",

408

short_table_name = "user_events",

409

table_id = "tbl_user_events_001",

410

table_schema = """{"type":"struct","fields":[...]}""",

411

range_column = "date,hour",

412

hash_column = "user_id",

413

configuration = config,

414

bucket_num = 16

415

)

416

417

println("Table created successfully")

418

}

419

420

// List all namespaces

421

val namespaces = MetaVersion.listNamespaces()

422

println(s"Available namespaces: ${namespaces.mkString(", ")}")

423

424

// Get table path from short name

425

val tablePath = MetaVersion.getTablePathFromShortTableName("user_events", "analytics")

426

println(s"Table path: $tablePath")

427

}

428

429

def dataFileOperationsExample(): Unit = {

430

val tableId = "tbl_user_events_001"

431

432

// Get all data files for table

433

val allFiles = DataOperation.getTableDataInfo(tableId)

434

println(s"Total files: ${allFiles.length}")

435

436

// Process files functionally

437

val parquetFiles = allFiles

438

.filter(_.path.endsWith(".parquet"))

439

.filter(_.file_op == "add")

440

.sortBy(_.modification_time)

441

442

println(s"Parquet files: ${parquetFiles.length}")

443

444

// Get incremental data between timestamps

445

val startTime = System.currentTimeMillis() - 86400000L // 24 hours ago

446

val endTime = System.currentTimeMillis()

447

448

val incrementalFiles = DataOperation.getIncrementalPartitionDataInfo(

449

table_id = tableId,

450

partition_desc = "date=2023-01-01",

451

startTimestamp = startTime,

452

endTimestamp = endTime,

453

readType = LakeSoulOptions.ReadType.INCREMENTAL_READ

454

)

455

456

println(s"Incremental files: ${incrementalFiles.length}")

457

458

// Calculate total size of files

459

val totalSize = incrementalFiles.map(_.size).sum

460

val avgSize = if (incrementalFiles.nonEmpty) totalSize / incrementalFiles.length else 0L

461

462

println(s"Total size: $totalSize bytes, Average: $avgSize bytes")

463

}

464

465

def partitionOperationsExample(): Unit = {

466

val tableId = "tbl_user_events_001"

467

468

// Get all partitions for table

469

val partitions = MetaVersion.getAllPartitionInfoScala(tableId)

470

println(s"Total partitions: ${partitions.length}")

471

472

// Group partitions by commit operation

473

val partitionsByOp = partitions.groupBy(_.commit_op)

474

partitionsByOp.foreach { case (op, parts) =>

475

println(s"$op: ${parts.length} partitions")

476

}

477

478

// Find partitions with multiple versions

479

val partitionVersions = partitions.groupBy(_.range_value)

480

val multiVersionPartitions = partitionVersions.filter(_._2.length > 1)

481

482

println(s"Multi-version partitions: ${multiVersionPartitions.size}")

483

484

multiVersionPartitions.foreach { case (rangeValue, versions) =>

485

val sortedVersions = versions.sortBy(_.version)

486

val latestVersion = sortedVersions.last.version

487

val oldestVersion = sortedVersions.head.version

488

489

println(s"Partition $rangeValue: versions $oldestVersion to $latestVersion")

490

491

// Get specific version details

492

val versionDetails = MetaVersion.getSinglePartitionInfoForVersion(

493

tableId, rangeValue, latestVersion

494

)

495

println(s" Latest version has ${versionDetails.length} entries")

496

}

497

}

498

499

def functionalDataProcessingExample(): Unit = {

500

val tableId = "tbl_user_events_001"

501

502

// Get data files and process functionally

503

val dataFiles = DataOperation.getTableDataInfo(tableId)

504

505

// Functional pipeline for data processing

506

val processedFiles = dataFiles

507

.filter(_.file_op == "add") // Only added files

508

.filter(_.size > 1024) // Minimum size filter

509

.map(file => file.copy(path = file.path.toLowerCase)) // Normalize paths

510

.groupBy(_.range_partitions) // Group by partition

511

.mapValues(files => files.sortBy(_.modification_time)) // Sort by time

512

.mapValues(files => files.take(10)) // Take latest 10 per partition

513

514

processedFiles.foreach { case (partition, files) =>

515

println(s"Partition $partition:")

516

files.foreach { file =>

517

val bucketId = BucketingUtils.getBucketId(new Path(file.path).getName)

518

println(s" File: ${file.path}, Size: ${file.size}, Bucket: ${bucketId.getOrElse("N/A")}")

519

}

520

}

521

522

// Calculate statistics

523

val stats = dataFiles.foldLeft((0, 0L, 0L)) { case ((count, totalSize, maxSize), file) =>

524

(count + 1, totalSize + file.size, math.max(maxSize, file.size))

525

}

526

527

val (fileCount, totalSize, maxSize) = stats

528

println(s"Statistics: $fileCount files, ${totalSize / 1024 / 1024}MB total, ${maxSize / 1024 / 1024}MB max")

529

}

530

531

def timeBasedOperationsExample(): Unit = {

532

val tableId = "tbl_user_events_001"

533

val partitionDesc = "date=2023-01-01"

534

535

// Get latest timestamp for partition

536

val latestTimestamp = MetaVersion.getLastedTimestamp(tableId, partitionDesc)

537

println(s"Latest timestamp: $latestTimestamp")

538

539

// Get version at specific time

540

val specificTime = System.currentTimeMillis() - 3600000L // 1 hour ago

541

val versionAtTime = MetaVersion.getLastedVersionUptoTime(tableId, partitionDesc, specificTime)

542

println(s"Version at time $specificTime: $versionAtTime")

543

544

// Clean old metadata and get files to delete

545

val cleanupTime = System.currentTimeMillis() - 7 * 86400000L // 7 days ago

546

val filesToDelete = MetaVersion.cleanMetaUptoTime(tableId, partitionDesc, cleanupTime)

547

548

println(s"Files to delete: ${filesToDelete.length}")

549

filesToDelete.take(5).foreach(println) // Show first 5

550

551

// Process cleanup with functional operations

552

val deletesByExtension = filesToDelete

553

.groupBy(path => path.substring(path.lastIndexOf('.') + 1))

554

.mapValues(_.size)

555

556

println("Files to delete by extension:")

557

deletesByExtension.foreach { case (ext, count) =>

558

println(s" .$ext: $count files")

559

}

560

}

561

562

def errorHandlingExample(): Unit = {

563

import scala.util.{Try, Success, Failure}

564

565

// Functional error handling with Try

566

val tableResult = Try {

567

MetaVersion.isTableExists("non_existent_table")

568

}

569

570

tableResult match {

571

case Success(exists) =>

572

println(s"Table check completed: $exists")

573

case Failure(exception) =>

574

println(s"Table check failed: ${exception.getMessage}")

575

}

576

577

// Chain operations with error handling

578

val chainedOperation = for {

579

namespaces <- Try(MetaVersion.listNamespaces())

580

firstNamespace = namespaces.headOption.getOrElse("default")

581

tables <- Try(MetaVersion.listTables(Array(firstNamespace)))

582

} yield (firstNamespace, tables.asScala.toList)

583

584

chainedOperation match {

585

case Success((namespace, tables)) =>

586

println(s"Namespace: $namespace, Tables: ${tables.length}")

587

case Failure(exception) =>

588

println(s"Chained operation failed: ${exception.getMessage}")

589

}

590

}

591

592

def immutableDataExample(): Unit = {

593

// Working with immutable data structures

594

val originalFile = DataFileInfo(

595

range_partitions = "date=2023-01-01",

596

path = "/data/file001.parquet",

597

file_op = "add",

598

size = 1024000L,

599

modification_time = System.currentTimeMillis(),

600

file_exist_cols = "[\"col1\",\"col2\"]"

601

)

602

603

// Create variations using copy

604

val expiredFile = originalFile.expire(System.currentTimeMillis() + 86400000L)

605

val renamedFile = originalFile.copy(path = "/data/renamed_file001.parquet")

606

607

println(s"Original: ${originalFile.path}")

608

println(s"Expired: ${expiredFile.modification_time}")

609

println(s"Renamed: ${renamedFile.path}")

610

611

// Functional composition with case classes

612

val partitionInfo = PartitionInfoScala(

613

table_id = "tbl_001",

614

range_value = "date=2023-01-01",

615

version = 1,

616

read_files = Array(UUID.randomUUID(), UUID.randomUUID()),

617

expression = "date >= '2023-01-01'",

618

commit_op = "AppendCommit"

619

)

620

621

// Create new version

622

val nextVersion = partitionInfo.copy(

623

version = partitionInfo.version + 1,

624

read_files = partitionInfo.read_files :+ UUID.randomUUID(),

625

commit_op = "CompactionCommit"

626

)

627

628

println(s"Original version: ${partitionInfo.version}")

629

println(s"Next version: ${nextVersion.version}")

630

println(s"Files: ${partitionInfo.read_files.length} -> ${nextVersion.read_files.length}")

631

}

632

}

633

```

634

635

**Functional Programming Patterns:**

636

637

The Scala API embraces functional programming principles:

638

639

1. **Immutable Data Structures**: All case classes are immutable with copy methods for modifications

640

2. **Pure Functions**: Most operations are side-effect free and return new instances

641

3. **Functional Composition**: Operations can be chained and composed easily

642

4. **Option Types**: Safe handling of nullable values with Option[T]

643

5. **Collection Operations**: Rich collection API with map, filter, fold, etc.

644

6. **Pattern Matching**: Extensive use of pattern matching for type-safe operations

645

7. **Lazy Evaluation**: Lazy vals for expensive computations

646

8. **Higher-Order Functions**: Functions that take other functions as parameters

647

648

**Type Safety:**

649

650

The Scala API provides compile-time type safety:

651

652

- **Strong Typing**: All operations are strongly typed with compile-time checking

653

- **Generic Types**: Type-safe collections and operations

654

- **Case Class Validation**: Automatic equals, hashCode, and toString implementations

655

- **Pattern Matching**: Exhaustive pattern matching with compiler warnings

656

- **Implicit Conversions**: Safe conversions between Java and Scala types

657

658

**Performance Characteristics:**

659

660

The Scala API maintains high performance:

661

662

- **Zero-Copy Operations**: Immutable data structures use structural sharing

663

- **Lazy Evaluation**: Expensive operations are computed only when needed

664

- **Collection Optimization**: Scala collections are optimized for functional operations

665

- **JVM Integration**: Seamless integration with Java components

666

- **Memory Efficient**: Case classes and collections minimize memory allocation