or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

bulk-processing.mdclient-configuration.mddatastream-api.mdfailure-handling.mdindex.mdtable-api.md

bulk-processing.mddocs/

0

# Bulk Processing

1

2

Configurable bulk request processing with batching, buffering, and timing controls. Supports backoff strategies and retry mechanisms for handling cluster load.

3

4

## Capabilities

5

6

### Bulk Configuration Methods

7

8

Methods available on ElasticsearchSink.Builder for configuring bulk processing behavior.

9

10

```java { .api }

11

/**

12

* Sets the maximum number of actions to buffer for each bulk request. You can pass -1 to disable it.

13

* @param numMaxActions the maximum number of actions to buffer per bulk request.

14

*/

15

public void setBulkFlushMaxActions(int numMaxActions);

16

17

/**

18

* Sets the maximum size of buffered actions, in mb, per bulk request. You can pass -1 to disable it.

19

* @param maxSizeMb the maximum size of buffered actions, in mb.

20

*/

21

public void setBulkFlushMaxSizeMb(int maxSizeMb);

22

23

/**

24

* Sets the bulk flush interval, in milliseconds. You can pass -1 to disable it.

25

* @param intervalMillis the bulk flush interval, in milliseconds.

26

*/

27

public void setBulkFlushInterval(long intervalMillis);

28

29

/**

30

* Sets whether or not to enable bulk flush backoff behaviour.

31

* @param enabled whether or not to enable backoffs.

32

*/

33

public void setBulkFlushBackoff(boolean enabled);

34

35

/**

36

* Sets the type of back off to use when flushing bulk requests.

37

* @param flushBackoffType the backoff type to use.

38

*/

39

public void setBulkFlushBackoffType(FlushBackoffType flushBackoffType);

40

41

/**

42

* Sets the maximum number of retries for a backoff attempt when flushing bulk requests.

43

* @param maxRetries the maximum number of retries for a backoff attempt when flushing bulk requests

44

*/

45

public void setBulkFlushBackoffRetries(int maxRetries);

46

47

/**

48

* Sets the amount of delay between each backoff attempt when flushing bulk requests, in milliseconds.

49

* @param delayMillis the amount of delay between each backoff attempt when flushing bulk requests, in milliseconds.

50

*/

51

public void setBulkFlushBackoffDelay(long delayMillis);

52

```

53

54

**Usage Examples:**

55

56

```java

57

import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;

58

import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.FlushBackoffType;

59

60

// High-throughput configuration

61

ElasticsearchSink<MyData> highThroughputSink = new ElasticsearchSink.Builder<>(

62

httpHosts,

63

sinkFunction

64

)

65

.setBulkFlushMaxActions(5000) // Buffer up to 5000 actions

66

.setBulkFlushMaxSizeMb(10) // Or 10MB of data

67

.setBulkFlushInterval(30000) // Or flush every 30 seconds

68

.setBulkFlushBackoff(true) // Enable backoff on rejection

69

.setBulkFlushBackoffType(FlushBackoffType.EXPONENTIAL)

70

.setBulkFlushBackoffRetries(5) // Up to 5 retries

71

.setBulkFlushBackoffDelay(200) // Starting with 200ms delay

72

.build();

73

74

// Low-latency configuration

75

ElasticsearchSink<MyData> lowLatencySink = new ElasticsearchSink.Builder<>(

76

httpHosts,

77

sinkFunction

78

)

79

.setBulkFlushMaxActions(100) // Smaller batches

80

.setBulkFlushMaxSizeMb(1) // 1MB max size

81

.setBulkFlushInterval(1000) // Flush every second

82

.setBulkFlushBackoff(true)

83

.setBulkFlushBackoffType(FlushBackoffType.CONSTANT)

84

.setBulkFlushBackoffRetries(3)

85

.setBulkFlushBackoffDelay(100) // Constant 100ms delay

86

.build();

87

88

// Memory-constrained configuration

89

ElasticsearchSink<MyData> memoryConstrainedSink = new ElasticsearchSink.Builder<>(

90

httpHosts,

91

sinkFunction

92

)

93

.setBulkFlushMaxActions(500) // Moderate batch size

94

.setBulkFlushMaxSizeMb(2) // Small memory footprint

95

.setBulkFlushInterval(5000) // 5 second intervals

96

.setBulkFlushBackoff(false) // Disable backoff to fail fast

97

.build();

98

99

// Disable all limits (flush only on checkpoint)

100

ElasticsearchSink<MyData> checkpointOnlySink = new ElasticsearchSink.Builder<>(

101

httpHosts,

102

sinkFunction

103

)

104

.setBulkFlushMaxActions(-1) // No action limit

105

.setBulkFlushMaxSizeMb(-1) // No size limit

106

.setBulkFlushInterval(-1) // No time limit

107

.build();

108

```

109

110

### Backoff Configuration

111

112

Advanced backoff configuration for handling Elasticsearch cluster load and rejection scenarios.

113

114

```java { .api }

115

/**

116

* Used to control whether the retry delay should increase exponentially or remain constant.

117

*/

118

@PublicEvolving

119

public enum FlushBackoffType {

120

CONSTANT, // Fixed delay between retries

121

EXPONENTIAL // Exponentially increasing delay

122

}

123

124

/**

125

* Provides a backoff policy for bulk requests. Whenever a bulk request is rejected due to

126

* resource constraints (i.e. the client's internal thread pool is full), the backoff policy

127

* decides how long the bulk processor will wait before the operation is retried internally.

128

*/

129

public static class BulkFlushBackoffPolicy implements Serializable {

130

/**

131

* Get the backoff type (CONSTANT or EXPONENTIAL).

132

* @return the backoff type

133

*/

134

public FlushBackoffType getBackoffType();

135

136

/**

137

* Get the maximum number of retry attempts.

138

* @return the maximum retry count

139

*/

140

public int getMaxRetryCount();

141

142

/**

143

* Get the initial delay in milliseconds.

144

* @return the delay in milliseconds

145

*/

146

public long getDelayMillis();

147

148

/**

149

* Set the backoff type.

150

* @param backoffType the backoff type to use

151

*/

152

public void setBackoffType(FlushBackoffType backoffType);

153

154

/**

155

* Set the maximum number of retry attempts.

156

* @param maxRetryCount the maximum retry count (must be >= 0)

157

*/

158

public void setMaxRetryCount(int maxRetryCount);

159

160

/**

161

* Set the initial delay between retry attempts.

162

* @param delayMillis the delay in milliseconds (must be >= 0)

163

*/

164

public void setDelayMillis(long delayMillis);

165

}

166

```

167

168

**Usage Examples:**

169

170

```java

171

// Exponential backoff configuration

172

ElasticsearchSink<Event> exponentialBackoffSink = new ElasticsearchSink.Builder<>(

173

httpHosts,

174

sinkFunction

175

)

176

.setBulkFlushBackoff(true)

177

.setBulkFlushBackoffType(FlushBackoffType.EXPONENTIAL)

178

.setBulkFlushBackoffRetries(8) // Up to 8 retries

179

.setBulkFlushBackoffDelay(50) // Start with 50ms, then 100ms, 200ms, 400ms, etc.

180

.build();

181

182

// Constant backoff configuration

183

ElasticsearchSink<Event> constantBackoffSink = new ElasticsearchSink.Builder<>(

184

httpHosts,

185

sinkFunction

186

)

187

.setBulkFlushBackoff(true)

188

.setBulkFlushBackoffType(FlushBackoffType.CONSTANT)

189

.setBulkFlushBackoffRetries(5) // Up to 5 retries

190

.setBulkFlushBackoffDelay(1000) // Always wait 1 second between retries

191

.build();

192

193

// Aggressive backoff for high-load scenarios

194

ElasticsearchSink<Event> aggressiveBackoffSink = new ElasticsearchSink.Builder<>(

195

httpHosts,

196

sinkFunction

197

)

198

.setBulkFlushBackoff(true)

199

.setBulkFlushBackoffType(FlushBackoffType.EXPONENTIAL)

200

.setBulkFlushBackoffRetries(10) // Many retries

201

.setBulkFlushBackoffDelay(25) // Start small: 25ms, 50ms, 100ms, 200ms, 400ms, 800ms, etc.

202

.build();

203

```

204

205

### Configuration Constants

206

207

Constants available for bulk processing configuration when using string-based configuration.

208

209

```java { .api }

210

// Bulk processor configuration keys

211

public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions";

212

public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb";

213

public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms";

214

public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE = "bulk.flush.backoff.enable";

215

public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE = "bulk.flush.backoff.type";

216

public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES = "bulk.flush.backoff.retries";

217

public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY = "bulk.flush.backoff.delay";

218

```

219

220

**Usage Examples:**

221

222

```java

223

import java.util.HashMap;

224

import java.util.Map;

225

226

// Configuration via properties map (useful for external configuration)

227

Map<String, String> bulkConfig = new HashMap<>();

228

bulkConfig.put(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1000");

229

bulkConfig.put(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB, "5");

230

bulkConfig.put(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS, "10000");

231

bulkConfig.put(CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE, "true");

232

bulkConfig.put(CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE, "EXPONENTIAL");

233

bulkConfig.put(CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES, "3");

234

bulkConfig.put(CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY, "100");

235

236

// These would be used internally by the ElasticsearchSinkBase

237

// when creating the BulkProcessor configuration

238

```

239

240

### Best Practices

241

242

#### Performance Tuning

243

244

```java

245

// For high-throughput scenarios

246

.setBulkFlushMaxActions(5000) // Large batches reduce overhead

247

.setBulkFlushMaxSizeMb(10) // Allow larger memory usage

248

.setBulkFlushInterval(30000) // Less frequent flushes

249

.setBulkFlushBackoffType(FlushBackoffType.EXPONENTIAL)

250

.setBulkFlushBackoffRetries(8) // More retries for resilience

251

252

// For low-latency scenarios

253

.setBulkFlushMaxActions(100) // Small batches for quick processing

254

.setBulkFlushMaxSizeMb(1) // Low memory usage

255

.setBulkFlushInterval(1000) // Frequent flushes

256

.setBulkFlushBackoffType(FlushBackoffType.CONSTANT)

257

.setBulkFlushBackoffRetries(3) // Fewer retries for faster failure

258

```

259

260

#### Memory Management

261

262

```java

263

// Memory-constrained environments

264

.setBulkFlushMaxActions(500) // Moderate batch sizes

265

.setBulkFlushMaxSizeMb(2) // Strict size limits

266

.setBulkFlushInterval(5000) // Regular flushes to clear buffers

267

.setBulkFlushBackoff(false) // Fail fast to avoid memory buildup

268

```

269

270

#### Cluster Load Management

271

272

```java

273

// For busy Elasticsearch clusters

274

.setBulkFlushBackoff(true) // Essential for handling rejections

275

.setBulkFlushBackoffType(FlushBackoffType.EXPONENTIAL)

276

.setBulkFlushBackoffRetries(10) // Generous retry count

277

.setBulkFlushBackoffDelay(100) // Start with reasonable delay

278

.setBulkFlushMaxActions(1000) // Moderate batch sizes

279

.setBulkFlushInterval(15000) // Give cluster time to process

280

```