or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

algorithm-operators.mdenvironment-management.mdindex.mdlinear-algebra.mdparameter-system.mdpipeline-base-classes.mdpipeline-framework.mdutility-libraries.md

utility-libraries.mddocs/

0

# Utility Libraries

1

2

Comprehensive utility libraries for table operations, vector parsing, data conversion, and statistical operations. Essential support classes for common ML operations and data processing tasks.

3

4

## Capabilities

5

6

### TableUtil Class

7

8

Comprehensive utility class for table content and schema operations.

9

10

```java { .api }

11

/**

12

* Utility class for table content and schema operations

13

* Provides validation, column manipulation, and formatting functions

14

*/

15

public class TableUtil {

16

17

// Table and column utilities

18

19

/** Generate unique temporary table name */

20

public static String getTempTableName();

21

22

/** Find column index in string array */

23

public static int findColIndex(String[] tableCols, String targetCol);

24

25

/** Find column index in table schema */

26

public static int findColIndex(TableSchema tableSchema, String targetCol);

27

28

/** Find multiple column indices in string array */

29

public static int[] findColIndices(String[] tableCols, String[] targetCols);

30

31

/** Find multiple column indices in table schema */

32

public static int[] findColIndices(TableSchema tableSchema, String[] targetCols);

33

34

/** Find column types for specified columns */

35

public static TypeInformation<?>[] findColTypes(TableSchema tableSchema, String[] targetCols);

36

37

/** Find single column type */

38

public static TypeInformation<?> findColType(TableSchema tableSchema, String targetCol);

39

40

// Type checking utilities

41

42

/** Check if data type is supported numeric type */

43

public static boolean isSupportedNumericType(TypeInformation<?> dataType);

44

45

/** Check if data type is string type */

46

public static boolean isString(TypeInformation<?> dataType);

47

48

/** Check if data type is vector type */

49

public static boolean isVector(TypeInformation<?> dataType);

50

51

// Column validation utilities

52

53

/** Assert that selected columns exist in table */

54

public static void assertSelectedColExist(String[] tableCols, String... selectedCols);

55

56

/** Assert that selected columns are numerical */

57

public static void assertNumericalCols(TableSchema tableSchema, String... selectedCols);

58

59

/** Assert that selected columns are string type */

60

public static void assertStringCols(TableSchema tableSchema, String... selectedCols);

61

62

/** Assert that selected columns are vector type */

63

public static void assertVectorCols(TableSchema tableSchema, String... selectedCols);

64

65

// Column filtering utilities

66

67

/** Get all string columns from schema */

68

public static String[] getStringCols(TableSchema tableSchema);

69

70

/** Get string columns excluding specified columns */

71

public static String[] getStringCols(TableSchema tableSchema, String[] excludeCols);

72

73

/** Get all numeric columns from schema */

74

public static String[] getNumericCols(TableSchema tableSchema);

75

76

/** Get numeric columns excluding specified columns */

77

public static String[] getNumericCols(TableSchema tableSchema, String[] excludeCols);

78

79

/** Get categorical columns from feature columns */

80

public static String[] getCategoricalCols(TableSchema tableSchema,

81

String[] featureCols,

82

String[] categoricalCols);

83

84

// Formatting utilities

85

86

/** Format column names as markdown table header */

87

public static String formatTitle(String[] colNames);

88

89

/** Format table row as markdown */

90

public static String formatRows(Row row);

91

92

/** Format table data as markdown table */

93

public static String format(String[] colNames, List<Row> data);

94

95

/** Convert column names to SQL SELECT clause */

96

public static String columnsToSqlClause(String[] colNames);

97

}

98

```

99

100

**Usage Examples:**

101

102

```java

103

import org.apache.flink.ml.common.utils.TableUtil;

104

import org.apache.flink.table.api.TableSchema;

105

import org.apache.flink.types.Row;

106

107

// Schema validation

108

TableSchema schema = // ... your table schema

109

String targetCol = "features";

110

111

// Check if column exists and get its type

112

int colIndex = TableUtil.findColIndex(schema, targetCol);

113

TypeInformation<?> colType = TableUtil.findColType(schema, targetCol);

114

115

// Validate column types

116

TableUtil.assertNumericalCols(schema, "age", "salary", "score");

117

TableUtil.assertVectorCols(schema, "features", "embeddings");

118

119

// Get columns by type

120

String[] numericCols = TableUtil.getNumericCols(schema);

121

String[] stringCols = TableUtil.getStringCols(schema, new String[]{"id"}); // Exclude 'id'

122

123

// Type checking

124

boolean isNumeric = TableUtil.isSupportedNumericType(colType);

125

boolean isVector = TableUtil.isVector(colType);

126

127

// Format table data for display

128

String[] headers = {"Name", "Age", "Score"};

129

List<Row> data = Arrays.asList(

130

Row.of("Alice", 25, 95.0),

131

Row.of("Bob", 30, 87.5)

132

);

133

String markdown = TableUtil.format(headers, data);

134

System.out.println(markdown);

135

/*

136

| Name | Age | Score |

137

|------|-----|-------|

138

| Alice | 25 | 95.0 |

139

| Bob | 30 | 87.5 |

140

*/

141

```

142

143

### VectorTypes Class

144

145

Constants class providing built-in vector type information for Flink's type system.

146

147

```java { .api }

148

/**

149

* Built-in vector type information constants

150

* Used for table schema definition and type checking

151

*/

152

public class VectorTypes {

153

154

/** Type information for DenseVector */

155

public static final TypeInformation<DenseVector> DENSE_VECTOR;

156

157

/** Type information for SparseVector */

158

public static final TypeInformation<SparseVector> SPARSE_VECTOR;

159

160

/** Type information for general Vector (base class) */

161

public static final TypeInformation<Vector> VECTOR;

162

}

163

```

164

165

**Usage Examples:**

166

167

```java

168

import org.apache.flink.ml.common.utils.VectorTypes;

169

import org.apache.flink.table.api.DataTypes;

170

import org.apache.flink.table.api.TableSchema;

171

172

// Create table schema with vector columns

173

TableSchema schema = TableSchema.builder()

174

.field("id", DataTypes.BIGINT())

175

.field("dense_features", VectorTypes.DENSE_VECTOR)

176

.field("sparse_features", VectorTypes.SPARSE_VECTOR)

177

.field("generic_vector", VectorTypes.VECTOR)

178

.build();

179

180

// Type checking with vector types

181

TypeInformation<?> colType = schema.getFieldDataTypes()[1].bridgedTo(VectorTypes.DENSE_VECTOR);

182

boolean isDenseVector = colType.equals(VectorTypes.DENSE_VECTOR);

183

```

184

185

### Data Conversion Utilities

186

187

Utilities for converting between different Flink data representations.

188

189

#### DataSetConversionUtil Class

190

191

```java { .api }

192

/**

193

* Utility class for DataSet conversion operations

194

* Provides methods for converting between DataSet and other formats

195

*/

196

public class DataSetConversionUtil {

197

// Methods for DataSet conversions

198

// Implementation details depend on specific conversion needs

199

}

200

```

201

202

#### DataStreamConversionUtil Class

203

204

```java { .api }

205

/**

206

* Utility class for DataStream conversion operations

207

* Provides methods for converting between DataStream and other formats

208

*/

209

public class DataStreamConversionUtil {

210

// Methods for DataStream conversions

211

// Implementation details depend on specific conversion needs

212

}

213

```

214

215

### OutputColsHelper Class

216

217

Helper class for managing output column configurations in ML operations.

218

219

```java { .api }

220

/**

221

* Helper class for output column management

222

* Assists with column naming and schema generation

223

*/

224

public class OutputColsHelper {

225

// Methods for managing output column configurations

226

// Used internally by ML operators for column management

227

}

228

```

229

230

### Mapper Framework Utilities

231

232

Utilities for row-wise data transformations and model applications.

233

234

#### Mapper Abstract Class

235

236

```java { .api }

237

/**

238

* Abstract class for row-wise transformations

239

* Base class for implementing custom row mappers

240

*/

241

public abstract class Mapper implements Serializable {

242

243

/** Input schema field names */

244

private String[] dataFieldNames;

245

246

/** Input schema field types */

247

private DataType[] dataFieldTypes;

248

249

/** Mapper parameters */

250

protected Params params;

251

252

/** Create mapper with schema and parameters */

253

public Mapper(TableSchema dataSchema, Params params);

254

255

/** Get input data schema */

256

public TableSchema getDataSchema();

257

258

/** Transform input row to output row (must implement) */

259

public abstract Row map(Row row);

260

261

/** Get output schema (must implement) */

262

public abstract TableSchema getOutputSchema();

263

}

264

```

265

266

**Implementation Example:**

267

268

```java

269

public class ScalingMapper extends Mapper {

270

271

public ScalingMapper(TableSchema dataSchema, Params params) {

272

super(dataSchema, params);

273

}

274

275

@Override

276

public Row map(Row row) {

277

// Extract scaling parameters

278

double scaleFactor = params.get(SCALE_FACTOR);

279

String inputCol = params.get(INPUT_COL);

280

281

// Apply scaling transformation

282

int colIndex = // ... find column index

283

double originalValue = row.getField(colIndex);

284

double scaledValue = originalValue * scaleFactor;

285

286

// Create output row

287

Row output = Row.copy(row);

288

output.setField(colIndex, scaledValue);

289

290

return output;

291

}

292

293

@Override

294

public TableSchema getOutputSchema() {

295

// Return schema with same structure as input

296

return getDataSchema();

297

}

298

}

299

```

300

301

#### MapperAdapter Class

302

303

```java { .api }

304

/**

305

* Adapter for integrating Mapper with operator framework

306

* Converts row-wise mappers to table operations

307

*/

308

public class MapperAdapter {

309

// Methods for adapter functionality

310

// Bridges between Mapper and BatchOperator/StreamOperator

311

}

312

```

313

314

#### ModelMapper Abstract Class

315

316

```java { .api }

317

/**

318

* Abstract class for model-based row transformations

319

* Extends Mapper with model data support

320

*/

321

public abstract class ModelMapper extends Mapper {

322

// Additional functionality for model-based row transformations

323

// Includes model data loading and management

324

}

325

```

326

327

#### ModelMapperAdapter Class

328

329

```java { .api }

330

/**

331

* Adapter for integrating ModelMapper with operator framework

332

* Converts model-based row mappers to table operations

333

*/

334

public class ModelMapperAdapter {

335

// Methods for model mapper adapter functionality

336

// Bridges between ModelMapper and prediction operators

337

}

338

```

339

340

### Model Source Utilities

341

342

Utilities for loading and managing model data from different sources.

343

344

#### ModelSource Interface

345

346

```java { .api }

347

/**

348

* Interface for loading models from different sources

349

* Abstracts model data retrieval mechanisms

350

*/

351

public interface ModelSource extends Serializable {

352

353

/** Get model rows from runtime context */

354

List<Row> getModelRows(RuntimeContext runtimeContext);

355

}

356

```

357

358

#### BroadcastVariableModelSource Class

359

360

```java { .api }

361

/**

362

* Load models from Flink broadcast variables

363

* Efficient for distributing small to medium-sized models

364

*/

365

public class BroadcastVariableModelSource implements ModelSource {

366

// Implementation for broadcast variable model loading

367

// Used when model data fits in memory and needs wide distribution

368

}

369

```

370

371

#### RowsModelSource Class

372

373

```java { .api }

374

/**

375

* Load models from row collections

376

* Direct model data access from in-memory collections

377

*/

378

public class RowsModelSource implements ModelSource {

379

// Implementation for direct row-based model loading

380

// Used for simple model data scenarios

381

}

382

```

383

384

**Usage Examples:**

385

386

```java

387

// Using model sources in custom operators

388

public class MyPredictionOp extends BatchOperator<MyPredictionOp> {

389

private ModelSource modelSource;

390

391

public MyPredictionOp setModelSource(ModelSource modelSource) {

392

this.modelSource = modelSource;

393

return this;

394

}

395

396

@Override

397

public MyPredictionOp linkFrom(BatchOperator<?>... inputs) {

398

// In the actual operator function:

399

// List<Row> modelRows = modelSource.getModelRows(runtimeContext);

400

// ... use model data for predictions

401

402

return this;

403

}

404

}

405

406

// Create model sources

407

ModelSource broadcastSource = new BroadcastVariableModelSource(/* broadcast name */);

408

ModelSource rowsSource = new RowsModelSource(/* model rows */);

409

410

// Use in operator

411

MyPredictionOp predictor = new MyPredictionOp()

412

.setModelSource(broadcastSource);

413

```

414

415

## Utility Integration Patterns

416

417

### Schema Validation Pipeline

418

419

Combine multiple utilities for robust schema validation:

420

421

```java

422

public class SchemaValidator {

423

424

public static void validateMLSchema(TableSchema schema,

425

String[] requiredCols,

426

String[] numericCols,

427

String[] vectorCols) {

428

// Check required columns exist

429

TableUtil.assertSelectedColExist(schema.getFieldNames(), requiredCols);

430

431

// Validate numeric columns

432

if (numericCols != null) {

433

TableUtil.assertNumericalCols(schema, numericCols);

434

}

435

436

// Validate vector columns

437

if (vectorCols != null) {

438

TableUtil.assertVectorCols(schema, vectorCols);

439

}

440

}

441

}

442

443

// Usage in ML component

444

@Override

445

protected MyModel fit(BatchOperator input) {

446

// Validate input schema

447

SchemaValidator.validateMLSchema(

448

input.getSchema(),

449

new String[]{"features", "label"}, // Required columns

450

new String[]{"label"}, // Must be numeric

451

new String[]{"features"} // Must be vector

452

);

453

454

// Proceed with training

455

// ...

456

}

457

```

458

459

### Type-Safe Column Operations

460

461

Use utilities for type-safe column manipulation:

462

463

```java

464

public class TypeSafeColumnOps {

465

466

public static String[] getCompatibleColumns(TableSchema schema,

467

TypeInformation<?> targetType) {

468

return Arrays.stream(schema.getFieldNames())

469

.filter(col -> {

470

TypeInformation<?> colType = TableUtil.findColType(schema, col);

471

return colType.equals(targetType);

472

})

473

.toArray(String[]::new);

474

}

475

476

public static Map<String, TypeInformation<?>> getColumnTypeMap(TableSchema schema) {

477

String[] names = schema.getFieldNames();

478

TypeInformation<?>[] types = schema.getFieldTypes();

479

480

Map<String, TypeInformation<?>> typeMap = new HashMap<>();

481

for (int i = 0; i < names.length; i++) {

482

typeMap.put(names[i], types[i]);

483

}

484

return typeMap;

485

}

486

}

487

488

// Usage

489

String[] vectorCols = TypeSafeColumnOps.getCompatibleColumns(schema, VectorTypes.VECTOR);

490

String[] denseCols = TypeSafeColumnOps.getCompatibleColumns(schema, VectorTypes.DENSE_VECTOR);

491

```

492

493

### Data Formatting and Inspection

494

495

Utilities for debugging and data inspection:

496

497

```java

498

public class DataInspector {

499

500

public static void printTableSummary(TableSchema schema, List<Row> sampleData) {

501

// Print schema information

502

System.out.println("Table Schema:");

503

for (int i = 0; i < schema.getFieldCount(); i++) {

504

String name = schema.getFieldNames()[i];

505

TypeInformation<?> type = schema.getFieldTypes()[i];

506

System.out.println(" " + name + ": " + type);

507

}

508

509

// Print sample data

510

if (!sampleData.isEmpty()) {

511

System.out.println("\nSample Data:");

512

String formatted = TableUtil.format(schema.getFieldNames(), sampleData);

513

System.out.println(formatted);

514

}

515

516

// Print column type summary

517

String[] numericCols = TableUtil.getNumericCols(schema);

518

String[] stringCols = TableUtil.getStringCols(schema);

519

520

System.out.println("\nColumn Types:");

521

System.out.println(" Numeric: " + Arrays.toString(numericCols));

522

System.out.println(" String: " + Arrays.toString(stringCols));

523

}

524

}

525

```

526

527

These utility libraries provide essential support for building robust ML applications with proper validation, type safety, and debugging capabilities.