Apache Flink connector for Apache Cassandra that provides both streaming and batch data integration capabilities
—
The ClusterBuilder abstract class provides a flexible configuration system for customizing Cassandra cluster connections. It supports authentication, SSL, load balancing, and all advanced connection parameters available in the DataStax Cassandra driver.
Abstract configuration class that encapsulates Cassandra cluster connection setup.
/**
* Abstract class for configuring Cassandra cluster connections
* Implementations define connection parameters through buildCluster method
*/
public abstract class ClusterBuilder implements Serializable {
/**
* Returns configured Cassandra cluster instance
* Creates new cluster using buildCluster implementation
* @return fully configured Cluster instance ready for use
*/
public Cluster getCluster();
/**
* Configures Cassandra cluster connection parameters
* Override this method to customize connection settings
* @param builder DataStax Cluster.Builder instance for configuration
* @return configured Cluster instance
*/
protected abstract Cluster buildCluster(Cluster.Builder builder);
}Simple connection to a single Cassandra node with default settings.
ClusterBuilder basicBuilder = new ClusterBuilder() {
@Override
protected Cluster buildCluster(Cluster.Builder builder) {
return builder
.addContactPoint("localhost")
.withPort(9042)
.build();
}
};Connection to multiple Cassandra nodes for high availability.
ClusterBuilder clusterBuilder = new ClusterBuilder() {
@Override
protected Cluster buildCluster(Cluster.Builder builder) {
return builder
.addContactPoint("cassandra1.example.com")
.addContactPoint("cassandra2.example.com")
.addContactPoint("cassandra3.example.com")
.withPort(9042)
.build();
}
};Connection with username/password authentication.
ClusterBuilder authBuilder = new ClusterBuilder() {
@Override
protected Cluster buildCluster(Cluster.Builder builder) {
return builder
.addContactPoint("secure-cassandra.example.com")
.withPort(9042)
.withCredentials("cassandra_user", "secure_password")
.build();
}
};Secure connection using SSL encryption.
import com.datastax.driver.core.JdkSSLOptions;
import javax.net.ssl.SSLContext;
ClusterBuilder sslBuilder = new ClusterBuilder() {
@Override
protected Cluster buildCluster(Cluster.Builder builder) {
try {
// Configure SSL context (implementation depends on your SSL setup)
SSLContext sslContext = SSLContext.getDefault();
JdkSSLOptions sslOptions = JdkSSLOptions.builder()
.withSSLContext(sslContext)
.build();
return builder
.addContactPoint("ssl-cassandra.example.com")
.withPort(9142) // typical SSL port
.withSSL(sslOptions)
.withCredentials("ssl_user", "ssl_password")
.build();
} catch (Exception e) {
throw new RuntimeException("Failed to configure SSL", e);
}
}
};Comprehensive configuration with connection pooling, timeouts, and retry policies.
import com.datastax.driver.core.policies.*;
import com.datastax.driver.core.SocketOptions;
import com.datastax.driver.core.PoolingOptions;
ClusterBuilder advancedBuilder = new ClusterBuilder() {
@Override
protected Cluster buildCluster(Cluster.Builder builder) {
// Configure connection pooling
PoolingOptions poolingOptions = new PoolingOptions()
.setConnectionsPerHost(HostDistance.LOCAL, 2, 8)
.setConnectionsPerHost(HostDistance.REMOTE, 1, 4)
.setMaxRequestsPerConnection(HostDistance.LOCAL, 32768)
.setMaxRequestsPerConnection(HostDistance.REMOTE, 2000);
// Configure socket options (timeouts)
SocketOptions socketOptions = new SocketOptions()
.setConnectTimeoutMillis(5000)
.setReadTimeoutMillis(12000)
.setKeepAlive(true)
.setTcpNoDelay(true);
// Configure retry and reconnection policies
RetryPolicy retryPolicy = new ExponentialReconnectionPolicy(1000, 10000);
ReconnectionPolicy reconnectionPolicy = new ExponentialReconnectionPolicy(1000, 10000);
// Configure load balancing policy
LoadBalancingPolicy loadBalancingPolicy = DCAwareRoundRobinPolicy.builder()
.withLocalDc("datacenter1")
.build();
return builder
.addContactPoint("cassandra1.prod.example.com")
.addContactPoint("cassandra2.prod.example.com")
.addContactPoint("cassandra3.prod.example.com")
.withPort(9042)
.withCredentials("prod_user", "prod_password")
.withPoolingOptions(poolingOptions)
.withSocketOptions(socketOptions)
.withRetryPolicy(retryPolicy)
.withReconnectionPolicy(reconnectionPolicy)
.withLoadBalancingPolicy(loadBalancingPolicy)
.withCompression(ProtocolOptions.Compression.SNAPPY)
.build();
}
};Configuration that adapts based on environment variables or properties.
ClusterBuilder environmentBuilder = new ClusterBuilder() {
@Override
protected Cluster buildCluster(Cluster.Builder builder) {
// Read configuration from environment or properties
String cassandraHosts = System.getenv("CASSANDRA_HOSTS");
String cassandraPort = System.getenv("CASSANDRA_PORT");
String cassandraUser = System.getenv("CASSANDRA_USER");
String cassandraPassword = System.getenv("CASSANDRA_PASSWORD");
String cassandraKeyspace = System.getenv("CASSANDRA_KEYSPACE");
// Split multiple hosts
String[] hosts = cassandraHosts != null ? cassandraHosts.split(",") : new String[]{"localhost"};
int port = cassandraPort != null ? Integer.parseInt(cassandraPort) : 9042;
Cluster.Builder clusterBuilder = builder.withPort(port);
// Add all contact points
for (String host : hosts) {
clusterBuilder.addContactPoint(host.trim());
}
// Add authentication if provided
if (cassandraUser != null && cassandraPassword != null) {
clusterBuilder.withCredentials(cassandraUser, cassandraPassword);
}
return clusterBuilder.build();
}
};Configuration that automatically connects to a specific keyspace.
ClusterBuilder keyspaceBuilder = new ClusterBuilder() {
private final String keyspace;
public KeyspaceClusterBuilder(String keyspace) {
this.keyspace = keyspace;
}
@Override
protected Cluster buildCluster(Cluster.Builder builder) {
Cluster cluster = builder
.addContactPoint("cassandra.example.com")
.withPort(9042)
.withCredentials("app_user", "app_password")
.build();
// Verify keyspace exists
Session session = cluster.connect();
try {
session.execute("USE " + keyspace);
} catch (Exception e) {
throw new RuntimeException("Keyspace '" + keyspace + "' not accessible", e);
} finally {
session.close();
}
return cluster;
}
};
// Usage
ClusterBuilder analyticsBuilder = new KeyspaceClusterBuilder("analytics");Configure appropriate connection pool sizes based on your workload:
Set appropriate timeouts for your network environment:
Configure robust retry policies for production environments:
DefaultRetryPolicy or custom policies for specific scenariosFollow security best practices:
Enable connection monitoring:
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-connector-cassandra-2-10