0
# Table API
1
2
SQL-based stream processing integration with dynamic table sink factory. Supports DDL configuration and comprehensive validation for table-based Elasticsearch operations.
3
4
## Capabilities
5
6
### Connector Identifier
7
8
The Table API uses the `elasticsearch-6` connector identifier for creating Elasticsearch sinks via DDL.
9
10
```sql { .api }
11
CREATE TABLE sink_table (
12
column1 datatype,
13
column2 datatype,
14
...
15
) WITH (
16
'connector' = 'elasticsearch-6',
17
'hosts' = 'http://localhost:9200',
18
'index' = 'target-index',
19
'document-type' = '_doc'
20
);
21
```
22
23
### Configuration Options
24
25
Comprehensive configuration options for Elasticsearch Table API integration.
26
27
#### Required Options
28
29
```java { .api }
30
// Required configuration options
31
'connector' = 'elasticsearch-6' // Connector identifier
32
'hosts' = 'http://host:port;...' // Elasticsearch hosts (semicolon-separated)
33
'index' = 'index-name' // Target Elasticsearch index
34
'document-type' = 'type-name' // Document type (use '_doc' for ES 6.x+)
35
```
36
37
#### Optional Options
38
39
```java { .api }
40
// Bulk processing options
41
'bulk.flush.max.actions' = '1000' // Max actions per bulk request
42
'bulk.flush.max.size' = '2mb' // Max size per bulk request
43
'bulk.flush.interval' = '1s' // Bulk flush interval
44
'bulk.flush.backoff.type' = 'EXPONENTIAL' // Backoff type: CONSTANT/EXPONENTIAL
45
'bulk.flush.backoff.max-retries' = '3' // Max backoff retries
46
'bulk.flush.backoff.delay' = '30s' // Backoff delay
47
48
// Connection options
49
'connection.max-retry-timeout' = '30s' // Max retry timeout
50
'connection.path-prefix' = '/path' // URL path prefix
51
52
// Authentication options (if both provided)
53
'username' = 'elastic' // Username for basic auth
54
'password' = 'password' // Password for basic auth
55
56
// Advanced options
57
'format' = 'json' // Serialization format
58
'failure-handler' = 'class.name.FailureHandler' // Custom failure handler class
59
'sink.flush-on-checkpoint' = 'true' // Flush on checkpoint
60
'sink.key-delimiter' = '_' // Primary key delimiter
61
```
62
63
**Usage Examples:**
64
65
```sql
66
-- Basic table sink
67
CREATE TABLE user_behavior (
68
user_id BIGINT,
69
item_id BIGINT,
70
category_id BIGINT,
71
behavior STRING,
72
ts TIMESTAMP(3)
73
) WITH (
74
'connector' = 'elasticsearch-6',
75
'hosts' = 'http://localhost:9200',
76
'index' = 'user_behavior',
77
'document-type' = '_doc'
78
);
79
80
-- Advanced configuration with bulk settings
81
CREATE TABLE product_events (
82
product_id BIGINT,
83
event_type STRING,
84
user_id BIGINT,
85
event_time TIMESTAMP(3),
86
properties MAP<STRING, STRING>
87
) WITH (
88
'connector' = 'elasticsearch-6',
89
'hosts' = 'http://es-node1:9200;http://es-node2:9200;http://es-node3:9200',
90
'index' = 'product_events',
91
'document-type' = '_doc',
92
'bulk.flush.max.actions' = '500',
93
'bulk.flush.max.size' = '1mb',
94
'bulk.flush.interval' = '5s',
95
'bulk.flush.backoff.type' = 'EXPONENTIAL',
96
'bulk.flush.backoff.max-retries' = '5',
97
'bulk.flush.backoff.delay' = '100ms'
98
);
99
100
-- With authentication
101
CREATE TABLE secure_logs (
102
log_id STRING,
103
timestamp_field TIMESTAMP(3),
104
level STRING,
105
message STRING,
106
source STRING
107
) WITH (
108
'connector' = 'elasticsearch-6',
109
'hosts' = 'https://secure-es:9200',
110
'index' = 'application_logs',
111
'document-type' = '_doc',
112
'username' = 'flink_user',
113
'password' = 'secure_password',
114
'connection.max-retry-timeout' = '60s'
115
);
116
117
-- Insert data into Elasticsearch
118
INSERT INTO user_behavior
119
SELECT user_id, item_id, category_id, behavior, event_time
120
FROM kafka_source;
121
```
122
123
### Primary Key Handling
124
125
Elasticsearch connector supports primary key configuration for document ID generation.
126
127
```sql { .api }
128
-- Table with primary key (used for document _id)
129
CREATE TABLE users (
130
user_id BIGINT PRIMARY KEY NOT ENFORCED,
131
name STRING,
132
email STRING,
133
registration_time TIMESTAMP(3)
134
) WITH (
135
'connector' = 'elasticsearch-6',
136
'hosts' = 'http://localhost:9200',
137
'index' = 'users',
138
'document-type' = '_doc'
139
);
140
141
-- Composite primary key (concatenated with delimiter)
142
CREATE TABLE user_sessions (
143
user_id BIGINT,
144
session_id STRING,
145
start_time TIMESTAMP(3),
146
duration_minutes INT,
147
PRIMARY KEY (user_id, session_id) NOT ENFORCED
148
) WITH (
149
'connector' = 'elasticsearch-6',
150
'hosts' = 'http://localhost:9200',
151
'index' = 'user_sessions',
152
'document-type' = '_doc',
153
'sink.key-delimiter' = '#' -- Results in document ID like: "123#abc-def-456"
154
);
155
```
156
157
### Data Type Mapping
158
159
Flink data types are automatically mapped to Elasticsearch field types.
160
161
```java { .api }
162
// Flink to Elasticsearch type mapping
163
TINYINT, SMALLINT, INTEGER -> integer
164
BIGINT -> long
165
FLOAT -> float
166
DOUBLE -> double
167
BOOLEAN -> boolean
168
STRING, VARCHAR, CHAR -> text/keyword (based on content)
169
DECIMAL -> scaled_float or double
170
DATE -> date
171
TIME -> time
172
TIMESTAMP -> date with format
173
ARRAY<T> -> array of T
174
MAP<STRING, T> -> object with T values
175
ROW -> nested object
176
BYTES -> binary
177
```
178
179
### Dynamic Index and Type
180
181
Support for dynamic index and document type based on record content.
182
183
```sql { .api }
184
-- Dynamic index based on event time
185
CREATE TABLE time_partitioned_events (
186
event_id STRING,
187
event_type STRING,
188
event_time TIMESTAMP(3),
189
data MAP<STRING, STRING>
190
) WITH (
191
'connector' = 'elasticsearch-6',
192
'hosts' = 'http://localhost:9200',
193
'index' = 'events-{event_time|yyyy-MM-dd}', -- Dynamic index by date
194
'document-type' = '_doc'
195
);
196
197
-- Dynamic document type (if supported by ES version)
198
CREATE TABLE categorized_docs (
199
doc_id STRING,
200
category STRING,
201
content STRING,
202
created_at TIMESTAMP(3)
203
) WITH (
204
'connector' = 'elasticsearch-6',
205
'hosts' = 'http://localhost:9200',
206
'index' = 'documents',
207
'document-type' = '{category}' -- Dynamic type based on category field
208
);
209
```
210
211
### Error Handling in Table API
212
213
Table API supports the same failure handling mechanisms as DataStream API through configuration.
214
215
```sql { .api }
216
-- Using built-in retry failure handler
217
CREATE TABLE resilient_sink (
218
id BIGINT,
219
data STRING,
220
ts TIMESTAMP(3)
221
) WITH (
222
'connector' = 'elasticsearch-6',
223
'hosts' = 'http://localhost:9200',
224
'index' = 'data',
225
'document-type' = '_doc',
226
'failure-handler' = 'org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandler',
227
'bulk.flush.backoff.type' = 'EXPONENTIAL',
228
'bulk.flush.backoff.max-retries' = '5'
229
);
230
231
-- Using ignoring failure handler (drops failed records)
232
CREATE TABLE lenient_sink (
233
id BIGINT,
234
data STRING,
235
ts TIMESTAMP(3)
236
) WITH (
237
'connector' = 'elasticsearch-6',
238
'hosts' = 'http://localhost:9200',
239
'index' = 'data',
240
'document-type' = '_doc',
241
'failure-handler' = 'org.apache.flink.streaming.connectors.elasticsearch.util.IgnoringFailureHandler'
242
);
243
```
244
245
### Elasticsearch6DynamicSinkFactory
246
247
Internal factory class that creates dynamic table sinks for the Table API.
248
249
```java { .api }
250
/**
251
* A DynamicTableSinkFactory for discovering Elasticsearch6DynamicSink.
252
*/
253
@Internal
254
public class Elasticsearch6DynamicSinkFactory implements DynamicTableSinkFactory {
255
/**
256
* Create dynamic table sink from context.
257
* @param context Factory context with table schema and options
258
* @return Configured Elasticsearch6DynamicSink
259
*/
260
public DynamicTableSink createDynamicTableSink(Context context);
261
262
/**
263
* Factory identifier for connector discovery.
264
* @return "elasticsearch-6"
265
*/
266
public String factoryIdentifier();
267
268
/**
269
* Required configuration options.
270
* @return Set of required ConfigOption objects
271
*/
272
public Set<ConfigOption<?>> requiredOptions();
273
274
/**
275
* Optional configuration options.
276
* @return Set of optional ConfigOption objects
277
*/
278
public Set<ConfigOption<?>> optionalOptions();
279
}
280
```