Core streaming sink functionality for programmatic Flink jobs. Provides ElasticsearchSink with builder pattern configuration for bulk processing, failure handling, and client customization.
Main sink class for sending streaming data to Elasticsearch 6.x clusters. Uses builder pattern for configuration and extends ElasticsearchSinkBase with RestHighLevelClient integration.
/**
* Elasticsearch 6.x sink that requests multiple ActionRequests against a cluster
* for each incoming element.
* @param <T> Type of the elements handled by this sink
*/
@PublicEvolving
public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevelClient> {
// Private constructor - use Builder pattern
}Builder pattern for creating ElasticsearchSink instances with full configuration control.
/**
* A builder for creating an ElasticsearchSink.
* @param <T> Type of the elements handled by the sink this builder creates.
*/
@PublicEvolving
public static class ElasticsearchSink.Builder<T> {
/**
* Creates a new ElasticsearchSink that connects to the cluster using a RestHighLevelClient.
* @param httpHosts The list of HttpHost to which the RestHighLevelClient connects to.
* @param elasticsearchSinkFunction This is used to generate multiple ActionRequest from the incoming element.
*/
public Builder(List<HttpHost> httpHosts, ElasticsearchSinkFunction<T> elasticsearchSinkFunction);
/**
* Sets the maximum number of actions to buffer for each bulk request. You can pass -1 to disable it.
* @param numMaxActions the maximum number of actions to buffer per bulk request.
*/
public void setBulkFlushMaxActions(int numMaxActions);
/**
* Sets the maximum size of buffered actions, in mb, per bulk request. You can pass -1 to disable it.
* @param maxSizeMb the maximum size of buffered actions, in mb.
*/
public void setBulkFlushMaxSizeMb(int maxSizeMb);
/**
* Sets the bulk flush interval, in milliseconds. You can pass -1 to disable it.
* @param intervalMillis the bulk flush interval, in milliseconds.
*/
public void setBulkFlushInterval(long intervalMillis);
/**
* Sets whether or not to enable bulk flush backoff behaviour.
* @param enabled whether or not to enable backoffs.
*/
public void setBulkFlushBackoff(boolean enabled);
/**
* Sets the type of back off to use when flushing bulk requests.
* @param flushBackoffType the backoff type to use.
*/
public void setBulkFlushBackoffType(FlushBackoffType flushBackoffType);
/**
* Sets the maximum number of retries for a backoff attempt when flushing bulk requests.
* @param maxRetries the maximum number of retries for a backoff attempt when flushing bulk requests
*/
public void setBulkFlushBackoffRetries(int maxRetries);
/**
* Sets the amount of delay between each backoff attempt when flushing bulk requests, in milliseconds.
* @param delayMillis the amount of delay between each backoff attempt when flushing bulk requests, in milliseconds.
*/
public void setBulkFlushBackoffDelay(long delayMillis);
/**
* Sets a failure handler for action requests.
* @param failureHandler This is used to handle failed ActionRequest.
*/
public void setFailureHandler(ActionRequestFailureHandler failureHandler);
/**
* Sets a REST client factory for custom client configuration.
* @param restClientFactory the factory that configures the rest client.
*/
public void setRestClientFactory(RestClientFactory restClientFactory);
/**
* Creates the Elasticsearch sink.
* @return the created Elasticsearch sink.
*/
public ElasticsearchSink<T> build();
}Usage Examples:
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandler;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
// Basic sink creation
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
httpHosts.add(new HttpHost("127.0.0.1", 9201, "http"));
ElasticsearchSinkFunction<MyData> sinkFunction = new ElasticsearchSinkFunction<MyData>() {
@Override
public void process(MyData element, RuntimeContext ctx, RequestIndexer indexer) {
Map<String, Object> json = new HashMap<>();
json.put("id", element.getId());
json.put("name", element.getName());
json.put("timestamp", element.getTimestamp());
IndexRequest request = Requests.indexRequest()
.index("my-index")
.type("_doc")
.id(String.valueOf(element.getId()))
.source(json);
indexer.add(request);
}
};
ElasticsearchSink<MyData> sink = new ElasticsearchSink.Builder<>(
httpHosts,
sinkFunction
).build();
// Advanced configuration
ElasticsearchSink<MyData> advancedSink = new ElasticsearchSink.Builder<>(
httpHosts,
sinkFunction
)
.setBulkFlushMaxActions(1000) // Flush after 1000 actions
.setBulkFlushMaxSizeMb(5) // Or after 5MB
.setBulkFlushInterval(30000) // Or after 30 seconds
.setBulkFlushBackoff(true) // Enable backoff
.setBulkFlushBackoffType(FlushBackoffType.EXPONENTIAL)
.setBulkFlushBackoffRetries(3)
.setBulkFlushBackoffDelay(1000)
.setFailureHandler(new RetryRejectedExecutionFailureHandler())
.build();
// Add to DataStream
DataStream<MyData> dataStream = // ... your data stream
dataStream.addSink(advancedSink);User-defined function interface for converting stream elements into Elasticsearch ActionRequests.
/**
* Creates multiple ActionRequests from an element in a stream.
* This is used by sinks to prepare elements for sending them to Elasticsearch.
* @param <T> The type of the element handled by this ElasticsearchSinkFunction
*/
@PublicEvolving
public interface ElasticsearchSinkFunction<T> extends Serializable, Function {
/**
* Initialization method for the function. It is called once before the actual working process methods.
*/
default void open() throws Exception {}
/**
* Tear-down method for the function. It is called when the sink closes.
*/
default void close() throws Exception {}
/**
* Process the incoming element to produce multiple ActionRequests.
* The produced requests should be added to the provided RequestIndexer.
* @param element incoming element to process
* @param ctx runtime context containing information about the sink instance
* @param indexer request indexer that ActionRequest should be added to
*/
void process(T element, RuntimeContext ctx, RequestIndexer indexer);
}Usage Examples:
// Simple single-request function
ElasticsearchSinkFunction<String> simpleSinkFunction = new ElasticsearchSinkFunction<String>() {
@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
Map<String, Object> json = new HashMap<>();
json.put("data", element);
json.put("timestamp", System.currentTimeMillis());
IndexRequest request = Requests.indexRequest()
.index("logs")
.type("_doc")
.source(json);
indexer.add(request);
}
};
// Multiple requests per element
ElasticsearchSinkFunction<Transaction> multiRequestFunction = new ElasticsearchSinkFunction<Transaction>() {
@Override
public void process(Transaction transaction, RuntimeContext ctx, RequestIndexer indexer) {
// Index the transaction
Map<String, Object> transactionDoc = new HashMap<>();
transactionDoc.put("id", transaction.getId());
transactionDoc.put("amount", transaction.getAmount());
transactionDoc.put("currency", transaction.getCurrency());
IndexRequest transactionRequest = Requests.indexRequest()
.index("transactions")
.type("_doc")
.id(transaction.getId())
.source(transactionDoc);
// Update user balance
Map<String, Object> balanceUpdate = new HashMap<>();
balanceUpdate.put("balance", transaction.getNewBalance());
balanceUpdate.put("last_updated", System.currentTimeMillis());
UpdateRequest balanceRequest = Requests.updateRequest()
.index("users")
.type("_doc")
.id(transaction.getUserId())
.doc(balanceUpdate);
indexer.add(transactionRequest, balanceRequest);
}
};
// With lifecycle methods
ElasticsearchSinkFunction<Event> lifecycleSinkFunction = new ElasticsearchSinkFunction<Event>() {
private ObjectMapper objectMapper;
@Override
public void open() throws Exception {
objectMapper = new ObjectMapper();
}
@Override
public void process(Event event, RuntimeContext ctx, RequestIndexer indexer) {
try {
String json = objectMapper.writeValueAsString(event);
Map<String, Object> source = objectMapper.readValue(json, HashMap.class);
IndexRequest request = Requests.indexRequest()
.index("events")
.type("_doc")
.source(source);
indexer.add(request);
} catch (Exception e) {
throw new RuntimeException("Failed to serialize event", e);
}
}
@Override
public void close() throws Exception {
// Cleanup resources if needed
}
};Interface for adding ActionRequests to be sent to Elasticsearch as part of bulk operations.
/**
* Users add multiple delete, index or update requests to a RequestIndexer to prepare them
* for sending to an Elasticsearch cluster.
*/
@PublicEvolving
public interface RequestIndexer {
/**
* Add multiple DeleteRequest to the indexer to prepare for sending requests to Elasticsearch.
* @param deleteRequests The multiple DeleteRequest to add.
*/
void add(DeleteRequest... deleteRequests);
/**
* Add multiple IndexRequest to the indexer to prepare for sending requests to Elasticsearch.
* @param indexRequests The multiple IndexRequest to add.
*/
void add(IndexRequest... indexRequests);
/**
* Add multiple UpdateRequest to the indexer to prepare for sending requests to Elasticsearch.
* @param updateRequests The multiple UpdateRequest to add.
*/
void add(UpdateRequest... updateRequests);
/**
* Add multiple ActionRequest to the indexer to prepare for sending requests to Elasticsearch.
* @param actionRequests The multiple ActionRequest to add.
* @deprecated use the DeleteRequest, IndexRequest or UpdateRequest methods
*/
@Deprecated
default void add(ActionRequest... actionRequests);
}