or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

bulk-processing.mdclient-configuration.mddatastream-api.mdfailure-handling.mdindex.mdtable-api.md
tile.json

table-api.mddocs/

Table API

SQL-based stream processing integration with dynamic table sink factory. Supports DDL configuration and comprehensive validation for table-based Elasticsearch operations.

Capabilities

Connector Identifier

The Table API uses the elasticsearch-6 connector identifier for creating Elasticsearch sinks via DDL.

CREATE TABLE sink_table (
  column1 datatype,
  column2 datatype,
  ...
) WITH (
  'connector' = 'elasticsearch-6',
  'hosts' = 'http://localhost:9200',
  'index' = 'target-index',
  'document-type' = '_doc'
);

Configuration Options

Comprehensive configuration options for Elasticsearch Table API integration.

Required Options

// Required configuration options
'connector' = 'elasticsearch-6'        // Connector identifier
'hosts' = 'http://host:port;...'       // Elasticsearch hosts (semicolon-separated)
'index' = 'index-name'                 // Target Elasticsearch index
'document-type' = 'type-name'          // Document type (use '_doc' for ES 6.x+)

Optional Options

// Bulk processing options
'bulk.flush.max.actions' = '1000'                    // Max actions per bulk request
'bulk.flush.max.size' = '2mb'                        // Max size per bulk request
'bulk.flush.interval' = '1s'                         // Bulk flush interval
'bulk.flush.backoff.type' = 'EXPONENTIAL'            // Backoff type: CONSTANT/EXPONENTIAL
'bulk.flush.backoff.max-retries' = '3'               // Max backoff retries
'bulk.flush.backoff.delay' = '30s'                   // Backoff delay

// Connection options
'connection.max-retry-timeout' = '30s'               // Max retry timeout
'connection.path-prefix' = '/path'                   // URL path prefix

// Authentication options (if both provided)
'username' = 'elastic'                               // Username for basic auth
'password' = 'password'                              // Password for basic auth

// Advanced options
'format' = 'json'                                    // Serialization format
'failure-handler' = 'class.name.FailureHandler'     // Custom failure handler class
'sink.flush-on-checkpoint' = 'true'                  // Flush on checkpoint
'sink.key-delimiter' = '_'                           // Primary key delimiter

Usage Examples:

-- Basic table sink
CREATE TABLE user_behavior (
  user_id BIGINT,
  item_id BIGINT,
  category_id BIGINT,
  behavior STRING,
  ts TIMESTAMP(3)
) WITH (
  'connector' = 'elasticsearch-6',
  'hosts' = 'http://localhost:9200',
  'index' = 'user_behavior',
  'document-type' = '_doc'
);

-- Advanced configuration with bulk settings
CREATE TABLE product_events (
  product_id BIGINT,
  event_type STRING,
  user_id BIGINT,
  event_time TIMESTAMP(3),
  properties MAP<STRING, STRING>
) WITH (
  'connector' = 'elasticsearch-6',
  'hosts' = 'http://es-node1:9200;http://es-node2:9200;http://es-node3:9200',
  'index' = 'product_events',
  'document-type' = '_doc',
  'bulk.flush.max.actions' = '500',
  'bulk.flush.max.size' = '1mb',
  'bulk.flush.interval' = '5s',
  'bulk.flush.backoff.type' = 'EXPONENTIAL',
  'bulk.flush.backoff.max-retries' = '5',
  'bulk.flush.backoff.delay' = '100ms'
);

-- With authentication
CREATE TABLE secure_logs (
  log_id STRING,
  timestamp_field TIMESTAMP(3),
  level STRING,
  message STRING,
  source STRING
) WITH (
  'connector' = 'elasticsearch-6',
  'hosts' = 'https://secure-es:9200',
  'index' = 'application_logs',
  'document-type' = '_doc',
  'username' = 'flink_user',
  'password' = 'secure_password',
  'connection.max-retry-timeout' = '60s'
);

-- Insert data into Elasticsearch
INSERT INTO user_behavior
SELECT user_id, item_id, category_id, behavior, event_time
FROM kafka_source;

Primary Key Handling

Elasticsearch connector supports primary key configuration for document ID generation.

-- Table with primary key (used for document _id)
CREATE TABLE users (
  user_id BIGINT PRIMARY KEY NOT ENFORCED,
  name STRING,
  email STRING,
  registration_time TIMESTAMP(3)
) WITH (
  'connector' = 'elasticsearch-6',
  'hosts' = 'http://localhost:9200',
  'index' = 'users',
  'document-type' = '_doc'
);

-- Composite primary key (concatenated with delimiter)
CREATE TABLE user_sessions (
  user_id BIGINT,
  session_id STRING,
  start_time TIMESTAMP(3),
  duration_minutes INT,
  PRIMARY KEY (user_id, session_id) NOT ENFORCED
) WITH (
  'connector' = 'elasticsearch-6',
  'hosts' = 'http://localhost:9200',
  'index' = 'user_sessions',
  'document-type' = '_doc',
  'sink.key-delimiter' = '#'  -- Results in document ID like: "123#abc-def-456"
);

Data Type Mapping

Flink data types are automatically mapped to Elasticsearch field types.

// Flink to Elasticsearch type mapping
TINYINT, SMALLINT, INTEGER -> integer
BIGINT -> long
FLOAT -> float  
DOUBLE -> double
BOOLEAN -> boolean
STRING, VARCHAR, CHAR -> text/keyword (based on content)
DECIMAL -> scaled_float or double
DATE -> date
TIME -> time
TIMESTAMP -> date with format
ARRAY<T> -> array of T
MAP<STRING, T> -> object with T values
ROW -> nested object
BYTES -> binary

Dynamic Index and Type

Support for dynamic index and document type based on record content.

-- Dynamic index based on event time
CREATE TABLE time_partitioned_events (
  event_id STRING,
  event_type STRING,
  event_time TIMESTAMP(3),
  data MAP<STRING, STRING>
) WITH (
  'connector' = 'elasticsearch-6',
  'hosts' = 'http://localhost:9200',
  'index' = 'events-{event_time|yyyy-MM-dd}',  -- Dynamic index by date
  'document-type' = '_doc'
);

-- Dynamic document type (if supported by ES version)
CREATE TABLE categorized_docs (
  doc_id STRING,
  category STRING,
  content STRING,
  created_at TIMESTAMP(3)
) WITH (
  'connector' = 'elasticsearch-6',
  'hosts' = 'http://localhost:9200',
  'index' = 'documents',
  'document-type' = '{category}'  -- Dynamic type based on category field
);

Error Handling in Table API

Table API supports the same failure handling mechanisms as DataStream API through configuration.

-- Using built-in retry failure handler
CREATE TABLE resilient_sink (
  id BIGINT,
  data STRING,
  ts TIMESTAMP(3)
) WITH (
  'connector' = 'elasticsearch-6',
  'hosts' = 'http://localhost:9200',
  'index' = 'data',
  'document-type' = '_doc',
  'failure-handler' = 'org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandler',
  'bulk.flush.backoff.type' = 'EXPONENTIAL',
  'bulk.flush.backoff.max-retries' = '5'
);

-- Using ignoring failure handler (drops failed records)
CREATE TABLE lenient_sink (
  id BIGINT,
  data STRING,
  ts TIMESTAMP(3)  
) WITH (
  'connector' = 'elasticsearch-6',
  'hosts' = 'http://localhost:9200',
  'index' = 'data',
  'document-type' = '_doc',
  'failure-handler' = 'org.apache.flink.streaming.connectors.elasticsearch.util.IgnoringFailureHandler'
);

Elasticsearch6DynamicSinkFactory

Internal factory class that creates dynamic table sinks for the Table API.

/**
 * A DynamicTableSinkFactory for discovering Elasticsearch6DynamicSink.
 */
@Internal
public class Elasticsearch6DynamicSinkFactory implements DynamicTableSinkFactory {
    /**
     * Create dynamic table sink from context.
     * @param context Factory context with table schema and options
     * @return Configured Elasticsearch6DynamicSink
     */
    public DynamicTableSink createDynamicTableSink(Context context);

    /**
     * Factory identifier for connector discovery.
     * @return "elasticsearch-6"
     */
    public String factoryIdentifier();

    /**
     * Required configuration options.
     * @return Set of required ConfigOption objects
     */
    public Set<ConfigOption<?>> requiredOptions();

    /**
     * Optional configuration options.
     * @return Set of optional ConfigOption objects
     */
    public Set<ConfigOption<?>> optionalOptions();
}