or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

bulk-processing.mdclient-configuration.mddatastream-api.mdfailure-handling.mdindex.mdtable-api.md

table-api.mddocs/

0

# Table API

1

2

SQL-based stream processing integration with dynamic table sink factory. Supports DDL configuration and comprehensive validation for table-based Elasticsearch operations.

3

4

## Capabilities

5

6

### Connector Identifier

7

8

The Table API uses the `elasticsearch-6` connector identifier for creating Elasticsearch sinks via DDL.

9

10

```sql { .api }

11

CREATE TABLE sink_table (

12

column1 datatype,

13

column2 datatype,

14

...

15

) WITH (

16

'connector' = 'elasticsearch-6',

17

'hosts' = 'http://localhost:9200',

18

'index' = 'target-index',

19

'document-type' = '_doc'

20

);

21

```

22

23

### Configuration Options

24

25

Comprehensive configuration options for Elasticsearch Table API integration.

26

27

#### Required Options

28

29

```java { .api }

30

// Required configuration options

31

'connector' = 'elasticsearch-6' // Connector identifier

32

'hosts' = 'http://host:port;...' // Elasticsearch hosts (semicolon-separated)

33

'index' = 'index-name' // Target Elasticsearch index

34

'document-type' = 'type-name' // Document type (use '_doc' for ES 6.x+)

35

```

36

37

#### Optional Options

38

39

```java { .api }

40

// Bulk processing options

41

'bulk.flush.max.actions' = '1000' // Max actions per bulk request

42

'bulk.flush.max.size' = '2mb' // Max size per bulk request

43

'bulk.flush.interval' = '1s' // Bulk flush interval

44

'bulk.flush.backoff.type' = 'EXPONENTIAL' // Backoff type: CONSTANT/EXPONENTIAL

45

'bulk.flush.backoff.max-retries' = '3' // Max backoff retries

46

'bulk.flush.backoff.delay' = '30s' // Backoff delay

47

48

// Connection options

49

'connection.max-retry-timeout' = '30s' // Max retry timeout

50

'connection.path-prefix' = '/path' // URL path prefix

51

52

// Authentication options (if both provided)

53

'username' = 'elastic' // Username for basic auth

54

'password' = 'password' // Password for basic auth

55

56

// Advanced options

57

'format' = 'json' // Serialization format

58

'failure-handler' = 'class.name.FailureHandler' // Custom failure handler class

59

'sink.flush-on-checkpoint' = 'true' // Flush on checkpoint

60

'sink.key-delimiter' = '_' // Primary key delimiter

61

```

62

63

**Usage Examples:**

64

65

```sql

66

-- Basic table sink

67

CREATE TABLE user_behavior (

68

user_id BIGINT,

69

item_id BIGINT,

70

category_id BIGINT,

71

behavior STRING,

72

ts TIMESTAMP(3)

73

) WITH (

74

'connector' = 'elasticsearch-6',

75

'hosts' = 'http://localhost:9200',

76

'index' = 'user_behavior',

77

'document-type' = '_doc'

78

);

79

80

-- Advanced configuration with bulk settings

81

CREATE TABLE product_events (

82

product_id BIGINT,

83

event_type STRING,

84

user_id BIGINT,

85

event_time TIMESTAMP(3),

86

properties MAP<STRING, STRING>

87

) WITH (

88

'connector' = 'elasticsearch-6',

89

'hosts' = 'http://es-node1:9200;http://es-node2:9200;http://es-node3:9200',

90

'index' = 'product_events',

91

'document-type' = '_doc',

92

'bulk.flush.max.actions' = '500',

93

'bulk.flush.max.size' = '1mb',

94

'bulk.flush.interval' = '5s',

95

'bulk.flush.backoff.type' = 'EXPONENTIAL',

96

'bulk.flush.backoff.max-retries' = '5',

97

'bulk.flush.backoff.delay' = '100ms'

98

);

99

100

-- With authentication

101

CREATE TABLE secure_logs (

102

log_id STRING,

103

timestamp_field TIMESTAMP(3),

104

level STRING,

105

message STRING,

106

source STRING

107

) WITH (

108

'connector' = 'elasticsearch-6',

109

'hosts' = 'https://secure-es:9200',

110

'index' = 'application_logs',

111

'document-type' = '_doc',

112

'username' = 'flink_user',

113

'password' = 'secure_password',

114

'connection.max-retry-timeout' = '60s'

115

);

116

117

-- Insert data into Elasticsearch

118

INSERT INTO user_behavior

119

SELECT user_id, item_id, category_id, behavior, event_time

120

FROM kafka_source;

121

```

122

123

### Primary Key Handling

124

125

Elasticsearch connector supports primary key configuration for document ID generation.

126

127

```sql { .api }

128

-- Table with primary key (used for document _id)

129

CREATE TABLE users (

130

user_id BIGINT PRIMARY KEY NOT ENFORCED,

131

name STRING,

132

email STRING,

133

registration_time TIMESTAMP(3)

134

) WITH (

135

'connector' = 'elasticsearch-6',

136

'hosts' = 'http://localhost:9200',

137

'index' = 'users',

138

'document-type' = '_doc'

139

);

140

141

-- Composite primary key (concatenated with delimiter)

142

CREATE TABLE user_sessions (

143

user_id BIGINT,

144

session_id STRING,

145

start_time TIMESTAMP(3),

146

duration_minutes INT,

147

PRIMARY KEY (user_id, session_id) NOT ENFORCED

148

) WITH (

149

'connector' = 'elasticsearch-6',

150

'hosts' = 'http://localhost:9200',

151

'index' = 'user_sessions',

152

'document-type' = '_doc',

153

'sink.key-delimiter' = '#' -- Results in document ID like: "123#abc-def-456"

154

);

155

```

156

157

### Data Type Mapping

158

159

Flink data types are automatically mapped to Elasticsearch field types.

160

161

```java { .api }

162

// Flink to Elasticsearch type mapping

163

TINYINT, SMALLINT, INTEGER -> integer

164

BIGINT -> long

165

FLOAT -> float

166

DOUBLE -> double

167

BOOLEAN -> boolean

168

STRING, VARCHAR, CHAR -> text/keyword (based on content)

169

DECIMAL -> scaled_float or double

170

DATE -> date

171

TIME -> time

172

TIMESTAMP -> date with format

173

ARRAY<T> -> array of T

174

MAP<STRING, T> -> object with T values

175

ROW -> nested object

176

BYTES -> binary

177

```

178

179

### Dynamic Index and Type

180

181

Support for dynamic index and document type based on record content.

182

183

```sql { .api }

184

-- Dynamic index based on event time

185

CREATE TABLE time_partitioned_events (

186

event_id STRING,

187

event_type STRING,

188

event_time TIMESTAMP(3),

189

data MAP<STRING, STRING>

190

) WITH (

191

'connector' = 'elasticsearch-6',

192

'hosts' = 'http://localhost:9200',

193

'index' = 'events-{event_time|yyyy-MM-dd}', -- Dynamic index by date

194

'document-type' = '_doc'

195

);

196

197

-- Dynamic document type (if supported by ES version)

198

CREATE TABLE categorized_docs (

199

doc_id STRING,

200

category STRING,

201

content STRING,

202

created_at TIMESTAMP(3)

203

) WITH (

204

'connector' = 'elasticsearch-6',

205

'hosts' = 'http://localhost:9200',

206

'index' = 'documents',

207

'document-type' = '{category}' -- Dynamic type based on category field

208

);

209

```

210

211

### Error Handling in Table API

212

213

Table API supports the same failure handling mechanisms as DataStream API through configuration.

214

215

```sql { .api }

216

-- Using built-in retry failure handler

217

CREATE TABLE resilient_sink (

218

id BIGINT,

219

data STRING,

220

ts TIMESTAMP(3)

221

) WITH (

222

'connector' = 'elasticsearch-6',

223

'hosts' = 'http://localhost:9200',

224

'index' = 'data',

225

'document-type' = '_doc',

226

'failure-handler' = 'org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandler',

227

'bulk.flush.backoff.type' = 'EXPONENTIAL',

228

'bulk.flush.backoff.max-retries' = '5'

229

);

230

231

-- Using ignoring failure handler (drops failed records)

232

CREATE TABLE lenient_sink (

233

id BIGINT,

234

data STRING,

235

ts TIMESTAMP(3)

236

) WITH (

237

'connector' = 'elasticsearch-6',

238

'hosts' = 'http://localhost:9200',

239

'index' = 'data',

240

'document-type' = '_doc',

241

'failure-handler' = 'org.apache.flink.streaming.connectors.elasticsearch.util.IgnoringFailureHandler'

242

);

243

```

244

245

### Elasticsearch6DynamicSinkFactory

246

247

Internal factory class that creates dynamic table sinks for the Table API.

248

249

```java { .api }

250

/**

251

* A DynamicTableSinkFactory for discovering Elasticsearch6DynamicSink.

252

*/

253

@Internal

254

public class Elasticsearch6DynamicSinkFactory implements DynamicTableSinkFactory {

255

/**

256

* Create dynamic table sink from context.

257

* @param context Factory context with table schema and options

258

* @return Configured Elasticsearch6DynamicSink

259

*/

260

public DynamicTableSink createDynamicTableSink(Context context);

261

262

/**

263

* Factory identifier for connector discovery.

264

* @return "elasticsearch-6"

265

*/

266

public String factoryIdentifier();

267

268

/**

269

* Required configuration options.

270

* @return Set of required ConfigOption objects

271

*/

272

public Set<ConfigOption<?>> requiredOptions();

273

274

/**

275

* Optional configuration options.

276

* @return Set of optional ConfigOption objects

277

*/

278

public Set<ConfigOption<?>> optionalOptions();

279

}

280

```