CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-connector-cassandra-2-10

Apache Flink connector for Apache Cassandra that provides both streaming and batch data integration capabilities

Pending
Overview
Eval results
Files

connection-configuration.mddocs/

Connection Configuration

The ClusterBuilder abstract class provides a flexible configuration system for customizing Cassandra cluster connections. It supports authentication, SSL, load balancing, and all advanced connection parameters available in the DataStax Cassandra driver.

Capabilities

ClusterBuilder Base Class

Abstract configuration class that encapsulates Cassandra cluster connection setup.

/**
 * Abstract class for configuring Cassandra cluster connections
 * Implementations define connection parameters through buildCluster method
 */
public abstract class ClusterBuilder implements Serializable {
    /**
     * Returns configured Cassandra cluster instance
     * Creates new cluster using buildCluster implementation
     * @return fully configured Cluster instance ready for use
     */
    public Cluster getCluster();
    
    /**
     * Configures Cassandra cluster connection parameters
     * Override this method to customize connection settings
     * @param builder DataStax Cluster.Builder instance for configuration
     * @return configured Cluster instance
     */
    protected abstract Cluster buildCluster(Cluster.Builder builder);
}

Configuration Examples

Basic Connection

Simple connection to a single Cassandra node with default settings.

ClusterBuilder basicBuilder = new ClusterBuilder() {
    @Override
    protected Cluster buildCluster(Cluster.Builder builder) {
        return builder
            .addContactPoint("localhost")
            .withPort(9042)
            .build();
    }
};

Multi-Node Cluster

Connection to multiple Cassandra nodes for high availability.

ClusterBuilder clusterBuilder = new ClusterBuilder() {
    @Override
    protected Cluster buildCluster(Cluster.Builder builder) {
        return builder
            .addContactPoint("cassandra1.example.com")
            .addContactPoint("cassandra2.example.com")
            .addContactPoint("cassandra3.example.com")
            .withPort(9042)
            .build();
    }
};

Authentication Configuration

Connection with username/password authentication.

ClusterBuilder authBuilder = new ClusterBuilder() {
    @Override
    protected Cluster buildCluster(Cluster.Builder builder) {
        return builder
            .addContactPoint("secure-cassandra.example.com")
            .withPort(9042)
            .withCredentials("cassandra_user", "secure_password")
            .build();
    }
};

SSL/TLS Configuration

Secure connection using SSL encryption.

import com.datastax.driver.core.JdkSSLOptions;
import javax.net.ssl.SSLContext;

ClusterBuilder sslBuilder = new ClusterBuilder() {
    @Override
    protected Cluster buildCluster(Cluster.Builder builder) {
        try {
            // Configure SSL context (implementation depends on your SSL setup)
            SSLContext sslContext = SSLContext.getDefault();
            JdkSSLOptions sslOptions = JdkSSLOptions.builder()
                .withSSLContext(sslContext)
                .build();
            
            return builder
                .addContactPoint("ssl-cassandra.example.com")
                .withPort(9142) // typical SSL port
                .withSSL(sslOptions)
                .withCredentials("ssl_user", "ssl_password")
                .build();
        } catch (Exception e) {
            throw new RuntimeException("Failed to configure SSL", e);
        }
    }
};

Advanced Connection Configuration

Comprehensive configuration with connection pooling, timeouts, and retry policies.

import com.datastax.driver.core.policies.*;
import com.datastax.driver.core.SocketOptions;
import com.datastax.driver.core.PoolingOptions;

ClusterBuilder advancedBuilder = new ClusterBuilder() {
    @Override
    protected Cluster buildCluster(Cluster.Builder builder) {
        // Configure connection pooling
        PoolingOptions poolingOptions = new PoolingOptions()
            .setConnectionsPerHost(HostDistance.LOCAL, 2, 8)
            .setConnectionsPerHost(HostDistance.REMOTE, 1, 4)
            .setMaxRequestsPerConnection(HostDistance.LOCAL, 32768)
            .setMaxRequestsPerConnection(HostDistance.REMOTE, 2000);
        
        // Configure socket options (timeouts)
        SocketOptions socketOptions = new SocketOptions()
            .setConnectTimeoutMillis(5000)
            .setReadTimeoutMillis(12000)
            .setKeepAlive(true)
            .setTcpNoDelay(true);
        
        // Configure retry and reconnection policies
        RetryPolicy retryPolicy = new ExponentialReconnectionPolicy(1000, 10000);
        ReconnectionPolicy reconnectionPolicy = new ExponentialReconnectionPolicy(1000, 10000);
        
        // Configure load balancing policy
        LoadBalancingPolicy loadBalancingPolicy = DCAwareRoundRobinPolicy.builder()
            .withLocalDc("datacenter1")
            .build();
        
        return builder
            .addContactPoint("cassandra1.prod.example.com")
            .addContactPoint("cassandra2.prod.example.com")
            .addContactPoint("cassandra3.prod.example.com")
            .withPort(9042)
            .withCredentials("prod_user", "prod_password")
            .withPoolingOptions(poolingOptions)
            .withSocketOptions(socketOptions)
            .withRetryPolicy(retryPolicy)
            .withReconnectionPolicy(reconnectionPolicy)
            .withLoadBalancingPolicy(loadBalancingPolicy)
            .withCompression(ProtocolOptions.Compression.SNAPPY)
            .build();
    }
};

Environment-Based Configuration

Configuration that adapts based on environment variables or properties.

ClusterBuilder environmentBuilder = new ClusterBuilder() {
    @Override
    protected Cluster buildCluster(Cluster.Builder builder) {
        // Read configuration from environment or properties
        String cassandraHosts = System.getenv("CASSANDRA_HOSTS");
        String cassandraPort = System.getenv("CASSANDRA_PORT");
        String cassandraUser = System.getenv("CASSANDRA_USER");
        String cassandraPassword = System.getenv("CASSANDRA_PASSWORD");
        String cassandraKeyspace = System.getenv("CASSANDRA_KEYSPACE");
        
        // Split multiple hosts
        String[] hosts = cassandraHosts != null ? cassandraHosts.split(",") : new String[]{"localhost"};
        int port = cassandraPort != null ? Integer.parseInt(cassandraPort) : 9042;
        
        Cluster.Builder clusterBuilder = builder.withPort(port);
        
        // Add all contact points
        for (String host : hosts) {
            clusterBuilder.addContactPoint(host.trim());
        }
        
        // Add authentication if provided
        if (cassandraUser != null && cassandraPassword != null) {
            clusterBuilder.withCredentials(cassandraUser, cassandraPassword);
        }
        
        return clusterBuilder.build();
    }
};

Keyspace-Specific Configuration

Configuration that automatically connects to a specific keyspace.

ClusterBuilder keyspaceBuilder = new ClusterBuilder() {
    private final String keyspace;
    
    public KeyspaceClusterBuilder(String keyspace) {
        this.keyspace = keyspace;
    }
    
    @Override
    protected Cluster buildCluster(Cluster.Builder builder) {
        Cluster cluster = builder
            .addContactPoint("cassandra.example.com")
            .withPort(9042)
            .withCredentials("app_user", "app_password")
            .build();
        
        // Verify keyspace exists
        Session session = cluster.connect();
        try {
            session.execute("USE " + keyspace);
        } catch (Exception e) {
            throw new RuntimeException("Keyspace '" + keyspace + "' not accessible", e);
        } finally {
            session.close();
        }
        
        return cluster;
    }
};

// Usage
ClusterBuilder analyticsBuilder = new KeyspaceClusterBuilder("analytics");

Configuration Best Practices

Connection Pooling

Configure appropriate connection pool sizes based on your workload:

  • Local connections: 2-8 connections per host for balanced throughput
  • Remote connections: 1-4 connections per host to reduce network overhead
  • Max requests: 32K for local, 2K for remote connections

Timeout Configuration

Set appropriate timeouts for your network environment:

  • Connect timeout: 5-10 seconds for initial connection establishment
  • Read timeout: 10-30 seconds depending on query complexity
  • Keep-alive: Enable to maintain long-lived connections

Retry and Reconnection

Configure robust retry policies for production environments:

  • Exponential backoff: Start with 1 second, max 10 seconds between retries
  • Retry policies: Use DefaultRetryPolicy or custom policies for specific scenarios
  • Circuit breaker: Consider implementing circuit breaker patterns for cascading failures

Security Configuration

Follow security best practices:

  • Use authentication: Always use credentials in production environments
  • Enable SSL/TLS: Encrypt connections, especially over public networks
  • Certificate validation: Properly validate SSL certificates in production
  • Principle of least privilege: Create dedicated users with minimal required permissions

Monitoring and Logging

Enable connection monitoring:

  • Connection metrics: Monitor connection pool utilization
  • Query metrics: Track query latency and error rates
  • Host metrics: Monitor per-host connection status
  • Error logging: Log connection failures and retry attempts

Common Configuration Issues

Connection Failures

  • DNS resolution: Ensure contact point hostnames resolve correctly
  • Network connectivity: Verify network routes and firewall rules
  • Port accessibility: Confirm Cassandra ports are open and accessible

Authentication Issues

  • User permissions: Verify user has required permissions on target keyspace
  • Password expiration: Check if passwords have expired
  • Authentication method: Ensure authentication method matches cluster configuration

Performance Issues

  • Pool exhaustion: Monitor connection pool utilization and adjust sizes
  • Timeout issues: Increase timeouts for complex queries or slow networks
  • Load balancing: Ensure proper distribution across cluster nodes

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-connector-cassandra-2-10

docs

batch-processing.md

connection-configuration.md

fault-tolerance.md

index.md

streaming-sinks.md

tile.json