Apache Flink connector for Elasticsearch 6.x that provides streaming sink functionality
Configurable bulk request processing with batching, buffering, and timing controls. Supports backoff strategies and retry mechanisms for handling cluster load.
Methods available on ElasticsearchSink.Builder for configuring bulk processing behavior.
/**
* Sets the maximum number of actions to buffer for each bulk request. You can pass -1 to disable it.
* @param numMaxActions the maximum number of actions to buffer per bulk request.
*/
public void setBulkFlushMaxActions(int numMaxActions);
/**
* Sets the maximum size of buffered actions, in mb, per bulk request. You can pass -1 to disable it.
* @param maxSizeMb the maximum size of buffered actions, in mb.
*/
public void setBulkFlushMaxSizeMb(int maxSizeMb);
/**
* Sets the bulk flush interval, in milliseconds. You can pass -1 to disable it.
* @param intervalMillis the bulk flush interval, in milliseconds.
*/
public void setBulkFlushInterval(long intervalMillis);
/**
* Sets whether or not to enable bulk flush backoff behaviour.
* @param enabled whether or not to enable backoffs.
*/
public void setBulkFlushBackoff(boolean enabled);
/**
* Sets the type of back off to use when flushing bulk requests.
* @param flushBackoffType the backoff type to use.
*/
public void setBulkFlushBackoffType(FlushBackoffType flushBackoffType);
/**
* Sets the maximum number of retries for a backoff attempt when flushing bulk requests.
* @param maxRetries the maximum number of retries for a backoff attempt when flushing bulk requests
*/
public void setBulkFlushBackoffRetries(int maxRetries);
/**
* Sets the amount of delay between each backoff attempt when flushing bulk requests, in milliseconds.
* @param delayMillis the amount of delay between each backoff attempt when flushing bulk requests, in milliseconds.
*/
public void setBulkFlushBackoffDelay(long delayMillis);Usage Examples:
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.FlushBackoffType;
// High-throughput configuration
ElasticsearchSink<MyData> highThroughputSink = new ElasticsearchSink.Builder<>(
httpHosts,
sinkFunction
)
.setBulkFlushMaxActions(5000) // Buffer up to 5000 actions
.setBulkFlushMaxSizeMb(10) // Or 10MB of data
.setBulkFlushInterval(30000) // Or flush every 30 seconds
.setBulkFlushBackoff(true) // Enable backoff on rejection
.setBulkFlushBackoffType(FlushBackoffType.EXPONENTIAL)
.setBulkFlushBackoffRetries(5) // Up to 5 retries
.setBulkFlushBackoffDelay(200) // Starting with 200ms delay
.build();
// Low-latency configuration
ElasticsearchSink<MyData> lowLatencySink = new ElasticsearchSink.Builder<>(
httpHosts,
sinkFunction
)
.setBulkFlushMaxActions(100) // Smaller batches
.setBulkFlushMaxSizeMb(1) // 1MB max size
.setBulkFlushInterval(1000) // Flush every second
.setBulkFlushBackoff(true)
.setBulkFlushBackoffType(FlushBackoffType.CONSTANT)
.setBulkFlushBackoffRetries(3)
.setBulkFlushBackoffDelay(100) // Constant 100ms delay
.build();
// Memory-constrained configuration
ElasticsearchSink<MyData> memoryConstrainedSink = new ElasticsearchSink.Builder<>(
httpHosts,
sinkFunction
)
.setBulkFlushMaxActions(500) // Moderate batch size
.setBulkFlushMaxSizeMb(2) // Small memory footprint
.setBulkFlushInterval(5000) // 5 second intervals
.setBulkFlushBackoff(false) // Disable backoff to fail fast
.build();
// Disable all limits (flush only on checkpoint)
ElasticsearchSink<MyData> checkpointOnlySink = new ElasticsearchSink.Builder<>(
httpHosts,
sinkFunction
)
.setBulkFlushMaxActions(-1) // No action limit
.setBulkFlushMaxSizeMb(-1) // No size limit
.setBulkFlushInterval(-1) // No time limit
.build();Advanced backoff configuration for handling Elasticsearch cluster load and rejection scenarios.
/**
* Used to control whether the retry delay should increase exponentially or remain constant.
*/
@PublicEvolving
public enum FlushBackoffType {
CONSTANT, // Fixed delay between retries
EXPONENTIAL // Exponentially increasing delay
}
/**
* Provides a backoff policy for bulk requests. Whenever a bulk request is rejected due to
* resource constraints (i.e. the client's internal thread pool is full), the backoff policy
* decides how long the bulk processor will wait before the operation is retried internally.
*/
public static class BulkFlushBackoffPolicy implements Serializable {
/**
* Get the backoff type (CONSTANT or EXPONENTIAL).
* @return the backoff type
*/
public FlushBackoffType getBackoffType();
/**
* Get the maximum number of retry attempts.
* @return the maximum retry count
*/
public int getMaxRetryCount();
/**
* Get the initial delay in milliseconds.
* @return the delay in milliseconds
*/
public long getDelayMillis();
/**
* Set the backoff type.
* @param backoffType the backoff type to use
*/
public void setBackoffType(FlushBackoffType backoffType);
/**
* Set the maximum number of retry attempts.
* @param maxRetryCount the maximum retry count (must be >= 0)
*/
public void setMaxRetryCount(int maxRetryCount);
/**
* Set the initial delay between retry attempts.
* @param delayMillis the delay in milliseconds (must be >= 0)
*/
public void setDelayMillis(long delayMillis);
}Usage Examples:
// Exponential backoff configuration
ElasticsearchSink<Event> exponentialBackoffSink = new ElasticsearchSink.Builder<>(
httpHosts,
sinkFunction
)
.setBulkFlushBackoff(true)
.setBulkFlushBackoffType(FlushBackoffType.EXPONENTIAL)
.setBulkFlushBackoffRetries(8) // Up to 8 retries
.setBulkFlushBackoffDelay(50) // Start with 50ms, then 100ms, 200ms, 400ms, etc.
.build();
// Constant backoff configuration
ElasticsearchSink<Event> constantBackoffSink = new ElasticsearchSink.Builder<>(
httpHosts,
sinkFunction
)
.setBulkFlushBackoff(true)
.setBulkFlushBackoffType(FlushBackoffType.CONSTANT)
.setBulkFlushBackoffRetries(5) // Up to 5 retries
.setBulkFlushBackoffDelay(1000) // Always wait 1 second between retries
.build();
// Aggressive backoff for high-load scenarios
ElasticsearchSink<Event> aggressiveBackoffSink = new ElasticsearchSink.Builder<>(
httpHosts,
sinkFunction
)
.setBulkFlushBackoff(true)
.setBulkFlushBackoffType(FlushBackoffType.EXPONENTIAL)
.setBulkFlushBackoffRetries(10) // Many retries
.setBulkFlushBackoffDelay(25) // Start small: 25ms, 50ms, 100ms, 200ms, 400ms, 800ms, etc.
.build();Constants available for bulk processing configuration when using string-based configuration.
// Bulk processor configuration keys
public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions";
public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb";
public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms";
public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE = "bulk.flush.backoff.enable";
public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE = "bulk.flush.backoff.type";
public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES = "bulk.flush.backoff.retries";
public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY = "bulk.flush.backoff.delay";Usage Examples:
import java.util.HashMap;
import java.util.Map;
// Configuration via properties map (useful for external configuration)
Map<String, String> bulkConfig = new HashMap<>();
bulkConfig.put(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1000");
bulkConfig.put(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB, "5");
bulkConfig.put(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS, "10000");
bulkConfig.put(CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE, "true");
bulkConfig.put(CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE, "EXPONENTIAL");
bulkConfig.put(CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES, "3");
bulkConfig.put(CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY, "100");
// These would be used internally by the ElasticsearchSinkBase
// when creating the BulkProcessor configuration// For high-throughput scenarios
.setBulkFlushMaxActions(5000) // Large batches reduce overhead
.setBulkFlushMaxSizeMb(10) // Allow larger memory usage
.setBulkFlushInterval(30000) // Less frequent flushes
.setBulkFlushBackoffType(FlushBackoffType.EXPONENTIAL)
.setBulkFlushBackoffRetries(8) // More retries for resilience
// For low-latency scenarios
.setBulkFlushMaxActions(100) // Small batches for quick processing
.setBulkFlushMaxSizeMb(1) // Low memory usage
.setBulkFlushInterval(1000) // Frequent flushes
.setBulkFlushBackoffType(FlushBackoffType.CONSTANT)
.setBulkFlushBackoffRetries(3) // Fewer retries for faster failure// Memory-constrained environments
.setBulkFlushMaxActions(500) // Moderate batch sizes
.setBulkFlushMaxSizeMb(2) // Strict size limits
.setBulkFlushInterval(5000) // Regular flushes to clear buffers
.setBulkFlushBackoff(false) // Fail fast to avoid memory buildup// For busy Elasticsearch clusters
.setBulkFlushBackoff(true) // Essential for handling rejections
.setBulkFlushBackoffType(FlushBackoffType.EXPONENTIAL)
.setBulkFlushBackoffRetries(10) // Generous retry count
.setBulkFlushBackoffDelay(100) // Start with reasonable delay
.setBulkFlushMaxActions(1000) // Moderate batch sizes
.setBulkFlushInterval(15000) // Give cluster time to processInstall with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-connector-elasticsearch6-2-12