0
# Bulk Processing
1
2
Configurable bulk request processing with batching, buffering, and timing controls. Supports backoff strategies and retry mechanisms for handling cluster load.
3
4
## Capabilities
5
6
### Bulk Configuration Methods
7
8
Methods available on ElasticsearchSink.Builder for configuring bulk processing behavior.
9
10
```java { .api }
11
/**
12
* Sets the maximum number of actions to buffer for each bulk request. You can pass -1 to disable it.
13
* @param numMaxActions the maximum number of actions to buffer per bulk request.
14
*/
15
public void setBulkFlushMaxActions(int numMaxActions);
16
17
/**
18
* Sets the maximum size of buffered actions, in mb, per bulk request. You can pass -1 to disable it.
19
* @param maxSizeMb the maximum size of buffered actions, in mb.
20
*/
21
public void setBulkFlushMaxSizeMb(int maxSizeMb);
22
23
/**
24
* Sets the bulk flush interval, in milliseconds. You can pass -1 to disable it.
25
* @param intervalMillis the bulk flush interval, in milliseconds.
26
*/
27
public void setBulkFlushInterval(long intervalMillis);
28
29
/**
30
* Sets whether or not to enable bulk flush backoff behaviour.
31
* @param enabled whether or not to enable backoffs.
32
*/
33
public void setBulkFlushBackoff(boolean enabled);
34
35
/**
36
* Sets the type of back off to use when flushing bulk requests.
37
* @param flushBackoffType the backoff type to use.
38
*/
39
public void setBulkFlushBackoffType(FlushBackoffType flushBackoffType);
40
41
/**
42
* Sets the maximum number of retries for a backoff attempt when flushing bulk requests.
43
* @param maxRetries the maximum number of retries for a backoff attempt when flushing bulk requests
44
*/
45
public void setBulkFlushBackoffRetries(int maxRetries);
46
47
/**
48
* Sets the amount of delay between each backoff attempt when flushing bulk requests, in milliseconds.
49
* @param delayMillis the amount of delay between each backoff attempt when flushing bulk requests, in milliseconds.
50
*/
51
public void setBulkFlushBackoffDelay(long delayMillis);
52
```
53
54
**Usage Examples:**
55
56
```java
57
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
58
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.FlushBackoffType;
59
60
// High-throughput configuration
61
ElasticsearchSink<MyData> highThroughputSink = new ElasticsearchSink.Builder<>(
62
httpHosts,
63
sinkFunction
64
)
65
.setBulkFlushMaxActions(5000) // Buffer up to 5000 actions
66
.setBulkFlushMaxSizeMb(10) // Or 10MB of data
67
.setBulkFlushInterval(30000) // Or flush every 30 seconds
68
.setBulkFlushBackoff(true) // Enable backoff on rejection
69
.setBulkFlushBackoffType(FlushBackoffType.EXPONENTIAL)
70
.setBulkFlushBackoffRetries(5) // Up to 5 retries
71
.setBulkFlushBackoffDelay(200) // Starting with 200ms delay
72
.build();
73
74
// Low-latency configuration
75
ElasticsearchSink<MyData> lowLatencySink = new ElasticsearchSink.Builder<>(
76
httpHosts,
77
sinkFunction
78
)
79
.setBulkFlushMaxActions(100) // Smaller batches
80
.setBulkFlushMaxSizeMb(1) // 1MB max size
81
.setBulkFlushInterval(1000) // Flush every second
82
.setBulkFlushBackoff(true)
83
.setBulkFlushBackoffType(FlushBackoffType.CONSTANT)
84
.setBulkFlushBackoffRetries(3)
85
.setBulkFlushBackoffDelay(100) // Constant 100ms delay
86
.build();
87
88
// Memory-constrained configuration
89
ElasticsearchSink<MyData> memoryConstrainedSink = new ElasticsearchSink.Builder<>(
90
httpHosts,
91
sinkFunction
92
)
93
.setBulkFlushMaxActions(500) // Moderate batch size
94
.setBulkFlushMaxSizeMb(2) // Small memory footprint
95
.setBulkFlushInterval(5000) // 5 second intervals
96
.setBulkFlushBackoff(false) // Disable backoff to fail fast
97
.build();
98
99
// Disable all limits (flush only on checkpoint)
100
ElasticsearchSink<MyData> checkpointOnlySink = new ElasticsearchSink.Builder<>(
101
httpHosts,
102
sinkFunction
103
)
104
.setBulkFlushMaxActions(-1) // No action limit
105
.setBulkFlushMaxSizeMb(-1) // No size limit
106
.setBulkFlushInterval(-1) // No time limit
107
.build();
108
```
109
110
### Backoff Configuration
111
112
Advanced backoff configuration for handling Elasticsearch cluster load and rejection scenarios.
113
114
```java { .api }
115
/**
116
* Used to control whether the retry delay should increase exponentially or remain constant.
117
*/
118
@PublicEvolving
119
public enum FlushBackoffType {
120
CONSTANT, // Fixed delay between retries
121
EXPONENTIAL // Exponentially increasing delay
122
}
123
124
/**
125
* Provides a backoff policy for bulk requests. Whenever a bulk request is rejected due to
126
* resource constraints (i.e. the client's internal thread pool is full), the backoff policy
127
* decides how long the bulk processor will wait before the operation is retried internally.
128
*/
129
public static class BulkFlushBackoffPolicy implements Serializable {
130
/**
131
* Get the backoff type (CONSTANT or EXPONENTIAL).
132
* @return the backoff type
133
*/
134
public FlushBackoffType getBackoffType();
135
136
/**
137
* Get the maximum number of retry attempts.
138
* @return the maximum retry count
139
*/
140
public int getMaxRetryCount();
141
142
/**
143
* Get the initial delay in milliseconds.
144
* @return the delay in milliseconds
145
*/
146
public long getDelayMillis();
147
148
/**
149
* Set the backoff type.
150
* @param backoffType the backoff type to use
151
*/
152
public void setBackoffType(FlushBackoffType backoffType);
153
154
/**
155
* Set the maximum number of retry attempts.
156
* @param maxRetryCount the maximum retry count (must be >= 0)
157
*/
158
public void setMaxRetryCount(int maxRetryCount);
159
160
/**
161
* Set the initial delay between retry attempts.
162
* @param delayMillis the delay in milliseconds (must be >= 0)
163
*/
164
public void setDelayMillis(long delayMillis);
165
}
166
```
167
168
**Usage Examples:**
169
170
```java
171
// Exponential backoff configuration
172
ElasticsearchSink<Event> exponentialBackoffSink = new ElasticsearchSink.Builder<>(
173
httpHosts,
174
sinkFunction
175
)
176
.setBulkFlushBackoff(true)
177
.setBulkFlushBackoffType(FlushBackoffType.EXPONENTIAL)
178
.setBulkFlushBackoffRetries(8) // Up to 8 retries
179
.setBulkFlushBackoffDelay(50) // Start with 50ms, then 100ms, 200ms, 400ms, etc.
180
.build();
181
182
// Constant backoff configuration
183
ElasticsearchSink<Event> constantBackoffSink = new ElasticsearchSink.Builder<>(
184
httpHosts,
185
sinkFunction
186
)
187
.setBulkFlushBackoff(true)
188
.setBulkFlushBackoffType(FlushBackoffType.CONSTANT)
189
.setBulkFlushBackoffRetries(5) // Up to 5 retries
190
.setBulkFlushBackoffDelay(1000) // Always wait 1 second between retries
191
.build();
192
193
// Aggressive backoff for high-load scenarios
194
ElasticsearchSink<Event> aggressiveBackoffSink = new ElasticsearchSink.Builder<>(
195
httpHosts,
196
sinkFunction
197
)
198
.setBulkFlushBackoff(true)
199
.setBulkFlushBackoffType(FlushBackoffType.EXPONENTIAL)
200
.setBulkFlushBackoffRetries(10) // Many retries
201
.setBulkFlushBackoffDelay(25) // Start small: 25ms, 50ms, 100ms, 200ms, 400ms, 800ms, etc.
202
.build();
203
```
204
205
### Configuration Constants
206
207
Constants available for bulk processing configuration when using string-based configuration.
208
209
```java { .api }
210
// Bulk processor configuration keys
211
public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions";
212
public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb";
213
public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms";
214
public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE = "bulk.flush.backoff.enable";
215
public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE = "bulk.flush.backoff.type";
216
public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES = "bulk.flush.backoff.retries";
217
public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY = "bulk.flush.backoff.delay";
218
```
219
220
**Usage Examples:**
221
222
```java
223
import java.util.HashMap;
224
import java.util.Map;
225
226
// Configuration via properties map (useful for external configuration)
227
Map<String, String> bulkConfig = new HashMap<>();
228
bulkConfig.put(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1000");
229
bulkConfig.put(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB, "5");
230
bulkConfig.put(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS, "10000");
231
bulkConfig.put(CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE, "true");
232
bulkConfig.put(CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE, "EXPONENTIAL");
233
bulkConfig.put(CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES, "3");
234
bulkConfig.put(CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY, "100");
235
236
// These would be used internally by the ElasticsearchSinkBase
237
// when creating the BulkProcessor configuration
238
```
239
240
### Best Practices
241
242
#### Performance Tuning
243
244
```java
245
// For high-throughput scenarios
246
.setBulkFlushMaxActions(5000) // Large batches reduce overhead
247
.setBulkFlushMaxSizeMb(10) // Allow larger memory usage
248
.setBulkFlushInterval(30000) // Less frequent flushes
249
.setBulkFlushBackoffType(FlushBackoffType.EXPONENTIAL)
250
.setBulkFlushBackoffRetries(8) // More retries for resilience
251
252
// For low-latency scenarios
253
.setBulkFlushMaxActions(100) // Small batches for quick processing
254
.setBulkFlushMaxSizeMb(1) // Low memory usage
255
.setBulkFlushInterval(1000) // Frequent flushes
256
.setBulkFlushBackoffType(FlushBackoffType.CONSTANT)
257
.setBulkFlushBackoffRetries(3) // Fewer retries for faster failure
258
```
259
260
#### Memory Management
261
262
```java
263
// Memory-constrained environments
264
.setBulkFlushMaxActions(500) // Moderate batch sizes
265
.setBulkFlushMaxSizeMb(2) // Strict size limits
266
.setBulkFlushInterval(5000) // Regular flushes to clear buffers
267
.setBulkFlushBackoff(false) // Fail fast to avoid memory buildup
268
```
269
270
#### Cluster Load Management
271
272
```java
273
// For busy Elasticsearch clusters
274
.setBulkFlushBackoff(true) // Essential for handling rejections
275
.setBulkFlushBackoffType(FlushBackoffType.EXPONENTIAL)
276
.setBulkFlushBackoffRetries(10) // Generous retry count
277
.setBulkFlushBackoffDelay(100) // Start with reasonable delay
278
.setBulkFlushMaxActions(1000) // Moderate batch sizes
279
.setBulkFlushInterval(15000) // Give cluster time to process
280
```