or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

bulk-processing.mdclient-configuration.mddatastream-api.mdfailure-handling.mdindex.mdtable-api.md

datastream-api.mddocs/

0

# DataStream API

1

2

Core streaming sink functionality for programmatic Flink jobs. Provides ElasticsearchSink with builder pattern configuration for bulk processing, failure handling, and client customization.

3

4

## Capabilities

5

6

### ElasticsearchSink

7

8

Main sink class for sending streaming data to Elasticsearch 6.x clusters. Uses builder pattern for configuration and extends ElasticsearchSinkBase with RestHighLevelClient integration.

9

10

```java { .api }

11

/**

12

* Elasticsearch 6.x sink that requests multiple ActionRequests against a cluster

13

* for each incoming element.

14

* @param <T> Type of the elements handled by this sink

15

*/

16

@PublicEvolving

17

public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevelClient> {

18

// Private constructor - use Builder pattern

19

}

20

```

21

22

### ElasticsearchSink.Builder

23

24

Builder pattern for creating ElasticsearchSink instances with full configuration control.

25

26

```java { .api }

27

/**

28

* A builder for creating an ElasticsearchSink.

29

* @param <T> Type of the elements handled by the sink this builder creates.

30

*/

31

@PublicEvolving

32

public static class ElasticsearchSink.Builder<T> {

33

/**

34

* Creates a new ElasticsearchSink that connects to the cluster using a RestHighLevelClient.

35

* @param httpHosts The list of HttpHost to which the RestHighLevelClient connects to.

36

* @param elasticsearchSinkFunction This is used to generate multiple ActionRequest from the incoming element.

37

*/

38

public Builder(List<HttpHost> httpHosts, ElasticsearchSinkFunction<T> elasticsearchSinkFunction);

39

40

/**

41

* Sets the maximum number of actions to buffer for each bulk request. You can pass -1 to disable it.

42

* @param numMaxActions the maximum number of actions to buffer per bulk request.

43

*/

44

public void setBulkFlushMaxActions(int numMaxActions);

45

46

/**

47

* Sets the maximum size of buffered actions, in mb, per bulk request. You can pass -1 to disable it.

48

* @param maxSizeMb the maximum size of buffered actions, in mb.

49

*/

50

public void setBulkFlushMaxSizeMb(int maxSizeMb);

51

52

/**

53

* Sets the bulk flush interval, in milliseconds. You can pass -1 to disable it.

54

* @param intervalMillis the bulk flush interval, in milliseconds.

55

*/

56

public void setBulkFlushInterval(long intervalMillis);

57

58

/**

59

* Sets whether or not to enable bulk flush backoff behaviour.

60

* @param enabled whether or not to enable backoffs.

61

*/

62

public void setBulkFlushBackoff(boolean enabled);

63

64

/**

65

* Sets the type of back off to use when flushing bulk requests.

66

* @param flushBackoffType the backoff type to use.

67

*/

68

public void setBulkFlushBackoffType(FlushBackoffType flushBackoffType);

69

70

/**

71

* Sets the maximum number of retries for a backoff attempt when flushing bulk requests.

72

* @param maxRetries the maximum number of retries for a backoff attempt when flushing bulk requests

73

*/

74

public void setBulkFlushBackoffRetries(int maxRetries);

75

76

/**

77

* Sets the amount of delay between each backoff attempt when flushing bulk requests, in milliseconds.

78

* @param delayMillis the amount of delay between each backoff attempt when flushing bulk requests, in milliseconds.

79

*/

80

public void setBulkFlushBackoffDelay(long delayMillis);

81

82

/**

83

* Sets a failure handler for action requests.

84

* @param failureHandler This is used to handle failed ActionRequest.

85

*/

86

public void setFailureHandler(ActionRequestFailureHandler failureHandler);

87

88

/**

89

* Sets a REST client factory for custom client configuration.

90

* @param restClientFactory the factory that configures the rest client.

91

*/

92

public void setRestClientFactory(RestClientFactory restClientFactory);

93

94

/**

95

* Creates the Elasticsearch sink.

96

* @return the created Elasticsearch sink.

97

*/

98

public ElasticsearchSink<T> build();

99

}

100

```

101

102

**Usage Examples:**

103

104

```java

105

import org.apache.flink.streaming.api.datastream.DataStream;

106

import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;

107

import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;

108

import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;

109

import org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandler;

110

import org.apache.http.HttpHost;

111

import org.elasticsearch.action.index.IndexRequest;

112

import org.elasticsearch.client.Requests;

113

114

import java.util.ArrayList;

115

import java.util.HashMap;

116

import java.util.List;

117

import java.util.Map;

118

119

// Basic sink creation

120

List<HttpHost> httpHosts = new ArrayList<>();

121

httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));

122

httpHosts.add(new HttpHost("127.0.0.1", 9201, "http"));

123

124

ElasticsearchSinkFunction<MyData> sinkFunction = new ElasticsearchSinkFunction<MyData>() {

125

@Override

126

public void process(MyData element, RuntimeContext ctx, RequestIndexer indexer) {

127

Map<String, Object> json = new HashMap<>();

128

json.put("id", element.getId());

129

json.put("name", element.getName());

130

json.put("timestamp", element.getTimestamp());

131

132

IndexRequest request = Requests.indexRequest()

133

.index("my-index")

134

.type("_doc")

135

.id(String.valueOf(element.getId()))

136

.source(json);

137

138

indexer.add(request);

139

}

140

};

141

142

ElasticsearchSink<MyData> sink = new ElasticsearchSink.Builder<>(

143

httpHosts,

144

sinkFunction

145

).build();

146

147

// Advanced configuration

148

ElasticsearchSink<MyData> advancedSink = new ElasticsearchSink.Builder<>(

149

httpHosts,

150

sinkFunction

151

)

152

.setBulkFlushMaxActions(1000) // Flush after 1000 actions

153

.setBulkFlushMaxSizeMb(5) // Or after 5MB

154

.setBulkFlushInterval(30000) // Or after 30 seconds

155

.setBulkFlushBackoff(true) // Enable backoff

156

.setBulkFlushBackoffType(FlushBackoffType.EXPONENTIAL)

157

.setBulkFlushBackoffRetries(3)

158

.setBulkFlushBackoffDelay(1000)

159

.setFailureHandler(new RetryRejectedExecutionFailureHandler())

160

.build();

161

162

// Add to DataStream

163

DataStream<MyData> dataStream = // ... your data stream

164

dataStream.addSink(advancedSink);

165

```

166

167

### ElasticsearchSinkFunction

168

169

User-defined function interface for converting stream elements into Elasticsearch ActionRequests.

170

171

```java { .api }

172

/**

173

* Creates multiple ActionRequests from an element in a stream.

174

* This is used by sinks to prepare elements for sending them to Elasticsearch.

175

* @param <T> The type of the element handled by this ElasticsearchSinkFunction

176

*/

177

@PublicEvolving

178

public interface ElasticsearchSinkFunction<T> extends Serializable, Function {

179

/**

180

* Initialization method for the function. It is called once before the actual working process methods.

181

*/

182

default void open() throws Exception {}

183

184

/**

185

* Tear-down method for the function. It is called when the sink closes.

186

*/

187

default void close() throws Exception {}

188

189

/**

190

* Process the incoming element to produce multiple ActionRequests.

191

* The produced requests should be added to the provided RequestIndexer.

192

* @param element incoming element to process

193

* @param ctx runtime context containing information about the sink instance

194

* @param indexer request indexer that ActionRequest should be added to

195

*/

196

void process(T element, RuntimeContext ctx, RequestIndexer indexer);

197

}

198

```

199

200

**Usage Examples:**

201

202

```java

203

// Simple single-request function

204

ElasticsearchSinkFunction<String> simpleSinkFunction = new ElasticsearchSinkFunction<String>() {

205

@Override

206

public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {

207

Map<String, Object> json = new HashMap<>();

208

json.put("data", element);

209

json.put("timestamp", System.currentTimeMillis());

210

211

IndexRequest request = Requests.indexRequest()

212

.index("logs")

213

.type("_doc")

214

.source(json);

215

216

indexer.add(request);

217

}

218

};

219

220

// Multiple requests per element

221

ElasticsearchSinkFunction<Transaction> multiRequestFunction = new ElasticsearchSinkFunction<Transaction>() {

222

@Override

223

public void process(Transaction transaction, RuntimeContext ctx, RequestIndexer indexer) {

224

// Index the transaction

225

Map<String, Object> transactionDoc = new HashMap<>();

226

transactionDoc.put("id", transaction.getId());

227

transactionDoc.put("amount", transaction.getAmount());

228

transactionDoc.put("currency", transaction.getCurrency());

229

230

IndexRequest transactionRequest = Requests.indexRequest()

231

.index("transactions")

232

.type("_doc")

233

.id(transaction.getId())

234

.source(transactionDoc);

235

236

// Update user balance

237

Map<String, Object> balanceUpdate = new HashMap<>();

238

balanceUpdate.put("balance", transaction.getNewBalance());

239

balanceUpdate.put("last_updated", System.currentTimeMillis());

240

241

UpdateRequest balanceRequest = Requests.updateRequest()

242

.index("users")

243

.type("_doc")

244

.id(transaction.getUserId())

245

.doc(balanceUpdate);

246

247

indexer.add(transactionRequest, balanceRequest);

248

}

249

};

250

251

// With lifecycle methods

252

ElasticsearchSinkFunction<Event> lifecycleSinkFunction = new ElasticsearchSinkFunction<Event>() {

253

private ObjectMapper objectMapper;

254

255

@Override

256

public void open() throws Exception {

257

objectMapper = new ObjectMapper();

258

}

259

260

@Override

261

public void process(Event event, RuntimeContext ctx, RequestIndexer indexer) {

262

try {

263

String json = objectMapper.writeValueAsString(event);

264

Map<String, Object> source = objectMapper.readValue(json, HashMap.class);

265

266

IndexRequest request = Requests.indexRequest()

267

.index("events")

268

.type("_doc")

269

.source(source);

270

271

indexer.add(request);

272

} catch (Exception e) {

273

throw new RuntimeException("Failed to serialize event", e);

274

}

275

}

276

277

@Override

278

public void close() throws Exception {

279

// Cleanup resources if needed

280

}

281

};

282

```

283

284

### RequestIndexer

285

286

Interface for adding ActionRequests to be sent to Elasticsearch as part of bulk operations.

287

288

```java { .api }

289

/**

290

* Users add multiple delete, index or update requests to a RequestIndexer to prepare them

291

* for sending to an Elasticsearch cluster.

292

*/

293

@PublicEvolving

294

public interface RequestIndexer {

295

/**

296

* Add multiple DeleteRequest to the indexer to prepare for sending requests to Elasticsearch.

297

* @param deleteRequests The multiple DeleteRequest to add.

298

*/

299

void add(DeleteRequest... deleteRequests);

300

301

/**

302

* Add multiple IndexRequest to the indexer to prepare for sending requests to Elasticsearch.

303

* @param indexRequests The multiple IndexRequest to add.

304

*/

305

void add(IndexRequest... indexRequests);

306

307

/**

308

* Add multiple UpdateRequest to the indexer to prepare for sending requests to Elasticsearch.

309

* @param updateRequests The multiple UpdateRequest to add.

310

*/

311

void add(UpdateRequest... updateRequests);

312

313

/**

314

* Add multiple ActionRequest to the indexer to prepare for sending requests to Elasticsearch.

315

* @param actionRequests The multiple ActionRequest to add.

316

* @deprecated use the DeleteRequest, IndexRequest or UpdateRequest methods

317

*/

318

@Deprecated

319

default void add(ActionRequest... actionRequests);

320

}

321

```