or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdsources-sinks.mdsql-integration.mdtable-environment.mdtable-operations.mdtype-system.mduser-defined-functions.mdwindow-operations.md

type-system.mddocs/

0

# Type System

1

2

The Flink Table API provides a rich type system supporting primitive types, complex types, and temporal types. The Types object serves as the main factory for creating type information used throughout the API.

3

4

## Capabilities

5

6

### Primitive Types

7

8

Basic data types for common values.

9

10

```scala { .api }

11

object Types {

12

/**

13

* String/VARCHAR type for text data

14

*/

15

val STRING: TypeInformation[String]

16

17

/**

18

* Boolean type for true/false values

19

*/

20

val BOOLEAN: TypeInformation[java.lang.Boolean]

21

22

/**

23

* TINYINT type for small integers (-128 to 127)

24

*/

25

val BYTE: TypeInformation[java.lang.Byte]

26

27

/**

28

* SMALLINT type for short integers (-32,768 to 32,767)

29

*/

30

val SHORT: TypeInformation[java.lang.Short]

31

32

/**

33

* INT/INTEGER type for standard integers

34

*/

35

val INT: TypeInformation[java.lang.Integer]

36

37

/**

38

* BIGINT type for large integers

39

*/

40

val LONG: TypeInformation[java.lang.Long]

41

42

/**

43

* FLOAT/REAL type for single-precision floating point

44

*/

45

val FLOAT: TypeInformation[java.lang.Float]

46

47

/**

48

* DOUBLE type for double-precision floating point

49

*/

50

val DOUBLE: TypeInformation[java.lang.Double]

51

52

/**

53

* DECIMAL type for high-precision decimal numbers

54

*/

55

val DECIMAL: TypeInformation[java.math.BigDecimal]

56

}

57

```

58

59

**Usage Examples:**

60

61

```scala

62

import org.apache.flink.table.api.Types

63

64

// Using primitive types in schema definition

65

val schema = new TableSchema(

66

Array("name", "age", "salary", "active"),

67

Array(Types.STRING, Types.INT, Types.DOUBLE, Types.BOOLEAN)

68

)

69

70

// Type information for table sources

71

val csvSource = new CsvTableSource(

72

"/path/to/data.csv",

73

Array("id", "name", "score"),

74

Array(Types.LONG, Types.STRING, Types.DOUBLE)

75

)

76

```

77

78

### Date and Time Types

79

80

Temporal types for handling date, time, and timestamp data.

81

82

```scala { .api }

83

object Types {

84

/**

85

* DATE type for calendar dates (year, month, day)

86

*/

87

val SQL_DATE: TypeInformation[java.sql.Date]

88

89

/**

90

* TIME type for time of day (hour, minute, second)

91

*/

92

val SQL_TIME: TypeInformation[java.sql.Time]

93

94

/**

95

* TIMESTAMP type for date and time with millisecond precision

96

*/

97

val SQL_TIMESTAMP: TypeInformation[java.sql.Timestamp]

98

99

/**

100

* INTERVAL type for month-based intervals

101

*/

102

val INTERVAL_MONTHS: TypeInformation[java.lang.Integer]

103

104

/**

105

* INTERVAL type for millisecond-based intervals

106

*/

107

val INTERVAL_MILLIS: TypeInformation[java.lang.Long]

108

}

109

```

110

111

**Usage Examples:**

112

113

```scala

114

// Event table with timestamps

115

val eventSchema = new TableSchema(

116

Array("event_id", "event_time", "event_date", "duration"),

117

Array(Types.LONG, Types.SQL_TIMESTAMP, Types.SQL_DATE, Types.INTERVAL_MILLIS)

118

)

119

120

// Using in SQL queries

121

val events = tEnv.sqlQuery("""

122

SELECT event_id,

123

DATE_FORMAT(event_time, 'yyyy-MM-dd') as date,

124

event_time + INTERVAL '1' HOUR as next_hour

125

FROM Events

126

""")

127

```

128

129

### Complex Types

130

131

Structured types for handling nested and collection data.

132

133

```scala { .api }

134

object Types {

135

/**

136

* ROW type with anonymous fields (field names: f0, f1, f2, ...)

137

* @param types Field types in order

138

* @returns Row type information

139

*/

140

def ROW(types: TypeInformation[_]*): TypeInformation[Row]

141

142

/**

143

* ROW type with named fields

144

* @param fieldNames Field names array

145

* @param types Field types array

146

* @returns Named row type information

147

*/

148

def ROW(fieldNames: Array[String], types: Array[TypeInformation[_]]): TypeInformation[Row]

149

150

/**

151

* Array type for primitive elements

152

* @param elementType Type of array elements

153

* @returns Primitive array type information

154

*/

155

def PRIMITIVE_ARRAY(elementType: TypeInformation[_]): TypeInformation[_]

156

157

/**

158

* Array type for object elements

159

* @param elementType Type of array elements

160

* @returns Object array type information

161

*/

162

def OBJECT_ARRAY[E](elementType: TypeInformation[E]): TypeInformation[Array[E]]

163

164

/**

165

* Map type for key-value pairs

166

* @param keyType Type of map keys

167

* @param valueType Type of map values

168

* @returns Map type information

169

*/

170

def MAP[K, V](keyType: TypeInformation[K], valueType: TypeInformation[V]): TypeInformation[java.util.Map[K, V]]

171

172

/**

173

* Multiset type (map with element counts)

174

* @param elementType Type of multiset elements

175

* @returns Multiset type information

176

*/

177

def MULTISET[E](elementType: TypeInformation[E]): TypeInformation[java.util.Map[E, java.lang.Integer]]

178

}

179

```

180

181

**Usage Examples:**

182

183

```scala

184

// Row type with named fields

185

val personType = Types.ROW(

186

Array("name", "age", "address"),

187

Array(Types.STRING, Types.INT, Types.STRING)

188

)

189

190

// Nested row type

191

val addressType = Types.ROW(

192

Array("street", "city", "zipCode"),

193

Array(Types.STRING, Types.STRING, Types.STRING)

194

)

195

196

val personWithAddressType = Types.ROW(

197

Array("name", "age", "address"),

198

Array(Types.STRING, Types.INT, addressType)

199

)

200

201

// Collection types

202

val stringArrayType = Types.OBJECT_ARRAY(Types.STRING)

203

val intMapType = Types.MAP(Types.STRING, Types.INT)

204

val tagMultisetType = Types.MULTISET(Types.STRING)

205

206

// Complex nested structure

207

val orderType = Types.ROW(

208

Array("orderId", "items", "customerInfo", "metadata"),

209

Array(

210

Types.LONG,

211

Types.OBJECT_ARRAY(Types.ROW(Array("name", "quantity"), Array(Types.STRING, Types.INT))),

212

personType,

213

Types.MAP(Types.STRING, Types.STRING)

214

)

215

)

216

```

217

218

### Schema Management

219

220

Define and manage table schemas with type information.

221

222

```scala { .api }

223

/**

224

* Represents the schema of a table with column names and types

225

*/

226

class TableSchema {

227

/**

228

* Creates a new table schema

229

* @param columnNames Array of column names

230

* @param columnTypes Array of corresponding column types

231

*/

232

def this(columnNames: Array[String], columnTypes: Array[TypeInformation[_]])

233

234

/**

235

* Gets the column names of the schema

236

* @returns Array of column names

237

*/

238

def getColumnNames: Array[String]

239

240

/**

241

* Gets all type information as an array

242

* @returns Array of column type information

243

*/

244

def getTypes: Array[TypeInformation[_]]

245

246

/**

247

* Gets the number of columns in the schema

248

* @returns Column count

249

*/

250

def getColumnCount: Int

251

252

/**

253

* Gets the type information for a specific column by name

254

* @param columnName Name of the column

255

* @returns Optional type information for the column

256

*/

257

def getType(columnName: String): Option[TypeInformation[_]]

258

259

/**

260

* Gets the type information for a specific column by index

261

* @param columnIndex Index of the column (0-based)

262

* @returns Optional type information for the column

263

*/

264

def getType(columnIndex: Int): Option[TypeInformation[_]]

265

266

/**

267

* Gets the column name for a specific column index

268

* @param columnIndex Index of the column (0-based)

269

* @returns Optional column name

270

*/

271

def getColumnName(columnIndex: Int): Option[String]

272

273

/**

274

* Converts time attributes to proper TIMESTAMP types

275

* @returns TableSchema with time attributes replaced by TIMESTAMP

276

*/

277

def withoutTimeAttributes: TableSchema

278

279

/**

280

* Creates a deep copy of the TableSchema

281

* @returns New TableSchema instance with copied data

282

*/

283

def copy: TableSchema

284

}

285

286

/**

287

* Factory methods for creating TableSchema instances

288

*/

289

object TableSchema {

290

/**

291

* Creates a TableSchema from TypeInformation

292

* @param typeInfo Type information to convert

293

* @returns TableSchema created from the type information

294

*/

295

def fromTypeInfo(typeInfo: TypeInformation[_]): TableSchema

296

297

/**

298

* Creates a new TableSchemaBuilder for fluent schema construction

299

* @returns New TableSchemaBuilder instance

300

*/

301

def builder(): TableSchemaBuilder

302

}

303

304

/**

305

* Builder for constructing TableSchema instances

306

*/

307

class TableSchemaBuilder {

308

/**

309

* Adds a field to the schema being built

310

* @param name Field name

311

* @param tpe Field type information

312

* @returns This builder for method chaining

313

*/

314

def field(name: String, tpe: TypeInformation[_]): TableSchemaBuilder

315

316

/**

317

* Builds the TableSchema from added fields

318

* @returns New TableSchema instance

319

*/

320

def build(): TableSchema

321

}

322

```

323

324

**Usage Examples:**

325

326

```scala

327

// Create schema for user table

328

val userSchema = new TableSchema(

329

Array("userId", "userName", "profile", "tags", "settings"),

330

Array(

331

Types.LONG,

332

Types.STRING,

333

Types.ROW(Array("email", "phone"), Array(Types.STRING, Types.STRING)),

334

Types.OBJECT_ARRAY(Types.STRING),

335

Types.MAP(Types.STRING, Types.STRING)

336

)

337

)

338

339

// Schema inspection

340

val columnNames = userSchema.getColumnNames // Array("userId", "userName", ...)

341

val columnCount = userSchema.getColumnCount // 5

342

val userIdType = userSchema.getType("userId") // Some(Types.LONG)

343

val firstColumnName = userSchema.getColumnName(0) // Some("userId")

344

val allTypes = userSchema.getTypes // Array of TypeInformation

345

346

// Builder pattern usage

347

val builtSchema = TableSchema.builder()

348

.field("id", Types.LONG)

349

.field("name", Types.STRING)

350

.field("score", Types.DOUBLE)

351

.build()

352

353

// Create schema from TypeInformation

354

val schemaFromType = TableSchema.fromTypeInfo(Types.ROW(Types.STRING, Types.INT))

355

356

// Create copy and remove time attributes

357

val schemaCopy = userSchema.copy

358

val withoutTime = userSchema.withoutTimeAttributes

359

360

// Use schema in table source

361

val jsonSource = new JsonTableSource("/path/to/users.json", userSchema)

362

```

363

364

### Type Conversion and Casting

365

366

Convert between different type representations.

367

368

```scala { .api }

369

/**

370

* Generic Row class for representing structured data

371

*/

372

case class Row(fields: Any*) {

373

/**

374

* Gets the field value at the specified position

375

* @param pos Field position (0-based)

376

* @returns Field value

377

*/

378

def getField(pos: Int): Any

379

380

/**

381

* Gets the number of fields in the row

382

* @returns Field count

383

*/

384

def getArity: Int

385

386

/**

387

* Sets the field value at the specified position

388

* @param pos Field position (0-based)

389

* @param value New field value

390

*/

391

def setField(pos: Int, value: Any): Unit

392

}

393

394

object Row {

395

/**

396

* Creates a new row with the specified field values

397

* @param values Field values in order

398

* @returns New Row instance

399

*/

400

def of(values: Any*): Row

401

}

402

```

403

404

**Usage Examples:**

405

406

```scala

407

// Creating rows programmatically

408

val userRow = Row.of(

409

1L, // userId: LONG

410

"john_doe", // userName: STRING

411

Row.of("john@example.com", "555-1234"), // profile: ROW

412

Array("admin", "user"), // tags: ARRAY

413

Map("theme" -> "dark", "lang" -> "en").asJava // settings: MAP

414

)

415

416

// Accessing row fields

417

val userId = userRow.getField(0).asInstanceOf[Long]

418

val profile = userRow.getField(2).asInstanceOf[Row]

419

val email = profile.getField(0).asInstanceOf[String]

420

421

// Using in table transformations

422

val dataSet = env.fromElements(userRow)

423

val table = dataSet.toTable(tEnv, 'userId, 'userName, 'profile, 'tags, 'settings)

424

```

425

426

### Custom Type Registration

427

428

Register custom types for specialized data handling.

429

430

```scala { .api }

431

/**

432

* Type information interface for custom types

433

*/

434

trait TypeInformation[T] {

435

/**

436

* Gets the Java class of the type

437

* @returns Java class

438

*/

439

def getTypeClass: Class[T]

440

441

/**

442

* Checks if the type is a basic type

443

* @returns True if basic type, false otherwise

444

*/

445

def isBasicType: Boolean

446

447

/**

448

* Gets the total number of fields for composite types

449

* @returns Field count for composite types, 1 for basic types

450

*/

451

def getTotalFields: Int

452

}

453

```

454

455

**Usage Examples:**

456

457

```scala

458

// Custom POJO type

459

case class Customer(id: Long, name: String, email: String, age: Int)

460

461

// Create type information for custom type (automatic in Scala)

462

val customerTypeInfo = createTypeInformation[Customer]

463

464

// Use in table operations

465

val customers: DataSet[Customer] = env.fromElements(

466

Customer(1, "Alice", "alice@example.com", 25),

467

Customer(2, "Bob", "bob@example.com", 30)

468

)

469

470

val customerTable = customers.toTable(tEnv, 'id, 'name, 'email, 'age)

471

```

472

473

## Type Compatibility and Conversion

474

475

The type system provides automatic conversions between compatible types and explicit casting operations.

476

477

```scala { .api }

478

// Implicit conversions are available for compatible types

479

val stringToInt = table.select('stringField.cast(Types.INT))

480

val timestampToString = table.select('timestampField.cast(Types.STRING))

481

482

// Type validation in expressions

483

val typedExpression = 'age.cast(Types.DOUBLE) * 1.5

484

```

485

486

## Types

487

488

```scala { .api }

489

object Types

490

class TableSchema

491

case class Row

492

trait TypeInformation[T]

493

494

// Built-in type constants

495

val STRING: TypeInformation[String]

496

val BOOLEAN: TypeInformation[java.lang.Boolean]

497

val BYTE: TypeInformation[java.lang.Byte]

498

val SHORT: TypeInformation[java.lang.Short]

499

val INT: TypeInformation[java.lang.Integer]

500

val LONG: TypeInformation[java.lang.Long]

501

val FLOAT: TypeInformation[java.lang.Float]

502

val DOUBLE: TypeInformation[java.lang.Double]

503

val DECIMAL: TypeInformation[java.math.BigDecimal]

504

val SQL_DATE: TypeInformation[java.sql.Date]

505

val SQL_TIME: TypeInformation[java.sql.Time]

506

val SQL_TIMESTAMP: TypeInformation[java.sql.Timestamp]

507

val INTERVAL_MONTHS: TypeInformation[java.lang.Integer]

508

val INTERVAL_MILLIS: TypeInformation[java.lang.Long]

509

```