0
# Data Types System
1
2
Comprehensive type system for defining table schemas, supporting primitive types, temporal types, and complex nested structures in Apache Flink's Table API.
3
4
## Capabilities
5
6
### DataTypes Factory
7
8
Central factory class for creating all Table API data types with full type safety and validation.
9
10
```java { .api }
11
// Primitive numeric types
12
public static DataType BOOLEAN();
13
public static DataType TINYINT();
14
public static DataType SMALLINT();
15
public static DataType INT();
16
public static DataType BIGINT();
17
public static DataType FLOAT();
18
public static DataType DOUBLE();
19
20
/**
21
* Create a decimal type with precision and scale
22
* @param precision Total number of digits
23
* @param scale Number of digits after decimal point
24
* @return DECIMAL data type
25
*/
26
public static DataType DECIMAL(int precision, int scale);
27
28
// String and binary types
29
public static DataType CHAR(int length);
30
public static DataType VARCHAR(int length);
31
public static DataType STRING();
32
public static DataType BINARY(int length);
33
public static DataType VARBINARY(int length);
34
public static DataType BYTES();
35
36
// Temporal types
37
public static DataType DATE();
38
public static DataType TIME();
39
public static DataType TIME(int precision);
40
public static DataType TIMESTAMP();
41
public static DataType TIMESTAMP(int precision);
42
public static DataType TIMESTAMP_WITH_TIME_ZONE();
43
public static DataType TIMESTAMP_WITH_TIME_ZONE(int precision);
44
public static DataType TIMESTAMP_WITH_LOCAL_TIME_ZONE();
45
public static DataType TIMESTAMP_WITH_LOCAL_TIME_ZONE(int precision);
46
47
// Interval types
48
public static DataType INTERVAL(DataTypes.Resolution resolution);
49
public static DataType INTERVAL(DataTypes.Resolution from, DataTypes.Resolution to);
50
51
/**
52
* Create an array type
53
* @param elementType Type of array elements
54
* @return ARRAY data type
55
*/
56
public static DataType ARRAY(DataType elementType);
57
58
/**
59
* Create a map type
60
* @param keyType Type of map keys
61
* @param valueType Type of map values
62
* @return MAP data type
63
*/
64
public static DataType MAP(DataType keyType, DataType valueType);
65
66
/**
67
* Create a multiset type (bag of elements)
68
* @param elementType Type of multiset elements
69
* @return MULTISET data type
70
*/
71
public static DataType MULTISET(DataType elementType);
72
73
/**
74
* Create a row/struct type with named fields
75
* @param fields Field definitions
76
* @return ROW data type
77
*/
78
public static DataType ROW(Field... fields);
79
80
/**
81
* Create a row type with field names and types
82
* @param fieldNames Array of field names
83
* @param fieldTypes Array of corresponding field types
84
* @return ROW data type
85
*/
86
public static DataType ROW(String[] fieldNames, DataType[] fieldTypes);
87
88
/**
89
* Create a field definition for row types
90
* @param name Field name
91
* @param type Field data type
92
* @return Field definition
93
*/
94
public static Field FIELD(String name, DataType type);
95
96
/**
97
* Create data type from Java class
98
* @param clazz Java class to convert
99
* @return Corresponding DataType
100
*/
101
public static DataType of(Class<?> clazz);
102
103
/**
104
* Create data type from type string
105
* @param typeString String representation of the type
106
* @return Parsed DataType
107
*/
108
public static DataType of(String typeString);
109
```
110
111
**Usage Examples:**
112
113
```java
114
// Primitive types
115
DataType userId = DataTypes.BIGINT();
116
DataType userName = DataTypes.STRING();
117
DataType balance = DataTypes.DECIMAL(10, 2);
118
DataType isActive = DataTypes.BOOLEAN();
119
DataType createdAt = DataTypes.TIMESTAMP(3);
120
121
// Complex types
122
DataType tags = DataTypes.ARRAY(DataTypes.STRING());
123
DataType attributes = DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING());
124
125
// Row type for nested structure
126
DataType address = DataTypes.ROW(
127
DataTypes.FIELD("street", DataTypes.STRING()),
128
DataTypes.FIELD("city", DataTypes.STRING()),
129
DataTypes.FIELD("zipcode", DataTypes.STRING())
130
);
131
132
// User record with nested address
133
DataType userRecord = DataTypes.ROW(
134
DataTypes.FIELD("id", DataTypes.BIGINT()),
135
DataTypes.FIELD("name", DataTypes.STRING()),
136
DataTypes.FIELD("address", address),
137
DataTypes.FIELD("tags", tags)
138
);
139
```
140
141
### Schema Definition
142
143
Builder pattern for defining table schemas with columns, watermarks, and constraints.
144
145
```java { .api }
146
/**
147
* Create a new schema builder
148
* @return Builder instance for constructing schema
149
*/
150
public static Builder newBuilder();
151
152
public static class Builder {
153
/**
154
* Add a physical column to the schema
155
* @param name Column name
156
* @param type Column data type
157
* @return Builder for method chaining
158
*/
159
public Builder column(String name, DataType type);
160
161
/**
162
* Add a computed column defined by an expression
163
* @param name Column name
164
* @param expression Expression for computing the column value
165
* @return Builder for method chaining
166
*/
167
public Builder columnByExpression(String name, Expression expression);
168
169
/**
170
* Add a metadata column for accessing connector metadata
171
* @param name Column name
172
* @param type Column data type
173
* @return Builder for method chaining
174
*/
175
public Builder columnByMetadata(String name, DataType type);
176
177
/**
178
* Add a metadata column with explicit metadata key
179
* @param name Column name
180
* @param type Column data type
181
* @param key Metadata key from the connector
182
* @return Builder for method chaining
183
*/
184
public Builder columnByMetadata(String name, DataType type, String key);
185
186
/**
187
* Add a metadata column with virtual flag
188
* @param name Column name
189
* @param type Column data type
190
* @param key Metadata key from the connector
191
* @param isVirtual Whether the column is virtual (not persisted)
192
* @return Builder for method chaining
193
*/
194
public Builder columnByMetadata(String name, DataType type, String key, boolean isVirtual);
195
196
/**
197
* Define a watermark strategy for event time processing
198
* @param columnName Name of the time column
199
* @param watermarkExpression Expression for watermark generation
200
* @return Builder for method chaining
201
*/
202
public Builder watermark(String columnName, Expression watermarkExpression);
203
204
/**
205
* Define a primary key constraint
206
* @param columnNames Names of columns forming the primary key
207
* @return Builder for method chaining
208
*/
209
public Builder primaryKey(String... columnNames);
210
211
/**
212
* Define a named primary key constraint
213
* @param constraintName Name for the primary key constraint
214
* @param columnNames Names of columns forming the primary key
215
* @return Builder for method chaining
216
*/
217
public Builder primaryKeyNamed(String constraintName, String... columnNames);
218
219
/**
220
* Build the final schema
221
* @return Constructed Schema instance
222
*/
223
public Schema build();
224
}
225
```
226
227
**Schema Usage Examples:**
228
229
```java
230
// Basic schema with columns and primary key
231
Schema userSchema = Schema.newBuilder()
232
.column("id", DataTypes.BIGINT())
233
.column("name", DataTypes.STRING())
234
.column("email", DataTypes.STRING())
235
.column("created_at", DataTypes.TIMESTAMP(3))
236
.primaryKey("id")
237
.build();
238
239
// Schema with computed column and watermark for streaming
240
Schema eventSchema = Schema.newBuilder()
241
.column("user_id", DataTypes.BIGINT())
242
.column("event_type", DataTypes.STRING())
243
.column("event_time", DataTypes.TIMESTAMP_LTZ(3))
244
.column("payload", DataTypes.STRING())
245
.columnByExpression("hour_of_day", $("event_time").extract(DateTimeUnit.HOUR))
246
.watermark("event_time", $("event_time").minus(lit(5).seconds()))
247
.build();
248
249
// Schema with metadata columns for Kafka connector
250
Schema kafkaSchema = Schema.newBuilder()
251
.column("user_id", DataTypes.BIGINT())
252
.column("message", DataTypes.STRING())
253
.columnByMetadata("kafka_topic", DataTypes.STRING(), "topic")
254
.columnByMetadata("kafka_partition", DataTypes.INT(), "partition")
255
.columnByMetadata("kafka_offset", DataTypes.BIGINT(), "offset")
256
.columnByMetadata("kafka_timestamp", DataTypes.TIMESTAMP_LTZ(3), "timestamp")
257
.build();
258
```
259
260
### Type Information Classes
261
262
Type information and utilities for working with data types at runtime.
263
264
```java { .api }
265
/**
266
* Abstract base class for all data types
267
*/
268
public abstract class DataType {
269
/**
270
* Get the logical type information
271
* @return LogicalType instance
272
*/
273
public LogicalType getLogicalType();
274
275
/**
276
* Get the Java class that represents this type
277
* @return Java Class<?> for this data type
278
*/
279
public Class<?> getConversionClass();
280
281
/**
282
* Create a nullable version of this type
283
* @return DataType that accepts null values
284
*/
285
public DataType nullable();
286
287
/**
288
* Create a non-nullable version of this type
289
* @return DataType that does not accept null values
290
*/
291
public DataType notNull();
292
293
/**
294
* Check if this type accepts null values
295
* @return true if nullable, false otherwise
296
*/
297
public boolean isNullable();
298
}
299
300
/**
301
* Represents a field in a row type
302
*/
303
public final class Field {
304
/**
305
* Get the field name
306
* @return Field name
307
*/
308
public String getName();
309
310
/**
311
* Get the field data type
312
* @return DataType of this field
313
*/
314
public DataType getType();
315
316
/**
317
* Get the field description
318
* @return Optional description of the field
319
*/
320
public Optional<String> getDescription();
321
}
322
323
/**
324
* Resolved schema containing all column and constraint information
325
*/
326
public interface ResolvedSchema {
327
/**
328
* Get the number of columns
329
* @return Column count
330
*/
331
public int getColumnCount();
332
333
/**
334
* Get column names in order
335
* @return List of column names
336
*/
337
public List<String> getColumnNames();
338
339
/**
340
* Get column data types in order
341
* @return List of DataType instances
342
*/
343
public List<DataType> getColumnDataTypes();
344
345
/**
346
* Get a specific column by index
347
* @param index Column index
348
* @return Column information
349
*/
350
public Column getColumn(int index);
351
352
/**
353
* Get a specific column by name
354
* @param name Column name
355
* @return Optional Column information
356
*/
357
public Optional<Column> getColumn(String name);
358
359
/**
360
* Get primary key constraint
361
* @return Optional primary key constraint
362
*/
363
public Optional<UniqueConstraint> getPrimaryKey();
364
365
/**
366
* Get all watermark specifications
367
* @return List of watermark specifications
368
*/
369
public List<WatermarkSpec> getWatermarkSpecs();
370
}
371
```
372
373
### Built-in Type Conversions
374
375
Utilities for converting between different type representations.
376
377
```java { .api }
378
/**
379
* Type conversion utilities
380
*/
381
public class TypeConversions {
382
/**
383
* Convert from legacy TypeInformation to DataType
384
* @param typeInfo Legacy TypeInformation
385
* @return Equivalent DataType
386
*/
387
public static DataType fromLegacyInfoToDataType(TypeInformation<?> typeInfo);
388
389
/**
390
* Convert DataType to legacy TypeInformation
391
* @param dataType DataType to convert
392
* @return Equivalent TypeInformation
393
*/
394
public static TypeInformation<?> fromDataTypeToLegacyInfo(DataType dataType);
395
}
396
```
397
398
### JSON Type Support
399
400
Special support for JSON data types and processing.
401
402
```java { .api }
403
/**
404
* JSON type enumeration for JSON processing functions
405
*/
406
public enum JsonType {
407
VALUE,
408
ARRAY,
409
OBJECT
410
}
411
412
/**
413
* JSON null handling behavior
414
*/
415
public enum JsonOnNull {
416
NULL,
417
ABSENT
418
}
419
420
/**
421
* JSON value extraction behavior on empty or error
422
*/
423
public enum JsonValueOnEmptyOrError {
424
NULL,
425
ERROR,
426
DEFAULT_VALUE
427
}
428
```
429
430
### Common Type Patterns
431
432
**Temporal Types with Precision:**
433
434
```java
435
// High precision timestamps for financial data
436
DataType orderTime = DataTypes.TIMESTAMP(9); // nanosecond precision
437
DataType tradeTime = DataTypes.TIMESTAMP_LTZ(6); // microsecond precision with timezone
438
439
// Date and time types
440
DataType birthDate = DataTypes.DATE();
441
DataType appointmentTime = DataTypes.TIME(3); // millisecond precision
442
```
443
444
**Complex Nested Structures:**
445
446
```java
447
// E-commerce order structure
448
DataType orderItem = DataTypes.ROW(
449
DataTypes.FIELD("product_id", DataTypes.BIGINT()),
450
DataTypes.FIELD("quantity", DataTypes.INT()),
451
DataTypes.FIELD("price", DataTypes.DECIMAL(10, 2))
452
);
453
454
DataType order = DataTypes.ROW(
455
DataTypes.FIELD("order_id", DataTypes.BIGINT()),
456
DataTypes.FIELD("customer_id", DataTypes.BIGINT()),
457
DataTypes.FIELD("items", DataTypes.ARRAY(orderItem)),
458
DataTypes.FIELD("total_amount", DataTypes.DECIMAL(12, 2)),
459
DataTypes.FIELD("order_date", DataTypes.TIMESTAMP(3))
460
);
461
```
462
463
**Map Types for Dynamic Data:**
464
465
```java
466
// Configuration or metadata as key-value pairs
467
DataType config = DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING());
468
DataType metrics = DataTypes.MAP(DataTypes.STRING(), DataTypes.DOUBLE());
469
470
// Multi-language text support
471
DataType translations = DataTypes.MAP(
472
DataTypes.STRING(), // language code
473
DataTypes.STRING() // translated text
474
);
475
```