Apache Flink connector for Apache Cassandra - provides sinks for streaming data into Cassandra databases
—
Comprehensive configuration system for customizing Cassandra connectivity, error handling, performance tuning, and DataStax mapper options. Provides flexible connection builders and centralized configuration management.
Abstract base class for configuring Cassandra cluster connections with custom settings.
public abstract class ClusterBuilder implements Serializable {
public Cluster getCluster();
protected abstract Cluster buildCluster(Cluster.Builder builder);
}Basic Usage:
// Simple connection
ClusterBuilder simpleBuilder = new ClusterBuilder() {
@Override
protected Cluster buildCluster(Cluster.Builder builder) {
return builder.addContactPoint("127.0.0.1").build();
}
};
// Multiple contact points
ClusterBuilder multiNodeBuilder = new ClusterBuilder() {
@Override
protected Cluster buildCluster(Cluster.Builder builder) {
return builder
.addContactPoint("cassandra-1")
.addContactPoint("cassandra-2")
.addContactPoint("cassandra-3")
.withPort(9042)
.build();
}
};Advanced Configuration:
ClusterBuilder productionBuilder = new ClusterBuilder() {
@Override
protected Cluster buildCluster(Cluster.Builder builder) {
// Connection settings
SocketOptions socketOptions = new SocketOptions()
.setConnectTimeoutMillis(10000)
.setReadTimeoutMillis(10000)
.setKeepAlive(true)
.setReuseAddress(true)
.setTcpNoDelay(true);
// Pool settings
PoolingOptions poolingOptions = new PoolingOptions()
.setConnectionsPerHost(HostDistance.LOCAL, 8, 16)
.setConnectionsPerHost(HostDistance.REMOTE, 4, 8)
.setMaxRequestsPerConnection(HostDistance.LOCAL, 1024)
.setMaxRequestsPerConnection(HostDistance.REMOTE, 512);
// Retry and reconnection policies
RetryPolicy retryPolicy = new LoggingRetryPolicy(DefaultRetryPolicy.INSTANCE);
ReconnectionPolicy reconnectionPolicy = new ExponentialReconnectionPolicy(1000, 30000);
return builder
.addContactPoints("cassandra-1", "cassandra-2", "cassandra-3")
.withPort(9042)
.withCredentials("username", "password")
.withSocketOptions(socketOptions)
.withPoolingOptions(poolingOptions)
.withRetryPolicy(retryPolicy)
.withReconnectionPolicy(reconnectionPolicy)
.withQueryOptions(new QueryOptions()
.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM)
.setSerialConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL))
.build();
}
};SSL Configuration:
ClusterBuilder sslBuilder = new ClusterBuilder() {
@Override
protected Cluster buildCluster(Cluster.Builder builder) {
SSLOptions sslOptions = RemoteEndpointAwareJdkSSLOptions.builder()
.withSSLContext(createSSLContext())
.build();
return builder
.addContactPoint("secure-cassandra.example.com")
.withPort(9142)
.withSSL(sslOptions)
.withCredentials("username", "password")
.build();
}
private SSLContext createSSLContext() {
// SSL context creation logic
// ...
}
};Interface for defining custom error handling strategies.
public interface CassandraFailureHandler extends Serializable {
void onFailure(Throwable failure) throws IOException;
}Built-in Handler:
// Default no-op handler (re-throws all exceptions)
public class NoOpCassandraFailureHandler implements CassandraFailureHandler {
public void onFailure(Throwable failure) throws IOException;
}Custom Failure Handlers:
// Retry on timeout, fail on other errors
CassandraFailureHandler retryHandler = new CassandraFailureHandler() {
@Override
public void onFailure(Throwable failure) throws IOException {
if (failure instanceof WriteTimeoutException) {
logger.warn("Write timeout occurred, continuing processing", failure);
return; // Don't re-throw, continue processing
}
if (failure instanceof ReadTimeoutException) {
logger.warn("Read timeout occurred, continuing processing", failure);
return;
}
// Fail the sink for other types of errors
throw new IOException("Cassandra operation failed", failure);
}
};
// Log all errors but continue processing
CassandraFailureHandler logOnlyHandler = new CassandraFailureHandler() {
@Override
public void onFailure(Throwable failure) throws IOException {
logger.error("Cassandra operation failed, but continuing", failure);
// Don't re-throw, continue processing
}
};
// Fail fast on any error
CassandraFailureHandler failFastHandler = new CassandraFailureHandler() {
@Override
public void onFailure(Throwable failure) throws IOException {
throw new IOException("Failing fast on Cassandra error", failure);
}
};
// Conditional error handling
CassandraFailureHandler conditionalHandler = new CassandraFailureHandler() {
private final AtomicInteger errorCount = new AtomicInteger(0);
@Override
public void onFailure(Throwable failure) throws IOException {
int count = errorCount.incrementAndGet();
if (count > 100) {
throw new IOException("Too many errors (" + count + "), failing sink", failure);
}
if (failure instanceof OverloadedException) {
logger.warn("Cassandra overloaded, backing off", failure);
try {
Thread.sleep(1000); // Simple backoff
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted during backoff", e);
}
return;
}
logger.warn("Cassandra error #{}, continuing", count, failure);
}
};Centralized configuration object for sink behavior and performance tuning.
public final class CassandraSinkBaseConfig implements Serializable {
public static final int DEFAULT_MAX_CONCURRENT_REQUESTS = Integer.MAX_VALUE;
public static final long DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT = Long.MAX_VALUE;
public static final boolean DEFAULT_IGNORE_NULL_FIELDS = false;
public int getMaxConcurrentRequests();
public Duration getMaxConcurrentRequestsTimeout();
public boolean getIgnoreNullFields();
public static Builder newBuilder();
}Builder pattern for creating sink configuration objects.
public static class Builder {
public Builder setMaxConcurrentRequests(int maxConcurrentRequests);
public Builder setMaxConcurrentRequestsTimeout(Duration timeout);
public Builder setIgnoreNullFields(boolean ignoreNullFields);
public CassandraSinkBaseConfig build();
}Configuration Examples:
// High-throughput configuration
CassandraSinkBaseConfig highThroughputConfig = CassandraSinkBaseConfig.newBuilder()
.setMaxConcurrentRequests(500)
.setMaxConcurrentRequestsTimeout(Duration.ofSeconds(10))
.setIgnoreNullFields(true) // Avoid tombstones
.build();
// Conservative configuration for stability
CassandraSinkBaseConfig conservativeConfig = CassandraSinkBaseConfig.newBuilder()
.setMaxConcurrentRequests(50)
.setMaxConcurrentRequestsTimeout(Duration.ofSeconds(30))
.setIgnoreNullFields(false)
.build();
// Default configuration
CassandraSinkBaseConfig defaultConfig = CassandraSinkBaseConfig.newBuilder().build();Interface for configuring DataStax object mapper behavior for POJO operations.
public interface MapperOptions extends Serializable {
Mapper.Option[] getMapperOptions();
}Common Mapper Options:
// TTL and timestamp options
MapperOptions ttlOptions = new MapperOptions() {
@Override
public Mapper.Option[] getMapperOptions() {
return new Mapper.Option[] {
Mapper.Option.ttl(3600), // 1 hour TTL
Mapper.Option.timestamp(System.currentTimeMillis())
};
}
};
// Consistency level options
MapperOptions consistencyOptions = new MapperOptions() {
@Override
public Mapper.Option[] getMapperOptions() {
return new Mapper.Option[] {
Mapper.Option.consistencyLevel(ConsistencyLevel.LOCAL_QUORUM),
Mapper.Option.serialConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL)
};
}
};
// Null field handling
MapperOptions nullHandlingOptions = new MapperOptions() {
@Override
public Mapper.Option[] getMapperOptions() {
return new Mapper.Option[] {
Mapper.Option.saveNullFields(false), // Don't save null fields
Mapper.Option.ifNotExists(true) // Use IF NOT EXISTS
};
}
};
// Conditional writes
MapperOptions conditionalOptions = new MapperOptions() {
@Override
public Mapper.Option[] getMapperOptions() {
return new Mapper.Option[] {
Mapper.Option.ifNotExists(true),
// Note: ifExists() and custom IF conditions are also available
};
}
};
// Combined options
MapperOptions productionOptions = new MapperOptions() {
@Override
public Mapper.Option[] getMapperOptions() {
return new Mapper.Option[] {
Mapper.Option.consistencyLevel(ConsistencyLevel.LOCAL_QUORUM),
Mapper.Option.ttl(86400), // 24 hours TTL
Mapper.Option.saveNullFields(false),
Mapper.Option.timestamp(System.currentTimeMillis())
};
}
};public class CassandraConfigFactory {
public static ClusterBuilder createClusterBuilder(String environment) {
switch (environment.toLowerCase()) {
case "development":
return createDevelopmentBuilder();
case "staging":
return createStagingBuilder();
case "production":
return createProductionBuilder();
default:
throw new IllegalArgumentException("Unknown environment: " + environment);
}
}
private static ClusterBuilder createDevelopmentBuilder() {
return new ClusterBuilder() {
@Override
protected Cluster buildCluster(Cluster.Builder builder) {
return builder
.addContactPoint("localhost")
.withPort(9042)
.build();
}
};
}
private static ClusterBuilder createProductionBuilder() {
return new ClusterBuilder() {
@Override
protected Cluster buildCluster(Cluster.Builder builder) {
return builder
.addContactPoints(
System.getenv("CASSANDRA_HOST_1"),
System.getenv("CASSANDRA_HOST_2"),
System.getenv("CASSANDRA_HOST_3")
)
.withPort(Integer.parseInt(System.getenv("CASSANDRA_PORT")))
.withCredentials(
System.getenv("CASSANDRA_USERNAME"),
System.getenv("CASSANDRA_PASSWORD")
)
.withSocketOptions(new SocketOptions()
.setConnectTimeoutMillis(10000)
.setReadTimeoutMillis(10000))
.withQueryOptions(new QueryOptions()
.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM))
.build();
}
};
}
}// For high-throughput workloads
CassandraSinkBaseConfig highThroughput = CassandraSinkBaseConfig.newBuilder()
.setMaxConcurrentRequests(1000) // High concurrency
.setMaxConcurrentRequestsTimeout(Duration.ofSeconds(5)) // Fast timeout
.setIgnoreNullFields(true) // Reduce tombstones
.build();
// For low-latency workloads
CassandraSinkBaseConfig lowLatency = CassandraSinkBaseConfig.newBuilder()
.setMaxConcurrentRequests(100) // Lower concurrency for consistency
.setMaxConcurrentRequestsTimeout(Duration.ofSeconds(1)) // Very fast timeout
.setIgnoreNullFields(true)
.build();
// For reliable workloads
CassandraSinkBaseConfig reliable = CassandraSinkBaseConfig.newBuilder()
.setMaxConcurrentRequests(50) // Conservative concurrency
.setMaxConcurrentRequestsTimeout(Duration.ofSeconds(30)) // Generous timeout
.setIgnoreNullFields(false) // Allow nulls if needed
.build();// Authentication and SSL
ClusterBuilder secureBuilder = new ClusterBuilder() {
@Override
protected Cluster buildCluster(Cluster.Builder builder) {
// Load SSL context from keystore
SSLContext sslContext = loadSSLContext();
SSLOptions sslOptions = RemoteEndpointAwareJdkSSLOptions.builder()
.withSSLContext(sslContext)
.build();
return builder
.addContactPoints("secure-cassandra.example.com")
.withPort(9142)
.withCredentials("service-user", loadPassword())
.withSSL(sslOptions)
.withAuthProvider(new PlainTextAuthProvider("service-user", loadPassword()))
.build();
}
private SSLContext loadSSLContext() {
// Load SSL configuration from files or environment
// ...
}
private String loadPassword() {
// Load password from secure source
return System.getenv("CASSANDRA_PASSWORD");
}
};Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-connector-cassandra