0
# Type System
1
2
Apache Flink's Table API provides a comprehensive type system supporting primitive types, complex nested structures, temporal types, and user-defined types. The type system bridges logical type definitions with physical data representations and provides full serialization support.
3
4
## Capabilities
5
6
### Primitive Data Types
7
8
Basic numeric, boolean, and binary data types for fundamental data representation.
9
10
```java { .api }
11
class DataTypes {
12
/**
13
* Boolean data type
14
* @return DataType for boolean values
15
*/
16
static DataType BOOLEAN();
17
18
/**
19
* 8-bit signed integer data type
20
* @return DataType for tinyint values (-128 to 127)
21
*/
22
static DataType TINYINT();
23
24
/**
25
* 16-bit signed integer data type
26
* @return DataType for smallint values (-32,768 to 32,767)
27
*/
28
static DataType SMALLINT();
29
30
/**
31
* 32-bit signed integer data type
32
* @return DataType for int values (-2,147,483,648 to 2,147,483,647)
33
*/
34
static DataType INT();
35
36
/**
37
* 64-bit signed integer data type
38
* @return DataType for bigint values
39
*/
40
static DataType BIGINT();
41
42
/**
43
* 32-bit IEEE 754 floating point data type
44
* @return DataType for float values
45
*/
46
static DataType FLOAT();
47
48
/**
49
* 64-bit IEEE 754 floating point data type
50
* @return DataType for double values
51
*/
52
static DataType DOUBLE();
53
54
/**
55
* Fixed precision and scale decimal data type
56
* @param precision Total number of digits
57
* @param scale Number of digits after decimal point
58
* @return DataType for decimal values
59
*/
60
static DataType DECIMAL(int precision, int scale);
61
}
62
```
63
64
**Usage Examples:**
65
66
```java
67
// Primitive types in schema definition
68
Schema schema = Schema.newBuilder()
69
.column("id", DataTypes.BIGINT())
70
.column("active", DataTypes.BOOLEAN())
71
.column("score", DataTypes.FLOAT())
72
.column("balance", DataTypes.DECIMAL(10, 2))
73
.build();
74
75
// Using in SQL DDL
76
tableEnv.executeSql(
77
"CREATE TABLE accounts (" +
78
" account_id BIGINT," +
79
" is_active BOOLEAN," +
80
" credit_score FLOAT," +
81
" balance DECIMAL(10, 2)" +
82
") WITH (...)"
83
);
84
```
85
86
### String and Binary Types
87
88
Text and binary data types with length specifications.
89
90
```java { .api }
91
class DataTypes {
92
/**
93
* Fixed-length character string data type
94
* @param length Fixed length of the string
95
* @return DataType for char values
96
*/
97
static DataType CHAR(int length);
98
99
/**
100
* Variable-length character string data type
101
* @param length Maximum length of the string
102
* @return DataType for varchar values
103
*/
104
static DataType VARCHAR(int length);
105
106
/**
107
* Variable-length character string with maximum length
108
* @return DataType for string values
109
*/
110
static DataType STRING();
111
112
/**
113
* Fixed-length binary data type
114
* @param length Fixed length of the binary data
115
* @return DataType for binary values
116
*/
117
static DataType BINARY(int length);
118
119
/**
120
* Variable-length binary data type
121
* @param length Maximum length of the binary data
122
* @return DataType for varbinary values
123
*/
124
static DataType VARBINARY(int length);
125
126
/**
127
* Variable-length binary data with maximum length
128
* @return DataType for bytes values
129
*/
130
static DataType BYTES();
131
}
132
```
133
134
**Usage Examples:**
135
136
```java
137
// String types in table definition
138
Schema userSchema = Schema.newBuilder()
139
.column("username", DataTypes.VARCHAR(50))
140
.column("first_name", DataTypes.STRING())
141
.column("country_code", DataTypes.CHAR(2))
142
.column("profile_image", DataTypes.BYTES())
143
.build();
144
```
145
146
### Temporal Data Types
147
148
Date, time, timestamp, and interval types for temporal data processing.
149
150
```java { .api }
151
class DataTypes {
152
/**
153
* Date data type (year-month-day)
154
* @return DataType for date values
155
*/
156
static DataType DATE();
157
158
/**
159
* Time data type (hour:minute:second[.fractional])
160
* @return DataType for time values
161
*/
162
static DataType TIME();
163
164
/**
165
* Time data type with precision
166
* @param precision Precision of fractional seconds (0-9)
167
* @return DataType for time values with specified precision
168
*/
169
static DataType TIME(int precision);
170
171
/**
172
* Timestamp data type (date and time without timezone)
173
* @return DataType for timestamp values
174
*/
175
static DataType TIMESTAMP();
176
177
/**
178
* Timestamp data type with precision
179
* @param precision Precision of fractional seconds (0-9)
180
* @return DataType for timestamp values with specified precision
181
*/
182
static DataType TIMESTAMP(int precision);
183
184
/**
185
* Timestamp with time zone data type
186
* @return DataType for timestamp values with timezone
187
*/
188
static DataType TIMESTAMP_WITH_TIME_ZONE();
189
190
/**
191
* Timestamp with time zone and precision
192
* @param precision Precision of fractional seconds (0-9)
193
* @return DataType for timestamp values with timezone and precision
194
*/
195
static DataType TIMESTAMP_WITH_TIME_ZONE(int precision);
196
197
/**
198
* Timestamp with local time zone data type
199
* @return DataType for local timestamp values
200
*/
201
static DataType TIMESTAMP_WITH_LOCAL_TIME_ZONE();
202
203
/**
204
* Timestamp with local time zone and precision
205
* @param precision Precision of fractional seconds (0-9)
206
* @return DataType for local timestamp values with precision
207
*/
208
static DataType TIMESTAMP_WITH_LOCAL_TIME_ZONE(int precision);
209
210
/**
211
* Day-time interval data type
212
* @param resolution Day-time resolution (DAY, HOUR, MINUTE, SECOND)
213
* @return DataType for day-time interval values
214
*/
215
static DataType INTERVAL(DataTypes.DayTimeResolution resolution);
216
217
/**
218
* Year-month interval data type
219
* @param resolution Year-month resolution (YEAR, MONTH)
220
* @return DataType for year-month interval values
221
*/
222
static DataType INTERVAL(DataTypes.YearMonthResolution resolution);
223
}
224
```
225
226
**Usage Examples:**
227
228
```java
229
// Temporal types in event table
230
Schema eventSchema = Schema.newBuilder()
231
.column("event_id", DataTypes.BIGINT())
232
.column("event_date", DataTypes.DATE())
233
.column("event_time", DataTypes.TIME(3))
234
.column("created_at", DataTypes.TIMESTAMP(3))
235
.column("created_at_utc", DataTypes.TIMESTAMP_WITH_TIME_ZONE(3))
236
.column("session_duration", DataTypes.INTERVAL(DataTypes.SECOND()))
237
.watermark("created_at", $("created_at").minus(lit(5).seconds()))
238
.build();
239
```
240
241
### Complex Data Types
242
243
Nested structures including arrays, maps, and row types for complex data modeling.
244
245
```java { .api }
246
class DataTypes {
247
/**
248
* Array data type
249
* @param elementType Data type of array elements
250
* @return DataType for array values
251
*/
252
static DataType ARRAY(DataType elementType);
253
254
/**
255
* Map data type
256
* @param keyType Data type of map keys
257
* @param valueType Data type of map values
258
* @return DataType for map values
259
*/
260
static DataType MAP(DataType keyType, DataType valueType);
261
262
/**
263
* Row data type (structured type)
264
* @param fields Fields defining the row structure
265
* @return DataType for row values
266
*/
267
static DataType ROW(Field... fields);
268
269
/**
270
* Creates a field for row data type
271
* @param name Field name
272
* @param dataType Field data type
273
* @return Field definition
274
*/
275
static Field FIELD(String name, DataType dataType);
276
277
/**
278
* Multiset data type (bag of elements)
279
* @param elementType Data type of multiset elements
280
* @return DataType for multiset values
281
*/
282
static DataType MULTISET(DataType elementType);
283
}
284
```
285
286
**Usage Examples:**
287
288
```java
289
// Complex nested structure
290
DataType addressType = DataTypes.ROW(
291
DataTypes.FIELD("street", DataTypes.STRING()),
292
DataTypes.FIELD("city", DataTypes.STRING()),
293
DataTypes.FIELD("zipcode", DataTypes.STRING()),
294
DataTypes.FIELD("country", DataTypes.STRING())
295
);
296
297
Schema customerSchema = Schema.newBuilder()
298
.column("customer_id", DataTypes.BIGINT())
299
.column("name", DataTypes.STRING())
300
.column("addresses", DataTypes.ARRAY(addressType))
301
.column("preferences", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()))
302
.column("order_history", DataTypes.ARRAY(DataTypes.BIGINT()))
303
.build();
304
305
// Accessing nested fields in queries
306
Table customers = tableEnv.from("customers");
307
Table result = customers.select(
308
$("customer_id"),
309
$("name"),
310
$("addresses").at(1).get("city").as("primary_city"),
311
$("preferences").get("language").as("preferred_language")
312
);
313
```
314
315
### Schema Definition
316
317
Comprehensive schema definition with columns, constraints, and watermarks.
318
319
```java { .api }
320
class Schema {
321
/**
322
* Creates a new schema builder
323
* @return Builder for constructing schemas
324
*/
325
static Builder newBuilder();
326
327
interface Builder {
328
/**
329
* Adds a physical column to the schema
330
* @param columnName Name of the column
331
* @param dataType Data type of the column
332
* @return Builder for method chaining
333
*/
334
Builder column(String columnName, AbstractDataType<?> dataType);
335
336
/**
337
* Adds a computed column defined by an expression
338
* @param columnName Name of the computed column
339
* @param expression SQL expression defining the column
340
* @return Builder for method chaining
341
*/
342
Builder columnByExpression(String columnName, String expression);
343
344
/**
345
* Adds a metadata column
346
* @param columnName Name of the metadata column
347
* @param dataType Data type of the metadata
348
* @return Builder for method chaining
349
*/
350
Builder columnByMetadata(String columnName, AbstractDataType<?> dataType);
351
352
/**
353
* Adds a metadata column with explicit metadata key
354
* @param columnName Name of the metadata column
355
* @param dataType Data type of the metadata
356
* @param metadataKey Key for accessing the metadata
357
* @return Builder for method chaining
358
*/
359
Builder columnByMetadata(String columnName, AbstractDataType<?> dataType, String metadataKey);
360
361
/**
362
* Adds a watermark specification for event time
363
* @param columnName Name of the time column
364
* @param watermarkExpression Expression defining the watermark
365
* @return Builder for method chaining
366
*/
367
Builder watermark(String columnName, Expression watermarkExpression);
368
369
/**
370
* Defines primary key constraints
371
* @param columnNames Names of columns forming the primary key
372
* @return Builder for method chaining
373
*/
374
Builder primaryKey(String... columnNames);
375
376
/**
377
* Builds the schema
378
* @return Constructed Schema
379
*/
380
Schema build();
381
}
382
}
383
```
384
385
**Usage Examples:**
386
387
```java
388
// Complete schema with all features
389
Schema orderSchema = Schema.newBuilder()
390
// Physical columns
391
.column("order_id", DataTypes.BIGINT())
392
.column("customer_id", DataTypes.BIGINT())
393
.column("product_id", DataTypes.BIGINT())
394
.column("quantity", DataTypes.INT())
395
.column("unit_price", DataTypes.DECIMAL(10, 2))
396
.column("order_time", DataTypes.TIMESTAMP(3))
397
398
// Computed columns
399
.columnByExpression("total_amount", "quantity * unit_price")
400
.columnByExpression("order_year", "EXTRACT(YEAR FROM order_time)")
401
402
// Metadata columns (for Kafka integration)
403
.columnByMetadata("kafka_partition", DataTypes.INT(), "partition")
404
.columnByMetadata("kafka_offset", DataTypes.BIGINT(), "offset")
405
.columnByMetadata("kafka_timestamp", DataTypes.TIMESTAMP(3), "timestamp")
406
407
// Watermark for event time processing
408
.watermark("order_time", $("order_time").minus(lit(30).seconds()))
409
410
// Primary key constraint
411
.primaryKey("order_id")
412
413
.build();
414
```
415
416
### Type Conversion and Bridging
417
418
Methods for type conversion between logical and physical representations.
419
420
```java { .api }
421
abstract class DataType {
422
/**
423
* Gets the logical type of this data type
424
* @return LogicalType representing the logical structure
425
*/
426
LogicalType getLogicalType();
427
428
/**
429
* Gets the conversion class for physical representation
430
* @return Class used for Java object conversion
431
*/
432
Class<?> getConversionClass();
433
434
/**
435
* Creates a non-nullable version of this data type
436
* @return DataType that doesn't accept null values
437
*/
438
DataType notNull();
439
440
/**
441
* Creates a nullable version of this data type
442
* @return DataType that accepts null values
443
*/
444
DataType nullable();
445
446
/**
447
* Bridges this data type to a different conversion class
448
* @param newConversionClass New class for physical representation
449
* @return DataType with different conversion class
450
*/
451
DataType bridgedTo(Class<?> newConversionClass);
452
}
453
```
454
455
**Usage Examples:**
456
457
```java
458
// Custom type bridging
459
DataType customDecimalType = DataTypes.DECIMAL(20, 4)
460
.bridgedTo(java.math.BigDecimal.class)
461
.notNull();
462
463
// Type inspection
464
DataType stringType = DataTypes.STRING();
465
LogicalType logicalType = stringType.getLogicalType();
466
Class<?> conversionClass = stringType.getConversionClass(); // String.class
467
boolean isNullable = logicalType.isNullable();
468
```
469
470
### JSON and Raw Types
471
472
Special types for JSON data and raw binary serialized objects.
473
474
```java { .api }
475
class DataTypes {
476
/**
477
* JSON data type for structured JSON documents
478
* @return DataType for JSON values
479
*/
480
static DataType JSON();
481
482
/**
483
* Raw data type for arbitrary serialized objects
484
* @param originatingClass Class of the original object
485
* @param serializer TypeSerializer for the object
486
* @return DataType for raw serialized values
487
*/
488
static <T> DataType RAW(Class<T> originatingClass, TypeSerializer<T> serializer);
489
490
/**
491
* Raw data type with type information
492
* @param typeInformation TypeInformation for the object
493
* @return DataType for raw serialized values
494
*/
495
static <T> DataType RAW(TypeInformation<T> typeInformation);
496
}
497
```
498
499
**Usage Examples:**
500
501
```java
502
// JSON column for flexible document storage
503
Schema documentSchema = Schema.newBuilder()
504
.column("doc_id", DataTypes.BIGINT())
505
.column("metadata", DataTypes.JSON())
506
.column("content", DataTypes.JSON())
507
.build();
508
509
// Raw type for custom objects
510
public class CustomEvent {
511
public String eventType;
512
public long timestamp;
513
public Map<String, Object> payload;
514
}
515
516
DataType customEventType = DataTypes.RAW(
517
TypeInformation.of(CustomEvent.class)
518
);
519
```
520
521
## Types
522
523
### Core Type Classes
524
525
```java { .api }
526
abstract class DataType {
527
LogicalType getLogicalType();
528
Class<?> getConversionClass();
529
DataType notNull();
530
DataType nullable();
531
DataType bridgedTo(Class<?> newConversionClass);
532
}
533
534
class AtomicDataType extends DataType {
535
// For primitive and atomic types
536
}
537
538
class CollectionDataType extends DataType {
539
DataType getElementDataType();
540
// For arrays and multisets
541
}
542
543
class FieldsDataType extends DataType {
544
List<DataType> getFieldDataTypes();
545
// For row/struct types
546
}
547
548
class KeyValueDataType extends DataType {
549
DataType getKeyDataType();
550
DataType getValueDataType();
551
// For map types
552
}
553
```
554
555
### Logical Type System
556
557
```java { .api }
558
abstract class LogicalType {
559
boolean isNullable();
560
LogicalTypeRoot getTypeRoot();
561
String asSummaryString();
562
563
// Type checking methods
564
boolean is(LogicalTypeRoot... typeRoots);
565
boolean isAnyOf(LogicalTypeFamily... families);
566
}
567
568
enum LogicalTypeRoot {
569
CHAR, VARCHAR, BOOLEAN, BINARY, VARBINARY,
570
DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT,
571
FLOAT, DOUBLE, DATE, TIME_WITHOUT_TIME_ZONE,
572
TIMESTAMP_WITHOUT_TIME_ZONE, TIMESTAMP_WITH_TIME_ZONE,
573
TIMESTAMP_WITH_LOCAL_TIME_ZONE, INTERVAL_YEAR_MONTH,
574
INTERVAL_DAY_TIME, ARRAY, MULTISET, MAP, ROW,
575
DISTINCT_TYPE, STRUCTURED_TYPE, NULL, RAW, SYMBOL,
576
UNRESOLVED
577
}
578
```
579
580
### Schema Information
581
582
```java { .api }
583
class ResolvedSchema {
584
List<Column> getColumns();
585
Optional<Column> getColumn(String name);
586
List<String> getColumnNames();
587
List<DataType> getColumnDataTypes();
588
Optional<UniqueConstraint> getPrimaryKey();
589
List<WatermarkSpec> getWatermarkSpecs();
590
}
591
592
abstract class Column {
593
String getName();
594
DataType getDataType();
595
String getComment();
596
boolean isPhysical();
597
boolean isComputed();
598
boolean isMetadata();
599
}
600
601
class WatermarkSpec {
602
String getRowtimeAttribute();
603
ResolvedExpression getWatermarkExpression();
604
}
605
```