0
# Configuration and Connection Management
1
2
Comprehensive configuration system for customizing Cassandra connectivity, error handling, performance tuning, and DataStax mapper options. Provides flexible connection builders and centralized configuration management.
3
4
## Capabilities
5
6
### Connection Management
7
8
#### Cluster Builder
9
10
Abstract base class for configuring Cassandra cluster connections with custom settings.
11
12
```java { .api }
13
public abstract class ClusterBuilder implements Serializable {
14
public Cluster getCluster();
15
protected abstract Cluster buildCluster(Cluster.Builder builder);
16
}
17
```
18
19
**Basic Usage:**
20
21
```java
22
// Simple connection
23
ClusterBuilder simpleBuilder = new ClusterBuilder() {
24
@Override
25
protected Cluster buildCluster(Cluster.Builder builder) {
26
return builder.addContactPoint("127.0.0.1").build();
27
}
28
};
29
30
// Multiple contact points
31
ClusterBuilder multiNodeBuilder = new ClusterBuilder() {
32
@Override
33
protected Cluster buildCluster(Cluster.Builder builder) {
34
return builder
35
.addContactPoint("cassandra-1")
36
.addContactPoint("cassandra-2")
37
.addContactPoint("cassandra-3")
38
.withPort(9042)
39
.build();
40
}
41
};
42
```
43
44
**Advanced Configuration:**
45
46
```java
47
ClusterBuilder productionBuilder = new ClusterBuilder() {
48
@Override
49
protected Cluster buildCluster(Cluster.Builder builder) {
50
// Connection settings
51
SocketOptions socketOptions = new SocketOptions()
52
.setConnectTimeoutMillis(10000)
53
.setReadTimeoutMillis(10000)
54
.setKeepAlive(true)
55
.setReuseAddress(true)
56
.setTcpNoDelay(true);
57
58
// Pool settings
59
PoolingOptions poolingOptions = new PoolingOptions()
60
.setConnectionsPerHost(HostDistance.LOCAL, 8, 16)
61
.setConnectionsPerHost(HostDistance.REMOTE, 4, 8)
62
.setMaxRequestsPerConnection(HostDistance.LOCAL, 1024)
63
.setMaxRequestsPerConnection(HostDistance.REMOTE, 512);
64
65
// Retry and reconnection policies
66
RetryPolicy retryPolicy = new LoggingRetryPolicy(DefaultRetryPolicy.INSTANCE);
67
ReconnectionPolicy reconnectionPolicy = new ExponentialReconnectionPolicy(1000, 30000);
68
69
return builder
70
.addContactPoints("cassandra-1", "cassandra-2", "cassandra-3")
71
.withPort(9042)
72
.withCredentials("username", "password")
73
.withSocketOptions(socketOptions)
74
.withPoolingOptions(poolingOptions)
75
.withRetryPolicy(retryPolicy)
76
.withReconnectionPolicy(reconnectionPolicy)
77
.withQueryOptions(new QueryOptions()
78
.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM)
79
.setSerialConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL))
80
.build();
81
}
82
};
83
```
84
85
**SSL Configuration:**
86
87
```java
88
ClusterBuilder sslBuilder = new ClusterBuilder() {
89
@Override
90
protected Cluster buildCluster(Cluster.Builder builder) {
91
SSLOptions sslOptions = RemoteEndpointAwareJdkSSLOptions.builder()
92
.withSSLContext(createSSLContext())
93
.build();
94
95
return builder
96
.addContactPoint("secure-cassandra.example.com")
97
.withPort(9142)
98
.withSSL(sslOptions)
99
.withCredentials("username", "password")
100
.build();
101
}
102
103
private SSLContext createSSLContext() {
104
// SSL context creation logic
105
// ...
106
}
107
};
108
```
109
110
### Error Handling
111
112
#### Failure Handler Interface
113
114
Interface for defining custom error handling strategies.
115
116
```java { .api }
117
public interface CassandraFailureHandler extends Serializable {
118
void onFailure(Throwable failure) throws IOException;
119
}
120
```
121
122
**Built-in Handler:**
123
124
```java
125
// Default no-op handler (re-throws all exceptions)
126
public class NoOpCassandraFailureHandler implements CassandraFailureHandler {
127
public void onFailure(Throwable failure) throws IOException;
128
}
129
```
130
131
**Custom Failure Handlers:**
132
133
```java
134
// Retry on timeout, fail on other errors
135
CassandraFailureHandler retryHandler = new CassandraFailureHandler() {
136
@Override
137
public void onFailure(Throwable failure) throws IOException {
138
if (failure instanceof WriteTimeoutException) {
139
logger.warn("Write timeout occurred, continuing processing", failure);
140
return; // Don't re-throw, continue processing
141
}
142
if (failure instanceof ReadTimeoutException) {
143
logger.warn("Read timeout occurred, continuing processing", failure);
144
return;
145
}
146
// Fail the sink for other types of errors
147
throw new IOException("Cassandra operation failed", failure);
148
}
149
};
150
151
// Log all errors but continue processing
152
CassandraFailureHandler logOnlyHandler = new CassandraFailureHandler() {
153
@Override
154
public void onFailure(Throwable failure) throws IOException {
155
logger.error("Cassandra operation failed, but continuing", failure);
156
// Don't re-throw, continue processing
157
}
158
};
159
160
// Fail fast on any error
161
CassandraFailureHandler failFastHandler = new CassandraFailureHandler() {
162
@Override
163
public void onFailure(Throwable failure) throws IOException {
164
throw new IOException("Failing fast on Cassandra error", failure);
165
}
166
};
167
168
// Conditional error handling
169
CassandraFailureHandler conditionalHandler = new CassandraFailureHandler() {
170
private final AtomicInteger errorCount = new AtomicInteger(0);
171
172
@Override
173
public void onFailure(Throwable failure) throws IOException {
174
int count = errorCount.incrementAndGet();
175
176
if (count > 100) {
177
throw new IOException("Too many errors (" + count + "), failing sink", failure);
178
}
179
180
if (failure instanceof OverloadedException) {
181
logger.warn("Cassandra overloaded, backing off", failure);
182
try {
183
Thread.sleep(1000); // Simple backoff
184
} catch (InterruptedException e) {
185
Thread.currentThread().interrupt();
186
throw new IOException("Interrupted during backoff", e);
187
}
188
return;
189
}
190
191
logger.warn("Cassandra error #{}, continuing", count, failure);
192
}
193
};
194
```
195
196
### Sink Configuration
197
198
#### Base Configuration
199
200
Centralized configuration object for sink behavior and performance tuning.
201
202
```java { .api }
203
public final class CassandraSinkBaseConfig implements Serializable {
204
public static final int DEFAULT_MAX_CONCURRENT_REQUESTS = Integer.MAX_VALUE;
205
public static final long DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT = Long.MAX_VALUE;
206
public static final boolean DEFAULT_IGNORE_NULL_FIELDS = false;
207
208
public int getMaxConcurrentRequests();
209
public Duration getMaxConcurrentRequestsTimeout();
210
public boolean getIgnoreNullFields();
211
212
public static Builder newBuilder();
213
}
214
```
215
216
#### Configuration Builder
217
218
Builder pattern for creating sink configuration objects.
219
220
```java { .api }
221
public static class Builder {
222
public Builder setMaxConcurrentRequests(int maxConcurrentRequests);
223
public Builder setMaxConcurrentRequestsTimeout(Duration timeout);
224
public Builder setIgnoreNullFields(boolean ignoreNullFields);
225
public CassandraSinkBaseConfig build();
226
}
227
```
228
229
**Configuration Examples:**
230
231
```java
232
// High-throughput configuration
233
CassandraSinkBaseConfig highThroughputConfig = CassandraSinkBaseConfig.newBuilder()
234
.setMaxConcurrentRequests(500)
235
.setMaxConcurrentRequestsTimeout(Duration.ofSeconds(10))
236
.setIgnoreNullFields(true) // Avoid tombstones
237
.build();
238
239
// Conservative configuration for stability
240
CassandraSinkBaseConfig conservativeConfig = CassandraSinkBaseConfig.newBuilder()
241
.setMaxConcurrentRequests(50)
242
.setMaxConcurrentRequestsTimeout(Duration.ofSeconds(30))
243
.setIgnoreNullFields(false)
244
.build();
245
246
// Default configuration
247
CassandraSinkBaseConfig defaultConfig = CassandraSinkBaseConfig.newBuilder().build();
248
```
249
250
### DataStax Mapper Configuration
251
252
#### Mapper Options Interface
253
254
Interface for configuring DataStax object mapper behavior for POJO operations.
255
256
```java { .api }
257
public interface MapperOptions extends Serializable {
258
Mapper.Option[] getMapperOptions();
259
}
260
```
261
262
**Common Mapper Options:**
263
264
```java
265
// TTL and timestamp options
266
MapperOptions ttlOptions = new MapperOptions() {
267
@Override
268
public Mapper.Option[] getMapperOptions() {
269
return new Mapper.Option[] {
270
Mapper.Option.ttl(3600), // 1 hour TTL
271
Mapper.Option.timestamp(System.currentTimeMillis())
272
};
273
}
274
};
275
276
// Consistency level options
277
MapperOptions consistencyOptions = new MapperOptions() {
278
@Override
279
public Mapper.Option[] getMapperOptions() {
280
return new Mapper.Option[] {
281
Mapper.Option.consistencyLevel(ConsistencyLevel.LOCAL_QUORUM),
282
Mapper.Option.serialConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL)
283
};
284
}
285
};
286
287
// Null field handling
288
MapperOptions nullHandlingOptions = new MapperOptions() {
289
@Override
290
public Mapper.Option[] getMapperOptions() {
291
return new Mapper.Option[] {
292
Mapper.Option.saveNullFields(false), // Don't save null fields
293
Mapper.Option.ifNotExists(true) // Use IF NOT EXISTS
294
};
295
}
296
};
297
298
// Conditional writes
299
MapperOptions conditionalOptions = new MapperOptions() {
300
@Override
301
public Mapper.Option[] getMapperOptions() {
302
return new Mapper.Option[] {
303
Mapper.Option.ifNotExists(true),
304
// Note: ifExists() and custom IF conditions are also available
305
};
306
}
307
};
308
309
// Combined options
310
MapperOptions productionOptions = new MapperOptions() {
311
@Override
312
public Mapper.Option[] getMapperOptions() {
313
return new Mapper.Option[] {
314
Mapper.Option.consistencyLevel(ConsistencyLevel.LOCAL_QUORUM),
315
Mapper.Option.ttl(86400), // 24 hours TTL
316
Mapper.Option.saveNullFields(false),
317
Mapper.Option.timestamp(System.currentTimeMillis())
318
};
319
}
320
};
321
```
322
323
## Configuration Patterns
324
325
### Environment-Specific Configuration
326
327
```java
328
public class CassandraConfigFactory {
329
330
public static ClusterBuilder createClusterBuilder(String environment) {
331
switch (environment.toLowerCase()) {
332
case "development":
333
return createDevelopmentBuilder();
334
case "staging":
335
return createStagingBuilder();
336
case "production":
337
return createProductionBuilder();
338
default:
339
throw new IllegalArgumentException("Unknown environment: " + environment);
340
}
341
}
342
343
private static ClusterBuilder createDevelopmentBuilder() {
344
return new ClusterBuilder() {
345
@Override
346
protected Cluster buildCluster(Cluster.Builder builder) {
347
return builder
348
.addContactPoint("localhost")
349
.withPort(9042)
350
.build();
351
}
352
};
353
}
354
355
private static ClusterBuilder createProductionBuilder() {
356
return new ClusterBuilder() {
357
@Override
358
protected Cluster buildCluster(Cluster.Builder builder) {
359
return builder
360
.addContactPoints(
361
System.getenv("CASSANDRA_HOST_1"),
362
System.getenv("CASSANDRA_HOST_2"),
363
System.getenv("CASSANDRA_HOST_3")
364
)
365
.withPort(Integer.parseInt(System.getenv("CASSANDRA_PORT")))
366
.withCredentials(
367
System.getenv("CASSANDRA_USERNAME"),
368
System.getenv("CASSANDRA_PASSWORD")
369
)
370
.withSocketOptions(new SocketOptions()
371
.setConnectTimeoutMillis(10000)
372
.setReadTimeoutMillis(10000))
373
.withQueryOptions(new QueryOptions()
374
.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM))
375
.build();
376
}
377
};
378
}
379
}
380
```
381
382
### Performance Tuning Guidelines
383
384
```java
385
// For high-throughput workloads
386
CassandraSinkBaseConfig highThroughput = CassandraSinkBaseConfig.newBuilder()
387
.setMaxConcurrentRequests(1000) // High concurrency
388
.setMaxConcurrentRequestsTimeout(Duration.ofSeconds(5)) // Fast timeout
389
.setIgnoreNullFields(true) // Reduce tombstones
390
.build();
391
392
// For low-latency workloads
393
CassandraSinkBaseConfig lowLatency = CassandraSinkBaseConfig.newBuilder()
394
.setMaxConcurrentRequests(100) // Lower concurrency for consistency
395
.setMaxConcurrentRequestsTimeout(Duration.ofSeconds(1)) // Very fast timeout
396
.setIgnoreNullFields(true)
397
.build();
398
399
// For reliable workloads
400
CassandraSinkBaseConfig reliable = CassandraSinkBaseConfig.newBuilder()
401
.setMaxConcurrentRequests(50) // Conservative concurrency
402
.setMaxConcurrentRequestsTimeout(Duration.ofSeconds(30)) // Generous timeout
403
.setIgnoreNullFields(false) // Allow nulls if needed
404
.build();
405
```
406
407
### Security Configuration
408
409
```java
410
// Authentication and SSL
411
ClusterBuilder secureBuilder = new ClusterBuilder() {
412
@Override
413
protected Cluster buildCluster(Cluster.Builder builder) {
414
// Load SSL context from keystore
415
SSLContext sslContext = loadSSLContext();
416
SSLOptions sslOptions = RemoteEndpointAwareJdkSSLOptions.builder()
417
.withSSLContext(sslContext)
418
.build();
419
420
return builder
421
.addContactPoints("secure-cassandra.example.com")
422
.withPort(9142)
423
.withCredentials("service-user", loadPassword())
424
.withSSL(sslOptions)
425
.withAuthProvider(new PlainTextAuthProvider("service-user", loadPassword()))
426
.build();
427
}
428
429
private SSLContext loadSSLContext() {
430
// Load SSL configuration from files or environment
431
// ...
432
}
433
434
private String loadPassword() {
435
// Load password from secure source
436
return System.getenv("CASSANDRA_PASSWORD");
437
}
438
};
439
```