or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-org-apache-flink--flink-connector-elasticsearch-2-12

Apache Flink connector for Elasticsearch 1.x that enables streaming data ingestion into Elasticsearch clusters

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-connector-elasticsearch_2.12@1.8.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-connector-elasticsearch-2-12@1.8.0

0

# Flink Elasticsearch Connector

1

2

The Apache Flink Elasticsearch connector enables streaming data ingestion from Flink data streams into **Elasticsearch 1.x clusters only**. It provides fault-tolerant, exactly-once processing guarantees with configurable bulk processing and flexible failure handling strategies.

3

4

**Important**: This connector is specifically designed for Elasticsearch 1.x and is not compatible with newer versions of Elasticsearch.

5

6

## Package Information

7

8

- **Package Name**: flink-connector-elasticsearch_2.12

9

- **Package Type**: maven

10

- **Language**: Java

11

- **Elasticsearch Version**: 1.x (specifically 1.7.1)

12

- **Installation**:

13

```xml

14

<dependency>

15

<groupId>org.apache.flink</groupId>

16

<artifactId>flink-connector-elasticsearch_2.12</artifactId>

17

<version>1.8.3</version>

18

</dependency>

19

```

20

21

## Core Imports

22

23

```java

24

import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink;

25

import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;

26

import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;

27

import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;

28

import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;

29

import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;

30

import org.apache.flink.streaming.connectors.elasticsearch.util.IgnoringFailureHandler;

31

import org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandler;

32

```

33

34

## Basic Usage

35

36

```java

37

import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink;

38

import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;

39

import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;

40

import org.apache.flink.api.common.functions.RuntimeContext;

41

import org.elasticsearch.action.index.IndexRequest;

42

import org.elasticsearch.client.Requests;

43

44

import java.util.HashMap;

45

import java.util.Map;

46

47

// Configure Elasticsearch connection using configuration constants

48

Map<String, String> config = new HashMap<>();

49

config.put("cluster.name", "my-cluster");

50

config.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1000");

51

config.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB, "5");

52

config.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_INTERVAL_MS, "5000");

53

54

// Create sink function

55

ElasticsearchSinkFunction<String> sinkFunction = new ElasticsearchSinkFunction<String>() {

56

@Override

57

public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {

58

Map<String, Object> json = new HashMap<>();

59

json.put("data", element);

60

json.put("timestamp", System.currentTimeMillis());

61

62

IndexRequest request = Requests.indexRequest()

63

.index("my-index")

64

.type("my-type")

65

.source(json);

66

67

indexer.add(request);

68

}

69

};

70

71

// Create and add sink to stream

72

ElasticsearchSink<String> sink = new ElasticsearchSink<>(config, sinkFunction);

73

dataStream.addSink(sink);

74

```

75

76

## Architecture

77

78

The connector is built around several key components:

79

80

- **ElasticsearchSink**: Main sink implementation that connects to Elasticsearch clusters

81

- **Connection Modes**: Supports both embedded Node and TransportClient connection modes

82

- **Bulk Processing**: Uses Elasticsearch's BulkProcessor for efficient batch operations

83

- **Failure Handling**: Configurable strategies for handling failed requests

84

- **Type Safety**: Generic type support for processing different data types

85

- **Version-Specific Bridge**: Uses Elasticsearch1ApiCallBridge for 1.x compatibility

86

87

## Capabilities

88

89

### Elasticsearch Sink

90

91

Main sink implementation for connecting Flink streams to Elasticsearch 1.x clusters.

92

93

```java { .api }

94

/**

95

* Elasticsearch 1.x sink that requests multiple ActionRequests against a cluster for each incoming element.

96

* @param <T> Type of the elements handled by this sink

97

*/

98

public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, Client> {

99

100

/**

101

* Creates a new ElasticsearchSink that connects using an embedded Node (deprecated constructor).

102

* @param userConfig The map of user settings for constructing the Node and BulkProcessor

103

* @param indexRequestBuilder Function to generate IndexRequest from incoming elements

104

* @deprecated Deprecated since version 1.2, to be removed at version 2.0. Use ElasticsearchSinkFunction instead.

105

*/

106

@Deprecated

107

public ElasticsearchSink(

108

Map<String, String> userConfig,

109

IndexRequestBuilder<T> indexRequestBuilder

110

);

111

112

/**

113

* Creates a new ElasticsearchSink that connects using a TransportClient (deprecated constructor).

114

* @param userConfig The map of user settings for constructing the TransportClient and BulkProcessor

115

* @param transportAddresses The addresses of Elasticsearch nodes to connect to

116

* @param indexRequestBuilder Function to generate IndexRequest from incoming elements

117

* @deprecated Deprecated since version 1.2, to be removed at version 2.0. Use ElasticsearchSinkFunction instead.

118

*/

119

@Deprecated

120

public ElasticsearchSink(

121

Map<String, String> userConfig,

122

List<TransportAddress> transportAddresses,

123

IndexRequestBuilder<T> indexRequestBuilder

124

);

125

126

/**

127

* Creates a new ElasticsearchSink that connects using an embedded Node.

128

* The sink will block and wait for a cluster to come online.

129

* @param userConfig The map of user settings for constructing the embedded Node and BulkProcessor.

130

* Important settings include "cluster.name" and bulk processing configuration.

131

* @param elasticsearchSinkFunction Function to generate multiple ActionRequests from incoming elements

132

*/

133

public ElasticsearchSink(

134

Map<String, String> userConfig,

135

ElasticsearchSinkFunction<T> elasticsearchSinkFunction

136

);

137

138

/**

139

* Creates a new ElasticsearchSink that connects using a TransportClient.

140

* The sink will fail if no cluster can be connected to.

141

* @param userConfig The map of user settings for constructing the TransportClient and BulkProcessor.

142

* Important settings include "cluster.name" and bulk processing configuration.

143

* @param transportAddresses The addresses of Elasticsearch nodes to connect to using a TransportClient

144

* @param elasticsearchSinkFunction Function to generate multiple ActionRequests from incoming elements

145

*/

146

public ElasticsearchSink(

147

Map<String, String> userConfig,

148

List<TransportAddress> transportAddresses,

149

ElasticsearchSinkFunction<T> elasticsearchSinkFunction

150

);

151

152

/**

153

* Creates a new ElasticsearchSink with custom failure handling using an embedded Node.

154

* @param userConfig The map of user settings for constructing the Node and BulkProcessor

155

* @param elasticsearchSinkFunction Function to generate ActionRequests from incoming elements

156

* @param failureHandler Handler for failed ActionRequests

157

*/

158

public ElasticsearchSink(

159

Map<String, String> userConfig,

160

ElasticsearchSinkFunction<T> elasticsearchSinkFunction,

161

ActionRequestFailureHandler failureHandler

162

);

163

164

/**

165

* Creates a new ElasticsearchSink with custom failure handling using a TransportClient.

166

* @param userConfig The map of user settings for constructing the TransportClient and BulkProcessor

167

* @param transportAddresses The addresses of Elasticsearch nodes to connect to

168

* @param elasticsearchSinkFunction Function to generate ActionRequests from incoming elements

169

* @param failureHandler Handler for failed ActionRequests

170

*/

171

public ElasticsearchSink(

172

Map<String, String> userConfig,

173

List<TransportAddress> transportAddresses,

174

ElasticsearchSinkFunction<T> elasticsearchSinkFunction,

175

ActionRequestFailureHandler failureHandler

176

);

177

}

178

```

179

180

### Elasticsearch Sink Function

181

182

Interface for processing stream elements into Elasticsearch action requests.

183

184

```java { .api }

185

/**

186

* Creates multiple ActionRequests from an element in a stream.

187

* @param <T> The type of the element handled by this ElasticsearchSinkFunction

188

*/

189

@PublicEvolving

190

public interface ElasticsearchSinkFunction<T> extends Serializable, Function {

191

192

/**

193

* Process the incoming element to produce multiple ActionRequests.

194

* The produced requests should be added to the provided RequestIndexer.

195

* @param element incoming element to process

196

* @param ctx runtime context containing information about the sink instance

197

* @param indexer request indexer that ActionRequest should be added to

198

*/

199

void process(T element, RuntimeContext ctx, RequestIndexer indexer);

200

}

201

```

202

203

### Request Indexer

204

205

Interface for adding requests to be sent to Elasticsearch.

206

207

```java { .api }

208

/**

209

* Users add multiple delete, index or update requests to a RequestIndexer to prepare

210

* them for sending to an Elasticsearch cluster.

211

*/

212

@PublicEvolving

213

public interface RequestIndexer {

214

215

/**

216

* Add multiple DeleteRequest to the indexer to prepare for sending requests to Elasticsearch.

217

* @param deleteRequests The multiple DeleteRequest to add.

218

*/

219

void add(DeleteRequest... deleteRequests);

220

221

/**

222

* Add multiple IndexRequest to the indexer to prepare for sending requests to Elasticsearch.

223

* @param indexRequests The multiple IndexRequest to add.

224

*/

225

void add(IndexRequest... indexRequests);

226

227

/**

228

* Add multiple UpdateRequest to the indexer to prepare for sending requests to Elasticsearch.

229

* @param updateRequests The multiple UpdateRequest to add.

230

*/

231

void add(UpdateRequest... updateRequests);

232

233

/**

234

* Add multiple ActionRequest to the indexer to prepare for sending requests to Elasticsearch.

235

* @param actionRequests The multiple ActionRequest to add.

236

* @deprecated use the DeleteRequest, IndexRequest or UpdateRequest methods

237

*/

238

@Deprecated

239

default void add(ActionRequest... actionRequests) {

240

for (ActionRequest actionRequest : actionRequests) {

241

if (actionRequest instanceof IndexRequest) {

242

add((IndexRequest) actionRequest);

243

} else if (actionRequest instanceof DeleteRequest) {

244

add((DeleteRequest) actionRequest);

245

} else if (actionRequest instanceof UpdateRequest) {

246

add((UpdateRequest) actionRequest);

247

} else {

248

throw new IllegalArgumentException("RequestIndexer only supports Index, Delete and Update requests");

249

}

250

}

251

}

252

}

253

```

254

255

### Action Request Failure Handler

256

257

Interface defining how failed ActionRequests should be handled.

258

259

```java { .api }

260

/**

261

* An implementation of ActionRequestFailureHandler is provided by the user to define how failed

262

* ActionRequests should be handled, e.g. dropping them, reprocessing malformed documents, or

263

* simply requesting them to be sent to Elasticsearch again if the failure is only temporary.

264

*/

265

@PublicEvolving

266

public interface ActionRequestFailureHandler extends Serializable {

267

268

/**

269

* Handle a failed ActionRequest.

270

* @param action the ActionRequest that failed due to the failure

271

* @param failure the cause of failure

272

* @param restStatusCode the REST status code of the failure (-1 if none can be retrieved)

273

* @param indexer request indexer to re-add the failed action, if intended to do so

274

* @throws Throwable if the sink should fail on this failure, the implementation should rethrow

275

* the exception or a custom one

276

*/

277

void onFailure(

278

ActionRequest action,

279

Throwable failure,

280

int restStatusCode,

281

RequestIndexer indexer

282

) throws Throwable;

283

}

284

```

285

286

## Failure Handler Implementations

287

288

### NoOpFailureHandler

289

290

Default failure handler that simply rethrows failures, causing the sink to fail.

291

292

```java { .api }

293

/**

294

* An ActionRequestFailureHandler that simply fails the sink on any failures.

295

*/

296

@Internal

297

public class NoOpFailureHandler implements ActionRequestFailureHandler {

298

299

@Override

300

public void onFailure(

301

ActionRequest action,

302

Throwable failure,

303

int restStatusCode,

304

RequestIndexer indexer

305

) throws Throwable {

306

// simply fail the sink

307

throw failure;

308

}

309

}

310

```

311

312

### IgnoringFailureHandler

313

314

Failure handler that ignores all failures and drops affected requests.

315

316

```java { .api }

317

/**

318

* Ignores all kinds of failures and drops the affected ActionRequest.

319

*/

320

@Internal

321

public class IgnoringFailureHandler implements ActionRequestFailureHandler {

322

323

@Override

324

public void onFailure(

325

ActionRequest action,

326

Throwable failure,

327

int restStatusCode,

328

RequestIndexer indexer

329

) {

330

// ignore failure

331

}

332

}

333

```

334

335

### RetryRejectedExecutionFailureHandler

336

337

Failure handler that retries requests failed due to temporary EsRejectedExecutionException.

338

339

```java { .api }

340

/**

341

* An ActionRequestFailureHandler that re-adds requests that failed due to temporary

342

* EsRejectedExecutionExceptions (which means that Elasticsearch node queues are currently full),

343

* and fails for all other failures.

344

*/

345

@PublicEvolving

346

public class RetryRejectedExecutionFailureHandler implements ActionRequestFailureHandler {

347

348

@Override

349

public void onFailure(

350

ActionRequest action,

351

Throwable failure,

352

int restStatusCode,

353

RequestIndexer indexer

354

) throws Throwable {

355

if (ExceptionUtils.findThrowable(failure, EsRejectedExecutionException.class).isPresent()) {

356

indexer.add(action);

357

} else {

358

// rethrow all other failures

359

throw failure;

360

}

361

}

362

}

363

```

364

365

## Configuration Constants

366

367

### Configuration Keys

368

369

The following constants define configuration keys used by the connector:

370

371

```java { .api }

372

// From ElasticsearchSinkBase class

373

public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions";

374

public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb";

375

public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms";

376

public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE = "bulk.flush.backoff.enable";

377

public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE = "bulk.flush.backoff.type";

378

public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES = "bulk.flush.backoff.retries";

379

public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY = "bulk.flush.backoff.delay";

380

```

381

382

### Configuration Options

383

384

#### Bulk Processing Configuration

385

386

- `bulk.flush.max.actions`: Maximum amount of elements to buffer (no default - uses Elasticsearch BulkProcessor defaults)

387

- `bulk.flush.max.size.mb`: Maximum amount of data (in megabytes) to buffer (no default - uses Elasticsearch BulkProcessor defaults)

388

- `bulk.flush.interval.ms`: Interval at which to flush data regardless of other settings (no default - uses Elasticsearch BulkProcessor defaults)

389

390

#### Backoff Configuration

391

392

- `bulk.flush.backoff.enable`: Enable backoff retries (default: true)

393

- `bulk.flush.backoff.type`: Backoff type - "CONSTANT" or "EXPONENTIAL" (default: EXPONENTIAL)

394

- `bulk.flush.backoff.retries`: Maximum retry count (default: 8)

395

- `bulk.flush.backoff.delay`: Delay in milliseconds (default: 50)

396

397

**Important Note**: Elasticsearch 1.x does not support backoff retries. The BulkProcessor in Elasticsearch 1.x does not provide backoff retry functionality, so these configuration options are parsed but have no effect and will log a warning when specified.

398

399

#### Cluster Configuration

400

401

- `cluster.name`: Name of the Elasticsearch cluster to connect to

402

- `http.enabled`: HTTP access setting (automatically set to false for embedded nodes)

403

404

## Deprecated Interfaces

405

406

### IndexRequestBuilder

407

408

Creates IndexRequest objects from stream elements. This interface is deprecated in favor of ElasticsearchSinkFunction.

409

410

```java { .api }

411

/**

412

* Function that creates an IndexRequest from an element in a Stream.

413

* This is used by ElasticsearchSink to prepare elements for sending them to Elasticsearch.

414

* @param <T> The type of the element handled by this IndexRequestBuilder

415

* @deprecated Deprecated since version 1.2, to be removed at version 2.0.

416

* Please create a ElasticsearchSink using a ElasticsearchSinkFunction instead.

417

*/

418

@Deprecated

419

public interface IndexRequestBuilder<T> extends Function, Serializable {

420

421

/**

422

* Creates an IndexRequest from an element.

423

* @param element The element that needs to be turned in to an IndexRequest

424

* @param ctx The Flink RuntimeContext of the ElasticsearchSink

425

* @return The constructed IndexRequest

426

*/

427

IndexRequest createIndexRequest(T element, RuntimeContext ctx);

428

}

429

```

430

431

## Enums and Types

432

433

### FlushBackoffType

434

435

Backoff strategy for bulk flush retries.

436

437

```java { .api }

438

/**

439

* Backoff strategy types for bulk flush retries.

440

*/

441

public enum FlushBackoffType {

442

/** Constant delay between retries */

443

CONSTANT,

444

/** Exponentially increasing delay between retries */

445

EXPONENTIAL

446

}

447

```

448

449

### BulkFlushBackoffPolicy

450

451

Configuration for bulk flush backoff behavior.

452

453

```java { .api }

454

/**

455

* Configuration for bulk flush backoff behavior.

456

*/

457

public static class BulkFlushBackoffPolicy implements Serializable {

458

/** The backoff type (CONSTANT or EXPONENTIAL) */

459

private FlushBackoffType backoffType = FlushBackoffType.EXPONENTIAL;

460

461

/** Maximum number of retry attempts */

462

private int maxRetryCount = 8;

463

464

/** Delay in milliseconds between retries */

465

private long delayMillis = 50L;

466

467

// Constructor and getter/setter methods...

468

}

469

```

470

471

### Exception Types

472

473

```java { .api }

474

// Exception thrown when Elasticsearch queues are full

475

import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;

476

477

// Flink utility for exception handling

478

import org.apache.flink.util.ExceptionUtils;

479

```

480

481

### Core Types

482

483

```java { .api }

484

// Elasticsearch client types from elasticsearch 1.x dependency (version 1.7.1)

485

import org.elasticsearch.action.ActionRequest;

486

import org.elasticsearch.action.delete.DeleteRequest;

487

import org.elasticsearch.action.index.IndexRequest;

488

import org.elasticsearch.action.update.UpdateRequest;

489

import org.elasticsearch.action.bulk.BulkProcessor;

490

import org.elasticsearch.action.bulk.BulkRequest;

491

import org.elasticsearch.action.bulk.BulkResponse;

492

import org.elasticsearch.action.bulk.BulkItemResponse;

493

import org.elasticsearch.client.Client;

494

import org.elasticsearch.client.Requests;

495

import org.elasticsearch.client.transport.TransportClient;

496

import org.elasticsearch.common.transport.TransportAddress;

497

import org.elasticsearch.common.transport.InetSocketTransportAddress;

498

import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;

499

import org.elasticsearch.node.Node;

500

import org.elasticsearch.rest.RestStatus;

501

502

// Flink runtime types

503

import org.apache.flink.api.common.functions.RuntimeContext;

504

import org.apache.flink.api.common.functions.Function;

505

import org.apache.flink.annotation.PublicEvolving;

506

import org.apache.flink.annotation.Internal;

507

import org.apache.flink.util.ExceptionUtils;

508

509

// Java standard types

510

import java.io.Serializable;

511

import java.util.List;

512

import java.util.Map;

513

import java.util.HashMap;

514

import java.util.Arrays;

515

```

516

517

## Connection Modes

518

519

### Embedded Node Mode

520

521

Uses a local Elasticsearch Node for communication. The sink will block and wait for a cluster to come online.

522

523

```java

524

// Configuration for embedded node

525

Map<String, String> config = new HashMap<>();

526

config.put("cluster.name", "my-cluster");

527

528

ElasticsearchSink<String> sink = new ElasticsearchSink<>(config, sinkFunction);

529

```

530

531

### TransportClient Mode

532

533

Uses a TransportClient with specified addresses. The sink will fail if no cluster connection can be established.

534

535

```java

536

import org.elasticsearch.common.transport.InetSocketTransportAddress;

537

import org.elasticsearch.common.transport.TransportAddress;

538

539

// Configuration for transport client

540

Map<String, String> config = new HashMap<>();

541

config.put("cluster.name", "my-cluster");

542

543

List<TransportAddress> transportAddresses = Arrays.asList(

544

new InetSocketTransportAddress("localhost", 9300),

545

new InetSocketTransportAddress("localhost", 9301)

546

);

547

548

ElasticsearchSink<String> sink = new ElasticsearchSink<>(config, transportAddresses, sinkFunction);

549

```

550

551

## Error Handling

552

553

The connector provides flexible error handling through the ActionRequestFailureHandler interface:

554

555

```java

556

// Custom failure handler example

557

ActionRequestFailureHandler customHandler = new ActionRequestFailureHandler() {

558

@Override

559

public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable {

560

if (restStatusCode == 429) {

561

// Retry on rate limiting

562

indexer.add(action);

563

} else if (restStatusCode >= 400 && restStatusCode < 500) {

564

// Drop malformed requests (client errors)

565

System.err.println("Dropping malformed request: " + failure.getMessage());

566

} else {

567

// Fail on other errors

568

throw failure;

569

}

570

}

571

};

572

573

ElasticsearchSink<String> sink = new ElasticsearchSink<>(config, sinkFunction, customHandler);

574

```

575

576

## Usage Examples

577

578

### Basic String Indexing

579

580

```java

581

ElasticsearchSinkFunction<String> sinkFunction = new ElasticsearchSinkFunction<String>() {

582

@Override

583

public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {

584

Map<String, Object> json = new HashMap<>();

585

json.put("message", element);

586

json.put("timestamp", System.currentTimeMillis());

587

588

IndexRequest request = Requests.indexRequest()

589

.index("logs")

590

.type("message")

591

.source(json);

592

593

indexer.add(request);

594

}

595

};

596

```

597

598

### Complex Object Processing

599

600

```java

601

public class User {

602

public String name;

603

public int age;

604

public String email;

605

// constructors, getters, setters...

606

}

607

608

ElasticsearchSinkFunction<User> userSinkFunction = new ElasticsearchSinkFunction<User>() {

609

@Override

610

public void process(User user, RuntimeContext ctx, RequestIndexer indexer) {

611

Map<String, Object> json = new HashMap<>();

612

json.put("name", user.name);

613

json.put("age", user.age);

614

json.put("email", user.email);

615

json.put("indexed_at", System.currentTimeMillis());

616

617

// Index request

618

IndexRequest indexRequest = Requests.indexRequest()

619

.index("users")

620

.type("user")

621

.id(user.email) // Use email as document ID

622

.source(json);

623

624

indexer.add(indexRequest);

625

626

// Optional: Also create update request for upsert behavior

627

UpdateRequest updateRequest = Requests.updateRequest()

628

.index("users")

629

.type("user")

630

.id(user.email)

631

.doc(json)

632

.upsert(json);

633

634

indexer.add(updateRequest);

635

}

636

};

637

```

638

639

### Using Configuration Constants

640

641

```java

642

// Recommended approach using configuration constants

643

Map<String, String> config = new HashMap<>();

644

config.put("cluster.name", "production-cluster");

645

config.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1000");

646

config.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB, "10");

647

config.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_INTERVAL_MS, "30000");

648

649

// Note: Backoff configurations will be ignored in Elasticsearch 1.x but can be set

650

config.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE, "true");

651

config.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE, "EXPONENTIAL");

652

653

ElasticsearchSink<MyEvent> sink = new ElasticsearchSink<>(

654

config,

655

mySinkFunction,

656

new RetryRejectedExecutionFailureHandler()

657

);

658

```

659

660

### Multiple Index Operations

661

662

```java

663

ElasticsearchSinkFunction<Event> eventSinkFunction = new ElasticsearchSinkFunction<Event>() {

664

@Override

665

public void process(Event event, RuntimeContext ctx, RequestIndexer indexer) {

666

// Index to main events index

667

Map<String, Object> eventJson = new HashMap<>();

668

eventJson.put("type", event.getType());

669

eventJson.put("data", event.getData());

670

eventJson.put("timestamp", event.getTimestamp());

671

672

IndexRequest eventRequest = Requests.indexRequest()

673

.index("events")

674

.type("event")

675

.source(eventJson);

676

indexer.add(eventRequest);

677

678

// Also index to daily partitioned index

679

String dailyIndex = "events-" + event.getTimestamp().format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));

680

IndexRequest dailyRequest = Requests.indexRequest()

681

.index(dailyIndex)

682

.type("event")

683

.source(eventJson);

684

indexer.add(dailyRequest);

685

}

686

};

687

```