or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

batch-connectors.mdconfiguration.mdindex.mdstreaming-sinks.mdtable-api.mdwrite-ahead-logging.md

configuration.mddocs/

0

# Configuration and Connection Management

1

2

Comprehensive configuration system for customizing Cassandra connectivity, error handling, performance tuning, and DataStax mapper options. Provides flexible connection builders and centralized configuration management.

3

4

## Capabilities

5

6

### Connection Management

7

8

#### Cluster Builder

9

10

Abstract base class for configuring Cassandra cluster connections with custom settings.

11

12

```java { .api }

13

public abstract class ClusterBuilder implements Serializable {

14

public Cluster getCluster();

15

protected abstract Cluster buildCluster(Cluster.Builder builder);

16

}

17

```

18

19

**Basic Usage:**

20

21

```java

22

// Simple connection

23

ClusterBuilder simpleBuilder = new ClusterBuilder() {

24

@Override

25

protected Cluster buildCluster(Cluster.Builder builder) {

26

return builder.addContactPoint("127.0.0.1").build();

27

}

28

};

29

30

// Multiple contact points

31

ClusterBuilder multiNodeBuilder = new ClusterBuilder() {

32

@Override

33

protected Cluster buildCluster(Cluster.Builder builder) {

34

return builder

35

.addContactPoint("cassandra-1")

36

.addContactPoint("cassandra-2")

37

.addContactPoint("cassandra-3")

38

.withPort(9042)

39

.build();

40

}

41

};

42

```

43

44

**Advanced Configuration:**

45

46

```java

47

ClusterBuilder productionBuilder = new ClusterBuilder() {

48

@Override

49

protected Cluster buildCluster(Cluster.Builder builder) {

50

// Connection settings

51

SocketOptions socketOptions = new SocketOptions()

52

.setConnectTimeoutMillis(10000)

53

.setReadTimeoutMillis(10000)

54

.setKeepAlive(true)

55

.setReuseAddress(true)

56

.setTcpNoDelay(true);

57

58

// Pool settings

59

PoolingOptions poolingOptions = new PoolingOptions()

60

.setConnectionsPerHost(HostDistance.LOCAL, 8, 16)

61

.setConnectionsPerHost(HostDistance.REMOTE, 4, 8)

62

.setMaxRequestsPerConnection(HostDistance.LOCAL, 1024)

63

.setMaxRequestsPerConnection(HostDistance.REMOTE, 512);

64

65

// Retry and reconnection policies

66

RetryPolicy retryPolicy = new LoggingRetryPolicy(DefaultRetryPolicy.INSTANCE);

67

ReconnectionPolicy reconnectionPolicy = new ExponentialReconnectionPolicy(1000, 30000);

68

69

return builder

70

.addContactPoints("cassandra-1", "cassandra-2", "cassandra-3")

71

.withPort(9042)

72

.withCredentials("username", "password")

73

.withSocketOptions(socketOptions)

74

.withPoolingOptions(poolingOptions)

75

.withRetryPolicy(retryPolicy)

76

.withReconnectionPolicy(reconnectionPolicy)

77

.withQueryOptions(new QueryOptions()

78

.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM)

79

.setSerialConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL))

80

.build();

81

}

82

};

83

```

84

85

**SSL Configuration:**

86

87

```java

88

ClusterBuilder sslBuilder = new ClusterBuilder() {

89

@Override

90

protected Cluster buildCluster(Cluster.Builder builder) {

91

SSLOptions sslOptions = RemoteEndpointAwareJdkSSLOptions.builder()

92

.withSSLContext(createSSLContext())

93

.build();

94

95

return builder

96

.addContactPoint("secure-cassandra.example.com")

97

.withPort(9142)

98

.withSSL(sslOptions)

99

.withCredentials("username", "password")

100

.build();

101

}

102

103

private SSLContext createSSLContext() {

104

// SSL context creation logic

105

// ...

106

}

107

};

108

```

109

110

### Error Handling

111

112

#### Failure Handler Interface

113

114

Interface for defining custom error handling strategies.

115

116

```java { .api }

117

public interface CassandraFailureHandler extends Serializable {

118

void onFailure(Throwable failure) throws IOException;

119

}

120

```

121

122

**Built-in Handler:**

123

124

```java

125

// Default no-op handler (re-throws all exceptions)

126

public class NoOpCassandraFailureHandler implements CassandraFailureHandler {

127

public void onFailure(Throwable failure) throws IOException;

128

}

129

```

130

131

**Custom Failure Handlers:**

132

133

```java

134

// Retry on timeout, fail on other errors

135

CassandraFailureHandler retryHandler = new CassandraFailureHandler() {

136

@Override

137

public void onFailure(Throwable failure) throws IOException {

138

if (failure instanceof WriteTimeoutException) {

139

logger.warn("Write timeout occurred, continuing processing", failure);

140

return; // Don't re-throw, continue processing

141

}

142

if (failure instanceof ReadTimeoutException) {

143

logger.warn("Read timeout occurred, continuing processing", failure);

144

return;

145

}

146

// Fail the sink for other types of errors

147

throw new IOException("Cassandra operation failed", failure);

148

}

149

};

150

151

// Log all errors but continue processing

152

CassandraFailureHandler logOnlyHandler = new CassandraFailureHandler() {

153

@Override

154

public void onFailure(Throwable failure) throws IOException {

155

logger.error("Cassandra operation failed, but continuing", failure);

156

// Don't re-throw, continue processing

157

}

158

};

159

160

// Fail fast on any error

161

CassandraFailureHandler failFastHandler = new CassandraFailureHandler() {

162

@Override

163

public void onFailure(Throwable failure) throws IOException {

164

throw new IOException("Failing fast on Cassandra error", failure);

165

}

166

};

167

168

// Conditional error handling

169

CassandraFailureHandler conditionalHandler = new CassandraFailureHandler() {

170

private final AtomicInteger errorCount = new AtomicInteger(0);

171

172

@Override

173

public void onFailure(Throwable failure) throws IOException {

174

int count = errorCount.incrementAndGet();

175

176

if (count > 100) {

177

throw new IOException("Too many errors (" + count + "), failing sink", failure);

178

}

179

180

if (failure instanceof OverloadedException) {

181

logger.warn("Cassandra overloaded, backing off", failure);

182

try {

183

Thread.sleep(1000); // Simple backoff

184

} catch (InterruptedException e) {

185

Thread.currentThread().interrupt();

186

throw new IOException("Interrupted during backoff", e);

187

}

188

return;

189

}

190

191

logger.warn("Cassandra error #{}, continuing", count, failure);

192

}

193

};

194

```

195

196

### Sink Configuration

197

198

#### Base Configuration

199

200

Centralized configuration object for sink behavior and performance tuning.

201

202

```java { .api }

203

public final class CassandraSinkBaseConfig implements Serializable {

204

public static final int DEFAULT_MAX_CONCURRENT_REQUESTS = Integer.MAX_VALUE;

205

public static final long DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT = Long.MAX_VALUE;

206

public static final boolean DEFAULT_IGNORE_NULL_FIELDS = false;

207

208

public int getMaxConcurrentRequests();

209

public Duration getMaxConcurrentRequestsTimeout();

210

public boolean getIgnoreNullFields();

211

212

public static Builder newBuilder();

213

}

214

```

215

216

#### Configuration Builder

217

218

Builder pattern for creating sink configuration objects.

219

220

```java { .api }

221

public static class Builder {

222

public Builder setMaxConcurrentRequests(int maxConcurrentRequests);

223

public Builder setMaxConcurrentRequestsTimeout(Duration timeout);

224

public Builder setIgnoreNullFields(boolean ignoreNullFields);

225

public CassandraSinkBaseConfig build();

226

}

227

```

228

229

**Configuration Examples:**

230

231

```java

232

// High-throughput configuration

233

CassandraSinkBaseConfig highThroughputConfig = CassandraSinkBaseConfig.newBuilder()

234

.setMaxConcurrentRequests(500)

235

.setMaxConcurrentRequestsTimeout(Duration.ofSeconds(10))

236

.setIgnoreNullFields(true) // Avoid tombstones

237

.build();

238

239

// Conservative configuration for stability

240

CassandraSinkBaseConfig conservativeConfig = CassandraSinkBaseConfig.newBuilder()

241

.setMaxConcurrentRequests(50)

242

.setMaxConcurrentRequestsTimeout(Duration.ofSeconds(30))

243

.setIgnoreNullFields(false)

244

.build();

245

246

// Default configuration

247

CassandraSinkBaseConfig defaultConfig = CassandraSinkBaseConfig.newBuilder().build();

248

```

249

250

### DataStax Mapper Configuration

251

252

#### Mapper Options Interface

253

254

Interface for configuring DataStax object mapper behavior for POJO operations.

255

256

```java { .api }

257

public interface MapperOptions extends Serializable {

258

Mapper.Option[] getMapperOptions();

259

}

260

```

261

262

**Common Mapper Options:**

263

264

```java

265

// TTL and timestamp options

266

MapperOptions ttlOptions = new MapperOptions() {

267

@Override

268

public Mapper.Option[] getMapperOptions() {

269

return new Mapper.Option[] {

270

Mapper.Option.ttl(3600), // 1 hour TTL

271

Mapper.Option.timestamp(System.currentTimeMillis())

272

};

273

}

274

};

275

276

// Consistency level options

277

MapperOptions consistencyOptions = new MapperOptions() {

278

@Override

279

public Mapper.Option[] getMapperOptions() {

280

return new Mapper.Option[] {

281

Mapper.Option.consistencyLevel(ConsistencyLevel.LOCAL_QUORUM),

282

Mapper.Option.serialConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL)

283

};

284

}

285

};

286

287

// Null field handling

288

MapperOptions nullHandlingOptions = new MapperOptions() {

289

@Override

290

public Mapper.Option[] getMapperOptions() {

291

return new Mapper.Option[] {

292

Mapper.Option.saveNullFields(false), // Don't save null fields

293

Mapper.Option.ifNotExists(true) // Use IF NOT EXISTS

294

};

295

}

296

};

297

298

// Conditional writes

299

MapperOptions conditionalOptions = new MapperOptions() {

300

@Override

301

public Mapper.Option[] getMapperOptions() {

302

return new Mapper.Option[] {

303

Mapper.Option.ifNotExists(true),

304

// Note: ifExists() and custom IF conditions are also available

305

};

306

}

307

};

308

309

// Combined options

310

MapperOptions productionOptions = new MapperOptions() {

311

@Override

312

public Mapper.Option[] getMapperOptions() {

313

return new Mapper.Option[] {

314

Mapper.Option.consistencyLevel(ConsistencyLevel.LOCAL_QUORUM),

315

Mapper.Option.ttl(86400), // 24 hours TTL

316

Mapper.Option.saveNullFields(false),

317

Mapper.Option.timestamp(System.currentTimeMillis())

318

};

319

}

320

};

321

```

322

323

## Configuration Patterns

324

325

### Environment-Specific Configuration

326

327

```java

328

public class CassandraConfigFactory {

329

330

public static ClusterBuilder createClusterBuilder(String environment) {

331

switch (environment.toLowerCase()) {

332

case "development":

333

return createDevelopmentBuilder();

334

case "staging":

335

return createStagingBuilder();

336

case "production":

337

return createProductionBuilder();

338

default:

339

throw new IllegalArgumentException("Unknown environment: " + environment);

340

}

341

}

342

343

private static ClusterBuilder createDevelopmentBuilder() {

344

return new ClusterBuilder() {

345

@Override

346

protected Cluster buildCluster(Cluster.Builder builder) {

347

return builder

348

.addContactPoint("localhost")

349

.withPort(9042)

350

.build();

351

}

352

};

353

}

354

355

private static ClusterBuilder createProductionBuilder() {

356

return new ClusterBuilder() {

357

@Override

358

protected Cluster buildCluster(Cluster.Builder builder) {

359

return builder

360

.addContactPoints(

361

System.getenv("CASSANDRA_HOST_1"),

362

System.getenv("CASSANDRA_HOST_2"),

363

System.getenv("CASSANDRA_HOST_3")

364

)

365

.withPort(Integer.parseInt(System.getenv("CASSANDRA_PORT")))

366

.withCredentials(

367

System.getenv("CASSANDRA_USERNAME"),

368

System.getenv("CASSANDRA_PASSWORD")

369

)

370

.withSocketOptions(new SocketOptions()

371

.setConnectTimeoutMillis(10000)

372

.setReadTimeoutMillis(10000))

373

.withQueryOptions(new QueryOptions()

374

.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM))

375

.build();

376

}

377

};

378

}

379

}

380

```

381

382

### Performance Tuning Guidelines

383

384

```java

385

// For high-throughput workloads

386

CassandraSinkBaseConfig highThroughput = CassandraSinkBaseConfig.newBuilder()

387

.setMaxConcurrentRequests(1000) // High concurrency

388

.setMaxConcurrentRequestsTimeout(Duration.ofSeconds(5)) // Fast timeout

389

.setIgnoreNullFields(true) // Reduce tombstones

390

.build();

391

392

// For low-latency workloads

393

CassandraSinkBaseConfig lowLatency = CassandraSinkBaseConfig.newBuilder()

394

.setMaxConcurrentRequests(100) // Lower concurrency for consistency

395

.setMaxConcurrentRequestsTimeout(Duration.ofSeconds(1)) // Very fast timeout

396

.setIgnoreNullFields(true)

397

.build();

398

399

// For reliable workloads

400

CassandraSinkBaseConfig reliable = CassandraSinkBaseConfig.newBuilder()

401

.setMaxConcurrentRequests(50) // Conservative concurrency

402

.setMaxConcurrentRequestsTimeout(Duration.ofSeconds(30)) // Generous timeout

403

.setIgnoreNullFields(false) // Allow nulls if needed

404

.build();

405

```

406

407

### Security Configuration

408

409

```java

410

// Authentication and SSL

411

ClusterBuilder secureBuilder = new ClusterBuilder() {

412

@Override

413

protected Cluster buildCluster(Cluster.Builder builder) {

414

// Load SSL context from keystore

415

SSLContext sslContext = loadSSLContext();

416

SSLOptions sslOptions = RemoteEndpointAwareJdkSSLOptions.builder()

417

.withSSLContext(sslContext)

418

.build();

419

420

return builder

421

.addContactPoints("secure-cassandra.example.com")

422

.withPort(9142)

423

.withCredentials("service-user", loadPassword())

424

.withSSL(sslOptions)

425

.withAuthProvider(new PlainTextAuthProvider("service-user", loadPassword()))

426

.build();

427

}

428

429

private SSLContext loadSSLContext() {

430

// Load SSL configuration from files or environment

431

// ...

432

}

433

434

private String loadPassword() {

435

// Load password from secure source

436

return System.getenv("CASSANDRA_PASSWORD");

437

}

438

};

439

```