SQL-based stream processing integration with dynamic table sink factory. Supports DDL configuration and comprehensive validation for table-based Elasticsearch operations.
The Table API uses the elasticsearch-6 connector identifier for creating Elasticsearch sinks via DDL.
CREATE TABLE sink_table (
column1 datatype,
column2 datatype,
...
) WITH (
'connector' = 'elasticsearch-6',
'hosts' = 'http://localhost:9200',
'index' = 'target-index',
'document-type' = '_doc'
);Comprehensive configuration options for Elasticsearch Table API integration.
// Required configuration options
'connector' = 'elasticsearch-6' // Connector identifier
'hosts' = 'http://host:port;...' // Elasticsearch hosts (semicolon-separated)
'index' = 'index-name' // Target Elasticsearch index
'document-type' = 'type-name' // Document type (use '_doc' for ES 6.x+)// Bulk processing options
'bulk.flush.max.actions' = '1000' // Max actions per bulk request
'bulk.flush.max.size' = '2mb' // Max size per bulk request
'bulk.flush.interval' = '1s' // Bulk flush interval
'bulk.flush.backoff.type' = 'EXPONENTIAL' // Backoff type: CONSTANT/EXPONENTIAL
'bulk.flush.backoff.max-retries' = '3' // Max backoff retries
'bulk.flush.backoff.delay' = '30s' // Backoff delay
// Connection options
'connection.max-retry-timeout' = '30s' // Max retry timeout
'connection.path-prefix' = '/path' // URL path prefix
// Authentication options (if both provided)
'username' = 'elastic' // Username for basic auth
'password' = 'password' // Password for basic auth
// Advanced options
'format' = 'json' // Serialization format
'failure-handler' = 'class.name.FailureHandler' // Custom failure handler class
'sink.flush-on-checkpoint' = 'true' // Flush on checkpoint
'sink.key-delimiter' = '_' // Primary key delimiterUsage Examples:
-- Basic table sink
CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'elasticsearch-6',
'hosts' = 'http://localhost:9200',
'index' = 'user_behavior',
'document-type' = '_doc'
);
-- Advanced configuration with bulk settings
CREATE TABLE product_events (
product_id BIGINT,
event_type STRING,
user_id BIGINT,
event_time TIMESTAMP(3),
properties MAP<STRING, STRING>
) WITH (
'connector' = 'elasticsearch-6',
'hosts' = 'http://es-node1:9200;http://es-node2:9200;http://es-node3:9200',
'index' = 'product_events',
'document-type' = '_doc',
'bulk.flush.max.actions' = '500',
'bulk.flush.max.size' = '1mb',
'bulk.flush.interval' = '5s',
'bulk.flush.backoff.type' = 'EXPONENTIAL',
'bulk.flush.backoff.max-retries' = '5',
'bulk.flush.backoff.delay' = '100ms'
);
-- With authentication
CREATE TABLE secure_logs (
log_id STRING,
timestamp_field TIMESTAMP(3),
level STRING,
message STRING,
source STRING
) WITH (
'connector' = 'elasticsearch-6',
'hosts' = 'https://secure-es:9200',
'index' = 'application_logs',
'document-type' = '_doc',
'username' = 'flink_user',
'password' = 'secure_password',
'connection.max-retry-timeout' = '60s'
);
-- Insert data into Elasticsearch
INSERT INTO user_behavior
SELECT user_id, item_id, category_id, behavior, event_time
FROM kafka_source;Elasticsearch connector supports primary key configuration for document ID generation.
-- Table with primary key (used for document _id)
CREATE TABLE users (
user_id BIGINT PRIMARY KEY NOT ENFORCED,
name STRING,
email STRING,
registration_time TIMESTAMP(3)
) WITH (
'connector' = 'elasticsearch-6',
'hosts' = 'http://localhost:9200',
'index' = 'users',
'document-type' = '_doc'
);
-- Composite primary key (concatenated with delimiter)
CREATE TABLE user_sessions (
user_id BIGINT,
session_id STRING,
start_time TIMESTAMP(3),
duration_minutes INT,
PRIMARY KEY (user_id, session_id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-6',
'hosts' = 'http://localhost:9200',
'index' = 'user_sessions',
'document-type' = '_doc',
'sink.key-delimiter' = '#' -- Results in document ID like: "123#abc-def-456"
);Flink data types are automatically mapped to Elasticsearch field types.
// Flink to Elasticsearch type mapping
TINYINT, SMALLINT, INTEGER -> integer
BIGINT -> long
FLOAT -> float
DOUBLE -> double
BOOLEAN -> boolean
STRING, VARCHAR, CHAR -> text/keyword (based on content)
DECIMAL -> scaled_float or double
DATE -> date
TIME -> time
TIMESTAMP -> date with format
ARRAY<T> -> array of T
MAP<STRING, T> -> object with T values
ROW -> nested object
BYTES -> binarySupport for dynamic index and document type based on record content.
-- Dynamic index based on event time
CREATE TABLE time_partitioned_events (
event_id STRING,
event_type STRING,
event_time TIMESTAMP(3),
data MAP<STRING, STRING>
) WITH (
'connector' = 'elasticsearch-6',
'hosts' = 'http://localhost:9200',
'index' = 'events-{event_time|yyyy-MM-dd}', -- Dynamic index by date
'document-type' = '_doc'
);
-- Dynamic document type (if supported by ES version)
CREATE TABLE categorized_docs (
doc_id STRING,
category STRING,
content STRING,
created_at TIMESTAMP(3)
) WITH (
'connector' = 'elasticsearch-6',
'hosts' = 'http://localhost:9200',
'index' = 'documents',
'document-type' = '{category}' -- Dynamic type based on category field
);Table API supports the same failure handling mechanisms as DataStream API through configuration.
-- Using built-in retry failure handler
CREATE TABLE resilient_sink (
id BIGINT,
data STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'elasticsearch-6',
'hosts' = 'http://localhost:9200',
'index' = 'data',
'document-type' = '_doc',
'failure-handler' = 'org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandler',
'bulk.flush.backoff.type' = 'EXPONENTIAL',
'bulk.flush.backoff.max-retries' = '5'
);
-- Using ignoring failure handler (drops failed records)
CREATE TABLE lenient_sink (
id BIGINT,
data STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'elasticsearch-6',
'hosts' = 'http://localhost:9200',
'index' = 'data',
'document-type' = '_doc',
'failure-handler' = 'org.apache.flink.streaming.connectors.elasticsearch.util.IgnoringFailureHandler'
);Internal factory class that creates dynamic table sinks for the Table API.
/**
* A DynamicTableSinkFactory for discovering Elasticsearch6DynamicSink.
*/
@Internal
public class Elasticsearch6DynamicSinkFactory implements DynamicTableSinkFactory {
/**
* Create dynamic table sink from context.
* @param context Factory context with table schema and options
* @return Configured Elasticsearch6DynamicSink
*/
public DynamicTableSink createDynamicTableSink(Context context);
/**
* Factory identifier for connector discovery.
* @return "elasticsearch-6"
*/
public String factoryIdentifier();
/**
* Required configuration options.
* @return Set of required ConfigOption objects
*/
public Set<ConfigOption<?>> requiredOptions();
/**
* Optional configuration options.
* @return Set of optional ConfigOption objects
*/
public Set<ConfigOption<?>> optionalOptions();
}