0
# Type System
1
2
The Flink Table API provides a rich type system supporting primitive types, complex types, and temporal types. The Types object serves as the main factory for creating type information used throughout the API.
3
4
## Capabilities
5
6
### Primitive Types
7
8
Basic data types for common values.
9
10
```scala { .api }
11
object Types {
12
/**
13
* String/VARCHAR type for text data
14
*/
15
val STRING: TypeInformation[String]
16
17
/**
18
* Boolean type for true/false values
19
*/
20
val BOOLEAN: TypeInformation[java.lang.Boolean]
21
22
/**
23
* TINYINT type for small integers (-128 to 127)
24
*/
25
val BYTE: TypeInformation[java.lang.Byte]
26
27
/**
28
* SMALLINT type for short integers (-32,768 to 32,767)
29
*/
30
val SHORT: TypeInformation[java.lang.Short]
31
32
/**
33
* INT/INTEGER type for standard integers
34
*/
35
val INT: TypeInformation[java.lang.Integer]
36
37
/**
38
* BIGINT type for large integers
39
*/
40
val LONG: TypeInformation[java.lang.Long]
41
42
/**
43
* FLOAT/REAL type for single-precision floating point
44
*/
45
val FLOAT: TypeInformation[java.lang.Float]
46
47
/**
48
* DOUBLE type for double-precision floating point
49
*/
50
val DOUBLE: TypeInformation[java.lang.Double]
51
52
/**
53
* DECIMAL type for high-precision decimal numbers
54
*/
55
val DECIMAL: TypeInformation[java.math.BigDecimal]
56
}
57
```
58
59
**Usage Examples:**
60
61
```scala
62
import org.apache.flink.table.api.Types
63
64
// Using primitive types in schema definition
65
val schema = new TableSchema(
66
Array("name", "age", "salary", "active"),
67
Array(Types.STRING, Types.INT, Types.DOUBLE, Types.BOOLEAN)
68
)
69
70
// Type information for table sources
71
val csvSource = new CsvTableSource(
72
"/path/to/data.csv",
73
Array("id", "name", "score"),
74
Array(Types.LONG, Types.STRING, Types.DOUBLE)
75
)
76
```
77
78
### Date and Time Types
79
80
Temporal types for handling date, time, and timestamp data.
81
82
```scala { .api }
83
object Types {
84
/**
85
* DATE type for calendar dates (year, month, day)
86
*/
87
val SQL_DATE: TypeInformation[java.sql.Date]
88
89
/**
90
* TIME type for time of day (hour, minute, second)
91
*/
92
val SQL_TIME: TypeInformation[java.sql.Time]
93
94
/**
95
* TIMESTAMP type for date and time with millisecond precision
96
*/
97
val SQL_TIMESTAMP: TypeInformation[java.sql.Timestamp]
98
99
/**
100
* INTERVAL type for month-based intervals
101
*/
102
val INTERVAL_MONTHS: TypeInformation[java.lang.Integer]
103
104
/**
105
* INTERVAL type for millisecond-based intervals
106
*/
107
val INTERVAL_MILLIS: TypeInformation[java.lang.Long]
108
}
109
```
110
111
**Usage Examples:**
112
113
```scala
114
// Event table with timestamps
115
val eventSchema = new TableSchema(
116
Array("event_id", "event_time", "event_date", "duration"),
117
Array(Types.LONG, Types.SQL_TIMESTAMP, Types.SQL_DATE, Types.INTERVAL_MILLIS)
118
)
119
120
// Using in SQL queries
121
val events = tEnv.sqlQuery("""
122
SELECT event_id,
123
DATE_FORMAT(event_time, 'yyyy-MM-dd') as date,
124
event_time + INTERVAL '1' HOUR as next_hour
125
FROM Events
126
""")
127
```
128
129
### Complex Types
130
131
Structured types for handling nested and collection data.
132
133
```scala { .api }
134
object Types {
135
/**
136
* ROW type with anonymous fields (field names: f0, f1, f2, ...)
137
* @param types Field types in order
138
* @returns Row type information
139
*/
140
def ROW(types: TypeInformation[_]*): TypeInformation[Row]
141
142
/**
143
* ROW type with named fields
144
* @param fieldNames Field names array
145
* @param types Field types array
146
* @returns Named row type information
147
*/
148
def ROW(fieldNames: Array[String], types: Array[TypeInformation[_]]): TypeInformation[Row]
149
150
/**
151
* Array type for primitive elements
152
* @param elementType Type of array elements
153
* @returns Primitive array type information
154
*/
155
def PRIMITIVE_ARRAY(elementType: TypeInformation[_]): TypeInformation[_]
156
157
/**
158
* Array type for object elements
159
* @param elementType Type of array elements
160
* @returns Object array type information
161
*/
162
def OBJECT_ARRAY[E](elementType: TypeInformation[E]): TypeInformation[Array[E]]
163
164
/**
165
* Map type for key-value pairs
166
* @param keyType Type of map keys
167
* @param valueType Type of map values
168
* @returns Map type information
169
*/
170
def MAP[K, V](keyType: TypeInformation[K], valueType: TypeInformation[V]): TypeInformation[java.util.Map[K, V]]
171
172
/**
173
* Multiset type (map with element counts)
174
* @param elementType Type of multiset elements
175
* @returns Multiset type information
176
*/
177
def MULTISET[E](elementType: TypeInformation[E]): TypeInformation[java.util.Map[E, java.lang.Integer]]
178
}
179
```
180
181
**Usage Examples:**
182
183
```scala
184
// Row type with named fields
185
val personType = Types.ROW(
186
Array("name", "age", "address"),
187
Array(Types.STRING, Types.INT, Types.STRING)
188
)
189
190
// Nested row type
191
val addressType = Types.ROW(
192
Array("street", "city", "zipCode"),
193
Array(Types.STRING, Types.STRING, Types.STRING)
194
)
195
196
val personWithAddressType = Types.ROW(
197
Array("name", "age", "address"),
198
Array(Types.STRING, Types.INT, addressType)
199
)
200
201
// Collection types
202
val stringArrayType = Types.OBJECT_ARRAY(Types.STRING)
203
val intMapType = Types.MAP(Types.STRING, Types.INT)
204
val tagMultisetType = Types.MULTISET(Types.STRING)
205
206
// Complex nested structure
207
val orderType = Types.ROW(
208
Array("orderId", "items", "customerInfo", "metadata"),
209
Array(
210
Types.LONG,
211
Types.OBJECT_ARRAY(Types.ROW(Array("name", "quantity"), Array(Types.STRING, Types.INT))),
212
personType,
213
Types.MAP(Types.STRING, Types.STRING)
214
)
215
)
216
```
217
218
### Schema Management
219
220
Define and manage table schemas with type information.
221
222
```scala { .api }
223
/**
224
* Represents the schema of a table with column names and types
225
*/
226
class TableSchema {
227
/**
228
* Creates a new table schema
229
* @param columnNames Array of column names
230
* @param columnTypes Array of corresponding column types
231
*/
232
def this(columnNames: Array[String], columnTypes: Array[TypeInformation[_]])
233
234
/**
235
* Gets the column names of the schema
236
* @returns Array of column names
237
*/
238
def getColumnNames: Array[String]
239
240
/**
241
* Gets all type information as an array
242
* @returns Array of column type information
243
*/
244
def getTypes: Array[TypeInformation[_]]
245
246
/**
247
* Gets the number of columns in the schema
248
* @returns Column count
249
*/
250
def getColumnCount: Int
251
252
/**
253
* Gets the type information for a specific column by name
254
* @param columnName Name of the column
255
* @returns Optional type information for the column
256
*/
257
def getType(columnName: String): Option[TypeInformation[_]]
258
259
/**
260
* Gets the type information for a specific column by index
261
* @param columnIndex Index of the column (0-based)
262
* @returns Optional type information for the column
263
*/
264
def getType(columnIndex: Int): Option[TypeInformation[_]]
265
266
/**
267
* Gets the column name for a specific column index
268
* @param columnIndex Index of the column (0-based)
269
* @returns Optional column name
270
*/
271
def getColumnName(columnIndex: Int): Option[String]
272
273
/**
274
* Converts time attributes to proper TIMESTAMP types
275
* @returns TableSchema with time attributes replaced by TIMESTAMP
276
*/
277
def withoutTimeAttributes: TableSchema
278
279
/**
280
* Creates a deep copy of the TableSchema
281
* @returns New TableSchema instance with copied data
282
*/
283
def copy: TableSchema
284
}
285
286
/**
287
* Factory methods for creating TableSchema instances
288
*/
289
object TableSchema {
290
/**
291
* Creates a TableSchema from TypeInformation
292
* @param typeInfo Type information to convert
293
* @returns TableSchema created from the type information
294
*/
295
def fromTypeInfo(typeInfo: TypeInformation[_]): TableSchema
296
297
/**
298
* Creates a new TableSchemaBuilder for fluent schema construction
299
* @returns New TableSchemaBuilder instance
300
*/
301
def builder(): TableSchemaBuilder
302
}
303
304
/**
305
* Builder for constructing TableSchema instances
306
*/
307
class TableSchemaBuilder {
308
/**
309
* Adds a field to the schema being built
310
* @param name Field name
311
* @param tpe Field type information
312
* @returns This builder for method chaining
313
*/
314
def field(name: String, tpe: TypeInformation[_]): TableSchemaBuilder
315
316
/**
317
* Builds the TableSchema from added fields
318
* @returns New TableSchema instance
319
*/
320
def build(): TableSchema
321
}
322
```
323
324
**Usage Examples:**
325
326
```scala
327
// Create schema for user table
328
val userSchema = new TableSchema(
329
Array("userId", "userName", "profile", "tags", "settings"),
330
Array(
331
Types.LONG,
332
Types.STRING,
333
Types.ROW(Array("email", "phone"), Array(Types.STRING, Types.STRING)),
334
Types.OBJECT_ARRAY(Types.STRING),
335
Types.MAP(Types.STRING, Types.STRING)
336
)
337
)
338
339
// Schema inspection
340
val columnNames = userSchema.getColumnNames // Array("userId", "userName", ...)
341
val columnCount = userSchema.getColumnCount // 5
342
val userIdType = userSchema.getType("userId") // Some(Types.LONG)
343
val firstColumnName = userSchema.getColumnName(0) // Some("userId")
344
val allTypes = userSchema.getTypes // Array of TypeInformation
345
346
// Builder pattern usage
347
val builtSchema = TableSchema.builder()
348
.field("id", Types.LONG)
349
.field("name", Types.STRING)
350
.field("score", Types.DOUBLE)
351
.build()
352
353
// Create schema from TypeInformation
354
val schemaFromType = TableSchema.fromTypeInfo(Types.ROW(Types.STRING, Types.INT))
355
356
// Create copy and remove time attributes
357
val schemaCopy = userSchema.copy
358
val withoutTime = userSchema.withoutTimeAttributes
359
360
// Use schema in table source
361
val jsonSource = new JsonTableSource("/path/to/users.json", userSchema)
362
```
363
364
### Type Conversion and Casting
365
366
Convert between different type representations.
367
368
```scala { .api }
369
/**
370
* Generic Row class for representing structured data
371
*/
372
case class Row(fields: Any*) {
373
/**
374
* Gets the field value at the specified position
375
* @param pos Field position (0-based)
376
* @returns Field value
377
*/
378
def getField(pos: Int): Any
379
380
/**
381
* Gets the number of fields in the row
382
* @returns Field count
383
*/
384
def getArity: Int
385
386
/**
387
* Sets the field value at the specified position
388
* @param pos Field position (0-based)
389
* @param value New field value
390
*/
391
def setField(pos: Int, value: Any): Unit
392
}
393
394
object Row {
395
/**
396
* Creates a new row with the specified field values
397
* @param values Field values in order
398
* @returns New Row instance
399
*/
400
def of(values: Any*): Row
401
}
402
```
403
404
**Usage Examples:**
405
406
```scala
407
// Creating rows programmatically
408
val userRow = Row.of(
409
1L, // userId: LONG
410
"john_doe", // userName: STRING
411
Row.of("john@example.com", "555-1234"), // profile: ROW
412
Array("admin", "user"), // tags: ARRAY
413
Map("theme" -> "dark", "lang" -> "en").asJava // settings: MAP
414
)
415
416
// Accessing row fields
417
val userId = userRow.getField(0).asInstanceOf[Long]
418
val profile = userRow.getField(2).asInstanceOf[Row]
419
val email = profile.getField(0).asInstanceOf[String]
420
421
// Using in table transformations
422
val dataSet = env.fromElements(userRow)
423
val table = dataSet.toTable(tEnv, 'userId, 'userName, 'profile, 'tags, 'settings)
424
```
425
426
### Custom Type Registration
427
428
Register custom types for specialized data handling.
429
430
```scala { .api }
431
/**
432
* Type information interface for custom types
433
*/
434
trait TypeInformation[T] {
435
/**
436
* Gets the Java class of the type
437
* @returns Java class
438
*/
439
def getTypeClass: Class[T]
440
441
/**
442
* Checks if the type is a basic type
443
* @returns True if basic type, false otherwise
444
*/
445
def isBasicType: Boolean
446
447
/**
448
* Gets the total number of fields for composite types
449
* @returns Field count for composite types, 1 for basic types
450
*/
451
def getTotalFields: Int
452
}
453
```
454
455
**Usage Examples:**
456
457
```scala
458
// Custom POJO type
459
case class Customer(id: Long, name: String, email: String, age: Int)
460
461
// Create type information for custom type (automatic in Scala)
462
val customerTypeInfo = createTypeInformation[Customer]
463
464
// Use in table operations
465
val customers: DataSet[Customer] = env.fromElements(
466
Customer(1, "Alice", "alice@example.com", 25),
467
Customer(2, "Bob", "bob@example.com", 30)
468
)
469
470
val customerTable = customers.toTable(tEnv, 'id, 'name, 'email, 'age)
471
```
472
473
## Type Compatibility and Conversion
474
475
The type system provides automatic conversions between compatible types and explicit casting operations.
476
477
```scala { .api }
478
// Implicit conversions are available for compatible types
479
val stringToInt = table.select('stringField.cast(Types.INT))
480
val timestampToString = table.select('timestampField.cast(Types.STRING))
481
482
// Type validation in expressions
483
val typedExpression = 'age.cast(Types.DOUBLE) * 1.5
484
```
485
486
## Types
487
488
```scala { .api }
489
object Types
490
class TableSchema
491
case class Row
492
trait TypeInformation[T]
493
494
// Built-in type constants
495
val STRING: TypeInformation[String]
496
val BOOLEAN: TypeInformation[java.lang.Boolean]
497
val BYTE: TypeInformation[java.lang.Byte]
498
val SHORT: TypeInformation[java.lang.Short]
499
val INT: TypeInformation[java.lang.Integer]
500
val LONG: TypeInformation[java.lang.Long]
501
val FLOAT: TypeInformation[java.lang.Float]
502
val DOUBLE: TypeInformation[java.lang.Double]
503
val DECIMAL: TypeInformation[java.math.BigDecimal]
504
val SQL_DATE: TypeInformation[java.sql.Date]
505
val SQL_TIME: TypeInformation[java.sql.Time]
506
val SQL_TIMESTAMP: TypeInformation[java.sql.Timestamp]
507
val INTERVAL_MONTHS: TypeInformation[java.lang.Integer]
508
val INTERVAL_MILLIS: TypeInformation[java.lang.Long]
509
```